| 
									
										
										
										
											1997-01-16 22:04:10 +00:00
										 |  |  | #! /usr/bin/env python | 
					
						
							| 
									
										
										
										
											2003-09-13 05:51:09 +00:00
										 |  |  | """Test script for the bsddb C module by Roger E. Masse
 | 
					
						
							|  |  |  |    Adapted to unittest format and expanded scope by Raymond Hettinger | 
					
						
							| 
									
										
										
										
											1997-01-16 22:04:10 +00:00
										 |  |  | """
 | 
					
						
							| 
									
										
										
										
											2003-11-02 09:10:16 +00:00
										 |  |  | import os, sys | 
					
						
							| 
									
										
										
										
											2003-11-03 01:04:41 +00:00
										 |  |  | import copy | 
					
						
							| 
									
										
										
										
											1997-01-16 22:04:10 +00:00
										 |  |  | import bsddb | 
					
						
							| 
									
										
										
										
											2001-12-07 16:43:19 +00:00
										 |  |  | import dbhash # Just so we know it's imported | 
					
						
							| 
									
										
										
										
											2003-09-13 05:51:09 +00:00
										 |  |  | import unittest | 
					
						
							|  |  |  | from test import test_support | 
					
						
							|  |  |  | from sets import Set | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TestBSDDB(unittest.TestCase): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def setUp(self): | 
					
						
							|  |  |  |         self.f = self.openmethod[0](self.fname, 'c') | 
					
						
							|  |  |  |         self.d = dict(q='Guido', w='van', e='Rossum', r='invented', t='Python', y='') | 
					
						
							|  |  |  |         for k, v in self.d.iteritems(): | 
					
						
							|  |  |  |             self.f[k] = v | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def tearDown(self): | 
					
						
							|  |  |  |         self.f.sync() | 
					
						
							|  |  |  |         self.f.close() | 
					
						
							|  |  |  |         if self.fname is None: | 
					
						
							|  |  |  |             return | 
					
						
							| 
									
										
										
										
											2002-04-23 02:11:05 +00:00
										 |  |  |         try: | 
					
						
							| 
									
										
										
										
											2003-09-13 05:51:09 +00:00
										 |  |  |             os.remove(self.fname) | 
					
						
							| 
									
										
										
										
											2002-04-23 02:11:05 +00:00
										 |  |  |         except os.error: | 
					
						
							|  |  |  |             pass | 
					
						
							| 
									
										
										
										
											1997-01-16 22:04:10 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2003-09-13 05:51:09 +00:00
										 |  |  |     def test_getitem(self): | 
					
						
							|  |  |  |         for k, v in self.d.iteritems(): | 
					
						
							|  |  |  |             self.assertEqual(self.f[k], v) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_len(self): | 
					
						
							|  |  |  |         self.assertEqual(len(self.f), len(self.d)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_change(self): | 
					
						
							|  |  |  |         self.f['r'] = 'discovered' | 
					
						
							|  |  |  |         self.assertEqual(self.f['r'], 'discovered') | 
					
						
							|  |  |  |         self.assert_('r' in self.f.keys()) | 
					
						
							|  |  |  |         self.assert_('discovered' in self.f.values()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_close_and_reopen(self): | 
					
						
							|  |  |  |         if self.fname is None: | 
					
						
							|  |  |  |             # if we're using an in-memory only db, we can't reopen it | 
					
						
							|  |  |  |             # so finish here. | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         self.f.close() | 
					
						
							|  |  |  |         self.f = self.openmethod[0](self.fname, 'w') | 
					
						
							|  |  |  |         for k, v in self.d.iteritems(): | 
					
						
							|  |  |  |             self.assertEqual(self.f[k], v) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def assertSetEquals(self, seqn1, seqn2): | 
					
						
							|  |  |  |         self.assertEqual(Set(seqn1), Set(seqn2)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_mapping_iteration_methods(self): | 
					
						
							|  |  |  |         f = self.f | 
					
						
							|  |  |  |         d = self.d | 
					
						
							|  |  |  |         self.assertSetEquals(d, f) | 
					
						
							|  |  |  |         self.assertSetEquals(d.keys(), f.keys()) | 
					
						
							|  |  |  |         self.assertSetEquals(d.values(), f.values()) | 
					
						
							|  |  |  |         self.assertSetEquals(d.items(), f.items()) | 
					
						
							|  |  |  |         self.assertSetEquals(d.iterkeys(), f.iterkeys()) | 
					
						
							|  |  |  |         self.assertSetEquals(d.itervalues(), f.itervalues()) | 
					
						
							|  |  |  |         self.assertSetEquals(d.iteritems(), f.iteritems()) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2003-11-03 01:04:41 +00:00
										 |  |  |     def test_iter_while_modifying_values(self): | 
					
						
							|  |  |  |         if not hasattr(self.f, '__iter__'): | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         di = iter(self.d) | 
					
						
							|  |  |  |         while 1: | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 key = di.next() | 
					
						
							|  |  |  |                 self.d[key] = 'modified '+key | 
					
						
							|  |  |  |             except StopIteration: | 
					
						
							|  |  |  |                 break | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # it should behave the same as a dict.  modifying values | 
					
						
							|  |  |  |         # of existing keys should not break iteration.  (adding | 
					
						
							|  |  |  |         # or removing keys should) | 
					
						
							|  |  |  |         fi = iter(self.f) | 
					
						
							|  |  |  |         while 1: | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 key = fi.next() | 
					
						
							|  |  |  |                 self.f[key] = 'modified '+key | 
					
						
							|  |  |  |             except StopIteration: | 
					
						
							|  |  |  |                 break | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.test_mapping_iteration_methods() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_iteritems_while_modifying_values(self): | 
					
						
							|  |  |  |         if not hasattr(self.f, 'iteritems'): | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         di = self.d.iteritems() | 
					
						
							|  |  |  |         while 1: | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 k, v = di.next() | 
					
						
							|  |  |  |                 self.d[k] = 'modified '+v | 
					
						
							|  |  |  |             except StopIteration: | 
					
						
							|  |  |  |                 break | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # it should behave the same as a dict.  modifying values | 
					
						
							|  |  |  |         # of existing keys should not break iteration.  (adding | 
					
						
							|  |  |  |         # or removing keys should) | 
					
						
							|  |  |  |         fi = self.f.iteritems() | 
					
						
							|  |  |  |         while 1: | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 k, v = fi.next() | 
					
						
							|  |  |  |                 self.f[k] = 'modified '+v | 
					
						
							|  |  |  |             except StopIteration: | 
					
						
							|  |  |  |                 break | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.test_mapping_iteration_methods() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2003-09-13 05:51:09 +00:00
										 |  |  |     def test_first_next_looping(self): | 
					
						
							|  |  |  |         items = [self.f.first()] | 
					
						
							|  |  |  |         for i in xrange(1, len(self.f)): | 
					
						
							|  |  |  |             items.append(self.f.next()) | 
					
						
							|  |  |  |         self.assertSetEquals(items, self.d.items()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_previous_last_looping(self): | 
					
						
							|  |  |  |         items = [self.f.last()] | 
					
						
							|  |  |  |         for i in xrange(1, len(self.f)): | 
					
						
							|  |  |  |             items.append(self.f.previous()) | 
					
						
							|  |  |  |         self.assertSetEquals(items, self.d.items()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_set_location(self): | 
					
						
							|  |  |  |         self.assertEqual(self.f.set_location('e'), ('e', self.d['e'])) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_contains(self): | 
					
						
							|  |  |  |         for k in self.d: | 
					
						
							|  |  |  |             self.assert_(k in self.f) | 
					
						
							|  |  |  |         self.assert_('not here' not in self.f) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_has_key(self): | 
					
						
							|  |  |  |         for k in self.d: | 
					
						
							|  |  |  |             self.assert_(self.f.has_key(k)) | 
					
						
							|  |  |  |         self.assert_(not self.f.has_key('not here')) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_clear(self): | 
					
						
							|  |  |  |         self.f.clear() | 
					
						
							|  |  |  |         self.assertEqual(len(self.f), 0) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2003-11-02 09:10:16 +00:00
										 |  |  |     def test__no_deadlock_first(self, debug=0): | 
					
						
							|  |  |  |         # do this so that testers can see what function we're in in | 
					
						
							|  |  |  |         # verbose mode when we deadlock. | 
					
						
							|  |  |  |         sys.stdout.flush() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # in pybsddb's _DBWithCursor this causes an internal DBCursor | 
					
						
							|  |  |  |         # object is created.  Other test_ methods in this class could | 
					
						
							|  |  |  |         # inadvertently cause the deadlock but an explicit test is needed. | 
					
						
							|  |  |  |         if debug: print "A" | 
					
						
							|  |  |  |         k,v = self.f.first() | 
					
						
							|  |  |  |         if debug: print "B", k | 
					
						
							|  |  |  |         self.f[k] = "deadlock.  do not pass go.  do not collect $200." | 
					
						
							|  |  |  |         if debug: print "C" | 
					
						
							|  |  |  |         # if the bsddb implementation leaves the DBCursor open during | 
					
						
							|  |  |  |         # the database write and locking+threading support is enabled | 
					
						
							|  |  |  |         # the cursor's read lock will deadlock the write lock request.. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # test the iterator interface (if present) | 
					
						
							| 
									
										
										
										
											2003-11-03 01:04:41 +00:00
										 |  |  |         if hasattr(self.f, 'iteritems'): | 
					
						
							| 
									
										
										
										
											2003-11-02 09:10:16 +00:00
										 |  |  |             if debug: print "D" | 
					
						
							| 
									
										
										
										
											2003-11-03 01:04:41 +00:00
										 |  |  |             i = self.f.iteritems() | 
					
						
							|  |  |  |             k,v = i.next() | 
					
						
							| 
									
										
										
										
											2003-11-02 09:10:16 +00:00
										 |  |  |             if debug: print "E" | 
					
						
							|  |  |  |             self.f[k] = "please don't deadlock" | 
					
						
							|  |  |  |             if debug: print "F" | 
					
						
							|  |  |  |             while 1: | 
					
						
							|  |  |  |                 try: | 
					
						
							| 
									
										
										
										
											2003-11-03 01:04:41 +00:00
										 |  |  |                     k,v = i.next() | 
					
						
							| 
									
										
										
										
											2003-11-02 09:10:16 +00:00
										 |  |  |                 except StopIteration: | 
					
						
							|  |  |  |                     break | 
					
						
							|  |  |  |             if debug: print "F2" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             i = iter(self.f) | 
					
						
							|  |  |  |             if debug: print "G" | 
					
						
							|  |  |  |             while i: | 
					
						
							|  |  |  |                 try: | 
					
						
							|  |  |  |                     if debug: print "H" | 
					
						
							|  |  |  |                     k = i.next() | 
					
						
							|  |  |  |                     if debug: print "I" | 
					
						
							|  |  |  |                     self.f[k] = "deadlocks-r-us" | 
					
						
							|  |  |  |                     if debug: print "J" | 
					
						
							|  |  |  |                 except StopIteration: | 
					
						
							|  |  |  |                     i = None | 
					
						
							|  |  |  |             if debug: print "K" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # test the legacy cursor interface mixed with writes | 
					
						
							|  |  |  |         self.assert_(self.f.first()[0] in self.d) | 
					
						
							|  |  |  |         k = self.f.next()[0] | 
					
						
							|  |  |  |         self.assert_(k in self.d) | 
					
						
							|  |  |  |         self.f[k] = "be gone with ye deadlocks" | 
					
						
							|  |  |  |         self.assert_(self.f[k], "be gone with ye deadlocks") | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2003-11-03 01:04:41 +00:00
										 |  |  |     def test_for_cursor_memleak(self): | 
					
						
							|  |  |  |         if not hasattr(self.f, 'iteritems'): | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # do the bsddb._DBWithCursor _iter_mixin internals leak cursors? | 
					
						
							|  |  |  |         nc1 = len(self.f._cursor_refs) | 
					
						
							|  |  |  |         # create iterator | 
					
						
							|  |  |  |         i = self.f.iteritems() | 
					
						
							|  |  |  |         nc2 = len(self.f._cursor_refs) | 
					
						
							|  |  |  |         # use the iterator (should run to the first yeild, creating the cursor) | 
					
						
							|  |  |  |         k, v = i.next() | 
					
						
							|  |  |  |         nc3 = len(self.f._cursor_refs) | 
					
						
							|  |  |  |         # destroy the iterator; this should cause the weakref callback | 
					
						
							|  |  |  |         # to remove the cursor object from self.f._cursor_refs | 
					
						
							|  |  |  |         del i | 
					
						
							|  |  |  |         nc4 = len(self.f._cursor_refs) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(nc1, nc2) | 
					
						
							|  |  |  |         self.assertEqual(nc1, nc4) | 
					
						
							|  |  |  |         self.assert_(nc3 == nc1+1) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2003-09-13 05:51:09 +00:00
										 |  |  |     def test_popitem(self): | 
					
						
							|  |  |  |         k, v = self.f.popitem() | 
					
						
							|  |  |  |         self.assert_(k in self.d) | 
					
						
							|  |  |  |         self.assert_(v in self.d.values()) | 
					
						
							|  |  |  |         self.assert_(k not in self.f) | 
					
						
							|  |  |  |         self.assertEqual(len(self.d)-1, len(self.f)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_pop(self): | 
					
						
							|  |  |  |         k = 'w' | 
					
						
							|  |  |  |         v = self.f.pop(k) | 
					
						
							|  |  |  |         self.assertEqual(v, self.d[k]) | 
					
						
							|  |  |  |         self.assert_(k not in self.f) | 
					
						
							|  |  |  |         self.assert_(v not in self.f.values()) | 
					
						
							|  |  |  |         self.assertEqual(len(self.d)-1, len(self.f)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_get(self): | 
					
						
							|  |  |  |         self.assertEqual(self.f.get('NotHere'), None) | 
					
						
							|  |  |  |         self.assertEqual(self.f.get('NotHere', 'Default'), 'Default') | 
					
						
							|  |  |  |         self.assertEqual(self.f.get('q', 'Default'), self.d['q']) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_setdefault(self): | 
					
						
							|  |  |  |         self.assertEqual(self.f.setdefault('new', 'dog'), 'dog') | 
					
						
							|  |  |  |         self.assertEqual(self.f.setdefault('r', 'cat'), self.d['r']) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_update(self): | 
					
						
							|  |  |  |         new = dict(y='life', u='of', i='brian') | 
					
						
							|  |  |  |         self.f.update(new) | 
					
						
							|  |  |  |         self.d.update(new) | 
					
						
							|  |  |  |         for k, v in self.d.iteritems(): | 
					
						
							|  |  |  |             self.assertEqual(self.f[k], v) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_keyordering(self): | 
					
						
							|  |  |  |         if self.openmethod[0] is not bsddb.btopen: | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         keys = self.d.keys() | 
					
						
							|  |  |  |         keys.sort() | 
					
						
							|  |  |  |         self.assertEqual(self.f.first()[0], keys[0]) | 
					
						
							|  |  |  |         self.assertEqual(self.f.next()[0], keys[1]) | 
					
						
							|  |  |  |         self.assertEqual(self.f.last()[0], keys[-1]) | 
					
						
							|  |  |  |         self.assertEqual(self.f.previous()[0], keys[-2]) | 
					
						
							|  |  |  |         self.assertEqual(list(self.f), keys) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TestBTree(TestBSDDB): | 
					
						
							|  |  |  |     fname = test_support.TESTFN | 
					
						
							|  |  |  |     openmethod = [bsddb.btopen] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TestBTree_InMemory(TestBSDDB): | 
					
						
							|  |  |  |     fname = None | 
					
						
							|  |  |  |     openmethod = [bsddb.btopen] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TestHashTable(TestBSDDB): | 
					
						
							|  |  |  |     fname = test_support.TESTFN | 
					
						
							|  |  |  |     openmethod = [bsddb.hashopen] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TestHashTable_InMemory(TestBSDDB): | 
					
						
							|  |  |  |     fname = None | 
					
						
							|  |  |  |     openmethod = [bsddb.hashopen] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | ##         # (bsddb.rnopen,'Record Numbers'), 'put' for RECNO for bsddb 1.85 | 
					
						
							|  |  |  | ##         #                                   appears broken... at least on | 
					
						
							|  |  |  | ##         #                                   Solaris Intel - rmasse 1/97 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def test_main(verbose=None): | 
					
						
							|  |  |  |     test_support.run_unittest( | 
					
						
							|  |  |  |         TestBTree, | 
					
						
							|  |  |  |         TestHashTable, | 
					
						
							|  |  |  |         TestBTree_InMemory, | 
					
						
							|  |  |  |         TestHashTable_InMemory, | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if __name__ == "__main__": | 
					
						
							|  |  |  |     test_main(verbose=True) |