| 
									
										
										
										
											2002-11-19 17:47:07 +00:00
										 |  |  | """
 | 
					
						
							|  |  |  | TestCases for checking dbShelve objects. | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import sys, os, string | 
					
						
							|  |  |  | import tempfile, random | 
					
						
							|  |  |  | from pprint import pprint | 
					
						
							|  |  |  | from types import * | 
					
						
							|  |  |  | import unittest | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2003-01-28 17:20:44 +00:00
										 |  |  | try: | 
					
						
							| 
									
										
										
										
											2003-09-21 00:08:14 +00:00
										 |  |  |     # For Pythons w/distutils pybsddb | 
					
						
							|  |  |  |     from bsddb3 import db, dbshelve | 
					
						
							|  |  |  | except ImportError: | 
					
						
							| 
									
										
										
										
											2003-01-28 17:20:44 +00:00
										 |  |  |     # For Python 2.3 | 
					
						
							|  |  |  |     from bsddb import db, dbshelve | 
					
						
							| 
									
										
										
										
											2002-11-19 17:47:07 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2002-12-30 20:53:52 +00:00
										 |  |  | from test_all import verbose | 
					
						
							| 
									
										
										
										
											2002-11-19 17:47:07 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | #---------------------------------------------------------------------- | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # We want the objects to be comparable so we can test dbshelve.values | 
					
						
							|  |  |  | # later on. | 
					
						
							|  |  |  | class DataClass: | 
					
						
							|  |  |  |     def __init__(self): | 
					
						
							|  |  |  |         self.value = random.random() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __cmp__(self, other): | 
					
						
							|  |  |  |         return cmp(self.value, other) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class DBShelveTestCase(unittest.TestCase): | 
					
						
							|  |  |  |     def setUp(self): | 
					
						
							|  |  |  |         self.filename = tempfile.mktemp() | 
					
						
							|  |  |  |         self.do_open() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def tearDown(self): | 
					
						
							|  |  |  |         self.do_close() | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             os.remove(self.filename) | 
					
						
							|  |  |  |         except os.error: | 
					
						
							|  |  |  |             pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def populateDB(self, d): | 
					
						
							|  |  |  |         for x in string.letters: | 
					
						
							|  |  |  |             d['S' + x] = 10 * x           # add a string | 
					
						
							|  |  |  |             d['I' + x] = ord(x)           # add an integer | 
					
						
							|  |  |  |             d['L' + x] = [x] * 10         # add a list | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             inst = DataClass()            # add an instance | 
					
						
							|  |  |  |             inst.S = 10 * x | 
					
						
							|  |  |  |             inst.I = ord(x) | 
					
						
							|  |  |  |             inst.L = [x] * 10 | 
					
						
							|  |  |  |             d['O' + x] = inst | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # overridable in derived classes to affect how the shelf is created/opened | 
					
						
							|  |  |  |     def do_open(self): | 
					
						
							|  |  |  |         self.d = dbshelve.open(self.filename) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # and closed... | 
					
						
							|  |  |  |     def do_close(self): | 
					
						
							|  |  |  |         self.d.close() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test01_basics(self): | 
					
						
							|  |  |  |         if verbose: | 
					
						
							|  |  |  |             print '\n', '-=' * 30 | 
					
						
							|  |  |  |             print "Running %s.test01_basics..." % self.__class__.__name__ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.populateDB(self.d) | 
					
						
							|  |  |  |         self.d.sync() | 
					
						
							|  |  |  |         self.do_close() | 
					
						
							|  |  |  |         self.do_open() | 
					
						
							|  |  |  |         d = self.d | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         l = len(d) | 
					
						
							|  |  |  |         k = d.keys() | 
					
						
							|  |  |  |         s = d.stat() | 
					
						
							|  |  |  |         f = d.fd() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if verbose: | 
					
						
							|  |  |  |             print "length:", l | 
					
						
							|  |  |  |             print "keys:", k | 
					
						
							|  |  |  |             print "stats:", s | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         assert 0 == d.has_key('bad key') | 
					
						
							|  |  |  |         assert 1 == d.has_key('IA') | 
					
						
							|  |  |  |         assert 1 == d.has_key('OA') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         d.delete('IA') | 
					
						
							|  |  |  |         del d['OA'] | 
					
						
							|  |  |  |         assert 0 == d.has_key('IA') | 
					
						
							|  |  |  |         assert 0 == d.has_key('OA') | 
					
						
							|  |  |  |         assert len(d) == l-2 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         values = [] | 
					
						
							|  |  |  |         for key in d.keys(): | 
					
						
							|  |  |  |             value = d[key] | 
					
						
							|  |  |  |             values.append(value) | 
					
						
							|  |  |  |             if verbose: | 
					
						
							|  |  |  |                 print "%s: %s" % (key, value) | 
					
						
							|  |  |  |             self.checkrec(key, value) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         dbvalues = d.values() | 
					
						
							|  |  |  |         assert len(dbvalues) == len(d.keys()) | 
					
						
							|  |  |  |         values.sort() | 
					
						
							|  |  |  |         dbvalues.sort() | 
					
						
							|  |  |  |         assert values == dbvalues | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         items = d.items() | 
					
						
							|  |  |  |         assert len(items) == len(values) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for key, value in items: | 
					
						
							|  |  |  |             self.checkrec(key, value) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         assert d.get('bad key') == None | 
					
						
							|  |  |  |         assert d.get('bad key', None) == None | 
					
						
							|  |  |  |         assert d.get('bad key', 'a string') == 'a string' | 
					
						
							|  |  |  |         assert d.get('bad key', [1, 2, 3]) == [1, 2, 3] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         d.set_get_returns_none(0) | 
					
						
							|  |  |  |         self.assertRaises(db.DBNotFoundError, d.get, 'bad key') | 
					
						
							|  |  |  |         d.set_get_returns_none(1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         d.put('new key', 'new data') | 
					
						
							|  |  |  |         assert d.get('new key') == 'new data' | 
					
						
							|  |  |  |         assert d['new key'] == 'new data' | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test02_cursors(self): | 
					
						
							|  |  |  |         if verbose: | 
					
						
							|  |  |  |             print '\n', '-=' * 30 | 
					
						
							|  |  |  |             print "Running %s.test02_cursors..." % self.__class__.__name__ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.populateDB(self.d) | 
					
						
							|  |  |  |         d = self.d | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         count = 0 | 
					
						
							|  |  |  |         c = d.cursor() | 
					
						
							|  |  |  |         rec = c.first() | 
					
						
							|  |  |  |         while rec is not None: | 
					
						
							|  |  |  |             count = count + 1 | 
					
						
							|  |  |  |             if verbose: | 
					
						
							|  |  |  |                 print rec | 
					
						
							|  |  |  |             key, value = rec | 
					
						
							|  |  |  |             self.checkrec(key, value) | 
					
						
							|  |  |  |             rec = c.next() | 
					
						
							| 
									
										
										
										
											2002-12-30 20:53:52 +00:00
										 |  |  |         del c | 
					
						
							| 
									
										
										
										
											2002-11-19 17:47:07 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |         assert count == len(d) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         count = 0 | 
					
						
							|  |  |  |         c = d.cursor() | 
					
						
							|  |  |  |         rec = c.last() | 
					
						
							|  |  |  |         while rec is not None: | 
					
						
							|  |  |  |             count = count + 1 | 
					
						
							|  |  |  |             if verbose: | 
					
						
							|  |  |  |                 print rec | 
					
						
							|  |  |  |             key, value = rec | 
					
						
							|  |  |  |             self.checkrec(key, value) | 
					
						
							|  |  |  |             rec = c.prev() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         assert count == len(d) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         c.set('SS') | 
					
						
							|  |  |  |         key, value = c.current() | 
					
						
							|  |  |  |         self.checkrec(key, value) | 
					
						
							| 
									
										
										
										
											2002-12-30 20:53:52 +00:00
										 |  |  |         del c | 
					
						
							| 
									
										
										
										
											2002-11-19 17:47:07 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def checkrec(self, key, value): | 
					
						
							|  |  |  |         x = key[1] | 
					
						
							|  |  |  |         if key[0] == 'S': | 
					
						
							|  |  |  |             assert type(value) == StringType | 
					
						
							|  |  |  |             assert value == 10 * x | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         elif key[0] == 'I': | 
					
						
							|  |  |  |             assert type(value) == IntType | 
					
						
							|  |  |  |             assert value == ord(x) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         elif key[0] == 'L': | 
					
						
							|  |  |  |             assert type(value) == ListType | 
					
						
							|  |  |  |             assert value == [x] * 10 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         elif key[0] == 'O': | 
					
						
							|  |  |  |             assert type(value) == InstanceType | 
					
						
							|  |  |  |             assert value.S == 10 * x | 
					
						
							|  |  |  |             assert value.I == ord(x) | 
					
						
							|  |  |  |             assert value.L == [x] * 10 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             raise AssertionError, 'Unknown key type, fix the test' | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | #---------------------------------------------------------------------- | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class BasicShelveTestCase(DBShelveTestCase): | 
					
						
							|  |  |  |     def do_open(self): | 
					
						
							|  |  |  |         self.d = dbshelve.DBShelf() | 
					
						
							|  |  |  |         self.d.open(self.filename, self.dbtype, self.dbflags) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def do_close(self): | 
					
						
							|  |  |  |         self.d.close() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class BTreeShelveTestCase(BasicShelveTestCase): | 
					
						
							|  |  |  |     dbtype = db.DB_BTREE | 
					
						
							|  |  |  |     dbflags = db.DB_CREATE | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class HashShelveTestCase(BasicShelveTestCase): | 
					
						
							| 
									
										
										
										
											2002-11-23 11:26:07 +00:00
										 |  |  |     dbtype = db.DB_HASH | 
					
						
							| 
									
										
										
										
											2002-11-19 17:47:07 +00:00
										 |  |  |     dbflags = db.DB_CREATE | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class ThreadBTreeShelveTestCase(BasicShelveTestCase): | 
					
						
							|  |  |  |     dbtype = db.DB_BTREE | 
					
						
							|  |  |  |     dbflags = db.DB_CREATE | db.DB_THREAD | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class ThreadHashShelveTestCase(BasicShelveTestCase): | 
					
						
							| 
									
										
										
										
											2002-11-23 11:26:07 +00:00
										 |  |  |     dbtype = db.DB_HASH | 
					
						
							| 
									
										
										
										
											2002-11-19 17:47:07 +00:00
										 |  |  |     dbflags = db.DB_CREATE | db.DB_THREAD | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | #---------------------------------------------------------------------- | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class BasicEnvShelveTestCase(DBShelveTestCase): | 
					
						
							|  |  |  |     def do_open(self): | 
					
						
							| 
									
										
										
										
											2002-12-30 20:53:52 +00:00
										 |  |  |         self.homeDir = homeDir = os.path.join( | 
					
						
							| 
									
										
										
										
											2007-01-05 15:51:24 +00:00
										 |  |  |             tempfile.gettempdir(), 'db_home') | 
					
						
							| 
									
										
										
										
											2002-11-19 17:47:07 +00:00
										 |  |  |         try: os.mkdir(homeDir) | 
					
						
							|  |  |  |         except os.error: pass | 
					
						
							|  |  |  |         self.env = db.DBEnv() | 
					
						
							|  |  |  |         self.env.open(homeDir, self.envflags | db.DB_INIT_MPOOL | db.DB_CREATE) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.filename = os.path.split(self.filename)[1] | 
					
						
							|  |  |  |         self.d = dbshelve.DBShelf(self.env) | 
					
						
							|  |  |  |         self.d.open(self.filename, self.dbtype, self.dbflags) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def do_close(self): | 
					
						
							|  |  |  |         self.d.close() | 
					
						
							|  |  |  |         self.env.close() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def tearDown(self): | 
					
						
							|  |  |  |         self.do_close() | 
					
						
							|  |  |  |         import glob | 
					
						
							|  |  |  |         files = glob.glob(os.path.join(self.homeDir, '*')) | 
					
						
							|  |  |  |         for file in files: | 
					
						
							|  |  |  |             os.remove(file) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class EnvBTreeShelveTestCase(BasicEnvShelveTestCase): | 
					
						
							|  |  |  |     envflags = 0 | 
					
						
							|  |  |  |     dbtype = db.DB_BTREE | 
					
						
							|  |  |  |     dbflags = db.DB_CREATE | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class EnvHashShelveTestCase(BasicEnvShelveTestCase): | 
					
						
							|  |  |  |     envflags = 0 | 
					
						
							| 
									
										
										
										
											2002-11-23 11:26:07 +00:00
										 |  |  |     dbtype = db.DB_HASH | 
					
						
							| 
									
										
										
										
											2002-11-19 17:47:07 +00:00
										 |  |  |     dbflags = db.DB_CREATE | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class EnvThreadBTreeShelveTestCase(BasicEnvShelveTestCase): | 
					
						
							|  |  |  |     envflags = db.DB_THREAD | 
					
						
							|  |  |  |     dbtype = db.DB_BTREE | 
					
						
							|  |  |  |     dbflags = db.DB_CREATE | db.DB_THREAD | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class EnvThreadHashShelveTestCase(BasicEnvShelveTestCase): | 
					
						
							|  |  |  |     envflags = db.DB_THREAD | 
					
						
							| 
									
										
										
										
											2002-11-23 11:26:07 +00:00
										 |  |  |     dbtype = db.DB_HASH | 
					
						
							| 
									
										
										
										
											2002-11-19 17:47:07 +00:00
										 |  |  |     dbflags = db.DB_CREATE | db.DB_THREAD | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | #---------------------------------------------------------------------- | 
					
						
							|  |  |  | # TODO:  Add test cases for a DBShelf in a RECNO DB. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | #---------------------------------------------------------------------- | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2002-12-30 20:53:52 +00:00
										 |  |  | def test_suite(): | 
					
						
							|  |  |  |     suite = unittest.TestSuite() | 
					
						
							| 
									
										
										
										
											2002-11-19 17:47:07 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2002-12-30 20:53:52 +00:00
										 |  |  |     suite.addTest(unittest.makeSuite(DBShelveTestCase)) | 
					
						
							|  |  |  |     suite.addTest(unittest.makeSuite(BTreeShelveTestCase)) | 
					
						
							|  |  |  |     suite.addTest(unittest.makeSuite(HashShelveTestCase)) | 
					
						
							|  |  |  |     suite.addTest(unittest.makeSuite(ThreadBTreeShelveTestCase)) | 
					
						
							|  |  |  |     suite.addTest(unittest.makeSuite(ThreadHashShelveTestCase)) | 
					
						
							|  |  |  |     suite.addTest(unittest.makeSuite(EnvBTreeShelveTestCase)) | 
					
						
							|  |  |  |     suite.addTest(unittest.makeSuite(EnvHashShelveTestCase)) | 
					
						
							|  |  |  |     suite.addTest(unittest.makeSuite(EnvThreadBTreeShelveTestCase)) | 
					
						
							|  |  |  |     suite.addTest(unittest.makeSuite(EnvThreadHashShelveTestCase)) | 
					
						
							| 
									
										
										
										
											2002-11-19 17:47:07 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2002-12-30 20:53:52 +00:00
										 |  |  |     return suite | 
					
						
							| 
									
										
										
										
											2002-11-19 17:47:07 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if __name__ == '__main__': | 
					
						
							| 
									
										
										
										
											2002-12-30 20:53:52 +00:00
										 |  |  |     unittest.main(defaultTest='test_suite') |