mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 13:41:24 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			431 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			431 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from collections import namedtuple
 | |
| import csv
 | |
| import re
 | |
| import textwrap
 | |
| 
 | |
| from . import NOT_SET, strutil, fsutil
 | |
| 
 | |
| 
 | |
| EMPTY = '-'
 | |
| UNKNOWN = '???'
 | |
| 
 | |
| 
 | |
| def parse_markers(markers, default=None):
 | |
|     if markers is NOT_SET:
 | |
|         return default
 | |
|     if not markers:
 | |
|         return None
 | |
|     if type(markers) is not str:
 | |
|         return markers
 | |
|     if markers == markers[0] * len(markers):
 | |
|         return [markers]
 | |
|     return list(markers)
 | |
| 
 | |
| 
 | |
| def fix_row(row, **markers):
 | |
|     if isinstance(row, str):
 | |
|         raise NotImplementedError(row)
 | |
|     empty = parse_markers(markers.pop('empty', ('-',)))
 | |
|     unknown = parse_markers(markers.pop('unknown', ('???',)))
 | |
|     row = (val if val else None for val in row)
 | |
|     if not empty:
 | |
|         if unknown:
 | |
|             row = (UNKNOWN if val in unknown else val for val in row)
 | |
|     elif not unknown:
 | |
|         row = (EMPTY if val in empty else val for val in row)
 | |
|     else:
 | |
|         row = (EMPTY if val in empty else (UNKNOWN if val in unknown else val)
 | |
|                for val in row)
 | |
|     return tuple(row)
 | |
| 
 | |
| 
 | |
| def _fix_read_default(row):
 | |
|     for value in row:
 | |
|         yield value.strip()
 | |
| 
 | |
| 
 | |
| def _fix_write_default(row, empty=''):
 | |
|     for value in row:
 | |
|         yield empty if value is None else str(value)
 | |
| 
 | |
| 
 | |
| def _normalize_fix_read(fix):
 | |
|     if fix is None:
 | |
|         fix = ''
 | |
|     if callable(fix):
 | |
|         def fix_row(row):
 | |
|             values = fix(row)
 | |
|             return _fix_read_default(values)
 | |
|     elif isinstance(fix, str):
 | |
|         def fix_row(row):
 | |
|             values = _fix_read_default(row)
 | |
|             return (None if v == fix else v
 | |
|                     for v in values)
 | |
|     else:
 | |
|         raise NotImplementedError(fix)
 | |
|     return fix_row
 | |
| 
 | |
| 
 | |
| def _normalize_fix_write(fix, empty=''):
 | |
|     if fix is None:
 | |
|         fix = empty
 | |
|     if callable(fix):
 | |
|         def fix_row(row):
 | |
|             values = fix(row)
 | |
|             return _fix_write_default(values, empty)
 | |
|     elif isinstance(fix, str):
 | |
|         def fix_row(row):
 | |
|             return _fix_write_default(row, fix)
 | |
|     else:
 | |
|         raise NotImplementedError(fix)
 | |
|     return fix_row
 | |
| 
 | |
| 
 | |
| def read_table(infile, header, *,
 | |
|                sep='\t',
 | |
|                fix=None,
 | |
|                _open=open,
 | |
|                _get_reader=csv.reader,
 | |
|                ):
 | |
|     """Yield each row of the given ???-separated (e.g. tab) file."""
 | |
|     if isinstance(infile, str):
 | |
|         with _open(infile, newline='') as infile:
 | |
|             yield from read_table(
 | |
|                 infile,
 | |
|                 header,
 | |
|                 sep=sep,
 | |
|                 fix=fix,
 | |
|                 _open=_open,
 | |
|                 _get_reader=_get_reader,
 | |
|             )
 | |
|             return
 | |
|     lines = strutil._iter_significant_lines(infile)
 | |
| 
 | |
|     # Validate the header.
 | |
|     if not isinstance(header, str):
 | |
|         header = sep.join(header)
 | |
|     try:
 | |
|         actualheader = next(lines).strip()
 | |
|     except StopIteration:
 | |
|         actualheader = ''
 | |
|     if actualheader != header:
 | |
|         raise ValueError(f'bad header {actualheader!r}')
 | |
| 
 | |
|     fix_row = _normalize_fix_read(fix)
 | |
|     for row in _get_reader(lines, delimiter=sep or '\t'):
 | |
|         yield tuple(fix_row(row))
 | |
| 
 | |
| 
 | |
| def write_table(outfile, header, rows, *,
 | |
|                 sep='\t',
 | |
|                 fix=None,
 | |
|                 backup=True,
 | |
|                 _open=open,
 | |
|                 _get_writer=csv.writer,
 | |
|                 ):
 | |
|     """Write each of the rows to the given ???-separated (e.g. tab) file."""
 | |
|     if backup:
 | |
|         fsutil.create_backup(outfile, backup)
 | |
|     if isinstance(outfile, str):
 | |
|         with _open(outfile, 'w', newline='') as outfile:
 | |
|             return write_table(
 | |
|                 outfile,
 | |
|                 header,
 | |
|                 rows,
 | |
|                 sep=sep,
 | |
|                 fix=fix,
 | |
|                 backup=backup,
 | |
|                 _open=_open,
 | |
|                 _get_writer=_get_writer,
 | |
|             )
 | |
| 
 | |
|     if isinstance(header, str):
 | |
|         header = header.split(sep or '\t')
 | |
|     fix_row = _normalize_fix_write(fix)
 | |
|     writer = _get_writer(outfile, delimiter=sep or '\t')
 | |
|     writer.writerow(header)
 | |
|     for row in rows:
 | |
|         writer.writerow(
 | |
|             tuple(fix_row(row))
 | |
|         )
 | |
| 
 | |
| 
 | |
| def parse_table(entries, sep, header=None, rawsep=None, *,
 | |
|                 default=NOT_SET,
 | |
|                 strict=True,
 | |
|                 ):
 | |
|     header, sep = _normalize_table_file_props(header, sep)
 | |
|     if not sep:
 | |
|         raise ValueError('missing "sep"')
 | |
| 
 | |
|     ncols = None
 | |
|     if header:
 | |
|         if strict:
 | |
|             ncols = len(header.split(sep))
 | |
|         cur_file = None
 | |
|     for line, filename in strutil.parse_entries(entries, ignoresep=sep):
 | |
|         _sep = sep
 | |
|         if filename:
 | |
|             if header and cur_file != filename:
 | |
|                 cur_file = filename
 | |
|                 # Skip the first line if it's the header.
 | |
|                 if line.strip() == header:
 | |
|                     continue
 | |
|                 else:
 | |
|                     # We expected the header.
 | |
|                     raise NotImplementedError((header, line))
 | |
|         elif rawsep and sep not in line:
 | |
|             _sep = rawsep
 | |
| 
 | |
|         row = _parse_row(line, _sep, ncols, default)
 | |
|         if strict and not ncols:
 | |
|             ncols = len(row)
 | |
|         yield row, filename
 | |
| 
 | |
| 
 | |
| def parse_row(line, sep, *, ncols=None, default=NOT_SET):
 | |
|     if not sep:
 | |
|         raise ValueError('missing "sep"')
 | |
|     return _parse_row(line, sep, ncols, default)
 | |
| 
 | |
| 
 | |
| def _parse_row(line, sep, ncols, default):
 | |
|     row = tuple(v.strip() for v in line.split(sep))
 | |
|     if (ncols or 0) > 0:
 | |
|         diff = ncols - len(row)
 | |
|         if diff:
 | |
|             if default is NOT_SET or diff < 0:
 | |
|                 raise Exception(f'bad row (expected {ncols} columns, got {row!r})')
 | |
|             row += (default,) * diff
 | |
|     return row
 | |
| 
 | |
| 
 | |
| def _normalize_table_file_props(header, sep):
 | |
|     if not header:
 | |
|         return None, sep
 | |
| 
 | |
|     if not isinstance(header, str):
 | |
|         if not sep:
 | |
|             raise NotImplementedError(header)
 | |
|         header = sep.join(header)
 | |
|     elif not sep:
 | |
|         for sep in ('\t', ',', ' '):
 | |
|             if sep in header:
 | |
|                 break
 | |
|         else:
 | |
|             sep = None
 | |
|     return header, sep
 | |
| 
 | |
| 
 | |
| ##################################
 | |
| # stdout tables
 | |
| 
 | |
| WIDTH = 20
 | |
| 
 | |
| 
 | |
| def resolve_columns(specs):
 | |
|     if isinstance(specs, str):
 | |
|         specs = specs.replace(',', ' ').strip().split()
 | |
|     resolved = []
 | |
|     for raw in specs:
 | |
|         column = ColumnSpec.from_raw(raw)
 | |
|         resolved.append(column)
 | |
|     return resolved
 | |
| 
 | |
| 
 | |
| def build_table(specs, *, sep=' ', defaultwidth=None):
 | |
|     columns = resolve_columns(specs)
 | |
|     return _build_table(columns, sep=sep, defaultwidth=defaultwidth)
 | |
| 
 | |
| 
 | |
| class ColumnSpec(namedtuple('ColumnSpec', 'field label fmt')):
 | |
| 
 | |
|     REGEX = re.compile(textwrap.dedent(r'''
 | |
|         ^
 | |
|         (?:
 | |
|             \[
 | |
|             (
 | |
|                 (?: [^\s\]] [^\]]* )?
 | |
|                 [^\s\]]
 | |
|             )  # <label>
 | |
|             ]
 | |
|         )?
 | |
|         ( [-\w]+ )  # <field>
 | |
|         (?:
 | |
|             (?:
 | |
|                 :
 | |
|                 ( [<^>] )  # <align>
 | |
|                 ( \d+ )?  # <width1>
 | |
|             )
 | |
|             |
 | |
|             (?:
 | |
|                 (?:
 | |
|                     :
 | |
|                     ( \d+ )  # <width2>
 | |
|                 )?
 | |
|                 (?:
 | |
|                     :
 | |
|                     ( .*? )  # <fmt>
 | |
|                 )?
 | |
|             )
 | |
|         )?
 | |
|         $
 | |
|     '''), re.VERBOSE)
 | |
| 
 | |
|     @classmethod
 | |
|     def from_raw(cls, raw):
 | |
|         if not raw:
 | |
|             raise ValueError('missing column spec')
 | |
|         elif isinstance(raw, cls):
 | |
|             return raw
 | |
| 
 | |
|         if isinstance(raw, str):
 | |
|             *values, _ = cls._parse(raw)
 | |
|         else:
 | |
|             *values, _ = cls._normalize(raw)
 | |
|         if values is None:
 | |
|             raise ValueError(f'unsupported column spec {raw!r}')
 | |
|         return cls(*values)
 | |
| 
 | |
|     @classmethod
 | |
|     def parse(cls, specstr):
 | |
|         parsed = cls._parse(specstr)
 | |
|         if not parsed:
 | |
|             return None
 | |
|         *values, _ = parsed
 | |
|         return cls(*values)
 | |
| 
 | |
|     @classmethod
 | |
|     def _parse(cls, specstr):
 | |
|         m = cls.REGEX.match(specstr)
 | |
|         if not m:
 | |
|             return None
 | |
|         (label, field,
 | |
|          align, width1,
 | |
|          width2, fmt,
 | |
|          ) = m.groups()
 | |
|         if not label:
 | |
|             label = field
 | |
|         if fmt:
 | |
|             assert not align and not width1, (specstr,)
 | |
|             _parsed = _parse_fmt(fmt)
 | |
|             if not _parsed:
 | |
|                 raise NotImplementedError
 | |
|             elif width2:
 | |
|                 width, _ = _parsed
 | |
|                 if width != int(width2):
 | |
|                     raise NotImplementedError(specstr)
 | |
|         elif width2:
 | |
|             fmt = width2
 | |
|             width = int(width2)
 | |
|         else:
 | |
|             assert not fmt, (fmt, specstr)
 | |
|             if align:
 | |
|                 width = int(width1) if width1 else len(label)
 | |
|                 fmt = f'{align}{width}'
 | |
|             else:
 | |
|                 width = None
 | |
|         return field, label, fmt, width
 | |
| 
 | |
|     @classmethod
 | |
|     def _normalize(cls, spec):
 | |
|         if len(spec) == 1:
 | |
|             raw, = spec
 | |
|             raise NotImplementedError
 | |
|             return _resolve_column(raw)
 | |
| 
 | |
|         if len(spec) == 4:
 | |
|             label, field, width, fmt = spec
 | |
|             if width:
 | |
|                 if not fmt:
 | |
|                     fmt = str(width)
 | |
|                 elif _parse_fmt(fmt)[0] != width:
 | |
|                     raise ValueError(f'width mismatch in {spec}')
 | |
|         elif len(raw) == 3:
 | |
|             label, field, fmt = spec
 | |
|             if not field:
 | |
|                 label, field = None, label
 | |
|             elif not isinstance(field, str) or not field.isidentifier():
 | |
|                 # XXX This doesn't seem right...
 | |
|                 fmt = f'{field}:{fmt}' if fmt else field
 | |
|                 label, field = None, label
 | |
|         elif len(raw) == 2:
 | |
|             label = None
 | |
|             field, fmt = raw
 | |
|             if not field:
 | |
|                 field, fmt = fmt, None
 | |
|             elif not field.isidentifier() or fmt.isidentifier():
 | |
|                 label, field = field, fmt
 | |
|         else:
 | |
|             raise NotImplementedError
 | |
| 
 | |
|         fmt = f':{fmt}' if fmt else ''
 | |
|         if label:
 | |
|             return cls._parse(f'[{label}]{field}{fmt}')
 | |
|         else:
 | |
|             return cls._parse(f'{field}{fmt}')
 | |
| 
 | |
|     @property
 | |
|     def width(self):
 | |
|         if not self.fmt:
 | |
|             return None
 | |
|         parsed = _parse_fmt(self.fmt)
 | |
|         if not parsed:
 | |
|             return None
 | |
|         width, _ = parsed
 | |
|         return width
 | |
| 
 | |
|     def resolve_width(self, default=None):
 | |
|         return _resolve_width(self.width, self.fmt, self.label, default)
 | |
| 
 | |
| 
 | |
| def _parse_fmt(fmt):
 | |
|     if fmt.startswith(tuple('<^>')):
 | |
|         align = fmt[0]
 | |
|         width = fmt[1:]
 | |
|         if width.isdigit():
 | |
|             return int(width), align
 | |
|     elif fmt.isdigit():
 | |
|         return int(fmt), '<'
 | |
|     return None
 | |
| 
 | |
| 
 | |
| def _resolve_width(width, fmt, label, default):
 | |
|     if width:
 | |
|         if not isinstance(width, int):
 | |
|             raise NotImplementedError
 | |
|         return width
 | |
|     elif fmt:
 | |
|         parsed = _parse_fmt(fmt)
 | |
|         if parsed:
 | |
|             width, _ = parsed
 | |
|             if width:
 | |
|                 return width
 | |
| 
 | |
|     if not default:
 | |
|         return WIDTH
 | |
|     elif hasattr(default, 'get'):
 | |
|         defaults = default
 | |
|         default = defaults.get(None) or WIDTH
 | |
|         return defaults.get(label) or default
 | |
|     else:
 | |
|         return default or WIDTH
 | |
| 
 | |
| 
 | |
| def _build_table(columns, *, sep=' ', defaultwidth=None):
 | |
|     header = []
 | |
|     div = []
 | |
|     rowfmt = []
 | |
|     for spec in columns:
 | |
|         width = spec.resolve_width(defaultwidth)
 | |
|         colfmt = spec.fmt
 | |
|         colfmt = f':{spec.fmt}' if spec.fmt else f':{width}'
 | |
| 
 | |
|         header.append(f' {{:^{width}}} '.format(spec.label))
 | |
|         div.append('-' * (width + 2))
 | |
|         rowfmt.append(f' {{{spec.field}{colfmt}}} ')
 | |
|     return (
 | |
|         sep.join(header),
 | |
|         sep.join(div),
 | |
|         sep.join(rowfmt),
 | |
|     )
 | 
