mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 23:21:29 +00:00 
			
		
		
		
	be compatible with Python 3.0, also. http://www.jcea.es/programacion/pybsddb.htm#bsddb3-4.7.3
		
			
				
	
	
		
			445 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			445 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""
 | 
						|
TestCases for DB.associate.
 | 
						|
"""
 | 
						|
 | 
						|
import sys, os, string
 | 
						|
import time
 | 
						|
from pprint import pprint
 | 
						|
 | 
						|
import unittest
 | 
						|
from test_all import db, dbshelve, test_support, verbose, have_threads, \
 | 
						|
        get_new_environment_path
 | 
						|
 | 
						|
 | 
						|
#----------------------------------------------------------------------
 | 
						|
 | 
						|
 | 
						|
musicdata = {
 | 
						|
1 : ("Bad English", "The Price Of Love", "Rock"),
 | 
						|
2 : ("DNA featuring Suzanne Vega", "Tom's Diner", "Rock"),
 | 
						|
3 : ("George Michael", "Praying For Time", "Rock"),
 | 
						|
4 : ("Gloria Estefan", "Here We Are", "Rock"),
 | 
						|
5 : ("Linda Ronstadt", "Don't Know Much", "Rock"),
 | 
						|
6 : ("Michael Bolton", "How Am I Supposed To Live Without You", "Blues"),
 | 
						|
7 : ("Paul Young", "Oh Girl", "Rock"),
 | 
						|
8 : ("Paula Abdul", "Opposites Attract", "Rock"),
 | 
						|
9 : ("Richard Marx", "Should've Known Better", "Rock"),
 | 
						|
10: ("Rod Stewart", "Forever Young", "Rock"),
 | 
						|
11: ("Roxette", "Dangerous", "Rock"),
 | 
						|
12: ("Sheena Easton", "The Lover In Me", "Rock"),
 | 
						|
13: ("Sinead O'Connor", "Nothing Compares 2 U", "Rock"),
 | 
						|
14: ("Stevie B.", "Because I Love You", "Rock"),
 | 
						|
15: ("Taylor Dayne", "Love Will Lead You Back", "Rock"),
 | 
						|
16: ("The Bangles", "Eternal Flame", "Rock"),
 | 
						|
17: ("Wilson Phillips", "Release Me", "Rock"),
 | 
						|
18: ("Billy Joel", "Blonde Over Blue", "Rock"),
 | 
						|
19: ("Billy Joel", "Famous Last Words", "Rock"),
 | 
						|
20: ("Billy Joel", "Lullabye (Goodnight, My Angel)", "Rock"),
 | 
						|
21: ("Billy Joel", "The River Of Dreams", "Rock"),
 | 
						|
22: ("Billy Joel", "Two Thousand Years", "Rock"),
 | 
						|
23: ("Janet Jackson", "Alright", "Rock"),
 | 
						|
24: ("Janet Jackson", "Black Cat", "Rock"),
 | 
						|
25: ("Janet Jackson", "Come Back To Me", "Rock"),
 | 
						|
26: ("Janet Jackson", "Escapade", "Rock"),
 | 
						|
27: ("Janet Jackson", "Love Will Never Do (Without You)", "Rock"),
 | 
						|
28: ("Janet Jackson", "Miss You Much", "Rock"),
 | 
						|
29: ("Janet Jackson", "Rhythm Nation", "Rock"),
 | 
						|
30: ("Janet Jackson", "State Of The World", "Rock"),
 | 
						|
31: ("Janet Jackson", "The Knowledge", "Rock"),
 | 
						|
32: ("Spyro Gyra", "End of Romanticism", "Jazz"),
 | 
						|
33: ("Spyro Gyra", "Heliopolis", "Jazz"),
 | 
						|
34: ("Spyro Gyra", "Jubilee", "Jazz"),
 | 
						|
35: ("Spyro Gyra", "Little Linda", "Jazz"),
 | 
						|
36: ("Spyro Gyra", "Morning Dance", "Jazz"),
 | 
						|
37: ("Spyro Gyra", "Song for Lorraine", "Jazz"),
 | 
						|
38: ("Yes", "Owner Of A Lonely Heart", "Rock"),
 | 
						|
39: ("Yes", "Rhythm Of Love", "Rock"),
 | 
						|
40: ("Cusco", "Dream Catcher", "New Age"),
 | 
						|
41: ("Cusco", "Geronimos Laughter", "New Age"),
 | 
						|
42: ("Cusco", "Ghost Dance", "New Age"),
 | 
						|
43: ("Blue Man Group", "Drumbone", "New Age"),
 | 
						|
44: ("Blue Man Group", "Endless Column", "New Age"),
 | 
						|
45: ("Blue Man Group", "Klein Mandelbrot", "New Age"),
 | 
						|
46: ("Kenny G", "Silhouette", "Jazz"),
 | 
						|
47: ("Sade", "Smooth Operator", "Jazz"),
 | 
						|
48: ("David Arkenstone", "Papillon (On The Wings Of The Butterfly)",
 | 
						|
     "New Age"),
 | 
						|
49: ("David Arkenstone", "Stepping Stars", "New Age"),
 | 
						|
50: ("David Arkenstone", "Carnation Lily Lily Rose", "New Age"),
 | 
						|
51: ("David Lanz", "Behind The Waterfall", "New Age"),
 | 
						|
52: ("David Lanz", "Cristofori's Dream", "New Age"),
 | 
						|
53: ("David Lanz", "Heartsounds", "New Age"),
 | 
						|
54: ("David Lanz", "Leaves on the Seine", "New Age"),
 | 
						|
99: ("unknown artist", "Unnamed song", "Unknown"),
 | 
						|
}
 | 
						|
 | 
						|
#----------------------------------------------------------------------
 | 
						|
 | 
						|
class AssociateErrorTestCase(unittest.TestCase):
 | 
						|
    def setUp(self):
 | 
						|
        self.filename = self.__class__.__name__ + '.db'
 | 
						|
        self.homeDir = get_new_environment_path()
 | 
						|
        self.env = db.DBEnv()
 | 
						|
        self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL)
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        self.env.close()
 | 
						|
        self.env = None
 | 
						|
        test_support.rmtree(self.homeDir)
 | 
						|
 | 
						|
    def test00_associateDBError(self):
 | 
						|
        if verbose:
 | 
						|
            print '\n', '-=' * 30
 | 
						|
            print "Running %s.test00_associateDBError..." % \
 | 
						|
                  self.__class__.__name__
 | 
						|
 | 
						|
        dupDB = db.DB(self.env)
 | 
						|
        dupDB.set_flags(db.DB_DUP)
 | 
						|
        dupDB.open(self.filename, "primary", db.DB_BTREE, db.DB_CREATE)
 | 
						|
 | 
						|
        secDB = db.DB(self.env)
 | 
						|
        secDB.open(self.filename, "secondary", db.DB_BTREE, db.DB_CREATE)
 | 
						|
 | 
						|
        # dupDB has been configured to allow duplicates, it can't
 | 
						|
        # associate with a secondary.  Berkeley DB will return an error.
 | 
						|
        try:
 | 
						|
            def f(a,b): return a+b
 | 
						|
            dupDB.associate(secDB, f)
 | 
						|
        except db.DBError:
 | 
						|
            # good
 | 
						|
            secDB.close()
 | 
						|
            dupDB.close()
 | 
						|
        else:
 | 
						|
            secDB.close()
 | 
						|
            dupDB.close()
 | 
						|
            self.fail("DBError exception was expected")
 | 
						|
 | 
						|
 | 
						|
 | 
						|
#----------------------------------------------------------------------
 | 
						|
 | 
						|
 | 
						|
class AssociateTestCase(unittest.TestCase):
 | 
						|
    keytype = ''
 | 
						|
    envFlags = 0
 | 
						|
    dbFlags = 0
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        self.filename = self.__class__.__name__ + '.db'
 | 
						|
        self.homeDir = get_new_environment_path()
 | 
						|
        self.env = db.DBEnv()
 | 
						|
        self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL |
 | 
						|
                               db.DB_INIT_LOCK | db.DB_THREAD | self.envFlags)
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        self.closeDB()
 | 
						|
        self.env.close()
 | 
						|
        self.env = None
 | 
						|
        test_support.rmtree(self.homeDir)
 | 
						|
 | 
						|
    def addDataToDB(self, d, txn=None):
 | 
						|
        for key, value in musicdata.items():
 | 
						|
            if type(self.keytype) == type(''):
 | 
						|
                key = "%02d" % key
 | 
						|
            d.put(key, '|'.join(value), txn=txn)
 | 
						|
 | 
						|
    def createDB(self, txn=None):
 | 
						|
        self.cur = None
 | 
						|
        self.secDB = None
 | 
						|
        self.primary = db.DB(self.env)
 | 
						|
        self.primary.set_get_returns_none(2)
 | 
						|
        if db.version() >= (4, 1):
 | 
						|
            self.primary.open(self.filename, "primary", self.dbtype,
 | 
						|
                          db.DB_CREATE | db.DB_THREAD | self.dbFlags, txn=txn)
 | 
						|
        else:
 | 
						|
            self.primary.open(self.filename, "primary", self.dbtype,
 | 
						|
                          db.DB_CREATE | db.DB_THREAD | self.dbFlags)
 | 
						|
 | 
						|
    def closeDB(self):
 | 
						|
        if self.cur:
 | 
						|
            self.cur.close()
 | 
						|
            self.cur = None
 | 
						|
        if self.secDB:
 | 
						|
            self.secDB.close()
 | 
						|
            self.secDB = None
 | 
						|
        self.primary.close()
 | 
						|
        self.primary = None
 | 
						|
 | 
						|
    def getDB(self):
 | 
						|
        return self.primary
 | 
						|
 | 
						|
 | 
						|
    def test01_associateWithDB(self):
 | 
						|
        if verbose:
 | 
						|
            print '\n', '-=' * 30
 | 
						|
            print "Running %s.test01_associateWithDB..." % \
 | 
						|
                  self.__class__.__name__
 | 
						|
 | 
						|
        self.createDB()
 | 
						|
 | 
						|
        self.secDB = db.DB(self.env)
 | 
						|
        self.secDB.set_flags(db.DB_DUP)
 | 
						|
        self.secDB.set_get_returns_none(2)
 | 
						|
        self.secDB.open(self.filename, "secondary", db.DB_BTREE,
 | 
						|
                   db.DB_CREATE | db.DB_THREAD | self.dbFlags)
 | 
						|
        self.getDB().associate(self.secDB, self.getGenre)
 | 
						|
 | 
						|
        self.addDataToDB(self.getDB())
 | 
						|
 | 
						|
        self.finish_test(self.secDB)
 | 
						|
 | 
						|
 | 
						|
    def test02_associateAfterDB(self):
 | 
						|
        if verbose:
 | 
						|
            print '\n', '-=' * 30
 | 
						|
            print "Running %s.test02_associateAfterDB..." % \
 | 
						|
                  self.__class__.__name__
 | 
						|
 | 
						|
        self.createDB()
 | 
						|
        self.addDataToDB(self.getDB())
 | 
						|
 | 
						|
        self.secDB = db.DB(self.env)
 | 
						|
        self.secDB.set_flags(db.DB_DUP)
 | 
						|
        self.secDB.open(self.filename, "secondary", db.DB_BTREE,
 | 
						|
                   db.DB_CREATE | db.DB_THREAD | self.dbFlags)
 | 
						|
 | 
						|
        # adding the DB_CREATE flag will cause it to index existing records
 | 
						|
        self.getDB().associate(self.secDB, self.getGenre, db.DB_CREATE)
 | 
						|
 | 
						|
        self.finish_test(self.secDB)
 | 
						|
 | 
						|
 | 
						|
    def finish_test(self, secDB, txn=None):
 | 
						|
        # 'Blues' should not be in the secondary database
 | 
						|
        vals = secDB.pget('Blues', txn=txn)
 | 
						|
        self.assertEqual(vals, None, vals)
 | 
						|
 | 
						|
        vals = secDB.pget('Unknown', txn=txn)
 | 
						|
        self.assert_(vals[0] == 99 or vals[0] == '99', vals)
 | 
						|
        vals[1].index('Unknown')
 | 
						|
        vals[1].index('Unnamed')
 | 
						|
        vals[1].index('unknown')
 | 
						|
 | 
						|
        if verbose:
 | 
						|
            print "Primary key traversal:"
 | 
						|
        self.cur = self.getDB().cursor(txn)
 | 
						|
        count = 0
 | 
						|
        rec = self.cur.first()
 | 
						|
        while rec is not None:
 | 
						|
            if type(self.keytype) == type(''):
 | 
						|
                self.assert_(int(rec[0]))  # for primary db, key is a number
 | 
						|
            else:
 | 
						|
                self.assert_(rec[0] and type(rec[0]) == type(0))
 | 
						|
            count = count + 1
 | 
						|
            if verbose:
 | 
						|
                print rec
 | 
						|
            rec = getattr(self.cur, "next")()
 | 
						|
        self.assertEqual(count, len(musicdata))  # all items accounted for
 | 
						|
 | 
						|
 | 
						|
        if verbose:
 | 
						|
            print "Secondary key traversal:"
 | 
						|
        self.cur = secDB.cursor(txn)
 | 
						|
        count = 0
 | 
						|
 | 
						|
        # test cursor pget
 | 
						|
        vals = self.cur.pget('Unknown', flags=db.DB_LAST)
 | 
						|
        self.assert_(vals[1] == 99 or vals[1] == '99', vals)
 | 
						|
        self.assertEqual(vals[0], 'Unknown')
 | 
						|
        vals[2].index('Unknown')
 | 
						|
        vals[2].index('Unnamed')
 | 
						|
        vals[2].index('unknown')
 | 
						|
 | 
						|
        vals = self.cur.pget('Unknown', data='wrong value', flags=db.DB_GET_BOTH)
 | 
						|
        self.assertEqual(vals, None, vals)
 | 
						|
 | 
						|
        rec = self.cur.first()
 | 
						|
        self.assertEqual(rec[0], "Jazz")
 | 
						|
        while rec is not None:
 | 
						|
            count = count + 1
 | 
						|
            if verbose:
 | 
						|
                print rec
 | 
						|
            rec = getattr(self.cur, "next")()
 | 
						|
        # all items accounted for EXCEPT for 1 with "Blues" genre
 | 
						|
        self.assertEqual(count, len(musicdata)-1)
 | 
						|
 | 
						|
        self.cur = None
 | 
						|
 | 
						|
    def getGenre(self, priKey, priData):
 | 
						|
        self.assertEqual(type(priData), type(""))
 | 
						|
        genre = priData.split('|')[2]
 | 
						|
 | 
						|
        if verbose:
 | 
						|
            print 'getGenre key: %r data: %r' % (priKey, priData)
 | 
						|
 | 
						|
        if genre == 'Blues':
 | 
						|
            return db.DB_DONOTINDEX
 | 
						|
        else:
 | 
						|
            return genre
 | 
						|
 | 
						|
 | 
						|
#----------------------------------------------------------------------
 | 
						|
 | 
						|
 | 
						|
class AssociateHashTestCase(AssociateTestCase):
 | 
						|
    dbtype = db.DB_HASH
 | 
						|
 | 
						|
class AssociateBTreeTestCase(AssociateTestCase):
 | 
						|
    dbtype = db.DB_BTREE
 | 
						|
 | 
						|
class AssociateRecnoTestCase(AssociateTestCase):
 | 
						|
    dbtype = db.DB_RECNO
 | 
						|
    keytype = 0
 | 
						|
 | 
						|
#----------------------------------------------------------------------
 | 
						|
 | 
						|
class AssociateBTreeTxnTestCase(AssociateBTreeTestCase):
 | 
						|
    envFlags = db.DB_INIT_TXN
 | 
						|
    dbFlags = 0
 | 
						|
 | 
						|
    def txn_finish_test(self, sDB, txn):
 | 
						|
        try:
 | 
						|
            self.finish_test(sDB, txn=txn)
 | 
						|
        finally:
 | 
						|
            if self.cur:
 | 
						|
                self.cur.close()
 | 
						|
                self.cur = None
 | 
						|
            if txn:
 | 
						|
                txn.commit()
 | 
						|
 | 
						|
    def test13_associate_in_transaction(self):
 | 
						|
        if verbose:
 | 
						|
            print '\n', '-=' * 30
 | 
						|
            print "Running %s.test13_associateAutoCommit..." % \
 | 
						|
                  self.__class__.__name__
 | 
						|
 | 
						|
        txn = self.env.txn_begin()
 | 
						|
        try:
 | 
						|
            self.createDB(txn=txn)
 | 
						|
 | 
						|
            self.secDB = db.DB(self.env)
 | 
						|
            self.secDB.set_flags(db.DB_DUP)
 | 
						|
            self.secDB.set_get_returns_none(2)
 | 
						|
            self.secDB.open(self.filename, "secondary", db.DB_BTREE,
 | 
						|
                       db.DB_CREATE | db.DB_THREAD, txn=txn)
 | 
						|
            if db.version() >= (4,1):
 | 
						|
                self.getDB().associate(self.secDB, self.getGenre, txn=txn)
 | 
						|
            else:
 | 
						|
                self.getDB().associate(self.secDB, self.getGenre)
 | 
						|
 | 
						|
            self.addDataToDB(self.getDB(), txn=txn)
 | 
						|
        except:
 | 
						|
            txn.abort()
 | 
						|
            raise
 | 
						|
 | 
						|
        self.txn_finish_test(self.secDB, txn=txn)
 | 
						|
 | 
						|
 | 
						|
#----------------------------------------------------------------------
 | 
						|
 | 
						|
class ShelveAssociateTestCase(AssociateTestCase):
 | 
						|
 | 
						|
    def createDB(self):
 | 
						|
        self.primary = dbshelve.open(self.filename,
 | 
						|
                                     dbname="primary",
 | 
						|
                                     dbenv=self.env,
 | 
						|
                                     filetype=self.dbtype)
 | 
						|
 | 
						|
    def addDataToDB(self, d):
 | 
						|
        for key, value in musicdata.items():
 | 
						|
            if type(self.keytype) == type(''):
 | 
						|
                key = "%02d" % key
 | 
						|
            d.put(key, value)    # save the value as is this time
 | 
						|
 | 
						|
 | 
						|
    def getGenre(self, priKey, priData):
 | 
						|
        self.assertEqual(type(priData), type(()))
 | 
						|
        if verbose:
 | 
						|
            print 'getGenre key: %r data: %r' % (priKey, priData)
 | 
						|
        genre = priData[2]
 | 
						|
        if genre == 'Blues':
 | 
						|
            return db.DB_DONOTINDEX
 | 
						|
        else:
 | 
						|
            return genre
 | 
						|
 | 
						|
 | 
						|
class ShelveAssociateHashTestCase(ShelveAssociateTestCase):
 | 
						|
    dbtype = db.DB_HASH
 | 
						|
 | 
						|
class ShelveAssociateBTreeTestCase(ShelveAssociateTestCase):
 | 
						|
    dbtype = db.DB_BTREE
 | 
						|
 | 
						|
class ShelveAssociateRecnoTestCase(ShelveAssociateTestCase):
 | 
						|
    dbtype = db.DB_RECNO
 | 
						|
    keytype = 0
 | 
						|
 | 
						|
 | 
						|
#----------------------------------------------------------------------
 | 
						|
 | 
						|
class ThreadedAssociateTestCase(AssociateTestCase):
 | 
						|
 | 
						|
    def addDataToDB(self, d):
 | 
						|
        t1 = Thread(target = self.writer1,
 | 
						|
                    args = (d, ))
 | 
						|
        t2 = Thread(target = self.writer2,
 | 
						|
                    args = (d, ))
 | 
						|
 | 
						|
        t1.setDaemon(True)
 | 
						|
        t2.setDaemon(True)
 | 
						|
        t1.start()
 | 
						|
        t2.start()
 | 
						|
        t1.join()
 | 
						|
        t2.join()
 | 
						|
 | 
						|
    def writer1(self, d):
 | 
						|
        for key, value in musicdata.items():
 | 
						|
            if type(self.keytype) == type(''):
 | 
						|
                key = "%02d" % key
 | 
						|
            d.put(key, '|'.join(value))
 | 
						|
 | 
						|
    def writer2(self, d):
 | 
						|
        for x in range(100, 600):
 | 
						|
            key = 'z%2d' % x
 | 
						|
            value = [key] * 4
 | 
						|
            d.put(key, '|'.join(value))
 | 
						|
 | 
						|
 | 
						|
class ThreadedAssociateHashTestCase(ShelveAssociateTestCase):
 | 
						|
    dbtype = db.DB_HASH
 | 
						|
 | 
						|
class ThreadedAssociateBTreeTestCase(ShelveAssociateTestCase):
 | 
						|
    dbtype = db.DB_BTREE
 | 
						|
 | 
						|
class ThreadedAssociateRecnoTestCase(ShelveAssociateTestCase):
 | 
						|
    dbtype = db.DB_RECNO
 | 
						|
    keytype = 0
 | 
						|
 | 
						|
 | 
						|
#----------------------------------------------------------------------
 | 
						|
 | 
						|
def test_suite():
 | 
						|
    suite = unittest.TestSuite()
 | 
						|
 | 
						|
    suite.addTest(unittest.makeSuite(AssociateErrorTestCase))
 | 
						|
 | 
						|
    suite.addTest(unittest.makeSuite(AssociateHashTestCase))
 | 
						|
    suite.addTest(unittest.makeSuite(AssociateBTreeTestCase))
 | 
						|
    suite.addTest(unittest.makeSuite(AssociateRecnoTestCase))
 | 
						|
 | 
						|
    if db.version() >= (4, 1):
 | 
						|
        suite.addTest(unittest.makeSuite(AssociateBTreeTxnTestCase))
 | 
						|
 | 
						|
    suite.addTest(unittest.makeSuite(ShelveAssociateHashTestCase))
 | 
						|
    suite.addTest(unittest.makeSuite(ShelveAssociateBTreeTestCase))
 | 
						|
    suite.addTest(unittest.makeSuite(ShelveAssociateRecnoTestCase))
 | 
						|
 | 
						|
    if have_threads:
 | 
						|
        suite.addTest(unittest.makeSuite(ThreadedAssociateHashTestCase))
 | 
						|
        suite.addTest(unittest.makeSuite(ThreadedAssociateBTreeTestCase))
 | 
						|
        suite.addTest(unittest.makeSuite(ThreadedAssociateRecnoTestCase))
 | 
						|
 | 
						|
    return suite
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    unittest.main(defaultTest='test_suite')
 |