| 
									
										
										
										
											2022-08-12 01:05:12 +02:00
										 |  |  | """A simple SQLite CLI for the sqlite3 module.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | Apart from using 'argparse' for the command-line interface, | 
					
						
							|  |  |  | this module implements the REPL as a thin wrapper around | 
					
						
							|  |  |  | the InteractiveConsole class from the 'code' stdlib module. | 
					
						
							|  |  |  | """
 | 
					
						
							| 
									
										
										
										
											2022-08-01 12:25:16 +02:00
										 |  |  | import sqlite3 | 
					
						
							|  |  |  | import sys | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from argparse import ArgumentParser | 
					
						
							|  |  |  | from code import InteractiveConsole | 
					
						
							|  |  |  | from textwrap import dedent | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def execute(c, sql, suppress_errors=True): | 
					
						
							| 
									
										
										
										
											2022-08-12 01:05:12 +02:00
										 |  |  |     """Helper that wraps execution of SQL code.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     This is used both by the REPL and by direct execution from the CLI. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     'c' may be a cursor or a connection. | 
					
						
							|  |  |  |     'sql' is the SQL string to execute. | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-08-01 12:25:16 +02:00
										 |  |  |     try: | 
					
						
							|  |  |  |         for row in c.execute(sql): | 
					
						
							|  |  |  |             print(row) | 
					
						
							|  |  |  |     except sqlite3.Error as e: | 
					
						
							|  |  |  |         tp = type(e).__name__ | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             print(f"{tp} ({e.sqlite_errorname}): {e}", file=sys.stderr) | 
					
						
							|  |  |  |         except AttributeError: | 
					
						
							|  |  |  |             print(f"{tp}: {e}", file=sys.stderr) | 
					
						
							|  |  |  |         if not suppress_errors: | 
					
						
							|  |  |  |             sys.exit(1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class SqliteInteractiveConsole(InteractiveConsole): | 
					
						
							| 
									
										
										
										
											2022-08-12 01:05:12 +02:00
										 |  |  |     """A simple SQLite REPL.""" | 
					
						
							| 
									
										
										
										
											2022-08-01 12:25:16 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, connection): | 
					
						
							|  |  |  |         super().__init__() | 
					
						
							|  |  |  |         self._con = connection | 
					
						
							|  |  |  |         self._cur = connection.cursor() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def runsource(self, source, filename="<input>", symbol="single"): | 
					
						
							| 
									
										
										
										
											2022-08-12 01:05:12 +02:00
										 |  |  |         """Override runsource, the core of the InteractiveConsole REPL.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Return True if more input is needed; buffering is done automatically. | 
					
						
							|  |  |  |         Return False is input is a complete statement ready for execution. | 
					
						
							|  |  |  |         """
 | 
					
						
							| 
									
										
										
										
											2022-08-01 12:25:16 +02:00
										 |  |  |         match source: | 
					
						
							|  |  |  |             case ".version": | 
					
						
							|  |  |  |                 print(f"{sqlite3.sqlite_version}") | 
					
						
							|  |  |  |             case ".help": | 
					
						
							|  |  |  |                 print("Enter SQL code and press enter.") | 
					
						
							|  |  |  |             case ".quit": | 
					
						
							|  |  |  |                 sys.exit(0) | 
					
						
							|  |  |  |             case _: | 
					
						
							|  |  |  |                 if not sqlite3.complete_statement(source): | 
					
						
							|  |  |  |                     return True | 
					
						
							|  |  |  |                 execute(self._cur, source) | 
					
						
							|  |  |  |         return False | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def main(): | 
					
						
							|  |  |  |     parser = ArgumentParser( | 
					
						
							|  |  |  |         description="Python sqlite3 CLI", | 
					
						
							|  |  |  |         prog="python -m sqlite3", | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     parser.add_argument( | 
					
						
							|  |  |  |         "filename", type=str, default=":memory:", nargs="?", | 
					
						
							|  |  |  |         help=( | 
					
						
							|  |  |  |             "SQLite database to open (defaults to ':memory:'). " | 
					
						
							|  |  |  |             "A new database is created if the file does not previously exist." | 
					
						
							|  |  |  |         ), | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     parser.add_argument( | 
					
						
							|  |  |  |         "sql", type=str, nargs="?", | 
					
						
							|  |  |  |         help=( | 
					
						
							|  |  |  |             "An SQL query to execute. " | 
					
						
							|  |  |  |             "Any returned rows are printed to stdout." | 
					
						
							|  |  |  |         ), | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     parser.add_argument( | 
					
						
							|  |  |  |         "-v", "--version", action="version", | 
					
						
							|  |  |  |         version=f"SQLite version {sqlite3.sqlite_version}", | 
					
						
							|  |  |  |         help="Print underlying SQLite library version", | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     args = parser.parse_args() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if args.filename == ":memory:": | 
					
						
							|  |  |  |         db_name = "a transient in-memory database" | 
					
						
							|  |  |  |     else: | 
					
						
							|  |  |  |         db_name = repr(args.filename) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-08-12 01:05:12 +02:00
										 |  |  |     # Prepare REPL banner and prompts. | 
					
						
							| 
									
										
										
										
											2023-04-27 23:22:26 +02:00
										 |  |  |     if sys.platform == "win32" and "idlelib.run" not in sys.modules: | 
					
						
							|  |  |  |         eofkey = "CTRL-Z" | 
					
						
							|  |  |  |     else: | 
					
						
							|  |  |  |         eofkey = "CTRL-D" | 
					
						
							| 
									
										
										
										
											2022-08-01 12:25:16 +02:00
										 |  |  |     banner = dedent(f"""
 | 
					
						
							|  |  |  |         sqlite3 shell, running on SQLite version {sqlite3.sqlite_version} | 
					
						
							|  |  |  |         Connected to {db_name} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Each command will be run using execute() on the cursor. | 
					
						
							| 
									
										
										
										
											2023-04-27 21:23:10 +02:00
										 |  |  |         Type ".help" for more information; type ".quit" or {eofkey} to quit. | 
					
						
							| 
									
										
										
										
											2022-08-01 12:25:16 +02:00
										 |  |  |     """).strip()
 | 
					
						
							|  |  |  |     sys.ps1 = "sqlite> " | 
					
						
							|  |  |  |     sys.ps2 = "    ... " | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     con = sqlite3.connect(args.filename, isolation_level=None) | 
					
						
							|  |  |  |     try: | 
					
						
							|  |  |  |         if args.sql: | 
					
						
							| 
									
										
										
										
											2022-08-12 01:05:12 +02:00
										 |  |  |             # SQL statement provided on the command-line; execute it directly. | 
					
						
							| 
									
										
										
										
											2022-08-01 12:25:16 +02:00
										 |  |  |             execute(con, args.sql, suppress_errors=False) | 
					
						
							|  |  |  |         else: | 
					
						
							| 
									
										
										
										
											2022-08-12 01:05:12 +02:00
										 |  |  |             # No SQL provided; start the REPL. | 
					
						
							| 
									
										
										
										
											2022-08-01 12:25:16 +02:00
										 |  |  |             console = SqliteInteractiveConsole(con) | 
					
						
							|  |  |  |             console.interact(banner, exitmsg="") | 
					
						
							|  |  |  |     finally: | 
					
						
							|  |  |  |         con.close() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | main() |