mirror of
				https://github.com/python/cpython.git
				synced 2025-10-26 03:04:41 +00:00 
			
		
		
		
	 fc06096911
			
		
	
	
		fc06096911
		
			
		
	
	
	
	
		
			
			The directory file for a newly created database is now created immediately after opening instead of deferring this until synchronizing or closing.
		
			
				
	
	
		
			388 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			388 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Test script for the dumbdbm module
 | |
|    Original by Roger E. Masse
 | |
| """
 | |
| 
 | |
| import contextlib
 | |
| import io
 | |
| import operator
 | |
| import os
 | |
| import stat
 | |
| import unittest
 | |
| import dbm.dumb as dumbdbm
 | |
| from test import support
 | |
| from test.support import os_helper
 | |
| from functools import partial
 | |
| 
 | |
| _fname = os_helper.TESTFN
 | |
| 
 | |
| 
 | |
| def _delete_files():
 | |
|     for ext in [".dir", ".dat", ".bak"]:
 | |
|         try:
 | |
|             os.unlink(_fname + ext)
 | |
|         except OSError:
 | |
|             pass
 | |
| 
 | |
| class DumbDBMTestCase(unittest.TestCase):
 | |
|     _dict = {b'0': b'',
 | |
|              b'a': b'Python:',
 | |
|              b'b': b'Programming',
 | |
|              b'c': b'the',
 | |
|              b'd': b'way',
 | |
|              b'f': b'Guido',
 | |
|              b'g': b'intended',
 | |
|              '\u00fc'.encode('utf-8') : b'!',
 | |
|              }
 | |
| 
 | |
|     def test_dumbdbm_creation(self):
 | |
|         with contextlib.closing(dumbdbm.open(_fname, 'c')) as f:
 | |
|             self.assertEqual(list(f.keys()), [])
 | |
|             for key in self._dict:
 | |
|                 f[key] = self._dict[key]
 | |
|             self.read_helper(f)
 | |
| 
 | |
|     @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
 | |
|     @os_helper.skip_unless_working_chmod
 | |
|     def test_dumbdbm_creation_mode(self):
 | |
|         try:
 | |
|             old_umask = os.umask(0o002)
 | |
|             f = dumbdbm.open(_fname, 'c', 0o637)
 | |
|             f.close()
 | |
|         finally:
 | |
|             os.umask(old_umask)
 | |
| 
 | |
|         expected_mode = 0o635
 | |
|         if os.name != 'posix':
 | |
|             # Windows only supports setting the read-only attribute.
 | |
|             # This shouldn't fail, but doesn't work like Unix either.
 | |
|             expected_mode = 0o666
 | |
| 
 | |
|         import stat
 | |
|         st = os.stat(_fname + '.dat')
 | |
|         self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode)
 | |
|         st = os.stat(_fname + '.dir')
 | |
|         self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode)
 | |
| 
 | |
|     def test_close_twice(self):
 | |
|         f = dumbdbm.open(_fname)
 | |
|         f[b'a'] = b'b'
 | |
|         self.assertEqual(f[b'a'], b'b')
 | |
|         f.close()
 | |
|         f.close()
 | |
| 
 | |
|     def test_dumbdbm_modification(self):
 | |
|         self.init_db()
 | |
|         with contextlib.closing(dumbdbm.open(_fname, 'w')) as f:
 | |
|             self._dict[b'g'] = f[b'g'] = b"indented"
 | |
|             self.read_helper(f)
 | |
|             # setdefault() works as in the dict interface
 | |
|             self.assertEqual(f.setdefault(b'xxx', b'foo'), b'foo')
 | |
|             self.assertEqual(f[b'xxx'], b'foo')
 | |
| 
 | |
|     def test_dumbdbm_read(self):
 | |
|         self.init_db()
 | |
|         with contextlib.closing(dumbdbm.open(_fname, 'r')) as f:
 | |
|             self.read_helper(f)
 | |
|             with self.assertRaisesRegex(dumbdbm.error,
 | |
|                                     'The database is opened for reading only'):
 | |
|                 f[b'g'] = b'x'
 | |
|             with self.assertRaisesRegex(dumbdbm.error,
 | |
|                                     'The database is opened for reading only'):
 | |
|                 del f[b'a']
 | |
|             # get() works as in the dict interface
 | |
|             self.assertEqual(f.get(b'a'), self._dict[b'a'])
 | |
|             self.assertEqual(f.get(b'xxx', b'foo'), b'foo')
 | |
|             self.assertIsNone(f.get(b'xxx'))
 | |
|             with self.assertRaises(KeyError):
 | |
|                 f[b'xxx']
 | |
| 
 | |
|     def test_dumbdbm_keys(self):
 | |
|         self.init_db()
 | |
|         with contextlib.closing(dumbdbm.open(_fname)) as f:
 | |
|             keys = self.keys_helper(f)
 | |
| 
 | |
|     def test_write_contains(self):
 | |
|         with contextlib.closing(dumbdbm.open(_fname)) as f:
 | |
|             f[b'1'] = b'hello'
 | |
|             self.assertIn(b'1', f)
 | |
| 
 | |
|     def test_write_write_read(self):
 | |
|         # test for bug #482460
 | |
|         with contextlib.closing(dumbdbm.open(_fname)) as f:
 | |
|             f[b'1'] = b'hello'
 | |
|             f[b'1'] = b'hello2'
 | |
|         with contextlib.closing(dumbdbm.open(_fname)) as f:
 | |
|             self.assertEqual(f[b'1'], b'hello2')
 | |
| 
 | |
|     def test_str_read(self):
 | |
|         self.init_db()
 | |
|         with contextlib.closing(dumbdbm.open(_fname, 'r')) as f:
 | |
|             self.assertEqual(f['\u00fc'], self._dict['\u00fc'.encode('utf-8')])
 | |
| 
 | |
|     def test_str_write_contains(self):
 | |
|         self.init_db()
 | |
|         with contextlib.closing(dumbdbm.open(_fname)) as f:
 | |
|             f['\u00fc'] = b'!'
 | |
|             f['1'] = 'a'
 | |
|         with contextlib.closing(dumbdbm.open(_fname, 'r')) as f:
 | |
|             self.assertIn('\u00fc', f)
 | |
|             self.assertEqual(f['\u00fc'.encode('utf-8')],
 | |
|                              self._dict['\u00fc'.encode('utf-8')])
 | |
|             self.assertEqual(f[b'1'], b'a')
 | |
| 
 | |
|     def test_line_endings(self):
 | |
|         # test for bug #1172763: dumbdbm would die if the line endings
 | |
|         # weren't what was expected.
 | |
|         with contextlib.closing(dumbdbm.open(_fname)) as f:
 | |
|             f[b'1'] = b'hello'
 | |
|             f[b'2'] = b'hello2'
 | |
| 
 | |
|         # Mangle the file by changing the line separator to Windows or Unix
 | |
|         with io.open(_fname + '.dir', 'rb') as file:
 | |
|             data = file.read()
 | |
|         if os.linesep == '\n':
 | |
|             data = data.replace(b'\n', b'\r\n')
 | |
|         else:
 | |
|             data = data.replace(b'\r\n', b'\n')
 | |
|         with io.open(_fname + '.dir', 'wb') as file:
 | |
|             file.write(data)
 | |
| 
 | |
|         f = dumbdbm.open(_fname)
 | |
|         self.assertEqual(f[b'1'], b'hello')
 | |
|         self.assertEqual(f[b'2'], b'hello2')
 | |
| 
 | |
| 
 | |
|     def read_helper(self, f):
 | |
|         keys = self.keys_helper(f)
 | |
|         for key in self._dict:
 | |
|             self.assertEqual(self._dict[key], f[key])
 | |
| 
 | |
|     def init_db(self):
 | |
|         with contextlib.closing(dumbdbm.open(_fname, 'n')) as f:
 | |
|             for k in self._dict:
 | |
|                 f[k] = self._dict[k]
 | |
| 
 | |
|     def keys_helper(self, f):
 | |
|         keys = sorted(f.keys())
 | |
|         dkeys = sorted(self._dict.keys())
 | |
|         self.assertEqual(keys, dkeys)
 | |
|         return keys
 | |
| 
 | |
|     # Perform randomized operations.  This doesn't make assumptions about
 | |
|     # what *might* fail.
 | |
|     def test_random(self):
 | |
|         import random
 | |
|         d = {}  # mirror the database
 | |
|         for dummy in range(5):
 | |
|             with contextlib.closing(dumbdbm.open(_fname)) as f:
 | |
|                 for dummy in range(100):
 | |
|                     k = random.choice('abcdefghijklm')
 | |
|                     if random.random() < 0.2:
 | |
|                         if k in d:
 | |
|                             del d[k]
 | |
|                             del f[k]
 | |
|                     else:
 | |
|                         v = random.choice((b'a', b'b', b'c')) * random.randrange(10000)
 | |
|                         d[k] = v
 | |
|                         f[k] = v
 | |
|                         self.assertEqual(f[k], v)
 | |
| 
 | |
|             with contextlib.closing(dumbdbm.open(_fname)) as f:
 | |
|                 expected = sorted((k.encode("latin-1"), v) for k, v in d.items())
 | |
|                 got = sorted(f.items())
 | |
|                 self.assertEqual(expected, got)
 | |
| 
 | |
|     def test_context_manager(self):
 | |
|         with dumbdbm.open(_fname, 'c') as db:
 | |
|             db["dumbdbm context manager"] = "context manager"
 | |
| 
 | |
|         with dumbdbm.open(_fname, 'r') as db:
 | |
|             self.assertEqual(list(db.keys()), [b"dumbdbm context manager"])
 | |
| 
 | |
|         with self.assertRaises(dumbdbm.error):
 | |
|             db.keys()
 | |
| 
 | |
|     def test_check_closed(self):
 | |
|         f = dumbdbm.open(_fname, 'c')
 | |
|         f.close()
 | |
| 
 | |
|         for meth in (partial(operator.delitem, f),
 | |
|                      partial(operator.setitem, f, 'b'),
 | |
|                      partial(operator.getitem, f),
 | |
|                      partial(operator.contains, f)):
 | |
|             with self.assertRaises(dumbdbm.error) as cm:
 | |
|                 meth('test')
 | |
|             self.assertEqual(str(cm.exception),
 | |
|                              "DBM object has already been closed")
 | |
| 
 | |
|         for meth in (operator.methodcaller('keys'),
 | |
|                      operator.methodcaller('iterkeys'),
 | |
|                      operator.methodcaller('items'),
 | |
|                      len):
 | |
|             with self.assertRaises(dumbdbm.error) as cm:
 | |
|                 meth(f)
 | |
|             self.assertEqual(str(cm.exception),
 | |
|                              "DBM object has already been closed")
 | |
| 
 | |
|     def test_create_new(self):
 | |
|         with dumbdbm.open(_fname, 'n') as f:
 | |
|             for k in self._dict:
 | |
|                 f[k] = self._dict[k]
 | |
| 
 | |
|         with dumbdbm.open(_fname, 'n') as f:
 | |
|             self.assertEqual(f.keys(), [])
 | |
| 
 | |
|     def test_eval(self):
 | |
|         with open(_fname + '.dir', 'w', encoding="utf-8") as stream:
 | |
|             stream.write("str(print('Hacked!')), 0\n")
 | |
|         with support.captured_stdout() as stdout:
 | |
|             with self.assertRaises(ValueError):
 | |
|                 with dumbdbm.open(_fname) as f:
 | |
|                     pass
 | |
|             self.assertEqual(stdout.getvalue(), '')
 | |
| 
 | |
|     def test_missing_data(self):
 | |
|         for value in ('r', 'w'):
 | |
|             _delete_files()
 | |
|             with self.assertRaises(FileNotFoundError):
 | |
|                 dumbdbm.open(_fname, value)
 | |
|             self.assertFalse(os.path.exists(_fname + '.dat'))
 | |
|             self.assertFalse(os.path.exists(_fname + '.dir'))
 | |
|             self.assertFalse(os.path.exists(_fname + '.bak'))
 | |
| 
 | |
|         for value in ('c', 'n'):
 | |
|             _delete_files()
 | |
|             with dumbdbm.open(_fname, value) as f:
 | |
|                 self.assertTrue(os.path.exists(_fname + '.dat'))
 | |
|                 self.assertTrue(os.path.exists(_fname + '.dir'))
 | |
|                 self.assertFalse(os.path.exists(_fname + '.bak'))
 | |
|             self.assertFalse(os.path.exists(_fname + '.bak'))
 | |
| 
 | |
|         for value in ('c', 'n'):
 | |
|             _delete_files()
 | |
|             with dumbdbm.open(_fname, value) as f:
 | |
|                 f['key'] = 'value'
 | |
|                 self.assertTrue(os.path.exists(_fname + '.dat'))
 | |
|                 self.assertTrue(os.path.exists(_fname + '.dir'))
 | |
|                 self.assertFalse(os.path.exists(_fname + '.bak'))
 | |
|             self.assertTrue(os.path.exists(_fname + '.bak'))
 | |
| 
 | |
|     def test_missing_index(self):
 | |
|         with dumbdbm.open(_fname, 'n') as f:
 | |
|             pass
 | |
|         os.unlink(_fname + '.dir')
 | |
|         for value in ('r', 'w'):
 | |
|             with self.assertRaises(FileNotFoundError):
 | |
|                 dumbdbm.open(_fname, value)
 | |
|             self.assertFalse(os.path.exists(_fname + '.dir'))
 | |
|             self.assertFalse(os.path.exists(_fname + '.bak'))
 | |
| 
 | |
|         for value in ('c', 'n'):
 | |
|             with dumbdbm.open(_fname, value) as f:
 | |
|                 self.assertTrue(os.path.exists(_fname + '.dir'))
 | |
|                 self.assertFalse(os.path.exists(_fname + '.bak'))
 | |
|             self.assertFalse(os.path.exists(_fname + '.bak'))
 | |
|             os.unlink(_fname + '.dir')
 | |
| 
 | |
|         for value in ('c', 'n'):
 | |
|             with dumbdbm.open(_fname, value) as f:
 | |
|                 f['key'] = 'value'
 | |
|                 self.assertTrue(os.path.exists(_fname + '.dir'))
 | |
|                 self.assertFalse(os.path.exists(_fname + '.bak'))
 | |
|             self.assertTrue(os.path.exists(_fname + '.bak'))
 | |
|             os.unlink(_fname + '.dir')
 | |
|             os.unlink(_fname + '.bak')
 | |
| 
 | |
|     def test_sync_empty_unmodified(self):
 | |
|         with dumbdbm.open(_fname, 'n') as f:
 | |
|             pass
 | |
|         os.unlink(_fname + '.dir')
 | |
|         for value in ('c', 'n'):
 | |
|             with dumbdbm.open(_fname, value) as f:
 | |
|                 self.assertTrue(os.path.exists(_fname + '.dir'))
 | |
|                 self.assertFalse(os.path.exists(_fname + '.bak'))
 | |
|                 f.sync()
 | |
|                 self.assertTrue(os.path.exists(_fname + '.dir'))
 | |
|                 self.assertFalse(os.path.exists(_fname + '.bak'))
 | |
|                 os.unlink(_fname + '.dir')
 | |
|                 f.sync()
 | |
|                 self.assertFalse(os.path.exists(_fname + '.dir'))
 | |
|                 self.assertFalse(os.path.exists(_fname + '.bak'))
 | |
|             self.assertFalse(os.path.exists(_fname + '.dir'))
 | |
|             self.assertFalse(os.path.exists(_fname + '.bak'))
 | |
| 
 | |
|     def test_sync_nonempty_unmodified(self):
 | |
|         with dumbdbm.open(_fname, 'n') as f:
 | |
|             pass
 | |
|         os.unlink(_fname + '.dir')
 | |
|         for value in ('c', 'n'):
 | |
|             with dumbdbm.open(_fname, value) as f:
 | |
|                 f['key'] = 'value'
 | |
|                 self.assertTrue(os.path.exists(_fname + '.dir'))
 | |
|                 self.assertFalse(os.path.exists(_fname + '.bak'))
 | |
|                 f.sync()
 | |
|                 self.assertTrue(os.path.exists(_fname + '.dir'))
 | |
|                 self.assertTrue(os.path.exists(_fname + '.bak'))
 | |
|                 os.unlink(_fname + '.dir')
 | |
|                 os.unlink(_fname + '.bak')
 | |
|                 f.sync()
 | |
|                 self.assertFalse(os.path.exists(_fname + '.dir'))
 | |
|                 self.assertFalse(os.path.exists(_fname + '.bak'))
 | |
|             self.assertFalse(os.path.exists(_fname + '.dir'))
 | |
|             self.assertFalse(os.path.exists(_fname + '.bak'))
 | |
| 
 | |
|     def test_invalid_flag(self):
 | |
|         for flag in ('x', 'rf', None):
 | |
|             with self.assertRaisesRegex(ValueError,
 | |
|                                         "Flag must be one of "
 | |
|                                         "'r', 'w', 'c', or 'n'"):
 | |
|                 dumbdbm.open(_fname, flag)
 | |
| 
 | |
|     @os_helper.skip_unless_working_chmod
 | |
|     def test_readonly_files(self):
 | |
|         with os_helper.temp_dir() as dir:
 | |
|             fname = os.path.join(dir, 'db')
 | |
|             with dumbdbm.open(fname, 'n') as f:
 | |
|                 self.assertEqual(list(f.keys()), [])
 | |
|                 for key in self._dict:
 | |
|                     f[key] = self._dict[key]
 | |
|             os.chmod(fname + ".dir", stat.S_IRUSR)
 | |
|             os.chmod(fname + ".dat", stat.S_IRUSR)
 | |
|             os.chmod(dir, stat.S_IRUSR|stat.S_IXUSR)
 | |
|             with dumbdbm.open(fname, 'r') as f:
 | |
|                 self.assertEqual(sorted(f.keys()), sorted(self._dict))
 | |
|                 f.close()  # don't write
 | |
| 
 | |
|     @unittest.skipUnless(os_helper.TESTFN_NONASCII,
 | |
|                          'requires OS support of non-ASCII encodings')
 | |
|     def test_nonascii_filename(self):
 | |
|         filename = os_helper.TESTFN_NONASCII
 | |
|         for suffix in ['.dir', '.dat', '.bak']:
 | |
|             self.addCleanup(os_helper.unlink, filename + suffix)
 | |
|         with dumbdbm.open(filename, 'c') as db:
 | |
|             db[b'key'] = b'value'
 | |
|         self.assertTrue(os.path.exists(filename + '.dat'))
 | |
|         self.assertTrue(os.path.exists(filename + '.dir'))
 | |
|         with dumbdbm.open(filename, 'r') as db:
 | |
|             self.assertEqual(list(db.keys()), [b'key'])
 | |
|             self.assertTrue(b'key' in db)
 | |
|             self.assertEqual(db[b'key'], b'value')
 | |
| 
 | |
|     def test_open_with_pathlib_path(self):
 | |
|         dumbdbm.open(os_helper.FakePath(_fname), "c").close()
 | |
| 
 | |
|     def test_open_with_bytes_path(self):
 | |
|         dumbdbm.open(os.fsencode(_fname), "c").close()
 | |
| 
 | |
|     def test_open_with_pathlib_bytes_path(self):
 | |
|         dumbdbm.open(os_helper.FakePath(os.fsencode(_fname)), "c").close()
 | |
| 
 | |
|     def tearDown(self):
 | |
|         _delete_files()
 | |
| 
 | |
|     def setUp(self):
 | |
|         _delete_files()
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     unittest.main()
 |