mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 13:41:24 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			850 lines
		
	
	
	
		
			24 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			850 lines
		
	
	
	
		
			24 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """
 | |
| Basic TestCases for BTree and hash DBs, with and without a DBEnv, with
 | |
| various DB flags, etc.
 | |
| """
 | |
| 
 | |
| import os
 | |
| import sys
 | |
| import errno
 | |
| import shutil
 | |
| import string
 | |
| import tempfile
 | |
| from pprint import pprint
 | |
| import unittest
 | |
| 
 | |
| from bsddb import db
 | |
| 
 | |
| from test_all import verbose
 | |
| 
 | |
| DASH = '-'
 | |
| 
 | |
| 
 | |
| #----------------------------------------------------------------------
 | |
| 
 | |
| class VersionTestCase(unittest.TestCase):
 | |
|     def test00_version(self):
 | |
|         info = db.version()
 | |
|         if verbose:
 | |
|             print '\n', '-=' * 20
 | |
|             print 'bsddb.db.version(): %s' % (info, )
 | |
|             print db.DB_VERSION_STRING
 | |
|             print '-=' * 20
 | |
|         assert info == (db.DB_VERSION_MAJOR, db.DB_VERSION_MINOR,
 | |
|                         db.DB_VERSION_PATCH)
 | |
| 
 | |
| #----------------------------------------------------------------------
 | |
| 
 | |
| class BasicTestCase(unittest.TestCase):
 | |
|     dbtype       = db.DB_UNKNOWN  # must be set in derived class
 | |
|     dbopenflags  = 0
 | |
|     dbsetflags   = 0
 | |
|     dbmode       = 0660
 | |
|     dbname       = None
 | |
|     useEnv       = 0
 | |
|     envflags     = 0
 | |
|     envsetflags  = 0
 | |
| 
 | |
|     def setUp(self):
 | |
|         if self.useEnv:
 | |
|             homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
 | |
|             self.homeDir = homeDir
 | |
|             try:
 | |
|                 shutil.rmtree(homeDir)
 | |
|             except OSError, e:
 | |
|                 # unix returns ENOENT, windows returns ESRCH
 | |
|                 if e.errno not in (errno.ENOENT, errno.ESRCH): raise
 | |
|             os.mkdir(homeDir)
 | |
|             try:
 | |
|                 self.env = db.DBEnv()
 | |
|                 self.env.set_lg_max(1024*1024)
 | |
|                 self.env.set_flags(self.envsetflags, 1)
 | |
|                 self.env.open(homeDir, self.envflags | db.DB_CREATE)
 | |
|                 tempfile.tempdir = homeDir
 | |
|                 self.filename = os.path.split(tempfile.mktemp())[1]
 | |
|                 tempfile.tempdir = None
 | |
|             # Yes, a bare except is intended, since we're re-raising the exc.
 | |
|             except:
 | |
|                 shutil.rmtree(homeDir)
 | |
|                 raise
 | |
|         else:
 | |
|             self.env = None
 | |
|             self.filename = tempfile.mktemp()
 | |
| 
 | |
|         # create and open the DB
 | |
|         self.d = db.DB(self.env)
 | |
|         self.d.set_flags(self.dbsetflags)
 | |
|         if self.dbname:
 | |
|             self.d.open(self.filename, self.dbname, self.dbtype,
 | |
|                         self.dbopenflags|db.DB_CREATE, self.dbmode)
 | |
|         else:
 | |
|             self.d.open(self.filename,   # try out keyword args
 | |
|                         mode = self.dbmode,
 | |
|                         dbtype = self.dbtype,
 | |
|                         flags = self.dbopenflags|db.DB_CREATE)
 | |
| 
 | |
|         self.populateDB()
 | |
| 
 | |
| 
 | |
|     def tearDown(self):
 | |
|         self.d.close()
 | |
|         if self.env is not None:
 | |
|             self.env.close()
 | |
|             shutil.rmtree(self.homeDir)
 | |
|             ## Make a new DBEnv to remove the env files from the home dir.
 | |
|             ## (It can't be done while the env is open, nor after it has been
 | |
|             ## closed, so we make a new one to do it.)
 | |
|             #e = db.DBEnv()
 | |
|             #e.remove(self.homeDir)
 | |
|             #os.remove(os.path.join(self.homeDir, self.filename))
 | |
|         else:
 | |
|             os.remove(self.filename)
 | |
| 
 | |
| 
 | |
| 
 | |
|     def populateDB(self):
 | |
|         d = self.d
 | |
|         for x in range(500):
 | |
|             key = '%04d' % (1000 - x)  # insert keys in reverse order
 | |
|             data = self.makeData(key)
 | |
|             d.put(key, data)
 | |
| 
 | |
|         for x in range(500):
 | |
|             key = '%04d' % x  # and now some in forward order
 | |
|             data = self.makeData(key)
 | |
|             d.put(key, data)
 | |
| 
 | |
|         num = len(d)
 | |
|         if verbose:
 | |
|             print "created %d records" % num
 | |
| 
 | |
| 
 | |
|     def makeData(self, key):
 | |
|         return DASH.join([key] * 5)
 | |
| 
 | |
| 
 | |
| 
 | |
|     #----------------------------------------
 | |
| 
 | |
|     def test01_GetsAndPuts(self):
 | |
|         d = self.d
 | |
| 
 | |
|         if verbose:
 | |
|             print '\n', '-=' * 30
 | |
|             print "Running %s.test01_GetsAndPuts..." % self.__class__.__name__
 | |
| 
 | |
|         for key in ['0001', '0100', '0400', '0700', '0999']:
 | |
|             data = d.get(key)
 | |
|             if verbose:
 | |
|                 print data
 | |
| 
 | |
|         assert d.get('0321') == '0321-0321-0321-0321-0321'
 | |
| 
 | |
|         # By default non-existant keys return None...
 | |
|         assert d.get('abcd') == None
 | |
| 
 | |
|         # ...but they raise exceptions in other situations.  Call
 | |
|         # set_get_returns_none() to change it.
 | |
|         try:
 | |
|             d.delete('abcd')
 | |
|         except db.DBNotFoundError, val:
 | |
|             assert val[0] == db.DB_NOTFOUND
 | |
|             if verbose: print val
 | |
|         else:
 | |
|             self.fail("expected exception")
 | |
| 
 | |
| 
 | |
|         d.put('abcd', 'a new record')
 | |
|         assert d.get('abcd') == 'a new record'
 | |
| 
 | |
|         d.put('abcd', 'same key')
 | |
|         if self.dbsetflags & db.DB_DUP:
 | |
|             assert d.get('abcd') == 'a new record'
 | |
|         else:
 | |
|             assert d.get('abcd') == 'same key'
 | |
| 
 | |
| 
 | |
|         try:
 | |
|             d.put('abcd', 'this should fail', flags=db.DB_NOOVERWRITE)
 | |
|         except db.DBKeyExistError, val:
 | |
|             assert val[0] == db.DB_KEYEXIST
 | |
|             if verbose: print val
 | |
|         else:
 | |
|             self.fail("expected exception")
 | |
| 
 | |
|         if self.dbsetflags & db.DB_DUP:
 | |
|             assert d.get('abcd') == 'a new record'
 | |
|         else:
 | |
|             assert d.get('abcd') == 'same key'
 | |
| 
 | |
| 
 | |
|         d.sync()
 | |
|         d.close()
 | |
|         del d
 | |
| 
 | |
|         self.d = db.DB(self.env)
 | |
|         if self.dbname:
 | |
|             self.d.open(self.filename, self.dbname)
 | |
|         else:
 | |
|             self.d.open(self.filename)
 | |
|         d = self.d
 | |
| 
 | |
|         assert d.get('0321') == '0321-0321-0321-0321-0321'
 | |
|         if self.dbsetflags & db.DB_DUP:
 | |
|             assert d.get('abcd') == 'a new record'
 | |
|         else:
 | |
|             assert d.get('abcd') == 'same key'
 | |
| 
 | |
|         rec = d.get_both('0555', '0555-0555-0555-0555-0555')
 | |
|         if verbose:
 | |
|             print rec
 | |
| 
 | |
|         assert d.get_both('0555', 'bad data') == None
 | |
| 
 | |
|         # test default value
 | |
|         data = d.get('bad key', 'bad data')
 | |
|         assert data == 'bad data'
 | |
| 
 | |
|         # any object can pass through
 | |
|         data = d.get('bad key', self)
 | |
|         assert data == self
 | |
| 
 | |
|         s = d.stat()
 | |
|         assert type(s) == type({})
 | |
|         if verbose:
 | |
|             print 'd.stat() returned this dictionary:'
 | |
|             pprint(s)
 | |
| 
 | |
| 
 | |
|     #----------------------------------------
 | |
| 
 | |
|     def test02_DictionaryMethods(self):
 | |
|         d = self.d
 | |
| 
 | |
|         if verbose:
 | |
|             print '\n', '-=' * 30
 | |
|             print "Running %s.test02_DictionaryMethods..." % \
 | |
|                   self.__class__.__name__
 | |
| 
 | |
|         for key in ['0002', '0101', '0401', '0701', '0998']:
 | |
|             data = d[key]
 | |
|             assert data == self.makeData(key)
 | |
|             if verbose:
 | |
|                 print data
 | |
| 
 | |
|         assert len(d) == 1000
 | |
|         keys = d.keys()
 | |
|         assert len(keys) == 1000
 | |
|         assert type(keys) == type([])
 | |
| 
 | |
|         d['new record'] = 'a new record'
 | |
|         assert len(d) == 1001
 | |
|         keys = d.keys()
 | |
|         assert len(keys) == 1001
 | |
| 
 | |
|         d['new record'] = 'a replacement record'
 | |
|         assert len(d) == 1001
 | |
|         keys = d.keys()
 | |
|         assert len(keys) == 1001
 | |
| 
 | |
|         if verbose:
 | |
|             print "the first 10 keys are:"
 | |
|             pprint(keys[:10])
 | |
| 
 | |
|         assert d['new record'] == 'a replacement record'
 | |
| 
 | |
|         assert d.has_key('0001') == 1
 | |
|         assert d.has_key('spam') == 0
 | |
| 
 | |
|         items = d.items()
 | |
|         assert len(items) == 1001
 | |
|         assert type(items) == type([])
 | |
|         assert type(items[0]) == type(())
 | |
|         assert len(items[0]) == 2
 | |
| 
 | |
|         if verbose:
 | |
|             print "the first 10 items are:"
 | |
|             pprint(items[:10])
 | |
| 
 | |
|         values = d.values()
 | |
|         assert len(values) == 1001
 | |
|         assert type(values) == type([])
 | |
| 
 | |
|         if verbose:
 | |
|             print "the first 10 values are:"
 | |
|             pprint(values[:10])
 | |
| 
 | |
| 
 | |
| 
 | |
|     #----------------------------------------
 | |
| 
 | |
|     def test03_SimpleCursorStuff(self):
 | |
|         if verbose:
 | |
|             print '\n', '-=' * 30
 | |
|             print "Running %s.test03_SimpleCursorStuff..." % \
 | |
|                   self.__class__.__name__
 | |
| 
 | |
|         if self.env and self.dbopenflags & db.DB_AUTO_COMMIT:
 | |
|             txn = self.env.txn_begin()
 | |
|         else:
 | |
|             txn = None
 | |
|         c = self.d.cursor(txn=txn)
 | |
| 
 | |
|         rec = c.first()
 | |
|         count = 0
 | |
|         while rec is not None:
 | |
|             count = count + 1
 | |
|             if verbose and count % 100 == 0:
 | |
|                 print rec
 | |
|             rec = c.next()
 | |
| 
 | |
|         assert count == 1000
 | |
| 
 | |
| 
 | |
|         rec = c.last()
 | |
|         count = 0
 | |
|         while rec is not None:
 | |
|             count = count + 1
 | |
|             if verbose and count % 100 == 0:
 | |
|                 print rec
 | |
|             rec = c.prev()
 | |
| 
 | |
|         assert count == 1000
 | |
| 
 | |
|         rec = c.set('0505')
 | |
|         rec2 = c.current()
 | |
|         assert rec == rec2
 | |
|         assert rec[0] == '0505'
 | |
|         assert rec[1] == self.makeData('0505')
 | |
| 
 | |
|         try:
 | |
|             c.set('bad key')
 | |
|         except db.DBNotFoundError, val:
 | |
|             assert val[0] == db.DB_NOTFOUND
 | |
|             if verbose: print val
 | |
|         else:
 | |
|             self.fail("expected exception")
 | |
| 
 | |
|         rec = c.get_both('0404', self.makeData('0404'))
 | |
|         assert rec == ('0404', self.makeData('0404'))
 | |
| 
 | |
|         try:
 | |
|             c.get_both('0404', 'bad data')
 | |
|         except db.DBNotFoundError, val:
 | |
|             assert val[0] == db.DB_NOTFOUND
 | |
|             if verbose: print val
 | |
|         else:
 | |
|             self.fail("expected exception")
 | |
| 
 | |
|         if self.d.get_type() == db.DB_BTREE:
 | |
|             rec = c.set_range('011')
 | |
|             if verbose:
 | |
|                 print "searched for '011', found: ", rec
 | |
| 
 | |
|             rec = c.set_range('011',dlen=0,doff=0)
 | |
|             if verbose:
 | |
|                 print "searched (partial) for '011', found: ", rec
 | |
|             if rec[1] != '': set.fail('expected empty data portion')
 | |
| 
 | |
|         c.set('0499')
 | |
|         c.delete()
 | |
|         try:
 | |
|             rec = c.current()
 | |
|         except db.DBKeyEmptyError, val:
 | |
|             assert val[0] == db.DB_KEYEMPTY
 | |
|             if verbose: print val
 | |
|         else:
 | |
|             self.fail('exception expected')
 | |
| 
 | |
|         c.next()
 | |
|         c2 = c.dup(db.DB_POSITION)
 | |
|         assert c.current() == c2.current()
 | |
| 
 | |
|         c2.put('', 'a new value', db.DB_CURRENT)
 | |
|         assert c.current() == c2.current()
 | |
|         assert c.current()[1] == 'a new value'
 | |
| 
 | |
|         c2.put('', 'er', db.DB_CURRENT, dlen=0, doff=5)
 | |
|         assert c2.current()[1] == 'a newer value'
 | |
| 
 | |
|         c.close()
 | |
|         c2.close()
 | |
|         if txn:
 | |
|             txn.commit()
 | |
| 
 | |
|         # time to abuse the closed cursors and hope we don't crash
 | |
|         methods_to_test = {
 | |
|             'current': (),
 | |
|             'delete': (),
 | |
|             'dup': (db.DB_POSITION,),
 | |
|             'first': (),
 | |
|             'get': (0,),
 | |
|             'next': (),
 | |
|             'prev': (),
 | |
|             'last': (),
 | |
|             'put':('', 'spam', db.DB_CURRENT),
 | |
|             'set': ("0505",),
 | |
|         }
 | |
|         for method, args in methods_to_test.items():
 | |
|             try:
 | |
|                 if verbose:
 | |
|                     print "attempting to use a closed cursor's %s method" % \
 | |
|                           method
 | |
|                 # a bug may cause a NULL pointer dereference...
 | |
|                 apply(getattr(c, method), args)
 | |
|             except db.DBError, val:
 | |
|                 assert val[0] == 0
 | |
|                 if verbose: print val
 | |
|             else:
 | |
|                 self.fail("no exception raised when using a buggy cursor's"
 | |
|                           "%s method" % method)
 | |
| 
 | |
|     #----------------------------------------
 | |
| 
 | |
|     def test04_PartialGetAndPut(self):
 | |
|         d = self.d
 | |
|         if verbose:
 | |
|             print '\n', '-=' * 30
 | |
|             print "Running %s.test04_PartialGetAndPut..." % \
 | |
|                   self.__class__.__name__
 | |
| 
 | |
|         key = "partialTest"
 | |
|         data = "1" * 1000 + "2" * 1000
 | |
|         d.put(key, data)
 | |
|         assert d.get(key) == data
 | |
|         assert d.get(key, dlen=20, doff=990) == ("1" * 10) + ("2" * 10)
 | |
| 
 | |
|         d.put("partialtest2", ("1" * 30000) + "robin" )
 | |
|         assert d.get("partialtest2", dlen=5, doff=30000) == "robin"
 | |
| 
 | |
|         # There seems to be a bug in DB here...  Commented out the test for
 | |
|         # now.
 | |
|         ##assert d.get("partialtest2", dlen=5, doff=30010) == ""
 | |
| 
 | |
|         if self.dbsetflags != db.DB_DUP:
 | |
|             # Partial put with duplicate records requires a cursor
 | |
|             d.put(key, "0000", dlen=2000, doff=0)
 | |
|             assert d.get(key) == "0000"
 | |
| 
 | |
|             d.put(key, "1111", dlen=1, doff=2)
 | |
|             assert d.get(key) == "0011110"
 | |
| 
 | |
|     #----------------------------------------
 | |
| 
 | |
|     def test05_GetSize(self):
 | |
|         d = self.d
 | |
|         if verbose:
 | |
|             print '\n', '-=' * 30
 | |
|             print "Running %s.test05_GetSize..." % self.__class__.__name__
 | |
| 
 | |
|         for i in range(1, 50000, 500):
 | |
|             key = "size%s" % i
 | |
|             #print "before ", i,
 | |
|             d.put(key, "1" * i)
 | |
|             #print "after",
 | |
|             assert d.get_size(key) == i
 | |
|             #print "done"
 | |
| 
 | |
|     #----------------------------------------
 | |
| 
 | |
|     def test06_Truncate(self):
 | |
|         if db.version() < (3,3):
 | |
|             # truncate is a feature of BerkeleyDB 3.3 and above
 | |
|             return
 | |
| 
 | |
|         d = self.d
 | |
|         if verbose:
 | |
|             print '\n', '-=' * 30
 | |
|             print "Running %s.test99_Truncate..." % self.__class__.__name__
 | |
| 
 | |
|         d.put("abcde", "ABCDE");
 | |
|         num = d.truncate()
 | |
|         assert num >= 1, "truncate returned <= 0 on non-empty database"
 | |
|         num = d.truncate()
 | |
|         assert num == 0, "truncate on empty DB returned nonzero (%s)" % `num`
 | |
| 
 | |
| #----------------------------------------------------------------------
 | |
| 
 | |
| 
 | |
| class BasicBTreeTestCase(BasicTestCase):
 | |
|     dbtype = db.DB_BTREE
 | |
| 
 | |
| 
 | |
| class BasicHashTestCase(BasicTestCase):
 | |
|     dbtype = db.DB_HASH
 | |
| 
 | |
| 
 | |
| class BasicBTreeWithThreadFlagTestCase(BasicTestCase):
 | |
|     dbtype = db.DB_BTREE
 | |
|     dbopenflags = db.DB_THREAD
 | |
| 
 | |
| 
 | |
| class BasicHashWithThreadFlagTestCase(BasicTestCase):
 | |
|     dbtype = db.DB_HASH
 | |
|     dbopenflags = db.DB_THREAD
 | |
| 
 | |
| 
 | |
| class BasicBTreeWithEnvTestCase(BasicTestCase):
 | |
|     dbtype = db.DB_BTREE
 | |
|     dbopenflags = db.DB_THREAD
 | |
|     useEnv = 1
 | |
|     envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
 | |
| 
 | |
| 
 | |
| class BasicHashWithEnvTestCase(BasicTestCase):
 | |
|     dbtype = db.DB_HASH
 | |
|     dbopenflags = db.DB_THREAD
 | |
|     useEnv = 1
 | |
|     envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
 | |
| 
 | |
| 
 | |
| #----------------------------------------------------------------------
 | |
| 
 | |
| class BasicTransactionTestCase(BasicTestCase):
 | |
|     dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT
 | |
|     useEnv = 1
 | |
|     envflags = (db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
 | |
|                 db.DB_INIT_TXN)
 | |
|     envsetflags = db.DB_AUTO_COMMIT
 | |
| 
 | |
| 
 | |
|     def tearDown(self):
 | |
|         self.txn.commit()
 | |
|         BasicTestCase.tearDown(self)
 | |
| 
 | |
| 
 | |
|     def populateDB(self):
 | |
|         d = self.d
 | |
|         txn = self.env.txn_begin()
 | |
|         for x in range(500):
 | |
|             key = '%04d' % (1000 - x)  # insert keys in reverse order
 | |
|             data = self.makeData(key)
 | |
|             d.put(key, data, txn)
 | |
| 
 | |
|         for x in range(500):
 | |
|             key = '%04d' % x  # and now some in forward order
 | |
|             data = self.makeData(key)
 | |
|             d.put(key, data, txn)
 | |
| 
 | |
|         txn.commit()
 | |
| 
 | |
|         num = len(d)
 | |
|         if verbose:
 | |
|             print "created %d records" % num
 | |
| 
 | |
|         self.txn = self.env.txn_begin()
 | |
| 
 | |
| 
 | |
| 
 | |
|     def test06_Transactions(self):
 | |
|         d = self.d
 | |
|         if verbose:
 | |
|             print '\n', '-=' * 30
 | |
|             print "Running %s.test06_Transactions..." % self.__class__.__name__
 | |
| 
 | |
|         assert d.get('new rec', txn=self.txn) == None
 | |
|         d.put('new rec', 'this is a new record', self.txn)
 | |
|         assert d.get('new rec', txn=self.txn) == 'this is a new record'
 | |
|         self.txn.abort()
 | |
|         assert d.get('new rec') == None
 | |
| 
 | |
|         self.txn = self.env.txn_begin()
 | |
| 
 | |
|         assert d.get('new rec', txn=self.txn) == None
 | |
|         d.put('new rec', 'this is a new record', self.txn)
 | |
|         assert d.get('new rec', txn=self.txn) == 'this is a new record'
 | |
|         self.txn.commit()
 | |
|         assert d.get('new rec') == 'this is a new record'
 | |
| 
 | |
|         self.txn = self.env.txn_begin()
 | |
|         c = d.cursor(self.txn)
 | |
|         rec = c.first()
 | |
|         count = 0
 | |
|         while rec is not None:
 | |
|             count = count + 1
 | |
|             if verbose and count % 100 == 0:
 | |
|                 print rec
 | |
|             rec = c.next()
 | |
|         assert count == 1001
 | |
| 
 | |
|         c.close()                # Cursors *MUST* be closed before commit!
 | |
|         self.txn.commit()
 | |
| 
 | |
|         # flush pending updates
 | |
|         try:
 | |
|             self.env.txn_checkpoint (0, 0, 0)
 | |
|         except db.DBIncompleteError:
 | |
|             pass
 | |
| 
 | |
|         # must have at least one log file present:
 | |
|         logs = self.env.log_archive(db.DB_ARCH_ABS | db.DB_ARCH_LOG)
 | |
|         assert logs != None
 | |
|         for log in logs:
 | |
|             if verbose:
 | |
|                 print 'log file: ' + log
 | |
| 
 | |
|         self.txn = self.env.txn_begin()
 | |
| 
 | |
|     #----------------------------------------
 | |
| 
 | |
|     def test07_TxnTruncate(self):
 | |
|         if db.version() < (3,3):
 | |
|             # truncate is a feature of BerkeleyDB 3.3 and above
 | |
|             return
 | |
| 
 | |
|         d = self.d
 | |
|         if verbose:
 | |
|             print '\n', '-=' * 30
 | |
|             print "Running %s.test07_TxnTruncate..." % self.__class__.__name__
 | |
| 
 | |
|         d.put("abcde", "ABCDE");
 | |
|         txn = self.env.txn_begin()
 | |
|         num = d.truncate(txn)
 | |
|         assert num >= 1, "truncate returned <= 0 on non-empty database"
 | |
|         num = d.truncate(txn)
 | |
|         assert num == 0, "truncate on empty DB returned nonzero (%s)" % `num`
 | |
|         txn.commit()
 | |
| 
 | |
| 
 | |
| 
 | |
| class BTreeTransactionTestCase(BasicTransactionTestCase):
 | |
|     dbtype = db.DB_BTREE
 | |
| 
 | |
| class HashTransactionTestCase(BasicTransactionTestCase):
 | |
|     dbtype = db.DB_HASH
 | |
| 
 | |
| 
 | |
| 
 | |
| #----------------------------------------------------------------------
 | |
| 
 | |
| class BTreeRecnoTestCase(BasicTestCase):
 | |
|     dbtype     = db.DB_BTREE
 | |
|     dbsetflags = db.DB_RECNUM
 | |
| 
 | |
|     def test07_RecnoInBTree(self):
 | |
|         d = self.d
 | |
|         if verbose:
 | |
|             print '\n', '-=' * 30
 | |
|             print "Running %s.test07_RecnoInBTree..." % self.__class__.__name__
 | |
| 
 | |
|         rec = d.get(200)
 | |
|         assert type(rec) == type(())
 | |
|         assert len(rec) == 2
 | |
|         if verbose:
 | |
|             print "Record #200 is ", rec
 | |
| 
 | |
|         c = d.cursor()
 | |
|         c.set('0200')
 | |
|         num = c.get_recno()
 | |
|         assert type(num) == type(1)
 | |
|         if verbose:
 | |
|             print "recno of d['0200'] is ", num
 | |
| 
 | |
|         rec = c.current()
 | |
|         assert c.set_recno(num) == rec
 | |
| 
 | |
|         c.close()
 | |
| 
 | |
| 
 | |
| 
 | |
| class BTreeRecnoWithThreadFlagTestCase(BTreeRecnoTestCase):
 | |
|     dbopenflags = db.DB_THREAD
 | |
| 
 | |
| #----------------------------------------------------------------------
 | |
| 
 | |
| class BasicDUPTestCase(BasicTestCase):
 | |
|     dbsetflags = db.DB_DUP
 | |
| 
 | |
|     def test08_DuplicateKeys(self):
 | |
|         d = self.d
 | |
|         if verbose:
 | |
|             print '\n', '-=' * 30
 | |
|             print "Running %s.test08_DuplicateKeys..." % \
 | |
|                   self.__class__.__name__
 | |
| 
 | |
|         d.put("dup0", "before")
 | |
|         for x in "The quick brown fox jumped over the lazy dog.".split():
 | |
|             d.put("dup1", x)
 | |
|         d.put("dup2", "after")
 | |
| 
 | |
|         data = d.get("dup1")
 | |
|         assert data == "The"
 | |
|         if verbose:
 | |
|             print data
 | |
| 
 | |
|         c = d.cursor()
 | |
|         rec = c.set("dup1")
 | |
|         assert rec == ('dup1', 'The')
 | |
| 
 | |
|         next = c.next()
 | |
|         assert next == ('dup1', 'quick')
 | |
| 
 | |
|         rec = c.set("dup1")
 | |
|         count = c.count()
 | |
|         assert count == 9
 | |
| 
 | |
|         next_dup = c.next_dup()
 | |
|         assert next_dup == ('dup1', 'quick')
 | |
| 
 | |
|         rec = c.set('dup1')
 | |
|         while rec is not None:
 | |
|             if verbose:
 | |
|                 print rec
 | |
|             rec = c.next_dup()
 | |
| 
 | |
|         c.set('dup1')
 | |
|         rec = c.next_nodup()
 | |
|         assert rec[0] != 'dup1'
 | |
|         if verbose:
 | |
|             print rec
 | |
| 
 | |
|         c.close()
 | |
| 
 | |
| 
 | |
| 
 | |
| class BTreeDUPTestCase(BasicDUPTestCase):
 | |
|     dbtype = db.DB_BTREE
 | |
| 
 | |
| class HashDUPTestCase(BasicDUPTestCase):
 | |
|     dbtype = db.DB_HASH
 | |
| 
 | |
| class BTreeDUPWithThreadTestCase(BasicDUPTestCase):
 | |
|     dbtype = db.DB_BTREE
 | |
|     dbopenflags = db.DB_THREAD
 | |
| 
 | |
| class HashDUPWithThreadTestCase(BasicDUPTestCase):
 | |
|     dbtype = db.DB_HASH
 | |
|     dbopenflags = db.DB_THREAD
 | |
| 
 | |
| 
 | |
| #----------------------------------------------------------------------
 | |
| 
 | |
| class BasicMultiDBTestCase(BasicTestCase):
 | |
|     dbname = 'first'
 | |
| 
 | |
|     def otherType(self):
 | |
|         if self.dbtype == db.DB_BTREE:
 | |
|             return db.DB_HASH
 | |
|         else:
 | |
|             return db.DB_BTREE
 | |
| 
 | |
|     def test09_MultiDB(self):
 | |
|         d1 = self.d
 | |
|         if verbose:
 | |
|             print '\n', '-=' * 30
 | |
|             print "Running %s.test09_MultiDB..." % self.__class__.__name__
 | |
| 
 | |
|         d2 = db.DB(self.env)
 | |
|         d2.open(self.filename, "second", self.dbtype,
 | |
|                 self.dbopenflags|db.DB_CREATE)
 | |
|         d3 = db.DB(self.env)
 | |
|         d3.open(self.filename, "third", self.otherType(),
 | |
|                 self.dbopenflags|db.DB_CREATE)
 | |
| 
 | |
|         for x in "The quick brown fox jumped over the lazy dog".split():
 | |
|             d2.put(x, self.makeData(x))
 | |
| 
 | |
|         for x in string.letters:
 | |
|             d3.put(x, x*70)
 | |
| 
 | |
|         d1.sync()
 | |
|         d2.sync()
 | |
|         d3.sync()
 | |
|         d1.close()
 | |
|         d2.close()
 | |
|         d3.close()
 | |
| 
 | |
|         self.d = d1 = d2 = d3 = None
 | |
| 
 | |
|         self.d = d1 = db.DB(self.env)
 | |
|         d1.open(self.filename, self.dbname, flags = self.dbopenflags)
 | |
|         d2 = db.DB(self.env)
 | |
|         d2.open(self.filename, "second",  flags = self.dbopenflags)
 | |
|         d3 = db.DB(self.env)
 | |
|         d3.open(self.filename, "third", flags = self.dbopenflags)
 | |
| 
 | |
|         c1 = d1.cursor()
 | |
|         c2 = d2.cursor()
 | |
|         c3 = d3.cursor()
 | |
| 
 | |
|         count = 0
 | |
|         rec = c1.first()
 | |
|         while rec is not None:
 | |
|             count = count + 1
 | |
|             if verbose and (count % 50) == 0:
 | |
|                 print rec
 | |
|             rec = c1.next()
 | |
|         assert count == 1000
 | |
| 
 | |
|         count = 0
 | |
|         rec = c2.first()
 | |
|         while rec is not None:
 | |
|             count = count + 1
 | |
|             if verbose:
 | |
|                 print rec
 | |
|             rec = c2.next()
 | |
|         assert count == 9
 | |
| 
 | |
|         count = 0
 | |
|         rec = c3.first()
 | |
|         while rec is not None:
 | |
|             count = count + 1
 | |
|             if verbose:
 | |
|                 print rec
 | |
|             rec = c3.next()
 | |
|         assert count == 52
 | |
| 
 | |
| 
 | |
|         c1.close()
 | |
|         c2.close()
 | |
|         c3.close()
 | |
| 
 | |
|         d2.close()
 | |
|         d3.close()
 | |
| 
 | |
| 
 | |
| 
 | |
| # Strange things happen if you try to use Multiple DBs per file without a
 | |
| # DBEnv with MPOOL and LOCKing...
 | |
| 
 | |
| class BTreeMultiDBTestCase(BasicMultiDBTestCase):
 | |
|     dbtype = db.DB_BTREE
 | |
|     dbopenflags = db.DB_THREAD
 | |
|     useEnv = 1
 | |
|     envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
 | |
| 
 | |
| class HashMultiDBTestCase(BasicMultiDBTestCase):
 | |
|     dbtype = db.DB_HASH
 | |
|     dbopenflags = db.DB_THREAD
 | |
|     useEnv = 1
 | |
|     envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
 | |
| 
 | |
| 
 | |
| #----------------------------------------------------------------------
 | |
| #----------------------------------------------------------------------
 | |
| 
 | |
| def test_suite():
 | |
|     suite = unittest.TestSuite()
 | |
| 
 | |
|     suite.addTest(unittest.makeSuite(VersionTestCase))
 | |
|     suite.addTest(unittest.makeSuite(BasicBTreeTestCase))
 | |
|     suite.addTest(unittest.makeSuite(BasicHashTestCase))
 | |
|     suite.addTest(unittest.makeSuite(BasicBTreeWithThreadFlagTestCase))
 | |
|     suite.addTest(unittest.makeSuite(BasicHashWithThreadFlagTestCase))
 | |
|     suite.addTest(unittest.makeSuite(BasicBTreeWithEnvTestCase))
 | |
|     suite.addTest(unittest.makeSuite(BasicHashWithEnvTestCase))
 | |
|     suite.addTest(unittest.makeSuite(BTreeTransactionTestCase))
 | |
|     suite.addTest(unittest.makeSuite(HashTransactionTestCase))
 | |
|     suite.addTest(unittest.makeSuite(BTreeRecnoTestCase))
 | |
|     suite.addTest(unittest.makeSuite(BTreeRecnoWithThreadFlagTestCase))
 | |
|     suite.addTest(unittest.makeSuite(BTreeDUPTestCase))
 | |
|     suite.addTest(unittest.makeSuite(HashDUPTestCase))
 | |
|     suite.addTest(unittest.makeSuite(BTreeDUPWithThreadTestCase))
 | |
|     suite.addTest(unittest.makeSuite(HashDUPWithThreadTestCase))
 | |
|     suite.addTest(unittest.makeSuite(BTreeMultiDBTestCase))
 | |
|     suite.addTest(unittest.makeSuite(HashMultiDBTestCase))
 | |
| 
 | |
|     return suite
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     unittest.main(defaultTest='test_suite')
 | 
