mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 07:31:38 +00:00 
			
		
		
		
	run the test simultaneously. The simplest thing I found that worked on both Windows and Unix was to use the PID. It's unique so should be sufficient. This should prevent many of the spurious failures of the automated tests since they run as different users. Also cleanup the directory consistenly in the tearDown methods. It would be nice if someone ensured that the directories are always created with a consistent name.
		
			
				
	
	
		
			107 lines
		
	
	
	
		
			3.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			107 lines
		
	
	
	
		
			3.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import unittest
 | 
						|
import os
 | 
						|
import tempfile
 | 
						|
 | 
						|
try:
 | 
						|
    # For Pythons w/distutils pybsddb
 | 
						|
    from bsddb3 import db
 | 
						|
except ImportError:
 | 
						|
    from bsddb import db
 | 
						|
 | 
						|
 | 
						|
class DBSequenceTest(unittest.TestCase):
 | 
						|
    def setUp(self):
 | 
						|
        self.int_32_max = 0x100000000
 | 
						|
        self.homeDir = os.path.join(tempfile.gettempdir(), 'db_home%d'%os.getpid())
 | 
						|
        try:
 | 
						|
            os.mkdir(self.homeDir)
 | 
						|
        except os.error:
 | 
						|
            pass
 | 
						|
        tempfile.tempdir = self.homeDir
 | 
						|
        self.filename = os.path.split(tempfile.mktemp())[1]
 | 
						|
        tempfile.tempdir = None
 | 
						|
 | 
						|
        self.dbenv = db.DBEnv()
 | 
						|
        self.dbenv.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL, 0666)
 | 
						|
        self.d = db.DB(self.dbenv)
 | 
						|
        self.d.open(self.filename, db.DB_BTREE, db.DB_CREATE, 0666)
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        if hasattr(self, 'seq'):
 | 
						|
            self.seq.close()
 | 
						|
            del self.seq
 | 
						|
        if hasattr(self, 'd'):
 | 
						|
            self.d.close()
 | 
						|
            del self.d
 | 
						|
        if hasattr(self, 'dbenv'):
 | 
						|
            self.dbenv.close()
 | 
						|
            del self.dbenv
 | 
						|
 | 
						|
        from test import test_support
 | 
						|
        test_support.rmtree(self.homeDir)
 | 
						|
 | 
						|
    def test_get(self):
 | 
						|
        self.seq = db.DBSequence(self.d, flags=0)
 | 
						|
        start_value = 10 * self.int_32_max
 | 
						|
        self.assertEqual(0xA00000000, start_value)
 | 
						|
        self.assertEquals(None, self.seq.init_value(start_value))
 | 
						|
        self.assertEquals(None, self.seq.open(key='id', txn=None, flags=db.DB_CREATE))
 | 
						|
        self.assertEquals(start_value, self.seq.get(5))
 | 
						|
        self.assertEquals(start_value + 5, self.seq.get())
 | 
						|
 | 
						|
    def test_remove(self):
 | 
						|
        self.seq = db.DBSequence(self.d, flags=0)
 | 
						|
        self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
 | 
						|
        self.assertEquals(None, self.seq.remove(txn=None, flags=0))
 | 
						|
        del self.seq
 | 
						|
 | 
						|
    def test_get_key(self):
 | 
						|
        self.seq = db.DBSequence(self.d, flags=0)
 | 
						|
        key = 'foo'
 | 
						|
        self.assertEquals(None, self.seq.open(key=key, txn=None, flags=db.DB_CREATE))
 | 
						|
        self.assertEquals(key, self.seq.get_key())
 | 
						|
 | 
						|
    def test_get_dbp(self):
 | 
						|
        self.seq = db.DBSequence(self.d, flags=0)
 | 
						|
        self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
 | 
						|
        self.assertEquals(self.d, self.seq.get_dbp())
 | 
						|
 | 
						|
    def test_cachesize(self):
 | 
						|
        self.seq = db.DBSequence(self.d, flags=0)
 | 
						|
        cashe_size = 10
 | 
						|
        self.assertEquals(None, self.seq.set_cachesize(cashe_size))
 | 
						|
        self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
 | 
						|
        self.assertEquals(cashe_size, self.seq.get_cachesize())
 | 
						|
 | 
						|
    def test_flags(self):
 | 
						|
        self.seq = db.DBSequence(self.d, flags=0)
 | 
						|
        flag = db.DB_SEQ_WRAP;
 | 
						|
        self.assertEquals(None, self.seq.set_flags(flag))
 | 
						|
        self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
 | 
						|
        self.assertEquals(flag, self.seq.get_flags() & flag)
 | 
						|
 | 
						|
    def test_range(self):
 | 
						|
        self.seq = db.DBSequence(self.d, flags=0)
 | 
						|
        seq_range = (10 * self.int_32_max, 11 * self.int_32_max - 1)
 | 
						|
        self.assertEquals(None, self.seq.set_range(seq_range))
 | 
						|
        self.seq.init_value(seq_range[0])
 | 
						|
        self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
 | 
						|
        self.assertEquals(seq_range, self.seq.get_range())
 | 
						|
 | 
						|
    def test_stat(self):
 | 
						|
        self.seq = db.DBSequence(self.d, flags=0)
 | 
						|
        self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
 | 
						|
        stat = self.seq.stat()
 | 
						|
        for param in ('nowait', 'min', 'max', 'value', 'current',
 | 
						|
                      'flags', 'cache_size', 'last_value', 'wait'):
 | 
						|
            self.assertTrue(param in stat, "parameter %s isn't in stat info" % param)
 | 
						|
 | 
						|
def test_suite():
 | 
						|
    suite = unittest.TestSuite()
 | 
						|
    if db.version() >= (4,3):
 | 
						|
        suite.addTest(unittest.makeSuite(DBSequenceTest))
 | 
						|
    return suite
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    unittest.main(defaultTest='test_suite')
 |