mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 13:41:24 +00:00 
			
		
		
		
	gh-77617: Add sqlite3 command-line interface (#95026)
Co-authored-by: Serhiy Storchaka <storchaka@gmail.com>
This commit is contained in:
		
							parent
							
								
									1e6b63542e
								
							
						
					
					
						commit
						bc7c7cd18a
					
				
					 5 changed files with 281 additions and 0 deletions
				
			
		|  | @ -1442,6 +1442,26 @@ and you can let the ``sqlite3`` module convert SQLite types to | ||||||
| Python types via :ref:`converters <sqlite3-converters>`. | Python types via :ref:`converters <sqlite3-converters>`. | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | .. _sqlite3-cli: | ||||||
|  | 
 | ||||||
|  | Command-line interface | ||||||
|  | ^^^^^^^^^^^^^^^^^^^^^^ | ||||||
|  | 
 | ||||||
|  | The ``sqlite3`` module can be invoked as a script | ||||||
|  | in order to provide a simple SQLite shell. | ||||||
|  | Type ``.quit`` or CTRL-D to exit the shell. | ||||||
|  | 
 | ||||||
|  | .. program:: python -m sqlite3 [-h] [-v] [filename] [sql] | ||||||
|  | 
 | ||||||
|  | .. option:: -h, --help | ||||||
|  |     Print CLI help. | ||||||
|  | 
 | ||||||
|  | .. option:: -v, --version | ||||||
|  |     Print underlying SQLite library version. | ||||||
|  | 
 | ||||||
|  | .. versionadded:: 3.12 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| .. _sqlite3-howtos: | .. _sqlite3-howtos: | ||||||
| 
 | 
 | ||||||
| How-to guides | How-to guides | ||||||
|  |  | ||||||
|  | @ -112,6 +112,13 @@ os | ||||||
|   (Contributed by Kumar Aditya in :gh:`93312`.) |   (Contributed by Kumar Aditya in :gh:`93312`.) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | sqlite3 | ||||||
|  | ------- | ||||||
|  | 
 | ||||||
|  | * Add a :ref:`command-line interface <sqlite3-cli>`. | ||||||
|  |   (Contributed by Erlend E. Aasland in :gh:`77617`.) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| Optimizations | Optimizations | ||||||
| ============= | ============= | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
							
								
								
									
										97
									
								
								Lib/sqlite3/__main__.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										97
									
								
								Lib/sqlite3/__main__.py
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,97 @@ | ||||||
|  | import sqlite3 | ||||||
|  | import sys | ||||||
|  | 
 | ||||||
|  | from argparse import ArgumentParser | ||||||
|  | from code import InteractiveConsole | ||||||
|  | from textwrap import dedent | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def execute(c, sql, suppress_errors=True): | ||||||
|  |     try: | ||||||
|  |         for row in c.execute(sql): | ||||||
|  |             print(row) | ||||||
|  |     except sqlite3.Error as e: | ||||||
|  |         tp = type(e).__name__ | ||||||
|  |         try: | ||||||
|  |             print(f"{tp} ({e.sqlite_errorname}): {e}", file=sys.stderr) | ||||||
|  |         except AttributeError: | ||||||
|  |             print(f"{tp}: {e}", file=sys.stderr) | ||||||
|  |         if not suppress_errors: | ||||||
|  |             sys.exit(1) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | class SqliteInteractiveConsole(InteractiveConsole): | ||||||
|  | 
 | ||||||
|  |     def __init__(self, connection): | ||||||
|  |         super().__init__() | ||||||
|  |         self._con = connection | ||||||
|  |         self._cur = connection.cursor() | ||||||
|  | 
 | ||||||
|  |     def runsource(self, source, filename="<input>", symbol="single"): | ||||||
|  |         match source: | ||||||
|  |             case ".version": | ||||||
|  |                 print(f"{sqlite3.sqlite_version}") | ||||||
|  |             case ".help": | ||||||
|  |                 print("Enter SQL code and press enter.") | ||||||
|  |             case ".quit": | ||||||
|  |                 sys.exit(0) | ||||||
|  |             case _: | ||||||
|  |                 if not sqlite3.complete_statement(source): | ||||||
|  |                     return True | ||||||
|  |                 execute(self._cur, source) | ||||||
|  |         return False | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def main(): | ||||||
|  |     parser = ArgumentParser( | ||||||
|  |         description="Python sqlite3 CLI", | ||||||
|  |         prog="python -m sqlite3", | ||||||
|  |     ) | ||||||
|  |     parser.add_argument( | ||||||
|  |         "filename", type=str, default=":memory:", nargs="?", | ||||||
|  |         help=( | ||||||
|  |             "SQLite database to open (defaults to ':memory:'). " | ||||||
|  |             "A new database is created if the file does not previously exist." | ||||||
|  |         ), | ||||||
|  |     ) | ||||||
|  |     parser.add_argument( | ||||||
|  |         "sql", type=str, nargs="?", | ||||||
|  |         help=( | ||||||
|  |             "An SQL query to execute. " | ||||||
|  |             "Any returned rows are printed to stdout." | ||||||
|  |         ), | ||||||
|  |     ) | ||||||
|  |     parser.add_argument( | ||||||
|  |         "-v", "--version", action="version", | ||||||
|  |         version=f"SQLite version {sqlite3.sqlite_version}", | ||||||
|  |         help="Print underlying SQLite library version", | ||||||
|  |     ) | ||||||
|  |     args = parser.parse_args() | ||||||
|  | 
 | ||||||
|  |     if args.filename == ":memory:": | ||||||
|  |         db_name = "a transient in-memory database" | ||||||
|  |     else: | ||||||
|  |         db_name = repr(args.filename) | ||||||
|  | 
 | ||||||
|  |     banner = dedent(f""" | ||||||
|  |         sqlite3 shell, running on SQLite version {sqlite3.sqlite_version} | ||||||
|  |         Connected to {db_name} | ||||||
|  | 
 | ||||||
|  |         Each command will be run using execute() on the cursor. | ||||||
|  |         Type ".help" for more information; type ".quit" or CTRL-D to quit. | ||||||
|  |     """).strip() | ||||||
|  |     sys.ps1 = "sqlite> " | ||||||
|  |     sys.ps2 = "    ... " | ||||||
|  | 
 | ||||||
|  |     con = sqlite3.connect(args.filename, isolation_level=None) | ||||||
|  |     try: | ||||||
|  |         if args.sql: | ||||||
|  |             execute(con, args.sql, suppress_errors=False) | ||||||
|  |         else: | ||||||
|  |             console = SqliteInteractiveConsole(con) | ||||||
|  |             console.interact(banner, exitmsg="") | ||||||
|  |     finally: | ||||||
|  |         con.close() | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | main() | ||||||
							
								
								
									
										155
									
								
								Lib/test/test_sqlite3/test_cli.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										155
									
								
								Lib/test/test_sqlite3/test_cli.py
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,155 @@ | ||||||
|  | """sqlite3 CLI tests.""" | ||||||
|  | 
 | ||||||
|  | import sqlite3 as sqlite | ||||||
|  | import subprocess | ||||||
|  | import sys | ||||||
|  | import unittest | ||||||
|  | 
 | ||||||
|  | from test.support import SHORT_TIMEOUT, requires_subprocess | ||||||
|  | from test.support.os_helper import TESTFN, unlink | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @requires_subprocess() | ||||||
|  | class CommandLineInterface(unittest.TestCase): | ||||||
|  | 
 | ||||||
|  |     def _do_test(self, *args, expect_success=True): | ||||||
|  |         with subprocess.Popen( | ||||||
|  |             [sys.executable, "-Xutf8", "-m", "sqlite3", *args], | ||||||
|  |             encoding="utf-8", | ||||||
|  |             bufsize=0, | ||||||
|  |             stdout=subprocess.PIPE, | ||||||
|  |             stderr=subprocess.PIPE, | ||||||
|  |         ) as proc: | ||||||
|  |             proc.wait() | ||||||
|  |             if expect_success == bool(proc.returncode): | ||||||
|  |                 self.fail("".join(proc.stderr)) | ||||||
|  |             stdout = proc.stdout.read() | ||||||
|  |             stderr = proc.stderr.read() | ||||||
|  |             if expect_success: | ||||||
|  |                 self.assertEqual(stderr, "") | ||||||
|  |             else: | ||||||
|  |                 self.assertEqual(stdout, "") | ||||||
|  |             return stdout, stderr | ||||||
|  | 
 | ||||||
|  |     def expect_success(self, *args): | ||||||
|  |         out, _ = self._do_test(*args) | ||||||
|  |         return out | ||||||
|  | 
 | ||||||
|  |     def expect_failure(self, *args): | ||||||
|  |         _, err = self._do_test(*args, expect_success=False) | ||||||
|  |         return err | ||||||
|  | 
 | ||||||
|  |     def test_cli_help(self): | ||||||
|  |         out = self.expect_success("-h") | ||||||
|  |         self.assertIn("usage: python -m sqlite3", out) | ||||||
|  | 
 | ||||||
|  |     def test_cli_version(self): | ||||||
|  |         out = self.expect_success("-v") | ||||||
|  |         self.assertIn(sqlite.sqlite_version, out) | ||||||
|  | 
 | ||||||
|  |     def test_cli_execute_sql(self): | ||||||
|  |         out = self.expect_success(":memory:", "select 1") | ||||||
|  |         self.assertIn("(1,)", out) | ||||||
|  | 
 | ||||||
|  |     def test_cli_execute_too_much_sql(self): | ||||||
|  |         stderr = self.expect_failure(":memory:", "select 1; select 2") | ||||||
|  |         err = "ProgrammingError: You can only execute one statement at a time" | ||||||
|  |         self.assertIn(err, stderr) | ||||||
|  | 
 | ||||||
|  |     def test_cli_execute_incomplete_sql(self): | ||||||
|  |         stderr = self.expect_failure(":memory:", "sel") | ||||||
|  |         self.assertIn("OperationalError (SQLITE_ERROR)", stderr) | ||||||
|  | 
 | ||||||
|  |     def test_cli_on_disk_db(self): | ||||||
|  |         self.addCleanup(unlink, TESTFN) | ||||||
|  |         out = self.expect_success(TESTFN, "create table t(t)") | ||||||
|  |         self.assertEqual(out, "") | ||||||
|  |         out = self.expect_success(TESTFN, "select count(t) from t") | ||||||
|  |         self.assertIn("(0,)", out) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @requires_subprocess() | ||||||
|  | class InteractiveSession(unittest.TestCase): | ||||||
|  |     TIMEOUT = SHORT_TIMEOUT / 10. | ||||||
|  |     MEMORY_DB_MSG = "Connected to a transient in-memory database" | ||||||
|  |     PS1 = "sqlite> " | ||||||
|  |     PS2 = "... " | ||||||
|  | 
 | ||||||
|  |     def start_cli(self, *args): | ||||||
|  |         return subprocess.Popen( | ||||||
|  |             [sys.executable, "-Xutf8", "-m", "sqlite3", *args], | ||||||
|  |             encoding="utf-8", | ||||||
|  |             bufsize=0, | ||||||
|  |             stdin=subprocess.PIPE, | ||||||
|  |             # Note: the banner is printed to stderr, the prompt to stdout. | ||||||
|  |             stdout=subprocess.PIPE, | ||||||
|  |             stderr=subprocess.PIPE, | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |     def expect_success(self, proc): | ||||||
|  |         proc.wait() | ||||||
|  |         if proc.returncode: | ||||||
|  |             self.fail("".join(proc.stderr)) | ||||||
|  | 
 | ||||||
|  |     def test_interact(self): | ||||||
|  |         with self.start_cli() as proc: | ||||||
|  |             out, err = proc.communicate(timeout=self.TIMEOUT) | ||||||
|  |             self.assertIn(self.MEMORY_DB_MSG, err) | ||||||
|  |             self.assertIn(self.PS1, out) | ||||||
|  |             self.expect_success(proc) | ||||||
|  | 
 | ||||||
|  |     def test_interact_quit(self): | ||||||
|  |         with self.start_cli() as proc: | ||||||
|  |             out, err = proc.communicate(input=".quit", timeout=self.TIMEOUT) | ||||||
|  |             self.assertIn(self.MEMORY_DB_MSG, err) | ||||||
|  |             self.assertIn(self.PS1, out) | ||||||
|  |             self.expect_success(proc) | ||||||
|  | 
 | ||||||
|  |     def test_interact_version(self): | ||||||
|  |         with self.start_cli() as proc: | ||||||
|  |             out, err = proc.communicate(input=".version", timeout=self.TIMEOUT) | ||||||
|  |             self.assertIn(self.MEMORY_DB_MSG, err) | ||||||
|  |             self.assertIn(sqlite.sqlite_version, out) | ||||||
|  |             self.expect_success(proc) | ||||||
|  | 
 | ||||||
|  |     def test_interact_valid_sql(self): | ||||||
|  |         with self.start_cli() as proc: | ||||||
|  |             out, err = proc.communicate(input="select 1;", | ||||||
|  |                                         timeout=self.TIMEOUT) | ||||||
|  |             self.assertIn(self.MEMORY_DB_MSG, err) | ||||||
|  |             self.assertIn("(1,)", out) | ||||||
|  |             self.expect_success(proc) | ||||||
|  | 
 | ||||||
|  |     def test_interact_valid_multiline_sql(self): | ||||||
|  |         with self.start_cli() as proc: | ||||||
|  |             out, err = proc.communicate(input="select 1\n;", | ||||||
|  |                                         timeout=self.TIMEOUT) | ||||||
|  |             self.assertIn(self.MEMORY_DB_MSG, err) | ||||||
|  |             self.assertIn(self.PS2, out) | ||||||
|  |             self.assertIn("(1,)", out) | ||||||
|  |             self.expect_success(proc) | ||||||
|  | 
 | ||||||
|  |     def test_interact_invalid_sql(self): | ||||||
|  |         with self.start_cli() as proc: | ||||||
|  |             out, err = proc.communicate(input="sel;", timeout=self.TIMEOUT) | ||||||
|  |             self.assertIn(self.MEMORY_DB_MSG, err) | ||||||
|  |             self.assertIn("OperationalError (SQLITE_ERROR)", err) | ||||||
|  |             self.expect_success(proc) | ||||||
|  | 
 | ||||||
|  |     def test_interact_on_disk_file(self): | ||||||
|  |         self.addCleanup(unlink, TESTFN) | ||||||
|  |         with self.start_cli(TESTFN) as proc: | ||||||
|  |             out, err = proc.communicate(input="create table t(t);", | ||||||
|  |                                         timeout=self.TIMEOUT) | ||||||
|  |             self.assertIn(TESTFN, err) | ||||||
|  |             self.assertIn(self.PS1, out) | ||||||
|  |             self.expect_success(proc) | ||||||
|  |         with self.start_cli(TESTFN, "select count(t) from t") as proc: | ||||||
|  |             out = proc.stdout.read() | ||||||
|  |             err = proc.stderr.read() | ||||||
|  |             self.assertIn("(0,)", out) | ||||||
|  |             self.expect_success(proc) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | if __name__ == "__main__": | ||||||
|  |     unittest.main() | ||||||
|  | @ -0,0 +1,2 @@ | ||||||
|  | Add :mod:`sqlite3` :ref:`command-line interface <sqlite3-cli>`. | ||||||
|  | Patch by Erlend Aasland. | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Erlend Egeberg Aasland
						Erlend Egeberg Aasland