mirror of
https://github.com/python/cpython.git
synced 2025-11-09 10:01:42 +00:00
run the test simultaneously. The simplest thing I found that worked on both Windows and Unix was to use the PID. It's unique so should be sufficient. This should prevent many of the spurious failures of the automated tests since they run as different users. Also cleanup the directory consistenly in the tearDown methods. It would be nice if someone ensured that the directories are always created with a consistent name.
384 lines
14 KiB
Python
384 lines
14 KiB
Python
#!/usr/bin/env python
|
|
#
|
|
#-----------------------------------------------------------------------
|
|
# A test suite for the table interface built on bsddb.db
|
|
#-----------------------------------------------------------------------
|
|
#
|
|
# Copyright (C) 2000, 2001 by Autonomous Zone Industries
|
|
# Copyright (C) 2002 Gregory P. Smith
|
|
#
|
|
# March 20, 2000
|
|
#
|
|
# License: This is free software. You may use this software for any
|
|
# purpose including modification/redistribution, so long as
|
|
# this header remains intact and that you do not claim any
|
|
# rights of ownership or authorship of this software. This
|
|
# software has been tested, but no warranty is expressed or
|
|
# implied.
|
|
#
|
|
# -- Gregory P. Smith <greg@krypto.org>
|
|
#
|
|
# $Id$
|
|
|
|
import os, re
|
|
import tempfile
|
|
try:
|
|
import cPickle
|
|
pickle = cPickle
|
|
except ImportError:
|
|
import pickle
|
|
import tempfile
|
|
|
|
import unittest
|
|
from test_all import verbose
|
|
|
|
try:
|
|
# For Pythons w/distutils pybsddb
|
|
from bsddb3 import db, dbtables
|
|
except ImportError:
|
|
# For Python 2.3
|
|
from bsddb import db, dbtables
|
|
|
|
|
|
|
|
#----------------------------------------------------------------------
|
|
|
|
class TableDBTestCase(unittest.TestCase):
|
|
db_home = 'db_home'
|
|
db_name = 'test-table.db'
|
|
|
|
def setUp(self):
|
|
homeDir = tempfile.mkdtemp()
|
|
self.testHomeDir = homeDir
|
|
try: os.mkdir(homeDir)
|
|
except os.error: pass
|
|
self.tdb = dbtables.bsdTableDB(
|
|
filename='tabletest.db', dbhome=homeDir, create=1)
|
|
|
|
def tearDown(self):
|
|
self.tdb.close()
|
|
from test import test_support
|
|
test_support.rmtree(self.testHomeDir)
|
|
|
|
def test01(self):
|
|
tabname = "test01"
|
|
colname = 'cool numbers'
|
|
try:
|
|
self.tdb.Drop(tabname)
|
|
except dbtables.TableDBError:
|
|
pass
|
|
self.tdb.CreateTable(tabname, [colname])
|
|
self.tdb.Insert(tabname, {colname: pickle.dumps(3.14159, 1)})
|
|
|
|
if verbose:
|
|
self.tdb._db_print()
|
|
|
|
values = self.tdb.Select(
|
|
tabname, [colname], conditions={colname: None})
|
|
|
|
colval = pickle.loads(values[0][colname])
|
|
self.assert_(colval > 3.141)
|
|
self.assert_(colval < 3.142)
|
|
|
|
|
|
def test02(self):
|
|
tabname = "test02"
|
|
col0 = 'coolness factor'
|
|
col1 = 'but can it fly?'
|
|
col2 = 'Species'
|
|
testinfo = [
|
|
{col0: pickle.dumps(8, 1), col1: 'no', col2: 'Penguin'},
|
|
{col0: pickle.dumps(-1, 1), col1: 'no', col2: 'Turkey'},
|
|
{col0: pickle.dumps(9, 1), col1: 'yes', col2: 'SR-71A Blackbird'}
|
|
]
|
|
|
|
try:
|
|
self.tdb.Drop(tabname)
|
|
except dbtables.TableDBError:
|
|
pass
|
|
self.tdb.CreateTable(tabname, [col0, col1, col2])
|
|
for row in testinfo :
|
|
self.tdb.Insert(tabname, row)
|
|
|
|
values = self.tdb.Select(tabname, [col2],
|
|
conditions={col0: lambda x: pickle.loads(x) >= 8})
|
|
|
|
self.assertEqual(len(values), 2)
|
|
if values[0]['Species'] == 'Penguin' :
|
|
self.assertEqual(values[1]['Species'], 'SR-71A Blackbird')
|
|
elif values[0]['Species'] == 'SR-71A Blackbird' :
|
|
self.assertEqual(values[1]['Species'], 'Penguin')
|
|
else :
|
|
if verbose:
|
|
print "values= %r" % (values,)
|
|
raise RuntimeError("Wrong values returned!")
|
|
|
|
def test03(self):
|
|
tabname = "test03"
|
|
try:
|
|
self.tdb.Drop(tabname)
|
|
except dbtables.TableDBError:
|
|
pass
|
|
if verbose:
|
|
print '...before CreateTable...'
|
|
self.tdb._db_print()
|
|
self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e'])
|
|
if verbose:
|
|
print '...after CreateTable...'
|
|
self.tdb._db_print()
|
|
self.tdb.Drop(tabname)
|
|
if verbose:
|
|
print '...after Drop...'
|
|
self.tdb._db_print()
|
|
self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e'])
|
|
|
|
try:
|
|
self.tdb.Insert(tabname,
|
|
{'a': "",
|
|
'e': pickle.dumps([{4:5, 6:7}, 'foo'], 1),
|
|
'f': "Zero"})
|
|
self.fail('Expected an exception')
|
|
except dbtables.TableDBError:
|
|
pass
|
|
|
|
try:
|
|
self.tdb.Select(tabname, [], conditions={'foo': '123'})
|
|
self.fail('Expected an exception')
|
|
except dbtables.TableDBError:
|
|
pass
|
|
|
|
self.tdb.Insert(tabname,
|
|
{'a': '42',
|
|
'b': "bad",
|
|
'c': "meep",
|
|
'e': 'Fuzzy wuzzy was a bear'})
|
|
self.tdb.Insert(tabname,
|
|
{'a': '581750',
|
|
'b': "good",
|
|
'd': "bla",
|
|
'c': "black",
|
|
'e': 'fuzzy was here'})
|
|
self.tdb.Insert(tabname,
|
|
{'a': '800000',
|
|
'b': "good",
|
|
'd': "bla",
|
|
'c': "black",
|
|
'e': 'Fuzzy wuzzy is a bear'})
|
|
|
|
if verbose:
|
|
self.tdb._db_print()
|
|
|
|
# this should return two rows
|
|
values = self.tdb.Select(tabname, ['b', 'a', 'd'],
|
|
conditions={'e': re.compile('wuzzy').search,
|
|
'a': re.compile('^[0-9]+$').match})
|
|
self.assertEqual(len(values), 2)
|
|
|
|
# now lets delete one of them and try again
|
|
self.tdb.Delete(tabname, conditions={'b': dbtables.ExactCond('good')})
|
|
values = self.tdb.Select(
|
|
tabname, ['a', 'd', 'b'],
|
|
conditions={'e': dbtables.PrefixCond('Fuzzy')})
|
|
self.assertEqual(len(values), 1)
|
|
self.assertEqual(values[0]['d'], None)
|
|
|
|
values = self.tdb.Select(tabname, ['b'],
|
|
conditions={'c': lambda c: c == 'meep'})
|
|
self.assertEqual(len(values), 1)
|
|
self.assertEqual(values[0]['b'], "bad")
|
|
|
|
|
|
def test04_MultiCondSelect(self):
|
|
tabname = "test04_MultiCondSelect"
|
|
try:
|
|
self.tdb.Drop(tabname)
|
|
except dbtables.TableDBError:
|
|
pass
|
|
self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e'])
|
|
|
|
try:
|
|
self.tdb.Insert(tabname,
|
|
{'a': "",
|
|
'e': pickle.dumps([{4:5, 6:7}, 'foo'], 1),
|
|
'f': "Zero"})
|
|
self.fail('Expected an exception')
|
|
except dbtables.TableDBError:
|
|
pass
|
|
|
|
self.tdb.Insert(tabname, {'a': "A", 'b': "B", 'c': "C", 'd': "D",
|
|
'e': "E"})
|
|
self.tdb.Insert(tabname, {'a': "-A", 'b': "-B", 'c': "-C", 'd': "-D",
|
|
'e': "-E"})
|
|
self.tdb.Insert(tabname, {'a': "A-", 'b': "B-", 'c': "C-", 'd': "D-",
|
|
'e': "E-"})
|
|
|
|
if verbose:
|
|
self.tdb._db_print()
|
|
|
|
# This select should return 0 rows. it is designed to test
|
|
# the bug identified and fixed in sourceforge bug # 590449
|
|
# (Big Thanks to "Rob Tillotson (n9mtb)" for tracking this down
|
|
# and supplying a fix!! This one caused many headaches to say
|
|
# the least...)
|
|
values = self.tdb.Select(tabname, ['b', 'a', 'd'],
|
|
conditions={'e': dbtables.ExactCond('E'),
|
|
'a': dbtables.ExactCond('A'),
|
|
'd': dbtables.PrefixCond('-')
|
|
} )
|
|
self.assertEqual(len(values), 0, values)
|
|
|
|
|
|
def test_CreateOrExtend(self):
|
|
tabname = "test_CreateOrExtend"
|
|
|
|
self.tdb.CreateOrExtendTable(
|
|
tabname, ['name', 'taste', 'filling', 'alcohol content', 'price'])
|
|
try:
|
|
self.tdb.Insert(tabname,
|
|
{'taste': 'crap',
|
|
'filling': 'no',
|
|
'is it Guinness?': 'no'})
|
|
self.fail("Insert should've failed due to bad column name")
|
|
except:
|
|
pass
|
|
self.tdb.CreateOrExtendTable(tabname,
|
|
['name', 'taste', 'is it Guinness?'])
|
|
|
|
# these should both succeed as the table should contain the union of both sets of columns.
|
|
self.tdb.Insert(tabname, {'taste': 'crap', 'filling': 'no',
|
|
'is it Guinness?': 'no'})
|
|
self.tdb.Insert(tabname, {'taste': 'great', 'filling': 'yes',
|
|
'is it Guinness?': 'yes',
|
|
'name': 'Guinness'})
|
|
|
|
|
|
def test_CondObjs(self):
|
|
tabname = "test_CondObjs"
|
|
|
|
self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e', 'p'])
|
|
|
|
self.tdb.Insert(tabname, {'a': "the letter A",
|
|
'b': "the letter B",
|
|
'c': "is for cookie"})
|
|
self.tdb.Insert(tabname, {'a': "is for aardvark",
|
|
'e': "the letter E",
|
|
'c': "is for cookie",
|
|
'd': "is for dog"})
|
|
self.tdb.Insert(tabname, {'a': "the letter A",
|
|
'e': "the letter E",
|
|
'c': "is for cookie",
|
|
'p': "is for Python"})
|
|
|
|
values = self.tdb.Select(
|
|
tabname, ['p', 'e'],
|
|
conditions={'e': dbtables.PrefixCond('the l')})
|
|
self.assertEqual(len(values), 2, values)
|
|
self.assertEqual(values[0]['e'], values[1]['e'], values)
|
|
self.assertNotEqual(values[0]['p'], values[1]['p'], values)
|
|
|
|
values = self.tdb.Select(
|
|
tabname, ['d', 'a'],
|
|
conditions={'a': dbtables.LikeCond('%aardvark%')})
|
|
self.assertEqual(len(values), 1, values)
|
|
self.assertEqual(values[0]['d'], "is for dog", values)
|
|
self.assertEqual(values[0]['a'], "is for aardvark", values)
|
|
|
|
values = self.tdb.Select(tabname, None,
|
|
{'b': dbtables.Cond(),
|
|
'e':dbtables.LikeCond('%letter%'),
|
|
'a':dbtables.PrefixCond('is'),
|
|
'd':dbtables.ExactCond('is for dog'),
|
|
'c':dbtables.PrefixCond('is for'),
|
|
'p':lambda s: not s})
|
|
self.assertEqual(len(values), 1, values)
|
|
self.assertEqual(values[0]['d'], "is for dog", values)
|
|
self.assertEqual(values[0]['a'], "is for aardvark", values)
|
|
|
|
def test_Delete(self):
|
|
tabname = "test_Delete"
|
|
self.tdb.CreateTable(tabname, ['x', 'y', 'z'])
|
|
|
|
# prior to 2001-05-09 there was a bug where Delete() would
|
|
# fail if it encountered any rows that did not have values in
|
|
# every column.
|
|
# Hunted and Squashed by <Donwulff> (Jukka Santala - donwulff@nic.fi)
|
|
self.tdb.Insert(tabname, {'x': 'X1', 'y':'Y1'})
|
|
self.tdb.Insert(tabname, {'x': 'X2', 'y':'Y2', 'z': 'Z2'})
|
|
|
|
self.tdb.Delete(tabname, conditions={'x': dbtables.PrefixCond('X')})
|
|
values = self.tdb.Select(tabname, ['y'],
|
|
conditions={'x': dbtables.PrefixCond('X')})
|
|
self.assertEqual(len(values), 0)
|
|
|
|
def test_Modify(self):
|
|
tabname = "test_Modify"
|
|
self.tdb.CreateTable(tabname, ['Name', 'Type', 'Access'])
|
|
|
|
self.tdb.Insert(tabname, {'Name': 'Index to MP3 files.doc',
|
|
'Type': 'Word', 'Access': '8'})
|
|
self.tdb.Insert(tabname, {'Name': 'Nifty.MP3', 'Access': '1'})
|
|
self.tdb.Insert(tabname, {'Type': 'Unknown', 'Access': '0'})
|
|
|
|
def set_type(type):
|
|
if type == None:
|
|
return 'MP3'
|
|
return type
|
|
|
|
def increment_access(count):
|
|
return str(int(count)+1)
|
|
|
|
def remove_value(value):
|
|
return None
|
|
|
|
self.tdb.Modify(tabname,
|
|
conditions={'Access': dbtables.ExactCond('0')},
|
|
mappings={'Access': remove_value})
|
|
self.tdb.Modify(tabname,
|
|
conditions={'Name': dbtables.LikeCond('%MP3%')},
|
|
mappings={'Type': set_type})
|
|
self.tdb.Modify(tabname,
|
|
conditions={'Name': dbtables.LikeCond('%')},
|
|
mappings={'Access': increment_access})
|
|
|
|
try:
|
|
self.tdb.Modify(tabname,
|
|
conditions={'Name': dbtables.LikeCond('%')},
|
|
mappings={'Access': 'What is your quest?'})
|
|
except TypeError:
|
|
# success, the string value in mappings isn't callable
|
|
pass
|
|
else:
|
|
raise RuntimeError, "why was TypeError not raised for bad callable?"
|
|
|
|
# Delete key in select conditions
|
|
values = self.tdb.Select(
|
|
tabname, None,
|
|
conditions={'Type': dbtables.ExactCond('Unknown')})
|
|
self.assertEqual(len(values), 1, values)
|
|
self.assertEqual(values[0]['Name'], None, values)
|
|
self.assertEqual(values[0]['Access'], None, values)
|
|
|
|
# Modify value by select conditions
|
|
values = self.tdb.Select(
|
|
tabname, None,
|
|
conditions={'Name': dbtables.ExactCond('Nifty.MP3')})
|
|
self.assertEqual(len(values), 1, values)
|
|
self.assertEqual(values[0]['Type'], "MP3", values)
|
|
self.assertEqual(values[0]['Access'], "2", values)
|
|
|
|
# Make sure change applied only to select conditions
|
|
values = self.tdb.Select(
|
|
tabname, None, conditions={'Name': dbtables.LikeCond('%doc%')})
|
|
self.assertEqual(len(values), 1, values)
|
|
self.assertEqual(values[0]['Type'], "Word", values)
|
|
self.assertEqual(values[0]['Access'], "9", values)
|
|
|
|
|
|
def test_suite():
|
|
suite = unittest.TestSuite()
|
|
suite.addTest(unittest.makeSuite(TableDBTestCase))
|
|
return suite
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main(defaultTest='test_suite')
|