bpo-40823: Use loadTestsFromTestCase() iso. makeSuite() in sqlite3 tests (GH-20538)

This commit is contained in:
Erlend Egeberg Aasland 2021-01-07 01:05:07 +01:00 committed by GitHub
parent 1ab045933b
commit 849e339a92
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 345 additions and 320 deletions

View file

@ -162,7 +162,7 @@ def test_database_source_name(self):
def suite(): def suite():
return unittest.makeSuite(BackupTests) return unittest.TestLoader().loadTestsFromTestCase(BackupTests)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View file

@ -29,61 +29,61 @@
class ModuleTests(unittest.TestCase): class ModuleTests(unittest.TestCase):
def CheckAPILevel(self): def test_api_level(self):
self.assertEqual(sqlite.apilevel, "2.0", self.assertEqual(sqlite.apilevel, "2.0",
"apilevel is %s, should be 2.0" % sqlite.apilevel) "apilevel is %s, should be 2.0" % sqlite.apilevel)
def CheckThreadSafety(self): def test_thread_safety(self):
self.assertEqual(sqlite.threadsafety, 1, self.assertEqual(sqlite.threadsafety, 1,
"threadsafety is %d, should be 1" % sqlite.threadsafety) "threadsafety is %d, should be 1" % sqlite.threadsafety)
def CheckParamStyle(self): def test_param_style(self):
self.assertEqual(sqlite.paramstyle, "qmark", self.assertEqual(sqlite.paramstyle, "qmark",
"paramstyle is '%s', should be 'qmark'" % "paramstyle is '%s', should be 'qmark'" %
sqlite.paramstyle) sqlite.paramstyle)
def CheckWarning(self): def test_warning(self):
self.assertTrue(issubclass(sqlite.Warning, Exception), self.assertTrue(issubclass(sqlite.Warning, Exception),
"Warning is not a subclass of Exception") "Warning is not a subclass of Exception")
def CheckError(self): def test_error(self):
self.assertTrue(issubclass(sqlite.Error, Exception), self.assertTrue(issubclass(sqlite.Error, Exception),
"Error is not a subclass of Exception") "Error is not a subclass of Exception")
def CheckInterfaceError(self): def test_interface_error(self):
self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error), self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error),
"InterfaceError is not a subclass of Error") "InterfaceError is not a subclass of Error")
def CheckDatabaseError(self): def test_database_error(self):
self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error), self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error),
"DatabaseError is not a subclass of Error") "DatabaseError is not a subclass of Error")
def CheckDataError(self): def test_data_error(self):
self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError), self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError),
"DataError is not a subclass of DatabaseError") "DataError is not a subclass of DatabaseError")
def CheckOperationalError(self): def test_operational_error(self):
self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError), self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError),
"OperationalError is not a subclass of DatabaseError") "OperationalError is not a subclass of DatabaseError")
def CheckIntegrityError(self): def test_integrity_error(self):
self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError), self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError),
"IntegrityError is not a subclass of DatabaseError") "IntegrityError is not a subclass of DatabaseError")
def CheckInternalError(self): def test_internal_error(self):
self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError), self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError),
"InternalError is not a subclass of DatabaseError") "InternalError is not a subclass of DatabaseError")
def CheckProgrammingError(self): def test_programming_error(self):
self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError), self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError),
"ProgrammingError is not a subclass of DatabaseError") "ProgrammingError is not a subclass of DatabaseError")
def CheckNotSupportedError(self): def test_not_supported_error(self):
self.assertTrue(issubclass(sqlite.NotSupportedError, self.assertTrue(issubclass(sqlite.NotSupportedError,
sqlite.DatabaseError), sqlite.DatabaseError),
"NotSupportedError is not a subclass of DatabaseError") "NotSupportedError is not a subclass of DatabaseError")
def CheckSharedCacheDeprecated(self): def test_shared_cache_deprecated(self):
for enable in (True, False): for enable in (True, False):
with self.assertWarns(DeprecationWarning) as cm: with self.assertWarns(DeprecationWarning) as cm:
sqlite.enable_shared_cache(enable) sqlite.enable_shared_cache(enable)
@ -101,38 +101,38 @@ def setUp(self):
def tearDown(self): def tearDown(self):
self.cx.close() self.cx.close()
def CheckCommit(self): def test_commit(self):
self.cx.commit() self.cx.commit()
def CheckCommitAfterNoChanges(self): def test_commit_after_no_changes(self):
""" """
A commit should also work when no changes were made to the database. A commit should also work when no changes were made to the database.
""" """
self.cx.commit() self.cx.commit()
self.cx.commit() self.cx.commit()
def CheckRollback(self): def test_rollback(self):
self.cx.rollback() self.cx.rollback()
def CheckRollbackAfterNoChanges(self): def test_rollback_after_no_changes(self):
""" """
A rollback should also work when no changes were made to the database. A rollback should also work when no changes were made to the database.
""" """
self.cx.rollback() self.cx.rollback()
self.cx.rollback() self.cx.rollback()
def CheckCursor(self): def test_cursor(self):
cu = self.cx.cursor() cu = self.cx.cursor()
def CheckFailedOpen(self): def test_failed_open(self):
YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db" YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db"
with self.assertRaises(sqlite.OperationalError): with self.assertRaises(sqlite.OperationalError):
con = sqlite.connect(YOU_CANNOT_OPEN_THIS) con = sqlite.connect(YOU_CANNOT_OPEN_THIS)
def CheckClose(self): def test_close(self):
self.cx.close() self.cx.close()
def CheckExceptions(self): def test_exceptions(self):
# Optional DB-API extension. # Optional DB-API extension.
self.assertEqual(self.cx.Warning, sqlite.Warning) self.assertEqual(self.cx.Warning, sqlite.Warning)
self.assertEqual(self.cx.Error, sqlite.Error) self.assertEqual(self.cx.Error, sqlite.Error)
@ -145,7 +145,7 @@ def CheckExceptions(self):
self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError) self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError)
self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError) self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError)
def CheckInTransaction(self): def test_in_transaction(self):
# Can't use db from setUp because we want to test initial state. # Can't use db from setUp because we want to test initial state.
cx = sqlite.connect(":memory:") cx = sqlite.connect(":memory:")
cu = cx.cursor() cu = cx.cursor()
@ -163,11 +163,11 @@ def CheckInTransaction(self):
row = cu.fetchone() row = cu.fetchone()
self.assertEqual(cx.in_transaction, False) self.assertEqual(cx.in_transaction, False)
def CheckInTransactionRO(self): def test_in_transaction_ro(self):
with self.assertRaises(AttributeError): with self.assertRaises(AttributeError):
self.cx.in_transaction = True self.cx.in_transaction = True
def CheckOpenWithPathLikeObject(self): def test_open_with_path_like_object(self):
""" Checks that we can successfully connect to a database using an object that """ Checks that we can successfully connect to a database using an object that
is PathLike, i.e. has __fspath__(). """ is PathLike, i.e. has __fspath__(). """
self.addCleanup(unlink, TESTFN) self.addCleanup(unlink, TESTFN)
@ -178,7 +178,7 @@ def __fspath__(self):
with sqlite.connect(path) as cx: with sqlite.connect(path) as cx:
cx.execute('create table test(id integer)') cx.execute('create table test(id integer)')
def CheckOpenUri(self): def test_open_uri(self):
self.addCleanup(unlink, TESTFN) self.addCleanup(unlink, TESTFN)
with sqlite.connect(TESTFN) as cx: with sqlite.connect(TESTFN) as cx:
cx.execute('create table test(id integer)') cx.execute('create table test(id integer)')
@ -203,21 +203,21 @@ def tearDown(self):
self.cu.close() self.cu.close()
self.cx.close() self.cx.close()
def CheckExecuteNoArgs(self): def test_execute_no_args(self):
self.cu.execute("delete from test") self.cu.execute("delete from test")
def CheckExecuteIllegalSql(self): def test_execute_illegal_sql(self):
with self.assertRaises(sqlite.OperationalError): with self.assertRaises(sqlite.OperationalError):
self.cu.execute("select asdf") self.cu.execute("select asdf")
def CheckExecuteTooMuchSql(self): def test_execute_too_much_sql(self):
with self.assertRaises(sqlite.Warning): with self.assertRaises(sqlite.Warning):
self.cu.execute("select 5+4; select 4+5") self.cu.execute("select 5+4; select 4+5")
def CheckExecuteTooMuchSql2(self): def test_execute_too_much_sql2(self):
self.cu.execute("select 5+4; -- foo bar") self.cu.execute("select 5+4; -- foo bar")
def CheckExecuteTooMuchSql3(self): def test_execute_too_much_sql3(self):
self.cu.execute(""" self.cu.execute("""
select 5+4; select 5+4;
@ -226,53 +226,53 @@ def CheckExecuteTooMuchSql3(self):
*/ */
""") """)
def CheckExecuteWrongSqlArg(self): def test_execute_wrong_sql_arg(self):
with self.assertRaises(TypeError): with self.assertRaises(TypeError):
self.cu.execute(42) self.cu.execute(42)
def CheckExecuteArgInt(self): def test_execute_arg_int(self):
self.cu.execute("insert into test(id) values (?)", (42,)) self.cu.execute("insert into test(id) values (?)", (42,))
def CheckExecuteArgFloat(self): def test_execute_arg_float(self):
self.cu.execute("insert into test(income) values (?)", (2500.32,)) self.cu.execute("insert into test(income) values (?)", (2500.32,))
def CheckExecuteArgString(self): def test_execute_arg_string(self):
self.cu.execute("insert into test(name) values (?)", ("Hugo",)) self.cu.execute("insert into test(name) values (?)", ("Hugo",))
def CheckExecuteArgStringWithZeroByte(self): def test_execute_arg_string_with_zero_byte(self):
self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",)) self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",))
self.cu.execute("select name from test where id=?", (self.cu.lastrowid,)) self.cu.execute("select name from test where id=?", (self.cu.lastrowid,))
row = self.cu.fetchone() row = self.cu.fetchone()
self.assertEqual(row[0], "Hu\x00go") self.assertEqual(row[0], "Hu\x00go")
def CheckExecuteNonIterable(self): def test_execute_non_iterable(self):
with self.assertRaises(ValueError) as cm: with self.assertRaises(ValueError) as cm:
self.cu.execute("insert into test(id) values (?)", 42) self.cu.execute("insert into test(id) values (?)", 42)
self.assertEqual(str(cm.exception), 'parameters are of unsupported type') self.assertEqual(str(cm.exception), 'parameters are of unsupported type')
def CheckExecuteWrongNoOfArgs1(self): def test_execute_wrong_no_of_args1(self):
# too many parameters # too many parameters
with self.assertRaises(sqlite.ProgrammingError): with self.assertRaises(sqlite.ProgrammingError):
self.cu.execute("insert into test(id) values (?)", (17, "Egon")) self.cu.execute("insert into test(id) values (?)", (17, "Egon"))
def CheckExecuteWrongNoOfArgs2(self): def test_execute_wrong_no_of_args2(self):
# too little parameters # too little parameters
with self.assertRaises(sqlite.ProgrammingError): with self.assertRaises(sqlite.ProgrammingError):
self.cu.execute("insert into test(id) values (?)") self.cu.execute("insert into test(id) values (?)")
def CheckExecuteWrongNoOfArgs3(self): def test_execute_wrong_no_of_args3(self):
# no parameters, parameters are needed # no parameters, parameters are needed
with self.assertRaises(sqlite.ProgrammingError): with self.assertRaises(sqlite.ProgrammingError):
self.cu.execute("insert into test(id) values (?)") self.cu.execute("insert into test(id) values (?)")
def CheckExecuteParamList(self): def test_execute_param_list(self):
self.cu.execute("insert into test(name) values ('foo')") self.cu.execute("insert into test(name) values ('foo')")
self.cu.execute("select name from test where name=?", ["foo"]) self.cu.execute("select name from test where name=?", ["foo"])
row = self.cu.fetchone() row = self.cu.fetchone()
self.assertEqual(row[0], "foo") self.assertEqual(row[0], "foo")
def CheckExecuteParamSequence(self): def test_execute_param_sequence(self):
class L: class L:
def __len__(self): def __len__(self):
return 1 return 1
@ -285,7 +285,7 @@ def __getitem__(self, x):
row = self.cu.fetchone() row = self.cu.fetchone()
self.assertEqual(row[0], "foo") self.assertEqual(row[0], "foo")
def CheckExecuteParamSequenceBadLen(self): def test_execute_param_sequence_bad_len(self):
# Issue41662: Error in __len__() was overridden with ProgrammingError. # Issue41662: Error in __len__() was overridden with ProgrammingError.
class L: class L:
def __len__(self): def __len__(self):
@ -297,13 +297,13 @@ def __getitem__(slf, x):
with self.assertRaises(ZeroDivisionError): with self.assertRaises(ZeroDivisionError):
self.cu.execute("select name from test where name=?", L()) self.cu.execute("select name from test where name=?", L())
def CheckExecuteDictMapping(self): def test_execute_dict_mapping(self):
self.cu.execute("insert into test(name) values ('foo')") self.cu.execute("insert into test(name) values ('foo')")
self.cu.execute("select name from test where name=:name", {"name": "foo"}) self.cu.execute("select name from test where name=:name", {"name": "foo"})
row = self.cu.fetchone() row = self.cu.fetchone()
self.assertEqual(row[0], "foo") self.assertEqual(row[0], "foo")
def CheckExecuteDictMapping_Mapping(self): def test_execute_dict_mapping_mapping(self):
class D(dict): class D(dict):
def __missing__(self, key): def __missing__(self, key):
return "foo" return "foo"
@ -313,32 +313,32 @@ def __missing__(self, key):
row = self.cu.fetchone() row = self.cu.fetchone()
self.assertEqual(row[0], "foo") self.assertEqual(row[0], "foo")
def CheckExecuteDictMappingTooLittleArgs(self): def test_execute_dict_mapping_too_little_args(self):
self.cu.execute("insert into test(name) values ('foo')") self.cu.execute("insert into test(name) values ('foo')")
with self.assertRaises(sqlite.ProgrammingError): with self.assertRaises(sqlite.ProgrammingError):
self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"}) self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"})
def CheckExecuteDictMappingNoArgs(self): def test_execute_dict_mapping_no_args(self):
self.cu.execute("insert into test(name) values ('foo')") self.cu.execute("insert into test(name) values ('foo')")
with self.assertRaises(sqlite.ProgrammingError): with self.assertRaises(sqlite.ProgrammingError):
self.cu.execute("select name from test where name=:name") self.cu.execute("select name from test where name=:name")
def CheckExecuteDictMappingUnnamed(self): def test_execute_dict_mapping_unnamed(self):
self.cu.execute("insert into test(name) values ('foo')") self.cu.execute("insert into test(name) values ('foo')")
with self.assertRaises(sqlite.ProgrammingError): with self.assertRaises(sqlite.ProgrammingError):
self.cu.execute("select name from test where name=?", {"name": "foo"}) self.cu.execute("select name from test where name=?", {"name": "foo"})
def CheckClose(self): def test_close(self):
self.cu.close() self.cu.close()
def CheckRowcountExecute(self): def test_rowcount_execute(self):
self.cu.execute("delete from test") self.cu.execute("delete from test")
self.cu.execute("insert into test(name) values ('foo')") self.cu.execute("insert into test(name) values ('foo')")
self.cu.execute("insert into test(name) values ('foo')") self.cu.execute("insert into test(name) values ('foo')")
self.cu.execute("update test set name='bar'") self.cu.execute("update test set name='bar'")
self.assertEqual(self.cu.rowcount, 2) self.assertEqual(self.cu.rowcount, 2)
def CheckRowcountSelect(self): def test_rowcount_select(self):
""" """
pysqlite does not know the rowcount of SELECT statements, because we pysqlite does not know the rowcount of SELECT statements, because we
don't fetch all rows after executing the select statement. The rowcount don't fetch all rows after executing the select statement. The rowcount
@ -347,12 +347,12 @@ def CheckRowcountSelect(self):
self.cu.execute("select 5 union select 6") self.cu.execute("select 5 union select 6")
self.assertEqual(self.cu.rowcount, -1) self.assertEqual(self.cu.rowcount, -1)
def CheckRowcountExecutemany(self): def test_rowcount_executemany(self):
self.cu.execute("delete from test") self.cu.execute("delete from test")
self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)]) self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)])
self.assertEqual(self.cu.rowcount, 3) self.assertEqual(self.cu.rowcount, 3)
def CheckTotalChanges(self): def test_total_changes(self):
self.cu.execute("insert into test(name) values ('foo')") self.cu.execute("insert into test(name) values ('foo')")
self.cu.execute("insert into test(name) values ('foo')") self.cu.execute("insert into test(name) values ('foo')")
self.assertLess(2, self.cx.total_changes, msg='total changes reported wrong value') self.assertLess(2, self.cx.total_changes, msg='total changes reported wrong value')
@ -361,10 +361,10 @@ def CheckTotalChanges(self):
# Sequences are required by the DB-API, iterators # Sequences are required by the DB-API, iterators
# enhancements in pysqlite. # enhancements in pysqlite.
def CheckExecuteManySequence(self): def test_execute_many_sequence(self):
self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)]) self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)])
def CheckExecuteManyIterator(self): def test_execute_many_iterator(self):
class MyIter: class MyIter:
def __init__(self): def __init__(self):
self.value = 5 self.value = 5
@ -378,26 +378,26 @@ def __next__(self):
self.cu.executemany("insert into test(income) values (?)", MyIter()) self.cu.executemany("insert into test(income) values (?)", MyIter())
def CheckExecuteManyGenerator(self): def test_execute_many_generator(self):
def mygen(): def mygen():
for i in range(5): for i in range(5):
yield (i,) yield (i,)
self.cu.executemany("insert into test(income) values (?)", mygen()) self.cu.executemany("insert into test(income) values (?)", mygen())
def CheckExecuteManyWrongSqlArg(self): def test_execute_many_wrong_sql_arg(self):
with self.assertRaises(TypeError): with self.assertRaises(TypeError):
self.cu.executemany(42, [(3,)]) self.cu.executemany(42, [(3,)])
def CheckExecuteManySelect(self): def test_execute_many_select(self):
with self.assertRaises(sqlite.ProgrammingError): with self.assertRaises(sqlite.ProgrammingError):
self.cu.executemany("select ?", [(3,)]) self.cu.executemany("select ?", [(3,)])
def CheckExecuteManyNotIterable(self): def test_execute_many_not_iterable(self):
with self.assertRaises(TypeError): with self.assertRaises(TypeError):
self.cu.executemany("insert into test(income) values (?)", 42) self.cu.executemany("insert into test(income) values (?)", 42)
def CheckFetchIter(self): def test_fetch_iter(self):
# Optional DB-API extension. # Optional DB-API extension.
self.cu.execute("delete from test") self.cu.execute("delete from test")
self.cu.execute("insert into test(id) values (?)", (5,)) self.cu.execute("insert into test(id) values (?)", (5,))
@ -409,19 +409,19 @@ def CheckFetchIter(self):
self.assertEqual(lst[0], 5) self.assertEqual(lst[0], 5)
self.assertEqual(lst[1], 6) self.assertEqual(lst[1], 6)
def CheckFetchone(self): def test_fetchone(self):
self.cu.execute("select name from test") self.cu.execute("select name from test")
row = self.cu.fetchone() row = self.cu.fetchone()
self.assertEqual(row[0], "foo") self.assertEqual(row[0], "foo")
row = self.cu.fetchone() row = self.cu.fetchone()
self.assertEqual(row, None) self.assertEqual(row, None)
def CheckFetchoneNoStatement(self): def test_fetchone_no_statement(self):
cur = self.cx.cursor() cur = self.cx.cursor()
row = cur.fetchone() row = cur.fetchone()
self.assertEqual(row, None) self.assertEqual(row, None)
def CheckArraySize(self): def test_array_size(self):
# must default ot 1 # must default ot 1
self.assertEqual(self.cu.arraysize, 1) self.assertEqual(self.cu.arraysize, 1)
@ -438,51 +438,51 @@ def CheckArraySize(self):
self.assertEqual(len(res), 2) self.assertEqual(len(res), 2)
def CheckFetchmany(self): def test_fetchmany(self):
self.cu.execute("select name from test") self.cu.execute("select name from test")
res = self.cu.fetchmany(100) res = self.cu.fetchmany(100)
self.assertEqual(len(res), 1) self.assertEqual(len(res), 1)
res = self.cu.fetchmany(100) res = self.cu.fetchmany(100)
self.assertEqual(res, []) self.assertEqual(res, [])
def CheckFetchmanyKwArg(self): def test_fetchmany_kw_arg(self):
"""Checks if fetchmany works with keyword arguments""" """Checks if fetchmany works with keyword arguments"""
self.cu.execute("select name from test") self.cu.execute("select name from test")
res = self.cu.fetchmany(size=100) res = self.cu.fetchmany(size=100)
self.assertEqual(len(res), 1) self.assertEqual(len(res), 1)
def CheckFetchall(self): def test_fetchall(self):
self.cu.execute("select name from test") self.cu.execute("select name from test")
res = self.cu.fetchall() res = self.cu.fetchall()
self.assertEqual(len(res), 1) self.assertEqual(len(res), 1)
res = self.cu.fetchall() res = self.cu.fetchall()
self.assertEqual(res, []) self.assertEqual(res, [])
def CheckSetinputsizes(self): def test_setinputsizes(self):
self.cu.setinputsizes([3, 4, 5]) self.cu.setinputsizes([3, 4, 5])
def CheckSetoutputsize(self): def test_setoutputsize(self):
self.cu.setoutputsize(5, 0) self.cu.setoutputsize(5, 0)
def CheckSetoutputsizeNoColumn(self): def test_setoutputsize_no_column(self):
self.cu.setoutputsize(42) self.cu.setoutputsize(42)
def CheckCursorConnection(self): def test_cursor_connection(self):
# Optional DB-API extension. # Optional DB-API extension.
self.assertEqual(self.cu.connection, self.cx) self.assertEqual(self.cu.connection, self.cx)
def CheckWrongCursorCallable(self): def test_wrong_cursor_callable(self):
with self.assertRaises(TypeError): with self.assertRaises(TypeError):
def f(): pass def f(): pass
cur = self.cx.cursor(f) cur = self.cx.cursor(f)
def CheckCursorWrongClass(self): def test_cursor_wrong_class(self):
class Foo: pass class Foo: pass
foo = Foo() foo = Foo()
with self.assertRaises(TypeError): with self.assertRaises(TypeError):
cur = sqlite.Cursor(foo) cur = sqlite.Cursor(foo)
def CheckLastRowIDOnReplace(self): def test_last_row_id_on_replace(self):
""" """
INSERT OR REPLACE and REPLACE INTO should produce the same behavior. INSERT OR REPLACE and REPLACE INTO should produce the same behavior.
""" """
@ -492,7 +492,7 @@ def CheckLastRowIDOnReplace(self):
self.cu.execute(sql.format(statement), (1, 'foo')) self.cu.execute(sql.format(statement), (1, 'foo'))
self.assertEqual(self.cu.lastrowid, 1) self.assertEqual(self.cu.lastrowid, 1)
def CheckLastRowIDOnIgnore(self): def test_last_row_id_on_ignore(self):
self.cu.execute( self.cu.execute(
"insert or ignore into test(unique_test) values (?)", "insert or ignore into test(unique_test) values (?)",
('test',)) ('test',))
@ -502,7 +502,7 @@ def CheckLastRowIDOnIgnore(self):
('test',)) ('test',))
self.assertEqual(self.cu.lastrowid, 2) self.assertEqual(self.cu.lastrowid, 2)
def CheckLastRowIDInsertOR(self): def test_last_row_id_insert_o_r(self):
results = [] results = []
for statement in ('FAIL', 'ABORT', 'ROLLBACK'): for statement in ('FAIL', 'ABORT', 'ROLLBACK'):
sql = 'INSERT OR {} INTO test(unique_test) VALUES (?)' sql = 'INSERT OR {} INTO test(unique_test) VALUES (?)'
@ -530,7 +530,7 @@ def tearDown(self):
self.cur.close() self.cur.close()
self.con.close() self.con.close()
def CheckConCursor(self): def test_con_cursor(self):
def run(con, errors): def run(con, errors):
try: try:
cur = con.cursor() cur = con.cursor()
@ -548,7 +548,7 @@ def run(con, errors):
if len(errors) > 0: if len(errors) > 0:
self.fail("\n".join(errors)) self.fail("\n".join(errors))
def CheckConCommit(self): def test_con_commit(self):
def run(con, errors): def run(con, errors):
try: try:
con.commit() con.commit()
@ -566,7 +566,7 @@ def run(con, errors):
if len(errors) > 0: if len(errors) > 0:
self.fail("\n".join(errors)) self.fail("\n".join(errors))
def CheckConRollback(self): def test_con_rollback(self):
def run(con, errors): def run(con, errors):
try: try:
con.rollback() con.rollback()
@ -584,7 +584,7 @@ def run(con, errors):
if len(errors) > 0: if len(errors) > 0:
self.fail("\n".join(errors)) self.fail("\n".join(errors))
def CheckConClose(self): def test_con_close(self):
def run(con, errors): def run(con, errors):
try: try:
con.close() con.close()
@ -602,7 +602,7 @@ def run(con, errors):
if len(errors) > 0: if len(errors) > 0:
self.fail("\n".join(errors)) self.fail("\n".join(errors))
def CheckCurImplicitBegin(self): def test_cur_implicit_begin(self):
def run(cur, errors): def run(cur, errors):
try: try:
cur.execute("insert into test(name) values ('a')") cur.execute("insert into test(name) values ('a')")
@ -620,7 +620,7 @@ def run(cur, errors):
if len(errors) > 0: if len(errors) > 0:
self.fail("\n".join(errors)) self.fail("\n".join(errors))
def CheckCurClose(self): def test_cur_close(self):
def run(cur, errors): def run(cur, errors):
try: try:
cur.close() cur.close()
@ -638,7 +638,7 @@ def run(cur, errors):
if len(errors) > 0: if len(errors) > 0:
self.fail("\n".join(errors)) self.fail("\n".join(errors))
def CheckCurExecute(self): def test_cur_execute(self):
def run(cur, errors): def run(cur, errors):
try: try:
cur.execute("select name from test") cur.execute("select name from test")
@ -657,7 +657,7 @@ def run(cur, errors):
if len(errors) > 0: if len(errors) > 0:
self.fail("\n".join(errors)) self.fail("\n".join(errors))
def CheckCurIterNext(self): def test_cur_iter_next(self):
def run(cur, errors): def run(cur, errors):
try: try:
row = cur.fetchone() row = cur.fetchone()
@ -678,29 +678,29 @@ def run(cur, errors):
self.fail("\n".join(errors)) self.fail("\n".join(errors))
class ConstructorTests(unittest.TestCase): class ConstructorTests(unittest.TestCase):
def CheckDate(self): def test_date(self):
d = sqlite.Date(2004, 10, 28) d = sqlite.Date(2004, 10, 28)
def CheckTime(self): def test_time(self):
t = sqlite.Time(12, 39, 35) t = sqlite.Time(12, 39, 35)
def CheckTimestamp(self): def test_timestamp(self):
ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35) ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35)
def CheckDateFromTicks(self): def test_date_from_ticks(self):
d = sqlite.DateFromTicks(42) d = sqlite.DateFromTicks(42)
def CheckTimeFromTicks(self): def test_time_from_ticks(self):
t = sqlite.TimeFromTicks(42) t = sqlite.TimeFromTicks(42)
def CheckTimestampFromTicks(self): def test_timestamp_from_ticks(self):
ts = sqlite.TimestampFromTicks(42) ts = sqlite.TimestampFromTicks(42)
def CheckBinary(self): def test_binary(self):
b = sqlite.Binary(b"\0'") b = sqlite.Binary(b"\0'")
class ExtensionTests(unittest.TestCase): class ExtensionTests(unittest.TestCase):
def CheckScriptStringSql(self): def test_script_string_sql(self):
con = sqlite.connect(":memory:") con = sqlite.connect(":memory:")
cur = con.cursor() cur = con.cursor()
cur.executescript(""" cur.executescript("""
@ -713,31 +713,31 @@ def CheckScriptStringSql(self):
res = cur.fetchone()[0] res = cur.fetchone()[0]
self.assertEqual(res, 5) self.assertEqual(res, 5)
def CheckScriptSyntaxError(self): def test_script_syntax_error(self):
con = sqlite.connect(":memory:") con = sqlite.connect(":memory:")
cur = con.cursor() cur = con.cursor()
with self.assertRaises(sqlite.OperationalError): with self.assertRaises(sqlite.OperationalError):
cur.executescript("create table test(x); asdf; create table test2(x)") cur.executescript("create table test(x); asdf; create table test2(x)")
def CheckScriptErrorNormal(self): def test_script_error_normal(self):
con = sqlite.connect(":memory:") con = sqlite.connect(":memory:")
cur = con.cursor() cur = con.cursor()
with self.assertRaises(sqlite.OperationalError): with self.assertRaises(sqlite.OperationalError):
cur.executescript("create table test(sadfsadfdsa); select foo from hurz;") cur.executescript("create table test(sadfsadfdsa); select foo from hurz;")
def CheckCursorExecutescriptAsBytes(self): def test_cursor_executescript_as_bytes(self):
con = sqlite.connect(":memory:") con = sqlite.connect(":memory:")
cur = con.cursor() cur = con.cursor()
with self.assertRaises(ValueError) as cm: with self.assertRaises(ValueError) as cm:
cur.executescript(b"create table test(foo); insert into test(foo) values (5);") cur.executescript(b"create table test(foo); insert into test(foo) values (5);")
self.assertEqual(str(cm.exception), 'script argument must be unicode.') self.assertEqual(str(cm.exception), 'script argument must be unicode.')
def CheckConnectionExecute(self): def test_connection_execute(self):
con = sqlite.connect(":memory:") con = sqlite.connect(":memory:")
result = con.execute("select 5").fetchone()[0] result = con.execute("select 5").fetchone()[0]
self.assertEqual(result, 5, "Basic test of Connection.execute") self.assertEqual(result, 5, "Basic test of Connection.execute")
def CheckConnectionExecutemany(self): def test_connection_executemany(self):
con = sqlite.connect(":memory:") con = sqlite.connect(":memory:")
con.execute("create table test(foo)") con.execute("create table test(foo)")
con.executemany("insert into test(foo) values (?)", [(3,), (4,)]) con.executemany("insert into test(foo) values (?)", [(3,), (4,)])
@ -745,46 +745,46 @@ def CheckConnectionExecutemany(self):
self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany") self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany")
self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany") self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany")
def CheckConnectionExecutescript(self): def test_connection_executescript(self):
con = sqlite.connect(":memory:") con = sqlite.connect(":memory:")
con.executescript("create table test(foo); insert into test(foo) values (5);") con.executescript("create table test(foo); insert into test(foo) values (5);")
result = con.execute("select foo from test").fetchone()[0] result = con.execute("select foo from test").fetchone()[0]
self.assertEqual(result, 5, "Basic test of Connection.executescript") self.assertEqual(result, 5, "Basic test of Connection.executescript")
class ClosedConTests(unittest.TestCase): class ClosedConTests(unittest.TestCase):
def CheckClosedConCursor(self): def test_closed_con_cursor(self):
con = sqlite.connect(":memory:") con = sqlite.connect(":memory:")
con.close() con.close()
with self.assertRaises(sqlite.ProgrammingError): with self.assertRaises(sqlite.ProgrammingError):
cur = con.cursor() cur = con.cursor()
def CheckClosedConCommit(self): def test_closed_con_commit(self):
con = sqlite.connect(":memory:") con = sqlite.connect(":memory:")
con.close() con.close()
with self.assertRaises(sqlite.ProgrammingError): with self.assertRaises(sqlite.ProgrammingError):
con.commit() con.commit()
def CheckClosedConRollback(self): def test_closed_con_rollback(self):
con = sqlite.connect(":memory:") con = sqlite.connect(":memory:")
con.close() con.close()
with self.assertRaises(sqlite.ProgrammingError): with self.assertRaises(sqlite.ProgrammingError):
con.rollback() con.rollback()
def CheckClosedCurExecute(self): def test_closed_cur_execute(self):
con = sqlite.connect(":memory:") con = sqlite.connect(":memory:")
cur = con.cursor() cur = con.cursor()
con.close() con.close()
with self.assertRaises(sqlite.ProgrammingError): with self.assertRaises(sqlite.ProgrammingError):
cur.execute("select 4") cur.execute("select 4")
def CheckClosedCreateFunction(self): def test_closed_create_function(self):
con = sqlite.connect(":memory:") con = sqlite.connect(":memory:")
con.close() con.close()
def f(x): return 17 def f(x): return 17
with self.assertRaises(sqlite.ProgrammingError): with self.assertRaises(sqlite.ProgrammingError):
con.create_function("foo", 1, f) con.create_function("foo", 1, f)
def CheckClosedCreateAggregate(self): def test_closed_create_aggregate(self):
con = sqlite.connect(":memory:") con = sqlite.connect(":memory:")
con.close() con.close()
class Agg: class Agg:
@ -797,7 +797,7 @@ def finalize(self):
with self.assertRaises(sqlite.ProgrammingError): with self.assertRaises(sqlite.ProgrammingError):
con.create_aggregate("foo", 1, Agg) con.create_aggregate("foo", 1, Agg)
def CheckClosedSetAuthorizer(self): def test_closed_set_authorizer(self):
con = sqlite.connect(":memory:") con = sqlite.connect(":memory:")
con.close() con.close()
def authorizer(*args): def authorizer(*args):
@ -805,21 +805,21 @@ def authorizer(*args):
with self.assertRaises(sqlite.ProgrammingError): with self.assertRaises(sqlite.ProgrammingError):
con.set_authorizer(authorizer) con.set_authorizer(authorizer)
def CheckClosedSetProgressCallback(self): def test_closed_set_progress_callback(self):
con = sqlite.connect(":memory:") con = sqlite.connect(":memory:")
con.close() con.close()
def progress(): pass def progress(): pass
with self.assertRaises(sqlite.ProgrammingError): with self.assertRaises(sqlite.ProgrammingError):
con.set_progress_handler(progress, 100) con.set_progress_handler(progress, 100)
def CheckClosedCall(self): def test_closed_call(self):
con = sqlite.connect(":memory:") con = sqlite.connect(":memory:")
con.close() con.close()
with self.assertRaises(sqlite.ProgrammingError): with self.assertRaises(sqlite.ProgrammingError):
con() con()
class ClosedCurTests(unittest.TestCase): class ClosedCurTests(unittest.TestCase):
def CheckClosed(self): def test_closed(self):
con = sqlite.connect(":memory:") con = sqlite.connect(":memory:")
cur = con.cursor() cur = con.cursor()
cur.close() cur.close()
@ -857,7 +857,7 @@ def tearDown(self):
self.cu.close() self.cu.close()
self.cx.close() self.cx.close()
def CheckOnConflictRollbackWithExplicitTransaction(self): def test_on_conflict_rollback_with_explicit_transaction(self):
self.cx.isolation_level = None # autocommit mode self.cx.isolation_level = None # autocommit mode
self.cu = self.cx.cursor() self.cu = self.cx.cursor()
# Start an explicit transaction. # Start an explicit transaction.
@ -872,7 +872,7 @@ def CheckOnConflictRollbackWithExplicitTransaction(self):
# Transaction should have rolled back and nothing should be in table. # Transaction should have rolled back and nothing should be in table.
self.assertEqual(self.cu.fetchall(), []) self.assertEqual(self.cu.fetchall(), [])
def CheckOnConflictAbortRaisesWithExplicitTransactions(self): def test_on_conflict_abort_raises_with_explicit_transactions(self):
# Abort cancels the current sql statement but doesn't change anything # Abort cancels the current sql statement but doesn't change anything
# about the current transaction. # about the current transaction.
self.cx.isolation_level = None # autocommit mode self.cx.isolation_level = None # autocommit mode
@ -888,7 +888,7 @@ def CheckOnConflictAbortRaisesWithExplicitTransactions(self):
# Expect the first two inserts to work, third to do nothing. # Expect the first two inserts to work, third to do nothing.
self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)]) self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)])
def CheckOnConflictRollbackWithoutTransaction(self): def test_on_conflict_rollback_without_transaction(self):
# Start of implicit transaction # Start of implicit transaction
self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
@ -898,7 +898,7 @@ def CheckOnConflictRollbackWithoutTransaction(self):
# Implicit transaction is rolled back on error. # Implicit transaction is rolled back on error.
self.assertEqual(self.cu.fetchall(), []) self.assertEqual(self.cu.fetchall(), [])
def CheckOnConflictAbortRaisesWithoutTransactions(self): def test_on_conflict_abort_raises_without_transactions(self):
# Abort cancels the current sql statement but doesn't change anything # Abort cancels the current sql statement but doesn't change anything
# about the current transaction. # about the current transaction.
self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
@ -909,20 +909,20 @@ def CheckOnConflictAbortRaisesWithoutTransactions(self):
self.cu.execute("SELECT name, unique_name FROM test") self.cu.execute("SELECT name, unique_name FROM test")
self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)]) self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)])
def CheckOnConflictFail(self): def test_on_conflict_fail(self):
self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')") self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')")
with self.assertRaises(sqlite.IntegrityError): with self.assertRaises(sqlite.IntegrityError):
self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')") self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')")
self.assertEqual(self.cu.fetchall(), []) self.assertEqual(self.cu.fetchall(), [])
def CheckOnConflictIgnore(self): def test_on_conflict_ignore(self):
self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')") self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')")
# Nothing should happen. # Nothing should happen.
self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')") self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')")
self.cu.execute("SELECT unique_name FROM test") self.cu.execute("SELECT unique_name FROM test")
self.assertEqual(self.cu.fetchall(), [('foo',)]) self.assertEqual(self.cu.fetchall(), [('foo',)])
def CheckOnConflictReplace(self): def test_on_conflict_replace(self):
self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Data!', 'foo')") self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Data!', 'foo')")
# There shouldn't be an IntegrityError exception. # There shouldn't be an IntegrityError exception.
self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Very different data!', 'foo')") self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Very different data!', 'foo')")
@ -931,20 +931,20 @@ def CheckOnConflictReplace(self):
def suite(): def suite():
module_suite = unittest.makeSuite(ModuleTests, "Check") tests = [
connection_suite = unittest.makeSuite(ConnectionTests, "Check") ClosedConTests,
cursor_suite = unittest.makeSuite(CursorTests, "Check") ClosedCurTests,
thread_suite = unittest.makeSuite(ThreadTests, "Check") ConnectionTests,
constructor_suite = unittest.makeSuite(ConstructorTests, "Check") ConstructorTests,
ext_suite = unittest.makeSuite(ExtensionTests, "Check") CursorTests,
closed_con_suite = unittest.makeSuite(ClosedConTests, "Check") ExtensionTests,
closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check") ModuleTests,
on_conflict_suite = unittest.makeSuite(SqliteOnConflictTests, "Check") SqliteOnConflictTests,
return unittest.TestSuite(( ThreadTests,
module_suite, connection_suite, cursor_suite, thread_suite, ]
constructor_suite, ext_suite, closed_con_suite, closed_cur_suite, return unittest.TestSuite(
on_conflict_suite, [unittest.TestLoader().loadTestsFromTestCase(t) for t in tests]
)) )
def test(): def test():
runner = unittest.TextTestRunner() runner = unittest.TextTestRunner()

View file

@ -11,7 +11,7 @@ def setUp(self):
def tearDown(self): def tearDown(self):
self.cx.close() self.cx.close()
def CheckTableDump(self): def test_table_dump(self):
expected_sqls = [ expected_sqls = [
"""CREATE TABLE "index"("index" blob);""" """CREATE TABLE "index"("index" blob);"""
, ,
@ -49,7 +49,7 @@ def CheckTableDump(self):
[self.assertEqual(expected_sqls[i], actual_sqls[i]) [self.assertEqual(expected_sqls[i], actual_sqls[i])
for i in range(len(expected_sqls))] for i in range(len(expected_sqls))]
def CheckUnorderableRow(self): def test_unorderable_row(self):
# iterdump() should be able to cope with unorderable row types (issue #15545) # iterdump() should be able to cope with unorderable row types (issue #15545)
class UnorderableRow: class UnorderableRow:
def __init__(self, cursor, row): def __init__(self, cursor, row):
@ -71,7 +71,12 @@ def __getitem__(self, index):
self.assertEqual(expected, got) self.assertEqual(expected, got)
def suite(): def suite():
return unittest.TestSuite(unittest.makeSuite(DumpTests, "Check")) tests = [
DumpTests,
]
return unittest.TestSuite(
[unittest.TestLoader().loadTestsFromTestCase(t) for t in tests]
)
def test(): def test():
runner = unittest.TextTestRunner() runner = unittest.TextTestRunner()

View file

@ -47,7 +47,7 @@ def setUp(self):
def tearDown(self): def tearDown(self):
self.con.close() self.con.close()
def CheckIsInstance(self): def test_is_instance(self):
self.assertIsInstance(self.con, MyConnection) self.assertIsInstance(self.con, MyConnection)
class CursorFactoryTests(unittest.TestCase): class CursorFactoryTests(unittest.TestCase):
@ -57,7 +57,7 @@ def setUp(self):
def tearDown(self): def tearDown(self):
self.con.close() self.con.close()
def CheckIsInstance(self): def test_is_instance(self):
cur = self.con.cursor() cur = self.con.cursor()
self.assertIsInstance(cur, sqlite.Cursor) self.assertIsInstance(cur, sqlite.Cursor)
cur = self.con.cursor(MyCursor) cur = self.con.cursor(MyCursor)
@ -65,7 +65,7 @@ def CheckIsInstance(self):
cur = self.con.cursor(factory=lambda con: MyCursor(con)) cur = self.con.cursor(factory=lambda con: MyCursor(con))
self.assertIsInstance(cur, MyCursor) self.assertIsInstance(cur, MyCursor)
def CheckInvalidFactory(self): def test_invalid_factory(self):
# not a callable at all # not a callable at all
self.assertRaises(TypeError, self.con.cursor, None) self.assertRaises(TypeError, self.con.cursor, None)
# invalid callable with not exact one argument # invalid callable with not exact one argument
@ -77,7 +77,7 @@ class RowFactoryTestsBackwardsCompat(unittest.TestCase):
def setUp(self): def setUp(self):
self.con = sqlite.connect(":memory:") self.con = sqlite.connect(":memory:")
def CheckIsProducedByFactory(self): def test_is_produced_by_factory(self):
cur = self.con.cursor(factory=MyCursor) cur = self.con.cursor(factory=MyCursor)
cur.execute("select 4+5 as foo") cur.execute("select 4+5 as foo")
row = cur.fetchone() row = cur.fetchone()
@ -91,12 +91,12 @@ class RowFactoryTests(unittest.TestCase):
def setUp(self): def setUp(self):
self.con = sqlite.connect(":memory:") self.con = sqlite.connect(":memory:")
def CheckCustomFactory(self): def test_custom_factory(self):
self.con.row_factory = lambda cur, row: list(row) self.con.row_factory = lambda cur, row: list(row)
row = self.con.execute("select 1, 2").fetchone() row = self.con.execute("select 1, 2").fetchone()
self.assertIsInstance(row, list) self.assertIsInstance(row, list)
def CheckSqliteRowIndex(self): def test_sqlite_row_index(self):
self.con.row_factory = sqlite.Row self.con.row_factory = sqlite.Row
row = self.con.execute("select 1 as a_1, 2 as b").fetchone() row = self.con.execute("select 1 as a_1, 2 as b").fetchone()
self.assertIsInstance(row, sqlite.Row) self.assertIsInstance(row, sqlite.Row)
@ -125,7 +125,7 @@ def CheckSqliteRowIndex(self):
with self.assertRaises(IndexError): with self.assertRaises(IndexError):
row[2**1000] row[2**1000]
def CheckSqliteRowIndexUnicode(self): def test_sqlite_row_index_unicode(self):
self.con.row_factory = sqlite.Row self.con.row_factory = sqlite.Row
row = self.con.execute("select 1 as \xff").fetchone() row = self.con.execute("select 1 as \xff").fetchone()
self.assertEqual(row["\xff"], 1) self.assertEqual(row["\xff"], 1)
@ -134,7 +134,7 @@ def CheckSqliteRowIndexUnicode(self):
with self.assertRaises(IndexError): with self.assertRaises(IndexError):
row['\xdf'] row['\xdf']
def CheckSqliteRowSlice(self): def test_sqlite_row_slice(self):
# A sqlite.Row can be sliced like a list. # A sqlite.Row can be sliced like a list.
self.con.row_factory = sqlite.Row self.con.row_factory = sqlite.Row
row = self.con.execute("select 1, 2, 3, 4").fetchone() row = self.con.execute("select 1, 2, 3, 4").fetchone()
@ -152,21 +152,21 @@ def CheckSqliteRowSlice(self):
self.assertEqual(row[0:4:2], (1, 3)) self.assertEqual(row[0:4:2], (1, 3))
self.assertEqual(row[3:0:-2], (4, 2)) self.assertEqual(row[3:0:-2], (4, 2))
def CheckSqliteRowIter(self): def test_sqlite_row_iter(self):
"""Checks if the row object is iterable""" """Checks if the row object is iterable"""
self.con.row_factory = sqlite.Row self.con.row_factory = sqlite.Row
row = self.con.execute("select 1 as a, 2 as b").fetchone() row = self.con.execute("select 1 as a, 2 as b").fetchone()
for col in row: for col in row:
pass pass
def CheckSqliteRowAsTuple(self): def test_sqlite_row_as_tuple(self):
"""Checks if the row object can be converted to a tuple""" """Checks if the row object can be converted to a tuple"""
self.con.row_factory = sqlite.Row self.con.row_factory = sqlite.Row
row = self.con.execute("select 1 as a, 2 as b").fetchone() row = self.con.execute("select 1 as a, 2 as b").fetchone()
t = tuple(row) t = tuple(row)
self.assertEqual(t, (row['a'], row['b'])) self.assertEqual(t, (row['a'], row['b']))
def CheckSqliteRowAsDict(self): def test_sqlite_row_as_dict(self):
"""Checks if the row object can be correctly converted to a dictionary""" """Checks if the row object can be correctly converted to a dictionary"""
self.con.row_factory = sqlite.Row self.con.row_factory = sqlite.Row
row = self.con.execute("select 1 as a, 2 as b").fetchone() row = self.con.execute("select 1 as a, 2 as b").fetchone()
@ -174,7 +174,7 @@ def CheckSqliteRowAsDict(self):
self.assertEqual(d["a"], row["a"]) self.assertEqual(d["a"], row["a"])
self.assertEqual(d["b"], row["b"]) self.assertEqual(d["b"], row["b"])
def CheckSqliteRowHashCmp(self): def test_sqlite_row_hash_cmp(self):
"""Checks if the row object compares and hashes correctly""" """Checks if the row object compares and hashes correctly"""
self.con.row_factory = sqlite.Row self.con.row_factory = sqlite.Row
row_1 = self.con.execute("select 1 as a, 2 as b").fetchone() row_1 = self.con.execute("select 1 as a, 2 as b").fetchone()
@ -208,7 +208,7 @@ def CheckSqliteRowHashCmp(self):
self.assertEqual(hash(row_1), hash(row_2)) self.assertEqual(hash(row_1), hash(row_2))
def CheckSqliteRowAsSequence(self): def test_sqlite_row_as_sequence(self):
""" Checks if the row object can act like a sequence """ """ Checks if the row object can act like a sequence """
self.con.row_factory = sqlite.Row self.con.row_factory = sqlite.Row
row = self.con.execute("select 1 as a, 2 as b").fetchone() row = self.con.execute("select 1 as a, 2 as b").fetchone()
@ -217,7 +217,7 @@ def CheckSqliteRowAsSequence(self):
self.assertEqual(list(reversed(row)), list(reversed(as_tuple))) self.assertEqual(list(reversed(row)), list(reversed(as_tuple)))
self.assertIsInstance(row, Sequence) self.assertIsInstance(row, Sequence)
def CheckFakeCursorClass(self): def test_fake_cursor_class(self):
# Issue #24257: Incorrect use of PyObject_IsInstance() caused # Issue #24257: Incorrect use of PyObject_IsInstance() caused
# segmentation fault. # segmentation fault.
# Issue #27861: Also applies for cursor factory. # Issue #27861: Also applies for cursor factory.
@ -234,26 +234,26 @@ class TextFactoryTests(unittest.TestCase):
def setUp(self): def setUp(self):
self.con = sqlite.connect(":memory:") self.con = sqlite.connect(":memory:")
def CheckUnicode(self): def test_unicode(self):
austria = "Österreich" austria = "Österreich"
row = self.con.execute("select ?", (austria,)).fetchone() row = self.con.execute("select ?", (austria,)).fetchone()
self.assertEqual(type(row[0]), str, "type of row[0] must be unicode") self.assertEqual(type(row[0]), str, "type of row[0] must be unicode")
def CheckString(self): def test_string(self):
self.con.text_factory = bytes self.con.text_factory = bytes
austria = "Österreich" austria = "Österreich"
row = self.con.execute("select ?", (austria,)).fetchone() row = self.con.execute("select ?", (austria,)).fetchone()
self.assertEqual(type(row[0]), bytes, "type of row[0] must be bytes") self.assertEqual(type(row[0]), bytes, "type of row[0] must be bytes")
self.assertEqual(row[0], austria.encode("utf-8"), "column must equal original data in UTF-8") self.assertEqual(row[0], austria.encode("utf-8"), "column must equal original data in UTF-8")
def CheckCustom(self): def test_custom(self):
self.con.text_factory = lambda x: str(x, "utf-8", "ignore") self.con.text_factory = lambda x: str(x, "utf-8", "ignore")
austria = "Österreich" austria = "Österreich"
row = self.con.execute("select ?", (austria,)).fetchone() row = self.con.execute("select ?", (austria,)).fetchone()
self.assertEqual(type(row[0]), str, "type of row[0] must be unicode") self.assertEqual(type(row[0]), str, "type of row[0] must be unicode")
self.assertTrue(row[0].endswith("reich"), "column must contain original data") self.assertTrue(row[0].endswith("reich"), "column must contain original data")
def CheckOptimizedUnicode(self): def test_optimized_unicode(self):
# OptimizedUnicode is deprecated as of Python 3.10 # OptimizedUnicode is deprecated as of Python 3.10
with self.assertWarns(DeprecationWarning) as cm: with self.assertWarns(DeprecationWarning) as cm:
self.con.text_factory = sqlite.OptimizedUnicode self.con.text_factory = sqlite.OptimizedUnicode
@ -274,25 +274,25 @@ def setUp(self):
self.con.execute("create table test (value text)") self.con.execute("create table test (value text)")
self.con.execute("insert into test (value) values (?)", ("a\x00b",)) self.con.execute("insert into test (value) values (?)", ("a\x00b",))
def CheckString(self): def test_string(self):
# text_factory defaults to str # text_factory defaults to str
row = self.con.execute("select value from test").fetchone() row = self.con.execute("select value from test").fetchone()
self.assertIs(type(row[0]), str) self.assertIs(type(row[0]), str)
self.assertEqual(row[0], "a\x00b") self.assertEqual(row[0], "a\x00b")
def CheckBytes(self): def test_bytes(self):
self.con.text_factory = bytes self.con.text_factory = bytes
row = self.con.execute("select value from test").fetchone() row = self.con.execute("select value from test").fetchone()
self.assertIs(type(row[0]), bytes) self.assertIs(type(row[0]), bytes)
self.assertEqual(row[0], b"a\x00b") self.assertEqual(row[0], b"a\x00b")
def CheckBytearray(self): def test_bytearray(self):
self.con.text_factory = bytearray self.con.text_factory = bytearray
row = self.con.execute("select value from test").fetchone() row = self.con.execute("select value from test").fetchone()
self.assertIs(type(row[0]), bytearray) self.assertIs(type(row[0]), bytearray)
self.assertEqual(row[0], b"a\x00b") self.assertEqual(row[0], b"a\x00b")
def CheckCustom(self): def test_custom(self):
# A custom factory should receive a bytes argument # A custom factory should receive a bytes argument
self.con.text_factory = lambda x: x self.con.text_factory = lambda x: x
row = self.con.execute("select value from test").fetchone() row = self.con.execute("select value from test").fetchone()
@ -303,13 +303,17 @@ def tearDown(self):
self.con.close() self.con.close()
def suite(): def suite():
connection_suite = unittest.makeSuite(ConnectionFactoryTests, "Check") tests = [
cursor_suite = unittest.makeSuite(CursorFactoryTests, "Check") ConnectionFactoryTests,
row_suite_compat = unittest.makeSuite(RowFactoryTestsBackwardsCompat, "Check") CursorFactoryTests,
row_suite = unittest.makeSuite(RowFactoryTests, "Check") RowFactoryTests,
text_suite = unittest.makeSuite(TextFactoryTests, "Check") RowFactoryTestsBackwardsCompat,
text_zero_bytes_suite = unittest.makeSuite(TextFactoryTestsWithEmbeddedZeroBytes, "Check") TextFactoryTests,
return unittest.TestSuite((connection_suite, cursor_suite, row_suite_compat, row_suite, text_suite, text_zero_bytes_suite)) TextFactoryTestsWithEmbeddedZeroBytes,
]
return unittest.TestSuite(
[unittest.TestLoader().loadTestsFromTestCase(t) for t in tests]
)
def test(): def test():
runner = unittest.TextTestRunner() runner = unittest.TextTestRunner()

View file

@ -28,23 +28,23 @@
class CollationTests(unittest.TestCase): class CollationTests(unittest.TestCase):
def CheckCreateCollationNotString(self): def test_create_collation_not_string(self):
con = sqlite.connect(":memory:") con = sqlite.connect(":memory:")
with self.assertRaises(TypeError): with self.assertRaises(TypeError):
con.create_collation(None, lambda x, y: (x > y) - (x < y)) con.create_collation(None, lambda x, y: (x > y) - (x < y))
def CheckCreateCollationNotCallable(self): def test_create_collation_not_callable(self):
con = sqlite.connect(":memory:") con = sqlite.connect(":memory:")
with self.assertRaises(TypeError) as cm: with self.assertRaises(TypeError) as cm:
con.create_collation("X", 42) con.create_collation("X", 42)
self.assertEqual(str(cm.exception), 'parameter must be callable') self.assertEqual(str(cm.exception), 'parameter must be callable')
def CheckCreateCollationNotAscii(self): def test_create_collation_not_ascii(self):
con = sqlite.connect(":memory:") con = sqlite.connect(":memory:")
with self.assertRaises(sqlite.ProgrammingError): with self.assertRaises(sqlite.ProgrammingError):
con.create_collation("collä", lambda x, y: (x > y) - (x < y)) con.create_collation("collä", lambda x, y: (x > y) - (x < y))
def CheckCreateCollationBadUpper(self): def test_create_collation_bad_upper(self):
class BadUpperStr(str): class BadUpperStr(str):
def upper(self): def upper(self):
return None return None
@ -61,7 +61,7 @@ def upper(self):
self.assertEqual(result[0][0], 'b') self.assertEqual(result[0][0], 'b')
self.assertEqual(result[1][0], 'a') self.assertEqual(result[1][0], 'a')
def CheckCollationIsUsed(self): def test_collation_is_used(self):
def mycoll(x, y): def mycoll(x, y):
# reverse order # reverse order
return -((x > y) - (x < y)) return -((x > y) - (x < y))
@ -86,7 +86,7 @@ def mycoll(x, y):
result = con.execute(sql).fetchall() result = con.execute(sql).fetchall()
self.assertEqual(str(cm.exception), 'no such collation sequence: mycoll') self.assertEqual(str(cm.exception), 'no such collation sequence: mycoll')
def CheckCollationReturnsLargeInteger(self): def test_collation_returns_large_integer(self):
def mycoll(x, y): def mycoll(x, y):
# reverse order # reverse order
return -((x > y) - (x < y)) * 2**32 return -((x > y) - (x < y)) * 2**32
@ -105,7 +105,7 @@ def mycoll(x, y):
self.assertEqual(result, [('c',), ('b',), ('a',)], self.assertEqual(result, [('c',), ('b',), ('a',)],
msg="the expected order was not returned") msg="the expected order was not returned")
def CheckCollationRegisterTwice(self): def test_collation_register_twice(self):
""" """
Register two different collation functions under the same name. Register two different collation functions under the same name.
Verify that the last one is actually used. Verify that the last one is actually used.
@ -119,7 +119,7 @@ def CheckCollationRegisterTwice(self):
self.assertEqual(result[0][0], 'b') self.assertEqual(result[0][0], 'b')
self.assertEqual(result[1][0], 'a') self.assertEqual(result[1][0], 'a')
def CheckDeregisterCollation(self): def test_deregister_collation(self):
""" """
Register a collation, then deregister it. Make sure an error is raised if we try Register a collation, then deregister it. Make sure an error is raised if we try
to use it. to use it.
@ -132,7 +132,7 @@ def CheckDeregisterCollation(self):
self.assertEqual(str(cm.exception), 'no such collation sequence: mycoll') self.assertEqual(str(cm.exception), 'no such collation sequence: mycoll')
class ProgressTests(unittest.TestCase): class ProgressTests(unittest.TestCase):
def CheckProgressHandlerUsed(self): def test_progress_handler_used(self):
""" """
Test that the progress handler is invoked once it is set. Test that the progress handler is invoked once it is set.
""" """
@ -148,7 +148,7 @@ def progress():
self.assertTrue(progress_calls) self.assertTrue(progress_calls)
def CheckOpcodeCount(self): def test_opcode_count(self):
""" """
Test that the opcode argument is respected. Test that the opcode argument is respected.
""" """
@ -171,7 +171,7 @@ def progress():
second_count = len(progress_calls) second_count = len(progress_calls)
self.assertGreaterEqual(first_count, second_count) self.assertGreaterEqual(first_count, second_count)
def CheckCancelOperation(self): def test_cancel_operation(self):
""" """
Test that returning a non-zero value stops the operation in progress. Test that returning a non-zero value stops the operation in progress.
""" """
@ -185,7 +185,7 @@ def progress():
curs.execute, curs.execute,
"create table bar (a, b)") "create table bar (a, b)")
def CheckClearHandler(self): def test_clear_handler(self):
""" """
Test that setting the progress handler to None clears the previously set handler. Test that setting the progress handler to None clears the previously set handler.
""" """
@ -201,7 +201,7 @@ def progress():
self.assertEqual(action, 0, "progress handler was not cleared") self.assertEqual(action, 0, "progress handler was not cleared")
class TraceCallbackTests(unittest.TestCase): class TraceCallbackTests(unittest.TestCase):
def CheckTraceCallbackUsed(self): def test_trace_callback_used(self):
""" """
Test that the trace callback is invoked once it is set. Test that the trace callback is invoked once it is set.
""" """
@ -214,7 +214,7 @@ def trace(statement):
self.assertTrue(traced_statements) self.assertTrue(traced_statements)
self.assertTrue(any("create table foo" in stmt for stmt in traced_statements)) self.assertTrue(any("create table foo" in stmt for stmt in traced_statements))
def CheckClearTraceCallback(self): def test_clear_trace_callback(self):
""" """
Test that setting the trace callback to None clears the previously set callback. Test that setting the trace callback to None clears the previously set callback.
""" """
@ -227,7 +227,7 @@ def trace(statement):
con.execute("create table foo(a, b)") con.execute("create table foo(a, b)")
self.assertFalse(traced_statements, "trace callback was not cleared") self.assertFalse(traced_statements, "trace callback was not cleared")
def CheckUnicodeContent(self): def test_unicode_content(self):
""" """
Test that the statement can contain unicode literals. Test that the statement can contain unicode literals.
""" """
@ -244,7 +244,7 @@ def trace(statement):
"Unicode data %s garbled in trace callback: %s" "Unicode data %s garbled in trace callback: %s"
% (ascii(unicode_value), ', '.join(map(ascii, traced_statements)))) % (ascii(unicode_value), ', '.join(map(ascii, traced_statements))))
def CheckTraceCallbackContent(self): def test_trace_callback_content(self):
# set_trace_callback() shouldn't produce duplicate content (bpo-26187) # set_trace_callback() shouldn't produce duplicate content (bpo-26187)
traced_statements = [] traced_statements = []
def trace(statement): def trace(statement):
@ -264,10 +264,14 @@ def trace(statement):
def suite(): def suite():
collation_suite = unittest.makeSuite(CollationTests, "Check") tests = [
progress_suite = unittest.makeSuite(ProgressTests, "Check") CollationTests,
trace_suite = unittest.makeSuite(TraceCallbackTests, "Check") ProgressTests,
return unittest.TestSuite((collation_suite, progress_suite, trace_suite)) TraceCallbackTests,
]
return unittest.TestSuite(
[unittest.TestLoader().loadTestsFromTestCase(t) for t in tests]
)
def test(): def test():
runner = unittest.TextTestRunner() runner = unittest.TextTestRunner()

View file

@ -35,12 +35,12 @@ def setUp(self):
def tearDown(self): def tearDown(self):
self.con.close() self.con.close()
def CheckPragmaUserVersion(self): def test_pragma_user_version(self):
# This used to crash pysqlite because this pragma command returns NULL for the column name # This used to crash pysqlite because this pragma command returns NULL for the column name
cur = self.con.cursor() cur = self.con.cursor()
cur.execute("pragma user_version") cur.execute("pragma user_version")
def CheckPragmaSchemaVersion(self): def test_pragma_schema_version(self):
# This still crashed pysqlite <= 2.2.1 # This still crashed pysqlite <= 2.2.1
con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
try: try:
@ -50,7 +50,7 @@ def CheckPragmaSchemaVersion(self):
cur.close() cur.close()
con.close() con.close()
def CheckStatementReset(self): def test_statement_reset(self):
# pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
# reset before a rollback, but only those that are still in the # reset before a rollback, but only those that are still in the
# statement cache. The others are not accessible from the connection object. # statement cache. The others are not accessible from the connection object.
@ -65,7 +65,7 @@ def CheckStatementReset(self):
con.rollback() con.rollback()
def CheckColumnNameWithSpaces(self): def test_column_name_with_spaces(self):
cur = self.con.cursor() cur = self.con.cursor()
cur.execute('select 1 as "foo bar [datetime]"') cur.execute('select 1 as "foo bar [datetime]"')
self.assertEqual(cur.description[0][0], "foo bar [datetime]") self.assertEqual(cur.description[0][0], "foo bar [datetime]")
@ -73,7 +73,7 @@ def CheckColumnNameWithSpaces(self):
cur.execute('select 1 as "foo baz"') cur.execute('select 1 as "foo baz"')
self.assertEqual(cur.description[0][0], "foo baz") self.assertEqual(cur.description[0][0], "foo baz")
def CheckStatementFinalizationOnCloseDb(self): def test_statement_finalization_on_close_db(self):
# pysqlite versions <= 2.3.3 only finalized statements in the statement # pysqlite versions <= 2.3.3 only finalized statements in the statement
# cache when closing the database. statements that were still # cache when closing the database. statements that were still
# referenced in cursors weren't closed and could provoke " # referenced in cursors weren't closed and could provoke "
@ -87,7 +87,7 @@ def CheckStatementFinalizationOnCloseDb(self):
cur.execute("select 1 x union select " + str(i)) cur.execute("select 1 x union select " + str(i))
con.close() con.close()
def CheckOnConflictRollback(self): def test_on_conflict_rollback(self):
con = sqlite.connect(":memory:") con = sqlite.connect(":memory:")
con.execute("create table foo(x, unique(x) on conflict rollback)") con.execute("create table foo(x, unique(x) on conflict rollback)")
con.execute("insert into foo(x) values (1)") con.execute("insert into foo(x) values (1)")
@ -101,7 +101,7 @@ def CheckOnConflictRollback(self):
except sqlite.OperationalError: except sqlite.OperationalError:
self.fail("pysqlite knew nothing about the implicit ROLLBACK") self.fail("pysqlite knew nothing about the implicit ROLLBACK")
def CheckWorkaroundForBuggySqliteTransferBindings(self): def test_workaround_for_buggy_sqlite_transfer_bindings(self):
""" """
pysqlite would crash with older SQLite versions unless pysqlite would crash with older SQLite versions unless
a workaround is implemented. a workaround is implemented.
@ -110,14 +110,14 @@ def CheckWorkaroundForBuggySqliteTransferBindings(self):
self.con.execute("drop table foo") self.con.execute("drop table foo")
self.con.execute("create table foo(bar)") self.con.execute("create table foo(bar)")
def CheckEmptyStatement(self): def test_empty_statement(self):
""" """
pysqlite used to segfault with SQLite versions 3.5.x. These return NULL pysqlite used to segfault with SQLite versions 3.5.x. These return NULL
for "no-operation" statements for "no-operation" statements
""" """
self.con.execute("") self.con.execute("")
def CheckTypeMapUsage(self): def test_type_map_usage(self):
""" """
pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
a statement. This test exhibits the problem. a statement. This test exhibits the problem.
@ -132,7 +132,7 @@ def CheckTypeMapUsage(self):
con.execute("insert into foo(bar) values (5)") con.execute("insert into foo(bar) values (5)")
con.execute(SELECT) con.execute(SELECT)
def CheckBindMutatingList(self): def test_bind_mutating_list(self):
# Issue41662: Crash when mutate a list of parameters during iteration. # Issue41662: Crash when mutate a list of parameters during iteration.
class X: class X:
def __conform__(self, protocol): def __conform__(self, protocol):
@ -145,7 +145,7 @@ def __conform__(self, protocol):
with self.assertRaises(IndexError): with self.assertRaises(IndexError):
con.execute("insert into foo(bar, baz) values (?, ?)", parameters) con.execute("insert into foo(bar, baz) values (?, ?)", parameters)
def CheckErrorMsgDecodeError(self): def test_error_msg_decode_error(self):
# When porting the module to Python 3.0, the error message about # When porting the module to Python 3.0, the error message about
# decoding errors disappeared. This verifies they're back again. # decoding errors disappeared. This verifies they're back again.
with self.assertRaises(sqlite.OperationalError) as cm: with self.assertRaises(sqlite.OperationalError) as cm:
@ -154,13 +154,13 @@ def CheckErrorMsgDecodeError(self):
msg = "Could not decode to UTF-8 column 'colname' with text 'xxx" msg = "Could not decode to UTF-8 column 'colname' with text 'xxx"
self.assertIn(msg, str(cm.exception)) self.assertIn(msg, str(cm.exception))
def CheckRegisterAdapter(self): def test_register_adapter(self):
""" """
See issue 3312. See issue 3312.
""" """
self.assertRaises(TypeError, sqlite.register_adapter, {}, None) self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
def CheckSetIsolationLevel(self): def test_set_isolation_level(self):
# See issue 27881. # See issue 27881.
class CustomStr(str): class CustomStr(str):
def upper(self): def upper(self):
@ -190,7 +190,7 @@ def __del__(self):
con.isolation_level = value con.isolation_level = value
self.assertEqual(con.isolation_level, "DEFERRED") self.assertEqual(con.isolation_level, "DEFERRED")
def CheckCursorConstructorCallCheck(self): def test_cursor_constructor_call_check(self):
""" """
Verifies that cursor methods check whether base class __init__ was Verifies that cursor methods check whether base class __init__ was
called. called.
@ -207,14 +207,14 @@ def __init__(self, con):
r'^Base Cursor\.__init__ not called\.$'): r'^Base Cursor\.__init__ not called\.$'):
cur.close() cur.close()
def CheckStrSubclass(self): def test_str_subclass(self):
""" """
The Python 3.0 port of the module didn't cope with values of subclasses of str. The Python 3.0 port of the module didn't cope with values of subclasses of str.
""" """
class MyStr(str): pass class MyStr(str): pass
self.con.execute("select ?", (MyStr("abc"),)) self.con.execute("select ?", (MyStr("abc"),))
def CheckConnectionConstructorCallCheck(self): def test_connection_constructor_call_check(self):
""" """
Verifies that connection methods check whether base class __init__ was Verifies that connection methods check whether base class __init__ was
called. called.
@ -227,7 +227,7 @@ def __init__(self, name):
with self.assertRaises(sqlite.ProgrammingError): with self.assertRaises(sqlite.ProgrammingError):
cur = con.cursor() cur = con.cursor()
def CheckCursorRegistration(self): def test_cursor_registration(self):
""" """
Verifies that subclassed cursor classes are correctly registered with Verifies that subclassed cursor classes are correctly registered with
the connection object, too. (fetch-across-rollback problem) the connection object, too. (fetch-across-rollback problem)
@ -249,7 +249,7 @@ def __init__(self, con):
with self.assertRaises(sqlite.InterfaceError): with self.assertRaises(sqlite.InterfaceError):
cur.fetchall() cur.fetchall()
def CheckAutoCommit(self): def test_auto_commit(self):
""" """
Verifies that creating a connection in autocommit mode works. Verifies that creating a connection in autocommit mode works.
2.5.3 introduced a regression so that these could no longer 2.5.3 introduced a regression so that these could no longer
@ -257,7 +257,7 @@ def CheckAutoCommit(self):
""" """
con = sqlite.connect(":memory:", isolation_level=None) con = sqlite.connect(":memory:", isolation_level=None)
def CheckPragmaAutocommit(self): def test_pragma_autocommit(self):
""" """
Verifies that running a PRAGMA statement that does an autocommit does Verifies that running a PRAGMA statement that does an autocommit does
work. This did not work in 2.5.3/2.5.4. work. This did not work in 2.5.3/2.5.4.
@ -269,21 +269,21 @@ def CheckPragmaAutocommit(self):
cur.execute("pragma page_size") cur.execute("pragma page_size")
row = cur.fetchone() row = cur.fetchone()
def CheckConnectionCall(self): def test_connection_call(self):
""" """
Call a connection with a non-string SQL request: check error handling Call a connection with a non-string SQL request: check error handling
of the statement constructor. of the statement constructor.
""" """
self.assertRaises(TypeError, self.con, 1) self.assertRaises(TypeError, self.con, 1)
def CheckCollation(self): def test_collation(self):
def collation_cb(a, b): def collation_cb(a, b):
return 1 return 1
self.assertRaises(sqlite.ProgrammingError, self.con.create_collation, self.assertRaises(sqlite.ProgrammingError, self.con.create_collation,
# Lone surrogate cannot be encoded to the default encoding (utf8) # Lone surrogate cannot be encoded to the default encoding (utf8)
"\uDC80", collation_cb) "\uDC80", collation_cb)
def CheckRecursiveCursorUse(self): def test_recursive_cursor_use(self):
""" """
http://bugs.python.org/issue10811 http://bugs.python.org/issue10811
@ -304,7 +304,7 @@ def foo():
cur.executemany("insert into b (baz) values (?)", cur.executemany("insert into b (baz) values (?)",
((i,) for i in foo())) ((i,) for i in foo()))
def CheckConvertTimestampMicrosecondPadding(self): def test_convert_timestamp_microsecond_padding(self):
""" """
http://bugs.python.org/issue14720 http://bugs.python.org/issue14720
@ -330,13 +330,13 @@ def CheckConvertTimestampMicrosecondPadding(self):
datetime.datetime(2012, 4, 4, 15, 6, 0, 123456), datetime.datetime(2012, 4, 4, 15, 6, 0, 123456),
]) ])
def CheckInvalidIsolationLevelType(self): def test_invalid_isolation_level_type(self):
# isolation level is a string, not an integer # isolation level is a string, not an integer
self.assertRaises(TypeError, self.assertRaises(TypeError,
sqlite.connect, ":memory:", isolation_level=123) sqlite.connect, ":memory:", isolation_level=123)
def CheckNullCharacter(self): def test_null_character(self):
# Issue #21147 # Issue #21147
con = sqlite.connect(":memory:") con = sqlite.connect(":memory:")
self.assertRaises(ValueError, con, "\0select 1") self.assertRaises(ValueError, con, "\0select 1")
@ -345,7 +345,7 @@ def CheckNullCharacter(self):
self.assertRaises(ValueError, cur.execute, " \0select 2") self.assertRaises(ValueError, cur.execute, " \0select 2")
self.assertRaises(ValueError, cur.execute, "select 2\0") self.assertRaises(ValueError, cur.execute, "select 2\0")
def CheckCommitCursorReset(self): def test_commit_cursor_reset(self):
""" """
Connection.commit() did reset cursors, which made sqlite3 Connection.commit() did reset cursors, which made sqlite3
to return rows multiple times when fetched from cursors to return rows multiple times when fetched from cursors
@ -376,7 +376,7 @@ def CheckCommitCursorReset(self):
counter += 1 counter += 1
self.assertEqual(counter, 3, "should have returned exactly three rows") self.assertEqual(counter, 3, "should have returned exactly three rows")
def CheckBpo31770(self): def test_bpo31770(self):
""" """
The interpreter shouldn't crash in case Cursor.__init__() is called The interpreter shouldn't crash in case Cursor.__init__() is called
more than once. more than once.
@ -392,11 +392,11 @@ def callback(*args):
del ref del ref
support.gc_collect() support.gc_collect()
def CheckDelIsolation_levelSegfault(self): def test_del_isolation_level_segfault(self):
with self.assertRaises(AttributeError): with self.assertRaises(AttributeError):
del self.con.isolation_level del self.con.isolation_level
def CheckBpo37347(self): def test_bpo37347(self):
class Printer: class Printer:
def log(self, *args): def log(self, *args):
return sqlite.SQLITE_OK return sqlite.SQLITE_OK
@ -413,10 +413,12 @@ def log(self, *args):
def suite(): def suite():
regression_suite = unittest.makeSuite(RegressionTests, "Check") tests = [
return unittest.TestSuite(( RegressionTests
regression_suite, ]
)) return unittest.TestSuite(
[unittest.TestLoader().loadTestsFromTestCase(t) for t in tests]
)
def test(): def test():
runner = unittest.TextTestRunner() runner = unittest.TextTestRunner()

View file

@ -52,7 +52,7 @@ def tearDown(self):
except OSError: except OSError:
pass pass
def CheckDMLDoesNotAutoCommitBefore(self): def test_dml_does_not_auto_commit_before(self):
self.cur1.execute("create table test(i)") self.cur1.execute("create table test(i)")
self.cur1.execute("insert into test(i) values (5)") self.cur1.execute("insert into test(i) values (5)")
self.cur1.execute("create table test2(j)") self.cur1.execute("create table test2(j)")
@ -60,14 +60,14 @@ def CheckDMLDoesNotAutoCommitBefore(self):
res = self.cur2.fetchall() res = self.cur2.fetchall()
self.assertEqual(len(res), 0) self.assertEqual(len(res), 0)
def CheckInsertStartsTransaction(self): def test_insert_starts_transaction(self):
self.cur1.execute("create table test(i)") self.cur1.execute("create table test(i)")
self.cur1.execute("insert into test(i) values (5)") self.cur1.execute("insert into test(i) values (5)")
self.cur2.execute("select i from test") self.cur2.execute("select i from test")
res = self.cur2.fetchall() res = self.cur2.fetchall()
self.assertEqual(len(res), 0) self.assertEqual(len(res), 0)
def CheckUpdateStartsTransaction(self): def test_update_starts_transaction(self):
self.cur1.execute("create table test(i)") self.cur1.execute("create table test(i)")
self.cur1.execute("insert into test(i) values (5)") self.cur1.execute("insert into test(i) values (5)")
self.con1.commit() self.con1.commit()
@ -76,7 +76,7 @@ def CheckUpdateStartsTransaction(self):
res = self.cur2.fetchone()[0] res = self.cur2.fetchone()[0]
self.assertEqual(res, 5) self.assertEqual(res, 5)
def CheckDeleteStartsTransaction(self): def test_delete_starts_transaction(self):
self.cur1.execute("create table test(i)") self.cur1.execute("create table test(i)")
self.cur1.execute("insert into test(i) values (5)") self.cur1.execute("insert into test(i) values (5)")
self.con1.commit() self.con1.commit()
@ -85,7 +85,7 @@ def CheckDeleteStartsTransaction(self):
res = self.cur2.fetchall() res = self.cur2.fetchall()
self.assertEqual(len(res), 1) self.assertEqual(len(res), 1)
def CheckReplaceStartsTransaction(self): def test_replace_starts_transaction(self):
self.cur1.execute("create table test(i)") self.cur1.execute("create table test(i)")
self.cur1.execute("insert into test(i) values (5)") self.cur1.execute("insert into test(i) values (5)")
self.con1.commit() self.con1.commit()
@ -95,7 +95,7 @@ def CheckReplaceStartsTransaction(self):
self.assertEqual(len(res), 1) self.assertEqual(len(res), 1)
self.assertEqual(res[0][0], 5) self.assertEqual(res[0][0], 5)
def CheckToggleAutoCommit(self): def test_toggle_auto_commit(self):
self.cur1.execute("create table test(i)") self.cur1.execute("create table test(i)")
self.cur1.execute("insert into test(i) values (5)") self.cur1.execute("insert into test(i) values (5)")
self.con1.isolation_level = None self.con1.isolation_level = None
@ -111,13 +111,13 @@ def CheckToggleAutoCommit(self):
res = self.cur2.fetchall() res = self.cur2.fetchall()
self.assertEqual(len(res), 1) self.assertEqual(len(res), 1)
def CheckRaiseTimeout(self): def test_raise_timeout(self):
self.cur1.execute("create table test(i)") self.cur1.execute("create table test(i)")
self.cur1.execute("insert into test(i) values (5)") self.cur1.execute("insert into test(i) values (5)")
with self.assertRaises(sqlite.OperationalError): with self.assertRaises(sqlite.OperationalError):
self.cur2.execute("insert into test(i) values (5)") self.cur2.execute("insert into test(i) values (5)")
def CheckLocking(self): def test_locking(self):
""" """
This tests the improved concurrency with pysqlite 2.3.4. You needed This tests the improved concurrency with pysqlite 2.3.4. You needed
to roll back con2 before you could commit con1. to roll back con2 before you could commit con1.
@ -129,7 +129,7 @@ def CheckLocking(self):
# NO self.con2.rollback() HERE!!! # NO self.con2.rollback() HERE!!!
self.con1.commit() self.con1.commit()
def CheckRollbackCursorConsistency(self): def test_rollback_cursor_consistency(self):
""" """
Checks if cursors on the connection are set into a "reset" state Checks if cursors on the connection are set into a "reset" state
when a rollback is done on the connection. when a rollback is done on the connection.
@ -149,12 +149,12 @@ def setUp(self):
self.con = sqlite.connect(":memory:") self.con = sqlite.connect(":memory:")
self.cur = self.con.cursor() self.cur = self.con.cursor()
def CheckDropTable(self): def test_drop_table(self):
self.cur.execute("create table test(i)") self.cur.execute("create table test(i)")
self.cur.execute("insert into test(i) values (5)") self.cur.execute("insert into test(i) values (5)")
self.cur.execute("drop table test") self.cur.execute("drop table test")
def CheckPragma(self): def test_pragma(self):
self.cur.execute("create table test(i)") self.cur.execute("create table test(i)")
self.cur.execute("insert into test(i) values (5)") self.cur.execute("insert into test(i) values (5)")
self.cur.execute("pragma count_changes=1") self.cur.execute("pragma count_changes=1")
@ -167,7 +167,7 @@ class TransactionalDDL(unittest.TestCase):
def setUp(self): def setUp(self):
self.con = sqlite.connect(":memory:") self.con = sqlite.connect(":memory:")
def CheckDdlDoesNotAutostartTransaction(self): def test_ddl_does_not_autostart_transaction(self):
# For backwards compatibility reasons, DDL statements should not # For backwards compatibility reasons, DDL statements should not
# implicitly start a transaction. # implicitly start a transaction.
self.con.execute("create table test(i)") self.con.execute("create table test(i)")
@ -175,7 +175,7 @@ def CheckDdlDoesNotAutostartTransaction(self):
result = self.con.execute("select * from test").fetchall() result = self.con.execute("select * from test").fetchall()
self.assertEqual(result, []) self.assertEqual(result, [])
def CheckImmediateTransactionalDDL(self): def test_immediate_transactional_ddl(self):
# You can achieve transactional DDL by issuing a BEGIN # You can achieve transactional DDL by issuing a BEGIN
# statement manually. # statement manually.
self.con.execute("begin immediate") self.con.execute("begin immediate")
@ -184,7 +184,7 @@ def CheckImmediateTransactionalDDL(self):
with self.assertRaises(sqlite.OperationalError): with self.assertRaises(sqlite.OperationalError):
self.con.execute("select * from test") self.con.execute("select * from test")
def CheckTransactionalDDL(self): def test_transactional_ddl(self):
# You can achieve transactional DDL by issuing a BEGIN # You can achieve transactional DDL by issuing a BEGIN
# statement manually. # statement manually.
self.con.execute("begin") self.con.execute("begin")
@ -197,10 +197,14 @@ def tearDown(self):
self.con.close() self.con.close()
def suite(): def suite():
default_suite = unittest.makeSuite(TransactionTests, "Check") tests = [
special_command_suite = unittest.makeSuite(SpecialCommandTests, "Check") SpecialCommandTests,
ddl_suite = unittest.makeSuite(TransactionalDDL, "Check") TransactionTests,
return unittest.TestSuite((default_suite, special_command_suite, ddl_suite)) TransactionalDDL,
]
return unittest.TestSuite(
[unittest.TestLoader().loadTestsFromTestCase(t) for t in tests]
)
def test(): def test():
runner = unittest.TextTestRunner() runner = unittest.TextTestRunner()

View file

@ -40,33 +40,33 @@ def tearDown(self):
self.cur.close() self.cur.close()
self.con.close() self.con.close()
def CheckString(self): def test_string(self):
self.cur.execute("insert into test(s) values (?)", ("Österreich",)) self.cur.execute("insert into test(s) values (?)", ("Österreich",))
self.cur.execute("select s from test") self.cur.execute("select s from test")
row = self.cur.fetchone() row = self.cur.fetchone()
self.assertEqual(row[0], "Österreich") self.assertEqual(row[0], "Österreich")
def CheckSmallInt(self): def test_small_int(self):
self.cur.execute("insert into test(i) values (?)", (42,)) self.cur.execute("insert into test(i) values (?)", (42,))
self.cur.execute("select i from test") self.cur.execute("select i from test")
row = self.cur.fetchone() row = self.cur.fetchone()
self.assertEqual(row[0], 42) self.assertEqual(row[0], 42)
def CheckLargeInt(self): def test_large_int(self):
num = 2**40 num = 2**40
self.cur.execute("insert into test(i) values (?)", (num,)) self.cur.execute("insert into test(i) values (?)", (num,))
self.cur.execute("select i from test") self.cur.execute("select i from test")
row = self.cur.fetchone() row = self.cur.fetchone()
self.assertEqual(row[0], num) self.assertEqual(row[0], num)
def CheckFloat(self): def test_float(self):
val = 3.14 val = 3.14
self.cur.execute("insert into test(f) values (?)", (val,)) self.cur.execute("insert into test(f) values (?)", (val,))
self.cur.execute("select f from test") self.cur.execute("select f from test")
row = self.cur.fetchone() row = self.cur.fetchone()
self.assertEqual(row[0], val) self.assertEqual(row[0], val)
def CheckBlob(self): def test_blob(self):
sample = b"Guglhupf" sample = b"Guglhupf"
val = memoryview(sample) val = memoryview(sample)
self.cur.execute("insert into test(b) values (?)", (val,)) self.cur.execute("insert into test(b) values (?)", (val,))
@ -74,7 +74,7 @@ def CheckBlob(self):
row = self.cur.fetchone() row = self.cur.fetchone()
self.assertEqual(row[0], sample) self.assertEqual(row[0], sample)
def CheckUnicodeExecute(self): def test_unicode_execute(self):
self.cur.execute("select 'Österreich'") self.cur.execute("select 'Österreich'")
row = self.cur.fetchone() row = self.cur.fetchone()
self.assertEqual(row[0], "Österreich") self.assertEqual(row[0], "Österreich")
@ -133,21 +133,21 @@ def tearDown(self):
self.cur.close() self.cur.close()
self.con.close() self.con.close()
def CheckString(self): def test_string(self):
# default # default
self.cur.execute("insert into test(s) values (?)", ("foo",)) self.cur.execute("insert into test(s) values (?)", ("foo",))
self.cur.execute('select s as "s [WRONG]" from test') self.cur.execute('select s as "s [WRONG]" from test')
row = self.cur.fetchone() row = self.cur.fetchone()
self.assertEqual(row[0], "foo") self.assertEqual(row[0], "foo")
def CheckSmallInt(self): def test_small_int(self):
# default # default
self.cur.execute("insert into test(i) values (?)", (42,)) self.cur.execute("insert into test(i) values (?)", (42,))
self.cur.execute("select i from test") self.cur.execute("select i from test")
row = self.cur.fetchone() row = self.cur.fetchone()
self.assertEqual(row[0], 42) self.assertEqual(row[0], 42)
def CheckLargeInt(self): def test_large_int(self):
# default # default
num = 2**40 num = 2**40
self.cur.execute("insert into test(i) values (?)", (num,)) self.cur.execute("insert into test(i) values (?)", (num,))
@ -155,7 +155,7 @@ def CheckLargeInt(self):
row = self.cur.fetchone() row = self.cur.fetchone()
self.assertEqual(row[0], num) self.assertEqual(row[0], num)
def CheckFloat(self): def test_float(self):
# custom # custom
val = 3.14 val = 3.14
self.cur.execute("insert into test(f) values (?)", (val,)) self.cur.execute("insert into test(f) values (?)", (val,))
@ -163,7 +163,7 @@ def CheckFloat(self):
row = self.cur.fetchone() row = self.cur.fetchone()
self.assertEqual(row[0], 47.2) self.assertEqual(row[0], 47.2)
def CheckBool(self): def test_bool(self):
# custom # custom
self.cur.execute("insert into test(b) values (?)", (False,)) self.cur.execute("insert into test(b) values (?)", (False,))
self.cur.execute("select b from test") self.cur.execute("select b from test")
@ -176,7 +176,7 @@ def CheckBool(self):
row = self.cur.fetchone() row = self.cur.fetchone()
self.assertIs(row[0], True) self.assertIs(row[0], True)
def CheckUnicode(self): def test_unicode(self):
# default # default
val = "\xd6sterreich" val = "\xd6sterreich"
self.cur.execute("insert into test(u) values (?)", (val,)) self.cur.execute("insert into test(u) values (?)", (val,))
@ -184,14 +184,14 @@ def CheckUnicode(self):
row = self.cur.fetchone() row = self.cur.fetchone()
self.assertEqual(row[0], val) self.assertEqual(row[0], val)
def CheckFoo(self): def test_foo(self):
val = DeclTypesTests.Foo("bla") val = DeclTypesTests.Foo("bla")
self.cur.execute("insert into test(foo) values (?)", (val,)) self.cur.execute("insert into test(foo) values (?)", (val,))
self.cur.execute("select foo from test") self.cur.execute("select foo from test")
row = self.cur.fetchone() row = self.cur.fetchone()
self.assertEqual(row[0], val) self.assertEqual(row[0], val)
def CheckErrorInConform(self): def test_error_in_conform(self):
val = DeclTypesTests.BadConform(TypeError) val = DeclTypesTests.BadConform(TypeError)
with self.assertRaises(sqlite.InterfaceError): with self.assertRaises(sqlite.InterfaceError):
self.cur.execute("insert into test(bad) values (?)", (val,)) self.cur.execute("insert into test(bad) values (?)", (val,))
@ -204,19 +204,19 @@ def CheckErrorInConform(self):
with self.assertRaises(KeyboardInterrupt): with self.assertRaises(KeyboardInterrupt):
self.cur.execute("insert into test(bad) values (:val)", {"val": val}) self.cur.execute("insert into test(bad) values (:val)", {"val": val})
def CheckUnsupportedSeq(self): def test_unsupported_seq(self):
class Bar: pass class Bar: pass
val = Bar() val = Bar()
with self.assertRaises(sqlite.InterfaceError): with self.assertRaises(sqlite.InterfaceError):
self.cur.execute("insert into test(f) values (?)", (val,)) self.cur.execute("insert into test(f) values (?)", (val,))
def CheckUnsupportedDict(self): def test_unsupported_dict(self):
class Bar: pass class Bar: pass
val = Bar() val = Bar()
with self.assertRaises(sqlite.InterfaceError): with self.assertRaises(sqlite.InterfaceError):
self.cur.execute("insert into test(f) values (:val)", {"val": val}) self.cur.execute("insert into test(f) values (:val)", {"val": val})
def CheckBlob(self): def test_blob(self):
# default # default
sample = b"Guglhupf" sample = b"Guglhupf"
val = memoryview(sample) val = memoryview(sample)
@ -225,13 +225,13 @@ def CheckBlob(self):
row = self.cur.fetchone() row = self.cur.fetchone()
self.assertEqual(row[0], sample) self.assertEqual(row[0], sample)
def CheckNumber1(self): def test_number1(self):
self.cur.execute("insert into test(n1) values (5)") self.cur.execute("insert into test(n1) values (5)")
value = self.cur.execute("select n1 from test").fetchone()[0] value = self.cur.execute("select n1 from test").fetchone()[0]
# if the converter is not used, it's an int instead of a float # if the converter is not used, it's an int instead of a float
self.assertEqual(type(value), float) self.assertEqual(type(value), float)
def CheckNumber2(self): def test_number2(self):
"""Checks whether converter names are cut off at '(' characters""" """Checks whether converter names are cut off at '(' characters"""
self.cur.execute("insert into test(n2) values (5)") self.cur.execute("insert into test(n2) values (5)")
value = self.cur.execute("select n2 from test").fetchone()[0] value = self.cur.execute("select n2 from test").fetchone()[0]
@ -257,7 +257,7 @@ def tearDown(self):
self.cur.close() self.cur.close()
self.con.close() self.con.close()
def CheckDeclTypeNotUsed(self): def test_decl_type_not_used(self):
""" """
Assures that the declared type is not used when PARSE_DECLTYPES Assures that the declared type is not used when PARSE_DECLTYPES
is not set. is not set.
@ -267,13 +267,13 @@ def CheckDeclTypeNotUsed(self):
val = self.cur.fetchone()[0] val = self.cur.fetchone()[0]
self.assertEqual(val, "xxx") self.assertEqual(val, "xxx")
def CheckNone(self): def test_none(self):
self.cur.execute("insert into test(x) values (?)", (None,)) self.cur.execute("insert into test(x) values (?)", (None,))
self.cur.execute("select x from test") self.cur.execute("select x from test")
val = self.cur.fetchone()[0] val = self.cur.fetchone()[0]
self.assertEqual(val, None) self.assertEqual(val, None)
def CheckColName(self): def test_col_name(self):
self.cur.execute("insert into test(x) values (?)", ("xxx",)) self.cur.execute("insert into test(x) values (?)", ("xxx",))
self.cur.execute('select x as "x y [bar]" from test') self.cur.execute('select x as "x y [bar]" from test')
val = self.cur.fetchone()[0] val = self.cur.fetchone()[0]
@ -283,12 +283,12 @@ def CheckColName(self):
# '[' (and the preceeding space) should be stripped. # '[' (and the preceeding space) should be stripped.
self.assertEqual(self.cur.description[0][0], "x y") self.assertEqual(self.cur.description[0][0], "x y")
def CheckCaseInConverterName(self): def test_case_in_converter_name(self):
self.cur.execute("select 'other' as \"x [b1b1]\"") self.cur.execute("select 'other' as \"x [b1b1]\"")
val = self.cur.fetchone()[0] val = self.cur.fetchone()[0]
self.assertEqual(val, "MARKER") self.assertEqual(val, "MARKER")
def CheckCursorDescriptionNoRow(self): def test_cursor_description_no_row(self):
""" """
cursor.description should at least provide the column name(s), even if cursor.description should at least provide the column name(s), even if
no row returned. no row returned.
@ -296,7 +296,7 @@ def CheckCursorDescriptionNoRow(self):
self.cur.execute("select * from test where 0 = 1") self.cur.execute("select * from test where 0 = 1")
self.assertEqual(self.cur.description[0][0], "x") self.assertEqual(self.cur.description[0][0], "x")
def CheckCursorDescriptionInsert(self): def test_cursor_description_insert(self):
self.cur.execute("insert into test values (1)") self.cur.execute("insert into test values (1)")
self.assertIsNone(self.cur.description) self.assertIsNone(self.cur.description)
@ -313,19 +313,19 @@ def tearDown(self):
self.cur.close() self.cur.close()
self.con.close() self.con.close()
def CheckCursorDescriptionCTESimple(self): def test_cursor_description_cte_simple(self):
self.cur.execute("with one as (select 1) select * from one") self.cur.execute("with one as (select 1) select * from one")
self.assertIsNotNone(self.cur.description) self.assertIsNotNone(self.cur.description)
self.assertEqual(self.cur.description[0][0], "1") self.assertEqual(self.cur.description[0][0], "1")
def CheckCursorDescriptionCTESMultipleColumns(self): def test_cursor_description_cte_multiple_columns(self):
self.cur.execute("insert into test values(1)") self.cur.execute("insert into test values(1)")
self.cur.execute("insert into test values(2)") self.cur.execute("insert into test values(2)")
self.cur.execute("with testCTE as (select * from test) select * from testCTE") self.cur.execute("with testCTE as (select * from test) select * from testCTE")
self.assertIsNotNone(self.cur.description) self.assertIsNotNone(self.cur.description)
self.assertEqual(self.cur.description[0][0], "x") self.assertEqual(self.cur.description[0][0], "x")
def CheckCursorDescriptionCTE(self): def test_cursor_description_cte(self):
self.cur.execute("insert into test values (1)") self.cur.execute("insert into test values (1)")
self.cur.execute("with bar as (select * from test) select * from test where x = 1") self.cur.execute("with bar as (select * from test) select * from test where x = 1")
self.assertIsNotNone(self.cur.description) self.assertIsNotNone(self.cur.description)
@ -354,7 +354,7 @@ def tearDown(self):
self.cur.close() self.cur.close()
self.con.close() self.con.close()
def CheckCasterIsUsed(self): def test_caster_is_used(self):
self.cur.execute("select ?", (4,)) self.cur.execute("select ?", (4,))
val = self.cur.fetchone()[0] val = self.cur.fetchone()[0]
self.assertEqual(type(val), float) self.assertEqual(type(val), float)
@ -372,7 +372,7 @@ def setUp(self):
def tearDown(self): def tearDown(self):
self.con.close() self.con.close()
def CheckBinaryInputForConverter(self): def test_binary_input_for_converter(self):
testdata = b"abcdefg" * 10 testdata = b"abcdefg" * 10
result = self.con.execute('select ? as "x [bin]"', (memoryview(zlib.compress(testdata)),)).fetchone()[0] result = self.con.execute('select ? as "x [bin]"', (memoryview(zlib.compress(testdata)),)).fetchone()[0]
self.assertEqual(testdata, result) self.assertEqual(testdata, result)
@ -387,21 +387,21 @@ def tearDown(self):
self.cur.close() self.cur.close()
self.con.close() self.con.close()
def CheckSqliteDate(self): def test_sqlite_date(self):
d = sqlite.Date(2004, 2, 14) d = sqlite.Date(2004, 2, 14)
self.cur.execute("insert into test(d) values (?)", (d,)) self.cur.execute("insert into test(d) values (?)", (d,))
self.cur.execute("select d from test") self.cur.execute("select d from test")
d2 = self.cur.fetchone()[0] d2 = self.cur.fetchone()[0]
self.assertEqual(d, d2) self.assertEqual(d, d2)
def CheckSqliteTimestamp(self): def test_sqlite_timestamp(self):
ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0) ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0)
self.cur.execute("insert into test(ts) values (?)", (ts,)) self.cur.execute("insert into test(ts) values (?)", (ts,))
self.cur.execute("select ts from test") self.cur.execute("select ts from test")
ts2 = self.cur.fetchone()[0] ts2 = self.cur.fetchone()[0]
self.assertEqual(ts, ts2) self.assertEqual(ts, ts2)
def CheckSqlTimestamp(self): def test_sql_timestamp(self):
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()
self.cur.execute("insert into test(ts) values (current_timestamp)") self.cur.execute("insert into test(ts) values (current_timestamp)")
self.cur.execute("select ts from test") self.cur.execute("select ts from test")
@ -409,14 +409,14 @@ def CheckSqlTimestamp(self):
self.assertEqual(type(ts), datetime.datetime) self.assertEqual(type(ts), datetime.datetime)
self.assertEqual(ts.year, now.year) self.assertEqual(ts.year, now.year)
def CheckDateTimeSubSeconds(self): def test_date_time_sub_seconds(self):
ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 500000) ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 500000)
self.cur.execute("insert into test(ts) values (?)", (ts,)) self.cur.execute("insert into test(ts) values (?)", (ts,))
self.cur.execute("select ts from test") self.cur.execute("select ts from test")
ts2 = self.cur.fetchone()[0] ts2 = self.cur.fetchone()[0]
self.assertEqual(ts, ts2) self.assertEqual(ts, ts2)
def CheckDateTimeSubSecondsFloatingPoint(self): def test_date_time_sub_seconds_floating_point(self):
ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 510241) ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 510241)
self.cur.execute("insert into test(ts) values (?)", (ts,)) self.cur.execute("insert into test(ts) values (?)", (ts,))
self.cur.execute("select ts from test") self.cur.execute("select ts from test")
@ -424,14 +424,18 @@ def CheckDateTimeSubSecondsFloatingPoint(self):
self.assertEqual(ts, ts2) self.assertEqual(ts, ts2)
def suite(): def suite():
sqlite_type_suite = unittest.makeSuite(SqliteTypeTests, "Check") tests = [
decltypes_type_suite = unittest.makeSuite(DeclTypesTests, "Check") BinaryConverterTests,
colnames_type_suite = unittest.makeSuite(ColNamesTests, "Check") ColNamesTests,
adaptation_suite = unittest.makeSuite(ObjectAdaptationTests, "Check") CommonTableExpressionTests,
bin_suite = unittest.makeSuite(BinaryConverterTests, "Check") DateTimeTests,
date_suite = unittest.makeSuite(DateTimeTests, "Check") DeclTypesTests,
cte_suite = unittest.makeSuite(CommonTableExpressionTests, "Check") ObjectAdaptationTests,
return unittest.TestSuite((sqlite_type_suite, decltypes_type_suite, colnames_type_suite, adaptation_suite, bin_suite, date_suite, cte_suite)) SqliteTypeTests,
]
return unittest.TestSuite(
[unittest.TestLoader().loadTestsFromTestCase(t) for t in tests]
)
def test(): def test():
runner = unittest.TextTestRunner() runner = unittest.TextTestRunner()

View file

@ -162,11 +162,11 @@ def setUp(self):
def tearDown(self): def tearDown(self):
self.con.close() self.con.close()
def CheckFuncErrorOnCreate(self): def test_func_error_on_create(self):
with self.assertRaises(sqlite.OperationalError): with self.assertRaises(sqlite.OperationalError):
self.con.create_function("bla", -100, lambda x: 2*x) self.con.create_function("bla", -100, lambda x: 2*x)
def CheckFuncRefCount(self): def test_func_ref_count(self):
def getfunc(): def getfunc():
def f(): def f():
return 1 return 1
@ -178,28 +178,28 @@ def f():
cur = self.con.cursor() cur = self.con.cursor()
cur.execute("select reftest()") cur.execute("select reftest()")
def CheckFuncReturnText(self): def test_func_return_text(self):
cur = self.con.cursor() cur = self.con.cursor()
cur.execute("select returntext()") cur.execute("select returntext()")
val = cur.fetchone()[0] val = cur.fetchone()[0]
self.assertEqual(type(val), str) self.assertEqual(type(val), str)
self.assertEqual(val, "foo") self.assertEqual(val, "foo")
def CheckFuncReturnUnicode(self): def test_func_return_unicode(self):
cur = self.con.cursor() cur = self.con.cursor()
cur.execute("select returnunicode()") cur.execute("select returnunicode()")
val = cur.fetchone()[0] val = cur.fetchone()[0]
self.assertEqual(type(val), str) self.assertEqual(type(val), str)
self.assertEqual(val, "bar") self.assertEqual(val, "bar")
def CheckFuncReturnInt(self): def test_func_return_int(self):
cur = self.con.cursor() cur = self.con.cursor()
cur.execute("select returnint()") cur.execute("select returnint()")
val = cur.fetchone()[0] val = cur.fetchone()[0]
self.assertEqual(type(val), int) self.assertEqual(type(val), int)
self.assertEqual(val, 42) self.assertEqual(val, 42)
def CheckFuncReturnFloat(self): def test_func_return_float(self):
cur = self.con.cursor() cur = self.con.cursor()
cur.execute("select returnfloat()") cur.execute("select returnfloat()")
val = cur.fetchone()[0] val = cur.fetchone()[0]
@ -207,70 +207,70 @@ def CheckFuncReturnFloat(self):
if val < 3.139 or val > 3.141: if val < 3.139 or val > 3.141:
self.fail("wrong value") self.fail("wrong value")
def CheckFuncReturnNull(self): def test_func_return_null(self):
cur = self.con.cursor() cur = self.con.cursor()
cur.execute("select returnnull()") cur.execute("select returnnull()")
val = cur.fetchone()[0] val = cur.fetchone()[0]
self.assertEqual(type(val), type(None)) self.assertEqual(type(val), type(None))
self.assertEqual(val, None) self.assertEqual(val, None)
def CheckFuncReturnBlob(self): def test_func_return_blob(self):
cur = self.con.cursor() cur = self.con.cursor()
cur.execute("select returnblob()") cur.execute("select returnblob()")
val = cur.fetchone()[0] val = cur.fetchone()[0]
self.assertEqual(type(val), bytes) self.assertEqual(type(val), bytes)
self.assertEqual(val, b"blob") self.assertEqual(val, b"blob")
def CheckFuncReturnLongLong(self): def test_func_return_long_long(self):
cur = self.con.cursor() cur = self.con.cursor()
cur.execute("select returnlonglong()") cur.execute("select returnlonglong()")
val = cur.fetchone()[0] val = cur.fetchone()[0]
self.assertEqual(val, 1<<31) self.assertEqual(val, 1<<31)
def CheckFuncException(self): def test_func_exception(self):
cur = self.con.cursor() cur = self.con.cursor()
with self.assertRaises(sqlite.OperationalError) as cm: with self.assertRaises(sqlite.OperationalError) as cm:
cur.execute("select raiseexception()") cur.execute("select raiseexception()")
cur.fetchone() cur.fetchone()
self.assertEqual(str(cm.exception), 'user-defined function raised exception') self.assertEqual(str(cm.exception), 'user-defined function raised exception')
def CheckParamString(self): def test_param_string(self):
cur = self.con.cursor() cur = self.con.cursor()
cur.execute("select isstring(?)", ("foo",)) cur.execute("select isstring(?)", ("foo",))
val = cur.fetchone()[0] val = cur.fetchone()[0]
self.assertEqual(val, 1) self.assertEqual(val, 1)
def CheckParamInt(self): def test_param_int(self):
cur = self.con.cursor() cur = self.con.cursor()
cur.execute("select isint(?)", (42,)) cur.execute("select isint(?)", (42,))
val = cur.fetchone()[0] val = cur.fetchone()[0]
self.assertEqual(val, 1) self.assertEqual(val, 1)
def CheckParamFloat(self): def test_param_float(self):
cur = self.con.cursor() cur = self.con.cursor()
cur.execute("select isfloat(?)", (3.14,)) cur.execute("select isfloat(?)", (3.14,))
val = cur.fetchone()[0] val = cur.fetchone()[0]
self.assertEqual(val, 1) self.assertEqual(val, 1)
def CheckParamNone(self): def test_param_none(self):
cur = self.con.cursor() cur = self.con.cursor()
cur.execute("select isnone(?)", (None,)) cur.execute("select isnone(?)", (None,))
val = cur.fetchone()[0] val = cur.fetchone()[0]
self.assertEqual(val, 1) self.assertEqual(val, 1)
def CheckParamBlob(self): def test_param_blob(self):
cur = self.con.cursor() cur = self.con.cursor()
cur.execute("select isblob(?)", (memoryview(b"blob"),)) cur.execute("select isblob(?)", (memoryview(b"blob"),))
val = cur.fetchone()[0] val = cur.fetchone()[0]
self.assertEqual(val, 1) self.assertEqual(val, 1)
def CheckParamLongLong(self): def test_param_long_long(self):
cur = self.con.cursor() cur = self.con.cursor()
cur.execute("select islonglong(?)", (1<<42,)) cur.execute("select islonglong(?)", (1<<42,))
val = cur.fetchone()[0] val = cur.fetchone()[0]
self.assertEqual(val, 1) self.assertEqual(val, 1)
def CheckAnyArguments(self): def test_any_arguments(self):
cur = self.con.cursor() cur = self.con.cursor()
cur.execute("select spam(?, ?)", (1, 2)) cur.execute("select spam(?, ?)", (1, 2))
val = cur.fetchone()[0] val = cur.fetchone()[0]
@ -284,7 +284,7 @@ def CheckAnyArguments(self):
# deterministic functions were permitted in WHERE clauses of partial # deterministic functions were permitted in WHERE clauses of partial
# indices, which allows testing based on syntax, iso. the query optimizer. # indices, which allows testing based on syntax, iso. the query optimizer.
@unittest.skipIf(sqlite.sqlite_version_info < (3, 8, 3), "Requires SQLite 3.8.3 or higher") @unittest.skipIf(sqlite.sqlite_version_info < (3, 8, 3), "Requires SQLite 3.8.3 or higher")
def CheckFuncNonDeterministic(self): def test_func_non_deterministic(self):
mock = unittest.mock.Mock(return_value=None) mock = unittest.mock.Mock(return_value=None)
self.con.create_function("nondeterministic", 0, mock, deterministic=False) self.con.create_function("nondeterministic", 0, mock, deterministic=False)
if sqlite.sqlite_version_info < (3, 15, 0): if sqlite.sqlite_version_info < (3, 15, 0):
@ -295,7 +295,7 @@ def CheckFuncNonDeterministic(self):
self.con.execute("create index t on test(t) where nondeterministic() is not null") self.con.execute("create index t on test(t) where nondeterministic() is not null")
@unittest.skipIf(sqlite.sqlite_version_info < (3, 8, 3), "Requires SQLite 3.8.3 or higher") @unittest.skipIf(sqlite.sqlite_version_info < (3, 8, 3), "Requires SQLite 3.8.3 or higher")
def CheckFuncDeterministic(self): def test_func_deterministic(self):
mock = unittest.mock.Mock(return_value=None) mock = unittest.mock.Mock(return_value=None)
self.con.create_function("deterministic", 0, mock, deterministic=True) self.con.create_function("deterministic", 0, mock, deterministic=True)
if sqlite.sqlite_version_info < (3, 15, 0): if sqlite.sqlite_version_info < (3, 15, 0):
@ -308,11 +308,11 @@ def CheckFuncDeterministic(self):
self.fail("Unexpected failure while creating partial index") self.fail("Unexpected failure while creating partial index")
@unittest.skipIf(sqlite.sqlite_version_info >= (3, 8, 3), "SQLite < 3.8.3 needed") @unittest.skipIf(sqlite.sqlite_version_info >= (3, 8, 3), "SQLite < 3.8.3 needed")
def CheckFuncDeterministicNotSupported(self): def test_func_deterministic_not_supported(self):
with self.assertRaises(sqlite.NotSupportedError): with self.assertRaises(sqlite.NotSupportedError):
self.con.create_function("deterministic", 0, int, deterministic=True) self.con.create_function("deterministic", 0, int, deterministic=True)
def CheckFuncDeterministicKeywordOnly(self): def test_func_deterministic_keyword_only(self):
with self.assertRaises(TypeError): with self.assertRaises(TypeError):
self.con.create_function("deterministic", 0, int, True) self.con.create_function("deterministic", 0, int, True)
@ -347,81 +347,81 @@ def tearDown(self):
#self.con.close() #self.con.close()
pass pass
def CheckAggrErrorOnCreate(self): def test_aggr_error_on_create(self):
with self.assertRaises(sqlite.OperationalError): with self.assertRaises(sqlite.OperationalError):
self.con.create_function("bla", -100, AggrSum) self.con.create_function("bla", -100, AggrSum)
def CheckAggrNoStep(self): def test_aggr_no_step(self):
cur = self.con.cursor() cur = self.con.cursor()
with self.assertRaises(AttributeError) as cm: with self.assertRaises(AttributeError) as cm:
cur.execute("select nostep(t) from test") cur.execute("select nostep(t) from test")
self.assertEqual(str(cm.exception), "'AggrNoStep' object has no attribute 'step'") self.assertEqual(str(cm.exception), "'AggrNoStep' object has no attribute 'step'")
def CheckAggrNoFinalize(self): def test_aggr_no_finalize(self):
cur = self.con.cursor() cur = self.con.cursor()
with self.assertRaises(sqlite.OperationalError) as cm: with self.assertRaises(sqlite.OperationalError) as cm:
cur.execute("select nofinalize(t) from test") cur.execute("select nofinalize(t) from test")
val = cur.fetchone()[0] val = cur.fetchone()[0]
self.assertEqual(str(cm.exception), "user-defined aggregate's 'finalize' method raised error") self.assertEqual(str(cm.exception), "user-defined aggregate's 'finalize' method raised error")
def CheckAggrExceptionInInit(self): def test_aggr_exception_in_init(self):
cur = self.con.cursor() cur = self.con.cursor()
with self.assertRaises(sqlite.OperationalError) as cm: with self.assertRaises(sqlite.OperationalError) as cm:
cur.execute("select excInit(t) from test") cur.execute("select excInit(t) from test")
val = cur.fetchone()[0] val = cur.fetchone()[0]
self.assertEqual(str(cm.exception), "user-defined aggregate's '__init__' method raised error") self.assertEqual(str(cm.exception), "user-defined aggregate's '__init__' method raised error")
def CheckAggrExceptionInStep(self): def test_aggr_exception_in_step(self):
cur = self.con.cursor() cur = self.con.cursor()
with self.assertRaises(sqlite.OperationalError) as cm: with self.assertRaises(sqlite.OperationalError) as cm:
cur.execute("select excStep(t) from test") cur.execute("select excStep(t) from test")
val = cur.fetchone()[0] val = cur.fetchone()[0]
self.assertEqual(str(cm.exception), "user-defined aggregate's 'step' method raised error") self.assertEqual(str(cm.exception), "user-defined aggregate's 'step' method raised error")
def CheckAggrExceptionInFinalize(self): def test_aggr_exception_in_finalize(self):
cur = self.con.cursor() cur = self.con.cursor()
with self.assertRaises(sqlite.OperationalError) as cm: with self.assertRaises(sqlite.OperationalError) as cm:
cur.execute("select excFinalize(t) from test") cur.execute("select excFinalize(t) from test")
val = cur.fetchone()[0] val = cur.fetchone()[0]
self.assertEqual(str(cm.exception), "user-defined aggregate's 'finalize' method raised error") self.assertEqual(str(cm.exception), "user-defined aggregate's 'finalize' method raised error")
def CheckAggrCheckParamStr(self): def test_aggr_check_param_str(self):
cur = self.con.cursor() cur = self.con.cursor()
cur.execute("select checkType('str', ?)", ("foo",)) cur.execute("select checkType('str', ?)", ("foo",))
val = cur.fetchone()[0] val = cur.fetchone()[0]
self.assertEqual(val, 1) self.assertEqual(val, 1)
def CheckAggrCheckParamInt(self): def test_aggr_check_param_int(self):
cur = self.con.cursor() cur = self.con.cursor()
cur.execute("select checkType('int', ?)", (42,)) cur.execute("select checkType('int', ?)", (42,))
val = cur.fetchone()[0] val = cur.fetchone()[0]
self.assertEqual(val, 1) self.assertEqual(val, 1)
def CheckAggrCheckParamsInt(self): def test_aggr_check_params_int(self):
cur = self.con.cursor() cur = self.con.cursor()
cur.execute("select checkTypes('int', ?, ?)", (42, 24)) cur.execute("select checkTypes('int', ?, ?)", (42, 24))
val = cur.fetchone()[0] val = cur.fetchone()[0]
self.assertEqual(val, 2) self.assertEqual(val, 2)
def CheckAggrCheckParamFloat(self): def test_aggr_check_param_float(self):
cur = self.con.cursor() cur = self.con.cursor()
cur.execute("select checkType('float', ?)", (3.14,)) cur.execute("select checkType('float', ?)", (3.14,))
val = cur.fetchone()[0] val = cur.fetchone()[0]
self.assertEqual(val, 1) self.assertEqual(val, 1)
def CheckAggrCheckParamNone(self): def test_aggr_check_param_none(self):
cur = self.con.cursor() cur = self.con.cursor()
cur.execute("select checkType('None', ?)", (None,)) cur.execute("select checkType('None', ?)", (None,))
val = cur.fetchone()[0] val = cur.fetchone()[0]
self.assertEqual(val, 1) self.assertEqual(val, 1)
def CheckAggrCheckParamBlob(self): def test_aggr_check_param_blob(self):
cur = self.con.cursor() cur = self.con.cursor()
cur.execute("select checkType('blob', ?)", (memoryview(b"blob"),)) cur.execute("select checkType('blob', ?)", (memoryview(b"blob"),))
val = cur.fetchone()[0] val = cur.fetchone()[0]
self.assertEqual(val, 1) self.assertEqual(val, 1)
def CheckAggrCheckAggrSum(self): def test_aggr_check_aggr_sum(self):
cur = self.con.cursor() cur = self.con.cursor()
cur.execute("delete from test") cur.execute("delete from test")
cur.executemany("insert into test(i) values (?)", [(10,), (20,), (30,)]) cur.executemany("insert into test(i) values (?)", [(10,), (20,), (30,)])
@ -494,17 +494,17 @@ def authorizer_cb(action, arg1, arg2, dbname, source):
def suite(): def suite():
function_suite = unittest.makeSuite(FunctionTests, "Check") tests = [
aggregate_suite = unittest.makeSuite(AggregateTests, "Check") AggregateTests,
authorizer_suite = unittest.makeSuite(AuthorizerTests) AuthorizerIllegalTypeTests,
return unittest.TestSuite(( AuthorizerLargeIntegerTests,
function_suite, AuthorizerRaiseExceptionTests,
aggregate_suite, AuthorizerTests,
authorizer_suite, FunctionTests,
unittest.makeSuite(AuthorizerRaiseExceptionTests), ]
unittest.makeSuite(AuthorizerIllegalTypeTests), return unittest.TestSuite(
unittest.makeSuite(AuthorizerLargeIntegerTests), [unittest.TestLoader().loadTestsFromTestCase(t) for t in tests]
)) )
def test(): def test():
runner = unittest.TextTestRunner() runner = unittest.TextTestRunner()

View file

@ -0,0 +1,2 @@
Use :meth:`unittest.TestLoader().loadTestsFromTestCase` instead of
:meth:`unittest.makeSuite` in :mod:`sqlite3` tests. Patch by Erlend E. Aasland.