"""sqlite3 CLI tests.""" import sqlite3 import sys import textwrap import unittest import unittest.mock import os from sqlite3.__main__ import main as cli from test.support.import_helper import import_module from test.support.os_helper import TESTFN, unlink from test.support.pty_helper import run_pty from test.support import ( captured_stdout, captured_stderr, captured_stdin, force_not_colorized_test_class, requires_subprocess, verbose, ) @force_not_colorized_test_class class CommandLineInterface(unittest.TestCase): def _do_test(self, *args, expect_success=True): with ( captured_stdout() as out, captured_stderr() as err, self.assertRaises(SystemExit) as cm ): cli(args) return out.getvalue(), err.getvalue(), cm.exception.code def expect_success(self, *args): out, err, code = self._do_test(*args) self.assertEqual(code, 0, "\n".join([f"Unexpected failure: {args=}", out, err])) self.assertEqual(err, "") return out def expect_failure(self, *args): out, err, code = self._do_test(*args, expect_success=False) self.assertNotEqual(code, 0, "\n".join([f"Unexpected failure: {args=}", out, err])) self.assertEqual(out, "") return err def test_cli_help(self): out = self.expect_success("-h") self.assertIn("usage: ", out) self.assertIn(" [-h] [-v] [filename] [sql]", out) self.assertIn("Python sqlite3 CLI", out) def test_cli_version(self): out = self.expect_success("-v") self.assertIn(sqlite3.sqlite_version, out) def test_cli_execute_sql(self): out = self.expect_success(":memory:", "select 1") self.assertIn("(1,)", out) def test_cli_execute_too_much_sql(self): stderr = self.expect_failure(":memory:", "select 1; select 2") err = "ProgrammingError: You can only execute one statement at a time" self.assertIn(err, stderr) def test_cli_execute_incomplete_sql(self): stderr = self.expect_failure(":memory:", "sel") self.assertIn("OperationalError (SQLITE_ERROR)", stderr) def test_cli_on_disk_db(self): self.addCleanup(unlink, TESTFN) out = self.expect_success(TESTFN, "create table t(t)") self.assertEqual(out, "") out = self.expect_success(TESTFN, "select count(t) from t") self.assertIn("(0,)", out) @force_not_colorized_test_class class InteractiveSession(unittest.TestCase): MEMORY_DB_MSG = "Connected to a transient in-memory database" PS1 = "sqlite> " PS2 = "... " def run_cli(self, *args, commands=()): with ( captured_stdin() as stdin, captured_stdout() as stdout, captured_stderr() as stderr, self.assertRaises(SystemExit) as cm ): for cmd in commands: stdin.write(cmd + "\n") stdin.seek(0) cli(args) out = stdout.getvalue() err = stderr.getvalue() self.assertEqual(cm.exception.code, 0, f"Unexpected failure: {args=}\n{out}\n{err}") return out, err def test_interact(self): out, err = self.run_cli() self.assertIn(self.MEMORY_DB_MSG, err) self.assertIn(self.MEMORY_DB_MSG, err) self.assertEndsWith(out, self.PS1) self.assertEqual(out.count(self.PS1), 1) self.assertEqual(out.count(self.PS2), 0) def test_interact_quit(self): out, err = self.run_cli(commands=(".quit",)) self.assertIn(self.MEMORY_DB_MSG, err) self.assertEndsWith(out, self.PS1) self.assertEqual(out.count(self.PS1), 1) self.assertEqual(out.count(self.PS2), 0) def test_interact_version(self): out, err = self.run_cli(commands=(".version",)) self.assertIn(self.MEMORY_DB_MSG, err) self.assertIn(sqlite3.sqlite_version + "\n", out) self.assertEndsWith(out, self.PS1) self.assertEqual(out.count(self.PS1), 2) self.assertEqual(out.count(self.PS2), 0) self.assertIn(sqlite3.sqlite_version, out) def test_interact_empty_source(self): out, err = self.run_cli(commands=("", " ")) self.assertIn(self.MEMORY_DB_MSG, err) self.assertEndsWith(out, self.PS1) self.assertEqual(out.count(self.PS1), 3) self.assertEqual(out.count(self.PS2), 0) def test_interact_dot_commands_unknown(self): out, err = self.run_cli(commands=(".unknown_command", )) self.assertIn(self.MEMORY_DB_MSG, err) self.assertEndsWith(out, self.PS1) self.assertEqual(out.count(self.PS1), 2) self.assertEqual(out.count(self.PS2), 0) self.assertIn('Error: unknown command: "', err) # test "unknown_command" is pointed out in the error message self.assertIn("unknown_command", err) def test_interact_dot_commands_empty(self): out, err = self.run_cli(commands=(".")) self.assertIn(self.MEMORY_DB_MSG, err) self.assertEndsWith(out, self.PS1) self.assertEqual(out.count(self.PS1), 2) self.assertEqual(out.count(self.PS2), 0) def test_interact_dot_commands_with_whitespaces(self): out, err = self.run_cli(commands=(".version ", ". version")) self.assertIn(self.MEMORY_DB_MSG, err) self.assertEqual(out.count(sqlite3.sqlite_version + "\n"), 2) self.assertEndsWith(out, self.PS1) self.assertEqual(out.count(self.PS1), 3) self.assertEqual(out.count(self.PS2), 0) def test_interact_valid_sql(self): out, err = self.run_cli(commands=("SELECT 1;",)) self.assertIn(self.MEMORY_DB_MSG, err) self.assertIn("(1,)\n", out) self.assertEndsWith(out, self.PS1) self.assertEqual(out.count(self.PS1), 2) self.assertEqual(out.count(self.PS2), 0) def test_interact_incomplete_multiline_sql(self): out, err = self.run_cli(commands=("SELECT 1",)) self.assertIn(self.MEMORY_DB_MSG, err) self.assertEndsWith(out, self.PS2) self.assertEqual(out.count(self.PS1), 1) self.assertEqual(out.count(self.PS2), 1) def test_interact_valid_multiline_sql(self): out, err = self.run_cli(commands=("SELECT 1\n;",)) self.assertIn(self.MEMORY_DB_MSG, err) self.assertIn(self.PS2, out) self.assertIn("(1,)\n", out) self.assertEndsWith(out, self.PS1) self.assertEqual(out.count(self.PS1), 2) self.assertEqual(out.count(self.PS2), 1) def test_interact_invalid_sql(self): out, err = self.run_cli(commands=("sel;",)) self.assertIn(self.MEMORY_DB_MSG, err) self.assertIn("OperationalError (SQLITE_ERROR)", err) self.assertEndsWith(out, self.PS1) self.assertEqual(out.count(self.PS1), 2) self.assertEqual(out.count(self.PS2), 0) def test_interact_on_disk_file(self): self.addCleanup(unlink, TESTFN) out, err = self.run_cli(TESTFN, commands=("CREATE TABLE t(t);",)) self.assertIn(TESTFN, err) self.assertEndsWith(out, self.PS1) out, _ = self.run_cli(TESTFN, commands=("SELECT count(t) FROM t;",)) self.assertIn("(0,)\n", out) def test_color(self): with unittest.mock.patch("_colorize.can_colorize", return_value=True): out, err = self.run_cli(commands="TEXT\n") self.assertIn("\x1b[1;35msqlite> \x1b[0m", out) self.assertIn("\x1b[1;35m ... \x1b[0m\x1b", out) out, err = self.run_cli(commands=("sel;",)) self.assertIn('\x1b[1;35mOperationalError (SQLITE_ERROR)\x1b[0m: ' '\x1b[35mnear "sel": syntax error\x1b[0m', err) @requires_subprocess() @force_not_colorized_test_class class Completion(unittest.TestCase): PS1 = "sqlite> " @classmethod def setUpClass(cls): readline = import_module("readline") if readline.backend == "editline": raise unittest.SkipTest("libedit readline is not supported") def write_input(self, input_, env=None): script = textwrap.dedent(""" import readline from sqlite3.__main__ import main # Configure readline to ...: # - hide control sequences surrounding each candidate # - hide "Display all xxx possibilities? (y or n)" # - show candidates one per line readline.parse_and_bind("set colored-completion-prefix off") readline.parse_and_bind("set completion-query-items 0") readline.parse_and_bind("set page-completions off") readline.parse_and_bind("set completion-display-width 0") main() """) return run_pty(script, input_, env) def test_complete_sql_keywords(self): _sqlite3 = import_module("_sqlite3") if not hasattr(_sqlite3, "SQLITE_KEYWORDS"): raise unittest.SkipTest("unable to determine SQLite keywords") # List candidates starting with 'S', there should be multiple matches. input_ = b"S\t\tEL\t 1;\n.quit\n" output = self.write_input(input_) self.assertIn(b"SELECT", output) self.assertIn(b"SET", output) self.assertIn(b"SAVEPOINT", output) self.assertIn(b"(1,)", output) # Keywords are completed in upper case for even lower case user input. input_ = b"sel\t\t 1;\n.quit\n" output = self.write_input(input_) self.assertIn(b"SELECT", output) self.assertIn(b"(1,)", output) # .commands are completed without changing case input_ = b".ver\t\n.quit\n" output = self.write_input(input_) self.assertIn(b".version", output) def test_complete_table_indexes_triggers_views(self): input_ = textwrap.dedent("""\ CREATE TABLE _Table (id); CREATE INDEX _Index ON _table (id); CREATE TRIGGER _Trigger BEFORE INSERT ON _Table BEGIN SELECT 1; END; CREATE VIEW _View AS SELECT 1; CREATE TEMP TABLE _Temp_table (id); CREATE INDEX temp._Temp_index ON _Temp_table (id); CREATE TEMP TRIGGER _Temp_trigger BEFORE INSERT ON _Table BEGIN SELECT 1; END; CREATE TEMP VIEW _Temp_view AS SELECT 1; ATTACH ':memory:' AS attached; CREATE TABLE attached._Attached_table (id); CREATE INDEX attached._Attached_index ON _Attached_table (id); CREATE TRIGGER attached._Attached_trigger BEFORE INSERT ON _Attached_table BEGIN SELECT 1; END; CREATE VIEW attached._Attached_view AS SELECT 1; SELECT id FROM _\t\tta\t; .quit\n""").encode() output = self.write_input(input_) lines = output.decode().splitlines() indices = [i for i, line in enumerate(lines) if line.startswith(self.PS1)] start, end = indices[-3], indices[-2] candidates = [l.strip() for l in lines[start+1:end]] self.assertEqual(candidates, [ "_Attached_index", "_Attached_table", "_Attached_trigger", "_Attached_view", "_Index", "_Table", "_Temp_index", "_Temp_table", "_Temp_trigger", "_Temp_view", "_Trigger", "_View", ], ) start, end = indices[-2], indices[-1] # direct match with '_Table' completed, no candidates displayed candidates = [l.strip() for l in lines[start+1:end]] self.assertEqual(len(candidates), 0) @unittest.skipIf(sqlite3.sqlite_version_info < (3, 16, 0), "PRAGMA table-valued function is not available until " "SQLite 3.16.0") def test_complete_columns(self): input_ = textwrap.dedent("""\ CREATE TABLE _table (_col_table); CREATE TEMP TABLE _temp_table (_col_temp); ATTACH ':memory:' AS attached; CREATE TABLE attached._attached_table (_col_attached); SELECT _col_\t\tta\tFROM _table; .quit\n""").encode() output = self.write_input(input_) lines = output.decode().splitlines() indices = [ i for i, line in enumerate(lines) if line.startswith(self.PS1) ] start, end = indices[-3], indices[-2] candidates = [l.strip() for l in lines[start+1:end]] self.assertEqual( candidates, ["_col_attached", "_col_table", "_col_temp"] ) @unittest.skipIf(sqlite3.sqlite_version_info < (3, 30, 0), "PRAGMA function_list is not available until " "SQLite 3.30.0") def test_complete_functions(self): input_ = b"SELECT AV\t1);\n.quit\n" output = self.write_input(input_) self.assertIn(b"AVG(1);", output) self.assertIn(b"(1.0,)", output) # Functions are completed in upper case for even lower case user input. input_ = b"SELECT av\t1);\n.quit\n" output = self.write_input(input_) self.assertIn(b"AVG(1);", output) self.assertIn(b"(1.0,)", output) def test_complete_schemata(self): input_ = textwrap.dedent("""\ ATTACH ':memory:' AS MixedCase; -- Test '_' is escaped in Like pattern filtering ATTACH ':memory:' AS _underscore; -- Let database_list pragma have a 'temp' schema entry CREATE TEMP TABLE _table (id); SELECT * FROM \t\tmIX\t.sqlite_master; SELECT * FROM _und\t.sqlite_master; .quit\n""").encode() output = self.write_input(input_) lines = output.decode().splitlines() indices = [ i for i, line in enumerate(lines) if line.startswith(self.PS1) ] start, end = indices[-4], indices[-3] candidates = [l.strip() for l in lines[start+1:end]] self.assertIn("MixedCase", candidates) self.assertIn("_underscore", candidates) self.assertIn("main", candidates) self.assertIn("temp", candidates) @unittest.skipIf(sys.platform.startswith("freebsd"), "Two actual tabs are inserted when there are no matching" " completions in the pseudo-terminal opened by run_pty()" " on FreeBSD") def test_complete_no_match(self): input_ = b"xyzzy\t\t\b\b\b\b\b\b\b.quit\n" # Set NO_COLOR to disable coloring for self.PS1. output = self.write_input(input_, env={**os.environ, "NO_COLOR": "1"}) lines = output.decode().splitlines() indices = ( i for i, line in enumerate(lines, 1) if line.startswith(f"{self.PS1}xyzzy") ) line_num = next(indices, -1) self.assertNotEqual(line_num, -1) # Completions occupy lines, assert no extra lines when there is nothing # to complete. self.assertEqual(line_num, len(lines)) def test_complete_no_input(self): script = textwrap.dedent(""" import readline from sqlite3.__main__ import main # Configure readline to ...: # - hide control sequences surrounding each candidate # - hide "Display all xxx possibilities? (y or n)" # - hide "--More--" # - show candidates one per line readline.parse_and_bind("set colored-completion-prefix off") readline.parse_and_bind("set colored-stats off") readline.parse_and_bind("set completion-query-items 0") readline.parse_and_bind("set page-completions off") readline.parse_and_bind("set completion-display-width 0") readline.parse_and_bind("set show-all-if-ambiguous off") readline.parse_and_bind("set show-all-if-unmodified off") main() """) input_ = b"\t\t.quit\n" output = run_pty(script, input_, env={**os.environ, "NO_COLOR": "1"}) try: lines = output.decode().splitlines() indices = [ i for i, line in enumerate(lines) if line.startswith(self.PS1) ] self.assertEqual(len(indices), 2) start, end = indices candidates = [l.strip() for l in lines[start+1:end]] self.assertEqual(candidates, sorted(candidates)) except: if verbose: print(' PTY output: '.center(30, '-')) print(output.decode(errors='replace')) print(' end PTY output '.center(30, '-')) raise if __name__ == "__main__": unittest.main()