mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 13:41:24 +00:00 
			
		
		
		
	 e6623e7083
			
		
	
	
		e6623e7083
		
			
		
	
	
	
	
		
			
			Co-authored-by: Ezio Melotti <ezio.melotti@gmail.com> Co-authored-by: CAM Gerlach <CAM.Gerlach@Gerlach.CAM>
		
			
				
	
	
		
			120 lines
		
	
	
	
		
			3.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			120 lines
		
	
	
	
		
			3.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """A simple SQLite CLI for the sqlite3 module.
 | |
| 
 | |
| Apart from using 'argparse' for the command-line interface,
 | |
| this module implements the REPL as a thin wrapper around
 | |
| the InteractiveConsole class from the 'code' stdlib module.
 | |
| """
 | |
| import sqlite3
 | |
| import sys
 | |
| 
 | |
| from argparse import ArgumentParser
 | |
| from code import InteractiveConsole
 | |
| from textwrap import dedent
 | |
| 
 | |
| 
 | |
| def execute(c, sql, suppress_errors=True):
 | |
|     """Helper that wraps execution of SQL code.
 | |
| 
 | |
|     This is used both by the REPL and by direct execution from the CLI.
 | |
| 
 | |
|     'c' may be a cursor or a connection.
 | |
|     'sql' is the SQL string to execute.
 | |
|     """
 | |
| 
 | |
|     try:
 | |
|         for row in c.execute(sql):
 | |
|             print(row)
 | |
|     except sqlite3.Error as e:
 | |
|         tp = type(e).__name__
 | |
|         try:
 | |
|             print(f"{tp} ({e.sqlite_errorname}): {e}", file=sys.stderr)
 | |
|         except AttributeError:
 | |
|             print(f"{tp}: {e}", file=sys.stderr)
 | |
|         if not suppress_errors:
 | |
|             sys.exit(1)
 | |
| 
 | |
| 
 | |
| class SqliteInteractiveConsole(InteractiveConsole):
 | |
|     """A simple SQLite REPL."""
 | |
| 
 | |
|     def __init__(self, connection):
 | |
|         super().__init__()
 | |
|         self._con = connection
 | |
|         self._cur = connection.cursor()
 | |
| 
 | |
|     def runsource(self, source, filename="<input>", symbol="single"):
 | |
|         """Override runsource, the core of the InteractiveConsole REPL.
 | |
| 
 | |
|         Return True if more input is needed; buffering is done automatically.
 | |
|         Return False is input is a complete statement ready for execution.
 | |
|         """
 | |
|         match source:
 | |
|             case ".version":
 | |
|                 print(f"{sqlite3.sqlite_version}")
 | |
|             case ".help":
 | |
|                 print("Enter SQL code and press enter.")
 | |
|             case ".quit":
 | |
|                 sys.exit(0)
 | |
|             case _:
 | |
|                 if not sqlite3.complete_statement(source):
 | |
|                     return True
 | |
|                 execute(self._cur, source)
 | |
|         return False
 | |
| 
 | |
| 
 | |
| def main():
 | |
|     parser = ArgumentParser(
 | |
|         description="Python sqlite3 CLI",
 | |
|         prog="python -m sqlite3",
 | |
|     )
 | |
|     parser.add_argument(
 | |
|         "filename", type=str, default=":memory:", nargs="?",
 | |
|         help=(
 | |
|             "SQLite database to open (defaults to ':memory:'). "
 | |
|             "A new database is created if the file does not previously exist."
 | |
|         ),
 | |
|     )
 | |
|     parser.add_argument(
 | |
|         "sql", type=str, nargs="?",
 | |
|         help=(
 | |
|             "An SQL query to execute. "
 | |
|             "Any returned rows are printed to stdout."
 | |
|         ),
 | |
|     )
 | |
|     parser.add_argument(
 | |
|         "-v", "--version", action="version",
 | |
|         version=f"SQLite version {sqlite3.sqlite_version}",
 | |
|         help="Print underlying SQLite library version",
 | |
|     )
 | |
|     args = parser.parse_args()
 | |
| 
 | |
|     if args.filename == ":memory:":
 | |
|         db_name = "a transient in-memory database"
 | |
|     else:
 | |
|         db_name = repr(args.filename)
 | |
| 
 | |
|     # Prepare REPL banner and prompts.
 | |
|     banner = dedent(f"""
 | |
|         sqlite3 shell, running on SQLite version {sqlite3.sqlite_version}
 | |
|         Connected to {db_name}
 | |
| 
 | |
|         Each command will be run using execute() on the cursor.
 | |
|         Type ".help" for more information; type ".quit" or CTRL-D to quit.
 | |
|     """).strip()
 | |
|     sys.ps1 = "sqlite> "
 | |
|     sys.ps2 = "    ... "
 | |
| 
 | |
|     con = sqlite3.connect(args.filename, isolation_level=None)
 | |
|     try:
 | |
|         if args.sql:
 | |
|             # SQL statement provided on the command-line; execute it directly.
 | |
|             execute(con, args.sql, suppress_errors=False)
 | |
|         else:
 | |
|             # No SQL provided; start the REPL.
 | |
|             console = SqliteInteractiveConsole(con)
 | |
|             console.interact(banner, exitmsg="")
 | |
|     finally:
 | |
|         con.close()
 | |
| 
 | |
| 
 | |
| main()
 |