mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 13:41:24 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			142 lines
		
	
	
	
		
			5.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			142 lines
		
	
	
	
		
			5.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import unittest
 | |
| import os
 | |
| 
 | |
| from test_all import db, test_support, get_new_environment_path, get_new_database_path
 | |
| 
 | |
| 
 | |
| class DBSequenceTest(unittest.TestCase):
 | |
|     import sys
 | |
|     if sys.version_info < (2, 4) :
 | |
|         def assertTrue(self, expr, msg=None):
 | |
|             self.failUnless(expr,msg=msg)
 | |
| 
 | |
|     def setUp(self):
 | |
|         self.int_32_max = 0x100000000
 | |
|         self.homeDir = get_new_environment_path()
 | |
|         self.filename = "test"
 | |
| 
 | |
|         self.dbenv = db.DBEnv()
 | |
|         self.dbenv.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL, 0666)
 | |
|         self.d = db.DB(self.dbenv)
 | |
|         self.d.open(self.filename, db.DB_BTREE, db.DB_CREATE, 0666)
 | |
| 
 | |
|     def tearDown(self):
 | |
|         if hasattr(self, 'seq'):
 | |
|             self.seq.close()
 | |
|             del self.seq
 | |
|         if hasattr(self, 'd'):
 | |
|             self.d.close()
 | |
|             del self.d
 | |
|         if hasattr(self, 'dbenv'):
 | |
|             self.dbenv.close()
 | |
|             del self.dbenv
 | |
| 
 | |
|         test_support.rmtree(self.homeDir)
 | |
| 
 | |
|     def test_get(self):
 | |
|         self.seq = db.DBSequence(self.d, flags=0)
 | |
|         start_value = 10 * self.int_32_max
 | |
|         self.assertEqual(0xA00000000, start_value)
 | |
|         self.assertEquals(None, self.seq.initial_value(start_value))
 | |
|         self.assertEquals(None, self.seq.open(key='id', txn=None, flags=db.DB_CREATE))
 | |
|         self.assertEquals(start_value, self.seq.get(5))
 | |
|         self.assertEquals(start_value + 5, self.seq.get())
 | |
| 
 | |
|     def test_remove(self):
 | |
|         self.seq = db.DBSequence(self.d, flags=0)
 | |
|         self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
 | |
|         self.assertEquals(None, self.seq.remove(txn=None, flags=0))
 | |
|         del self.seq
 | |
| 
 | |
|     def test_get_key(self):
 | |
|         self.seq = db.DBSequence(self.d, flags=0)
 | |
|         key = 'foo'
 | |
|         self.assertEquals(None, self.seq.open(key=key, txn=None, flags=db.DB_CREATE))
 | |
|         self.assertEquals(key, self.seq.get_key())
 | |
| 
 | |
|     def test_get_dbp(self):
 | |
|         self.seq = db.DBSequence(self.d, flags=0)
 | |
|         self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
 | |
|         self.assertEquals(self.d, self.seq.get_dbp())
 | |
| 
 | |
|     def test_cachesize(self):
 | |
|         self.seq = db.DBSequence(self.d, flags=0)
 | |
|         cashe_size = 10
 | |
|         self.assertEquals(None, self.seq.set_cachesize(cashe_size))
 | |
|         self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
 | |
|         self.assertEquals(cashe_size, self.seq.get_cachesize())
 | |
| 
 | |
|     def test_flags(self):
 | |
|         self.seq = db.DBSequence(self.d, flags=0)
 | |
|         flag = db.DB_SEQ_WRAP;
 | |
|         self.assertEquals(None, self.seq.set_flags(flag))
 | |
|         self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
 | |
|         self.assertEquals(flag, self.seq.get_flags() & flag)
 | |
| 
 | |
|     def test_range(self):
 | |
|         self.seq = db.DBSequence(self.d, flags=0)
 | |
|         seq_range = (10 * self.int_32_max, 11 * self.int_32_max - 1)
 | |
|         self.assertEquals(None, self.seq.set_range(seq_range))
 | |
|         self.seq.initial_value(seq_range[0])
 | |
|         self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
 | |
|         self.assertEquals(seq_range, self.seq.get_range())
 | |
| 
 | |
|     def test_stat(self):
 | |
|         self.seq = db.DBSequence(self.d, flags=0)
 | |
|         self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
 | |
|         stat = self.seq.stat()
 | |
|         for param in ('nowait', 'min', 'max', 'value', 'current',
 | |
|                       'flags', 'cache_size', 'last_value', 'wait'):
 | |
|             self.assertTrue(param in stat, "parameter %s isn't in stat info" % param)
 | |
| 
 | |
|     if db.version() >= (4,7) :
 | |
|         # This code checks a crash solved in Berkeley DB 4.7
 | |
|         def test_stat_crash(self) :
 | |
|             d=db.DB()
 | |
|             d.open(None,dbtype=db.DB_HASH,flags=db.DB_CREATE)  # In RAM
 | |
|             seq = db.DBSequence(d, flags=0)
 | |
| 
 | |
|             self.assertRaises(db.DBNotFoundError, seq.open,
 | |
|                     key='id', txn=None, flags=0)
 | |
| 
 | |
|             self.assertRaises(db.DBInvalidArgError, seq.stat)
 | |
| 
 | |
|             d.close()
 | |
| 
 | |
|     def test_64bits(self) :
 | |
|         # We don't use both extremes because they are problematic
 | |
|         value_plus=(1L<<63)-2
 | |
|         self.assertEquals(9223372036854775806L,value_plus)
 | |
|         value_minus=(-1L<<63)+1  # Two complement
 | |
|         self.assertEquals(-9223372036854775807L,value_minus)
 | |
|         self.seq = db.DBSequence(self.d, flags=0)
 | |
|         self.assertEquals(None, self.seq.initial_value(value_plus-1))
 | |
|         self.assertEquals(None, self.seq.open(key='id', txn=None,
 | |
|             flags=db.DB_CREATE))
 | |
|         self.assertEquals(value_plus-1, self.seq.get(1))
 | |
|         self.assertEquals(value_plus, self.seq.get(1))
 | |
| 
 | |
|         self.seq.remove(txn=None, flags=0)
 | |
| 
 | |
|         self.seq = db.DBSequence(self.d, flags=0)
 | |
|         self.assertEquals(None, self.seq.initial_value(value_minus))
 | |
|         self.assertEquals(None, self.seq.open(key='id', txn=None,
 | |
|             flags=db.DB_CREATE))
 | |
|         self.assertEquals(value_minus, self.seq.get(1))
 | |
|         self.assertEquals(value_minus+1, self.seq.get(1))
 | |
| 
 | |
|     def test_multiple_close(self):
 | |
|         self.seq = db.DBSequence(self.d)
 | |
|         self.seq.close()  # You can close a Sequence multiple times
 | |
|         self.seq.close()
 | |
|         self.seq.close()
 | |
| 
 | |
| def test_suite():
 | |
|     suite = unittest.TestSuite()
 | |
|     if db.version() >= (4,3):
 | |
|         suite.addTest(unittest.makeSuite(DBSequenceTest))
 | |
|     return suite
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     unittest.main(defaultTest='test_suite')
 | 
