mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 23:21:29 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			138 lines
		
	
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			138 lines
		
	
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""Miscellaneous bsddb module test cases
 | 
						|
"""
 | 
						|
 | 
						|
import os
 | 
						|
import unittest
 | 
						|
 | 
						|
try:
 | 
						|
    # For Pythons w/distutils pybsddb
 | 
						|
    from bsddb3 import db, dbshelve, hashopen
 | 
						|
except ImportError:
 | 
						|
    # For Python 2.3
 | 
						|
    from bsddb import db, dbshelve, hashopen
 | 
						|
 | 
						|
from test_all import get_new_environment_path, get_new_database_path
 | 
						|
 | 
						|
try:
 | 
						|
    from bsddb3 import test_support
 | 
						|
except ImportError:
 | 
						|
    from test import test_support
 | 
						|
 | 
						|
#----------------------------------------------------------------------
 | 
						|
 | 
						|
class MiscTestCase(unittest.TestCase):
 | 
						|
    def setUp(self):
 | 
						|
        self.filename = self.__class__.__name__ + '.db'
 | 
						|
        self.homeDir = get_new_environment_path()
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        test_support.unlink(self.filename)
 | 
						|
        test_support.rmtree(self.homeDir)
 | 
						|
 | 
						|
    def test01_badpointer(self):
 | 
						|
        dbs = dbshelve.open(self.filename)
 | 
						|
        dbs.close()
 | 
						|
        self.assertRaises(db.DBError, dbs.get, "foo")
 | 
						|
 | 
						|
    def test02_db_home(self):
 | 
						|
        env = db.DBEnv()
 | 
						|
        # check for crash fixed when db_home is used before open()
 | 
						|
        self.assert_(env.db_home is None)
 | 
						|
        env.open(self.homeDir, db.DB_CREATE)
 | 
						|
        self.assertEqual(self.homeDir, env.db_home)
 | 
						|
 | 
						|
    def test03_repr_closed_db(self):
 | 
						|
        db = hashopen(self.filename)
 | 
						|
        db.close()
 | 
						|
        rp = repr(db)
 | 
						|
        self.assertEquals(rp, "{}")
 | 
						|
 | 
						|
    def test04_repr_db(self) :
 | 
						|
        db = hashopen(self.filename)
 | 
						|
        d = {}
 | 
						|
        for i in xrange(100) :
 | 
						|
            db[repr(i)] = repr(100*i)
 | 
						|
            d[repr(i)] = repr(100*i)
 | 
						|
        db.close()
 | 
						|
        db = hashopen(self.filename)
 | 
						|
        rp = repr(db)
 | 
						|
        self.assertEquals(rp, repr(d))
 | 
						|
        db.close()
 | 
						|
 | 
						|
    # http://sourceforge.net/tracker/index.php?func=detail&aid=1708868&group_id=13900&atid=313900
 | 
						|
    #
 | 
						|
    # See the bug report for details.
 | 
						|
    #
 | 
						|
    # The problem was that make_key_dbt() was not allocating a copy of
 | 
						|
    # string keys but FREE_DBT() was always being told to free it when the
 | 
						|
    # database was opened with DB_THREAD.
 | 
						|
    def test05_double_free_make_key_dbt(self):
 | 
						|
        try:
 | 
						|
            db1 = db.DB()
 | 
						|
            db1.open(self.filename, None, db.DB_BTREE,
 | 
						|
                     db.DB_CREATE | db.DB_THREAD)
 | 
						|
 | 
						|
            curs = db1.cursor()
 | 
						|
            t = curs.get("/foo", db.DB_SET)
 | 
						|
            # double free happened during exit from DBC_get
 | 
						|
        finally:
 | 
						|
            db1.close()
 | 
						|
            test_support.unlink(self.filename)
 | 
						|
 | 
						|
    def test06_key_with_null_bytes(self):
 | 
						|
        try:
 | 
						|
            db1 = db.DB()
 | 
						|
            db1.open(self.filename, None, db.DB_HASH, db.DB_CREATE)
 | 
						|
            db1['a'] = 'eh?'
 | 
						|
            db1['a\x00'] = 'eh zed.'
 | 
						|
            db1['a\x00a'] = 'eh zed eh?'
 | 
						|
            db1['aaa'] = 'eh eh eh!'
 | 
						|
            keys = db1.keys()
 | 
						|
            keys.sort()
 | 
						|
            self.assertEqual(['a', 'a\x00', 'a\x00a', 'aaa'], keys)
 | 
						|
            self.assertEqual(db1['a'], 'eh?')
 | 
						|
            self.assertEqual(db1['a\x00'], 'eh zed.')
 | 
						|
            self.assertEqual(db1['a\x00a'], 'eh zed eh?')
 | 
						|
            self.assertEqual(db1['aaa'], 'eh eh eh!')
 | 
						|
        finally:
 | 
						|
            db1.close()
 | 
						|
            test_support.unlink(self.filename)
 | 
						|
 | 
						|
    def test07_DB_set_flags_persists(self):
 | 
						|
        if db.version() < (4,2):
 | 
						|
            # The get_flags API required for this to work is only available
 | 
						|
            # in Berkeley DB >= 4.2
 | 
						|
            return
 | 
						|
        try:
 | 
						|
            db1 = db.DB()
 | 
						|
            db1.set_flags(db.DB_DUPSORT)
 | 
						|
            db1.open(self.filename, db.DB_HASH, db.DB_CREATE)
 | 
						|
            db1['a'] = 'eh'
 | 
						|
            db1['a'] = 'A'
 | 
						|
            self.assertEqual([('a', 'A')], db1.items())
 | 
						|
            db1.put('a', 'Aa')
 | 
						|
            self.assertEqual([('a', 'A'), ('a', 'Aa')], db1.items())
 | 
						|
            db1.close()
 | 
						|
            db1 = db.DB()
 | 
						|
            # no set_flags call, we're testing that it reads and obeys
 | 
						|
            # the flags on open.
 | 
						|
            db1.open(self.filename, db.DB_HASH)
 | 
						|
            self.assertEqual([('a', 'A'), ('a', 'Aa')], db1.items())
 | 
						|
            # if it read the flags right this will replace all values
 | 
						|
            # for key 'a' instead of adding a new one.  (as a dict should)
 | 
						|
            db1['a'] = 'new A'
 | 
						|
            self.assertEqual([('a', 'new A')], db1.items())
 | 
						|
        finally:
 | 
						|
            db1.close()
 | 
						|
            test_support.unlink(self.filename)
 | 
						|
 | 
						|
 | 
						|
#----------------------------------------------------------------------
 | 
						|
 | 
						|
 | 
						|
def test_suite():
 | 
						|
    return unittest.makeSuite(MiscTestCase)
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    unittest.main(defaultTest='test_suite')
 |