mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 21:51:50 +00:00 
			
		
		
		
	 c5a11fabdb
			
		
	
	
		c5a11fabdb
		
	
	
	
	
		
			
			This patch publishes the work done until now for Python 3.0 compatibility. Still a lot to be done. When possible, we use 3.0 features in Python 2.6, easing development and testing, and exposing internal changes to a wider audience, for better test coverage. Some mode details: http://www.jcea.es/programacion/pybsddb.htm#bsddb3-4.7.2
		
			
				
	
	
		
			481 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			481 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """TestCases for multi-threaded access to a DB.
 | |
| """
 | |
| 
 | |
| import os
 | |
| import sys
 | |
| import time
 | |
| import errno
 | |
| from random import random
 | |
| 
 | |
| DASH = '-'
 | |
| 
 | |
| try:
 | |
|     WindowsError
 | |
| except NameError:
 | |
|     class WindowsError(Exception):
 | |
|         pass
 | |
| 
 | |
| import unittest
 | |
| from test_all import verbose, have_threads, get_new_environment_path, get_new_database_path
 | |
| 
 | |
| if have_threads :
 | |
|     from threading import Thread, currentThread
 | |
| 
 | |
| 
 | |
| try:
 | |
|     # For Pythons w/distutils pybsddb
 | |
|     from bsddb3 import db, dbutils
 | |
| except ImportError:
 | |
|     # For Python 2.3
 | |
|     from bsddb import db, dbutils
 | |
| 
 | |
| try:
 | |
|     from bsddb3 import test_support
 | |
| except ImportError:
 | |
|     from test import test_support
 | |
| 
 | |
| 
 | |
| #----------------------------------------------------------------------
 | |
| 
 | |
| class BaseThreadedTestCase(unittest.TestCase):
 | |
|     dbtype       = db.DB_UNKNOWN  # must be set in derived class
 | |
|     dbopenflags  = 0
 | |
|     dbsetflags   = 0
 | |
|     envflags     = 0
 | |
| 
 | |
|     import sys
 | |
|     if sys.version_info[:3] < (2, 4, 0):
 | |
|         def assertTrue(self, expr, msg=None):
 | |
|             self.failUnless(expr,msg=msg)
 | |
| 
 | |
|     def setUp(self):
 | |
|         if verbose:
 | |
|             dbutils._deadlock_VerboseFile = sys.stdout
 | |
| 
 | |
|         self.homeDir = get_new_environment_path()
 | |
|         self.env = db.DBEnv()
 | |
|         self.setEnvOpts()
 | |
|         self.env.open(self.homeDir, self.envflags | db.DB_CREATE)
 | |
| 
 | |
|         self.filename = self.__class__.__name__ + '.db'
 | |
|         self.d = db.DB(self.env)
 | |
|         if self.dbsetflags:
 | |
|             self.d.set_flags(self.dbsetflags)
 | |
|         self.d.open(self.filename, self.dbtype, self.dbopenflags|db.DB_CREATE)
 | |
| 
 | |
|     def tearDown(self):
 | |
|         self.d.close()
 | |
|         self.env.close()
 | |
|         test_support.rmtree(self.homeDir)
 | |
| 
 | |
|     def setEnvOpts(self):
 | |
|         pass
 | |
| 
 | |
|     def makeData(self, key):
 | |
|         return DASH.join([key] * 5)
 | |
| 
 | |
| 
 | |
| #----------------------------------------------------------------------
 | |
| 
 | |
| 
 | |
| class ConcurrentDataStoreBase(BaseThreadedTestCase):
 | |
|     dbopenflags = db.DB_THREAD
 | |
|     envflags    = db.DB_THREAD | db.DB_INIT_CDB | db.DB_INIT_MPOOL
 | |
|     readers     = 0 # derived class should set
 | |
|     writers     = 0
 | |
|     records     = 1000
 | |
| 
 | |
|     def test01_1WriterMultiReaders(self):
 | |
|         if verbose:
 | |
|             print '\n', '-=' * 30
 | |
|             print "Running %s.test01_1WriterMultiReaders..." % \
 | |
|                   self.__class__.__name__
 | |
| 
 | |
|         keys=range(self.records)
 | |
|         import random
 | |
|         random.shuffle(keys)
 | |
|         records_per_writer=self.records//self.writers
 | |
|         readers_per_writer=self.readers//self.writers
 | |
|         self.assertEqual(self.records,self.writers*records_per_writer)
 | |
|         self.assertEqual(self.readers,self.writers*readers_per_writer)
 | |
|         self.assertTrue((records_per_writer%readers_per_writer)==0)
 | |
|         readers = []
 | |
| 
 | |
|         for x in xrange(self.readers):
 | |
|             rt = Thread(target = self.readerThread,
 | |
|                         args = (self.d, x),
 | |
|                         name = 'reader %d' % x,
 | |
|                         )#verbose = verbose)
 | |
|             rt.setDaemon(True)
 | |
|             readers.append(rt)
 | |
| 
 | |
|         writers=[]
 | |
|         for x in xrange(self.writers):
 | |
|             a=keys[records_per_writer*x:records_per_writer*(x+1)]
 | |
|             a.sort()  # Generate conflicts
 | |
|             b=readers[readers_per_writer*x:readers_per_writer*(x+1)]
 | |
|             wt = Thread(target = self.writerThread,
 | |
|                         args = (self.d, a, b),
 | |
|                         name = 'writer %d' % x,
 | |
|                         )#verbose = verbose)
 | |
|             writers.append(wt)
 | |
| 
 | |
|         for t in writers:
 | |
|             t.setDaemon(True)
 | |
|             t.start()
 | |
| 
 | |
|         for t in writers:
 | |
|             t.join()
 | |
|         for t in readers:
 | |
|             t.join()
 | |
| 
 | |
|     def writerThread(self, d, keys, readers):
 | |
|         name = currentThread().getName()
 | |
|         if verbose:
 | |
|             print "%s: creating records %d - %d" % (name, start, stop)
 | |
| 
 | |
|         count=len(keys)//len(readers)
 | |
|         count2=count
 | |
|         for x in keys :
 | |
|             key = '%04d' % x
 | |
|             dbutils.DeadlockWrap(d.put, key, self.makeData(key),
 | |
|                                  max_retries=12)
 | |
|             if verbose and x % 100 == 0:
 | |
|                 print "%s: records %d - %d finished" % (name, start, x)
 | |
| 
 | |
|             count2-=1
 | |
|             if not count2 :
 | |
|                 readers.pop().start()
 | |
|                 count2=count
 | |
| 
 | |
|         if verbose:
 | |
|             print "%s: finished creating records" % name
 | |
| 
 | |
|         if verbose:
 | |
|             print "%s: thread finished" % name
 | |
| 
 | |
|     def readerThread(self, d, readerNum):
 | |
|         name = currentThread().getName()
 | |
| 
 | |
|         for i in xrange(5) :
 | |
|             c = d.cursor()
 | |
|             count = 0
 | |
|             rec = c.first()
 | |
|             while rec:
 | |
|                 count += 1
 | |
|                 key, data = rec
 | |
|                 self.assertEqual(self.makeData(key), data)
 | |
|                 rec = c.next()
 | |
|             if verbose:
 | |
|                 print "%s: found %d records" % (name, count)
 | |
|             c.close()
 | |
| 
 | |
|         if verbose:
 | |
|             print "%s: thread finished" % name
 | |
| 
 | |
| 
 | |
| class BTreeConcurrentDataStore(ConcurrentDataStoreBase):
 | |
|     dbtype  = db.DB_BTREE
 | |
|     writers = 2
 | |
|     readers = 10
 | |
|     records = 1000
 | |
| 
 | |
| 
 | |
| class HashConcurrentDataStore(ConcurrentDataStoreBase):
 | |
|     dbtype  = db.DB_HASH
 | |
|     writers = 2
 | |
|     readers = 10
 | |
|     records = 1000
 | |
| 
 | |
| 
 | |
| #----------------------------------------------------------------------
 | |
| 
 | |
| class SimpleThreadedBase(BaseThreadedTestCase):
 | |
|     dbopenflags = db.DB_THREAD
 | |
|     envflags    = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
 | |
|     readers = 10
 | |
|     writers = 2
 | |
|     records = 1000
 | |
| 
 | |
|     def setEnvOpts(self):
 | |
|         self.env.set_lk_detect(db.DB_LOCK_DEFAULT)
 | |
| 
 | |
|     def test02_SimpleLocks(self):
 | |
|         if verbose:
 | |
|             print '\n', '-=' * 30
 | |
|             print "Running %s.test02_SimpleLocks..." % self.__class__.__name__
 | |
| 
 | |
| 
 | |
|         keys=range(self.records)
 | |
|         import random
 | |
|         random.shuffle(keys)
 | |
|         records_per_writer=self.records//self.writers
 | |
|         readers_per_writer=self.readers//self.writers
 | |
|         self.assertEqual(self.records,self.writers*records_per_writer)
 | |
|         self.assertEqual(self.readers,self.writers*readers_per_writer)
 | |
|         self.assertTrue((records_per_writer%readers_per_writer)==0)
 | |
| 
 | |
|         readers = []
 | |
|         for x in xrange(self.readers):
 | |
|             rt = Thread(target = self.readerThread,
 | |
|                         args = (self.d, x),
 | |
|                         name = 'reader %d' % x,
 | |
|                         )#verbose = verbose)
 | |
|             rt.setDaemon(True)
 | |
|             readers.append(rt)
 | |
| 
 | |
|         writers = []
 | |
|         for x in xrange(self.writers):
 | |
|             a=keys[records_per_writer*x:records_per_writer*(x+1)]
 | |
|             a.sort()  # Generate conflicts
 | |
|             b=readers[readers_per_writer*x:readers_per_writer*(x+1)]
 | |
|             wt = Thread(target = self.writerThread,
 | |
|                         args = (self.d, a, b),
 | |
|                         name = 'writer %d' % x,
 | |
|                         )#verbose = verbose)
 | |
|             writers.append(wt)
 | |
| 
 | |
|         for t in writers:
 | |
|             t.setDaemon(True)
 | |
|             t.start()
 | |
| 
 | |
|         for t in writers:
 | |
|             t.join()
 | |
|         for t in readers:
 | |
|             t.join()
 | |
| 
 | |
|     def writerThread(self, d, keys, readers):
 | |
|         name = currentThread().getName()
 | |
|         if verbose:
 | |
|             print "%s: creating records %d - %d" % (name, start, stop)
 | |
| 
 | |
|         count=len(keys)//len(readers)
 | |
|         count2=count
 | |
|         for x in keys :
 | |
|             key = '%04d' % x
 | |
|             dbutils.DeadlockWrap(d.put, key, self.makeData(key),
 | |
|                                  max_retries=12)
 | |
| 
 | |
|             if verbose and x % 100 == 0:
 | |
|                 print "%s: records %d - %d finished" % (name, start, x)
 | |
| 
 | |
|             count2-=1
 | |
|             if not count2 :
 | |
|                 readers.pop().start()
 | |
|                 count2=count
 | |
| 
 | |
|         if verbose:
 | |
|             print "%s: thread finished" % name
 | |
| 
 | |
|     def readerThread(self, d, readerNum):
 | |
|         name = currentThread().getName()
 | |
| 
 | |
|         c = d.cursor()
 | |
|         count = 0
 | |
|         rec = dbutils.DeadlockWrap(c.first, max_retries=10)
 | |
|         while rec:
 | |
|             count += 1
 | |
|             key, data = rec
 | |
|             self.assertEqual(self.makeData(key), data)
 | |
|             rec = dbutils.DeadlockWrap(c.next, max_retries=10)
 | |
|         if verbose:
 | |
|             print "%s: found %d records" % (name, count)
 | |
|         c.close()
 | |
| 
 | |
|         if verbose:
 | |
|             print "%s: thread finished" % name
 | |
| 
 | |
| 
 | |
| class BTreeSimpleThreaded(SimpleThreadedBase):
 | |
|     dbtype = db.DB_BTREE
 | |
| 
 | |
| 
 | |
| class HashSimpleThreaded(SimpleThreadedBase):
 | |
|     dbtype = db.DB_HASH
 | |
| 
 | |
| 
 | |
| #----------------------------------------------------------------------
 | |
| 
 | |
| 
 | |
| class ThreadedTransactionsBase(BaseThreadedTestCase):
 | |
|     dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT
 | |
|     envflags    = (db.DB_THREAD |
 | |
|                    db.DB_INIT_MPOOL |
 | |
|                    db.DB_INIT_LOCK |
 | |
|                    db.DB_INIT_LOG |
 | |
|                    db.DB_INIT_TXN
 | |
|                    )
 | |
|     readers = 0
 | |
|     writers = 0
 | |
|     records = 2000
 | |
|     txnFlag = 0
 | |
| 
 | |
|     def setEnvOpts(self):
 | |
|         #self.env.set_lk_detect(db.DB_LOCK_DEFAULT)
 | |
|         pass
 | |
| 
 | |
|     def test03_ThreadedTransactions(self):
 | |
|         if verbose:
 | |
|             print '\n', '-=' * 30
 | |
|             print "Running %s.test03_ThreadedTransactions..." % \
 | |
|                   self.__class__.__name__
 | |
| 
 | |
|         keys=range(self.records)
 | |
|         import random
 | |
|         random.shuffle(keys)
 | |
|         records_per_writer=self.records//self.writers
 | |
|         readers_per_writer=self.readers//self.writers
 | |
|         self.assertEqual(self.records,self.writers*records_per_writer)
 | |
|         self.assertEqual(self.readers,self.writers*readers_per_writer)
 | |
|         self.assertTrue((records_per_writer%readers_per_writer)==0)
 | |
| 
 | |
|         readers=[]
 | |
|         for x in xrange(self.readers):
 | |
|             rt = Thread(target = self.readerThread,
 | |
|                         args = (self.d, x),
 | |
|                         name = 'reader %d' % x,
 | |
|                         )#verbose = verbose)
 | |
|             rt.setDaemon(True)
 | |
|             readers.append(rt)
 | |
| 
 | |
|         writers = []
 | |
|         for x in xrange(self.writers):
 | |
|             a=keys[records_per_writer*x:records_per_writer*(x+1)]
 | |
|             b=readers[readers_per_writer*x:readers_per_writer*(x+1)]
 | |
|             wt = Thread(target = self.writerThread,
 | |
|                         args = (self.d, a, b),
 | |
|                         name = 'writer %d' % x,
 | |
|                         )#verbose = verbose)
 | |
|             writers.append(wt)
 | |
| 
 | |
|         dt = Thread(target = self.deadlockThread)
 | |
|         dt.setDaemon(True)
 | |
|         dt.start()
 | |
| 
 | |
|         for t in writers:
 | |
|             t.setDaemon(True)
 | |
|             t.start()
 | |
| 
 | |
|         for t in writers:
 | |
|             t.join()
 | |
|         for t in readers:
 | |
|             t.join()
 | |
| 
 | |
|         self.doLockDetect = False
 | |
|         dt.join()
 | |
| 
 | |
|     def writerThread(self, d, keys, readers):
 | |
|         name = currentThread().getName()
 | |
|         count=len(keys)//len(readers)
 | |
|         while len(keys):
 | |
|             try:
 | |
|                 txn = self.env.txn_begin(None, self.txnFlag)
 | |
|                 keys2=keys[:count]
 | |
|                 for x in keys2 :
 | |
|                     key = '%04d' % x
 | |
|                     d.put(key, self.makeData(key), txn)
 | |
|                     if verbose and x % 100 == 0:
 | |
|                         print "%s: records %d - %d finished" % (name, start, x)
 | |
|                 txn.commit()
 | |
|                 keys=keys[count:]
 | |
|                 readers.pop().start()
 | |
|             except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
 | |
|                 if verbose:
 | |
|                     print "%s: Aborting transaction (%s)" % (name, val[1])
 | |
|                 txn.abort()
 | |
| 
 | |
|         if verbose:
 | |
|             print "%s: thread finished" % name
 | |
| 
 | |
|     def readerThread(self, d, readerNum):
 | |
|         name = currentThread().getName()
 | |
| 
 | |
|         finished = False
 | |
|         while not finished:
 | |
|             try:
 | |
|                 txn = self.env.txn_begin(None, self.txnFlag)
 | |
|                 c = d.cursor(txn)
 | |
|                 count = 0
 | |
|                 rec = c.first()
 | |
|                 while rec:
 | |
|                     count += 1
 | |
|                     key, data = rec
 | |
|                     self.assertEqual(self.makeData(key), data)
 | |
|                     rec = c.next()
 | |
|                 if verbose: print "%s: found %d records" % (name, count)
 | |
|                 c.close()
 | |
|                 txn.commit()
 | |
|                 finished = True
 | |
|             except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
 | |
|                 if verbose:
 | |
|                     print "%s: Aborting transaction (%s)" % (name, val[1])
 | |
|                 c.close()
 | |
|                 txn.abort()
 | |
| 
 | |
|         if verbose:
 | |
|             print "%s: thread finished" % name
 | |
| 
 | |
|     def deadlockThread(self):
 | |
|         self.doLockDetect = True
 | |
|         while self.doLockDetect:
 | |
|             time.sleep(0.05)
 | |
|             try:
 | |
|                 aborted = self.env.lock_detect(
 | |
|                     db.DB_LOCK_RANDOM, db.DB_LOCK_CONFLICT)
 | |
|                 if verbose and aborted:
 | |
|                     print "deadlock: Aborted %d deadlocked transaction(s)" \
 | |
|                           % aborted
 | |
|             except db.DBError:
 | |
|                 pass
 | |
| 
 | |
| 
 | |
| class BTreeThreadedTransactions(ThreadedTransactionsBase):
 | |
|     dbtype = db.DB_BTREE
 | |
|     writers = 2
 | |
|     readers = 10
 | |
|     records = 1000
 | |
| 
 | |
| class HashThreadedTransactions(ThreadedTransactionsBase):
 | |
|     dbtype = db.DB_HASH
 | |
|     writers = 2
 | |
|     readers = 10
 | |
|     records = 1000
 | |
| 
 | |
| class BTreeThreadedNoWaitTransactions(ThreadedTransactionsBase):
 | |
|     dbtype = db.DB_BTREE
 | |
|     writers = 2
 | |
|     readers = 10
 | |
|     records = 1000
 | |
|     txnFlag = db.DB_TXN_NOWAIT
 | |
| 
 | |
| class HashThreadedNoWaitTransactions(ThreadedTransactionsBase):
 | |
|     dbtype = db.DB_HASH
 | |
|     writers = 2
 | |
|     readers = 10
 | |
|     records = 1000
 | |
|     txnFlag = db.DB_TXN_NOWAIT
 | |
| 
 | |
| 
 | |
| #----------------------------------------------------------------------
 | |
| 
 | |
| def test_suite():
 | |
|     suite = unittest.TestSuite()
 | |
| 
 | |
|     if have_threads:
 | |
|         suite.addTest(unittest.makeSuite(BTreeConcurrentDataStore))
 | |
|         suite.addTest(unittest.makeSuite(HashConcurrentDataStore))
 | |
|         suite.addTest(unittest.makeSuite(BTreeSimpleThreaded))
 | |
|         suite.addTest(unittest.makeSuite(HashSimpleThreaded))
 | |
|         suite.addTest(unittest.makeSuite(BTreeThreadedTransactions))
 | |
|         suite.addTest(unittest.makeSuite(HashThreadedTransactions))
 | |
|         suite.addTest(unittest.makeSuite(BTreeThreadedNoWaitTransactions))
 | |
|         suite.addTest(unittest.makeSuite(HashThreadedNoWaitTransactions))
 | |
| 
 | |
|     else:
 | |
|         print "Threads not available, skipping thread tests."
 | |
| 
 | |
|     return suite
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     unittest.main(defaultTest='test_suite')
 |