mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 07:31:38 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			431 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			431 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from collections import namedtuple
 | 
						|
import csv
 | 
						|
import re
 | 
						|
import textwrap
 | 
						|
 | 
						|
from . import NOT_SET, strutil, fsutil
 | 
						|
 | 
						|
 | 
						|
EMPTY = '-'
 | 
						|
UNKNOWN = '???'
 | 
						|
 | 
						|
 | 
						|
def parse_markers(markers, default=None):
 | 
						|
    if markers is NOT_SET:
 | 
						|
        return default
 | 
						|
    if not markers:
 | 
						|
        return None
 | 
						|
    if type(markers) is not str:
 | 
						|
        return markers
 | 
						|
    if markers == markers[0] * len(markers):
 | 
						|
        return [markers]
 | 
						|
    return list(markers)
 | 
						|
 | 
						|
 | 
						|
def fix_row(row, **markers):
 | 
						|
    if isinstance(row, str):
 | 
						|
        raise NotImplementedError(row)
 | 
						|
    empty = parse_markers(markers.pop('empty', ('-',)))
 | 
						|
    unknown = parse_markers(markers.pop('unknown', ('???',)))
 | 
						|
    row = (val if val else None for val in row)
 | 
						|
    if not empty:
 | 
						|
        if unknown:
 | 
						|
            row = (UNKNOWN if val in unknown else val for val in row)
 | 
						|
    elif not unknown:
 | 
						|
        row = (EMPTY if val in empty else val for val in row)
 | 
						|
    else:
 | 
						|
        row = (EMPTY if val in empty else (UNKNOWN if val in unknown else val)
 | 
						|
               for val in row)
 | 
						|
    return tuple(row)
 | 
						|
 | 
						|
 | 
						|
def _fix_read_default(row):
 | 
						|
    for value in row:
 | 
						|
        yield value.strip()
 | 
						|
 | 
						|
 | 
						|
def _fix_write_default(row, empty=''):
 | 
						|
    for value in row:
 | 
						|
        yield empty if value is None else str(value)
 | 
						|
 | 
						|
 | 
						|
def _normalize_fix_read(fix):
 | 
						|
    if fix is None:
 | 
						|
        fix = ''
 | 
						|
    if callable(fix):
 | 
						|
        def fix_row(row):
 | 
						|
            values = fix(row)
 | 
						|
            return _fix_read_default(values)
 | 
						|
    elif isinstance(fix, str):
 | 
						|
        def fix_row(row):
 | 
						|
            values = _fix_read_default(row)
 | 
						|
            return (None if v == fix else v
 | 
						|
                    for v in values)
 | 
						|
    else:
 | 
						|
        raise NotImplementedError(fix)
 | 
						|
    return fix_row
 | 
						|
 | 
						|
 | 
						|
def _normalize_fix_write(fix, empty=''):
 | 
						|
    if fix is None:
 | 
						|
        fix = empty
 | 
						|
    if callable(fix):
 | 
						|
        def fix_row(row):
 | 
						|
            values = fix(row)
 | 
						|
            return _fix_write_default(values, empty)
 | 
						|
    elif isinstance(fix, str):
 | 
						|
        def fix_row(row):
 | 
						|
            return _fix_write_default(row, fix)
 | 
						|
    else:
 | 
						|
        raise NotImplementedError(fix)
 | 
						|
    return fix_row
 | 
						|
 | 
						|
 | 
						|
def read_table(infile, header, *,
 | 
						|
               sep='\t',
 | 
						|
               fix=None,
 | 
						|
               _open=open,
 | 
						|
               _get_reader=csv.reader,
 | 
						|
               ):
 | 
						|
    """Yield each row of the given ???-separated (e.g. tab) file."""
 | 
						|
    if isinstance(infile, str):
 | 
						|
        with _open(infile, newline='') as infile:
 | 
						|
            yield from read_table(
 | 
						|
                infile,
 | 
						|
                header,
 | 
						|
                sep=sep,
 | 
						|
                fix=fix,
 | 
						|
                _open=_open,
 | 
						|
                _get_reader=_get_reader,
 | 
						|
            )
 | 
						|
            return
 | 
						|
    lines = strutil._iter_significant_lines(infile)
 | 
						|
 | 
						|
    # Validate the header.
 | 
						|
    if not isinstance(header, str):
 | 
						|
        header = sep.join(header)
 | 
						|
    try:
 | 
						|
        actualheader = next(lines).strip()
 | 
						|
    except StopIteration:
 | 
						|
        actualheader = ''
 | 
						|
    if actualheader != header:
 | 
						|
        raise ValueError(f'bad header {actualheader!r}')
 | 
						|
 | 
						|
    fix_row = _normalize_fix_read(fix)
 | 
						|
    for row in _get_reader(lines, delimiter=sep or '\t'):
 | 
						|
        yield tuple(fix_row(row))
 | 
						|
 | 
						|
 | 
						|
def write_table(outfile, header, rows, *,
 | 
						|
                sep='\t',
 | 
						|
                fix=None,
 | 
						|
                backup=True,
 | 
						|
                _open=open,
 | 
						|
                _get_writer=csv.writer,
 | 
						|
                ):
 | 
						|
    """Write each of the rows to the given ???-separated (e.g. tab) file."""
 | 
						|
    if backup:
 | 
						|
        fsutil.create_backup(outfile, backup)
 | 
						|
    if isinstance(outfile, str):
 | 
						|
        with _open(outfile, 'w', newline='') as outfile:
 | 
						|
            return write_table(
 | 
						|
                outfile,
 | 
						|
                header,
 | 
						|
                rows,
 | 
						|
                sep=sep,
 | 
						|
                fix=fix,
 | 
						|
                backup=backup,
 | 
						|
                _open=_open,
 | 
						|
                _get_writer=_get_writer,
 | 
						|
            )
 | 
						|
 | 
						|
    if isinstance(header, str):
 | 
						|
        header = header.split(sep or '\t')
 | 
						|
    fix_row = _normalize_fix_write(fix)
 | 
						|
    writer = _get_writer(outfile, delimiter=sep or '\t')
 | 
						|
    writer.writerow(header)
 | 
						|
    for row in rows:
 | 
						|
        writer.writerow(
 | 
						|
            tuple(fix_row(row))
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
def parse_table(entries, sep, header=None, rawsep=None, *,
 | 
						|
                default=NOT_SET,
 | 
						|
                strict=True,
 | 
						|
                ):
 | 
						|
    header, sep = _normalize_table_file_props(header, sep)
 | 
						|
    if not sep:
 | 
						|
        raise ValueError('missing "sep"')
 | 
						|
 | 
						|
    ncols = None
 | 
						|
    if header:
 | 
						|
        if strict:
 | 
						|
            ncols = len(header.split(sep))
 | 
						|
        cur_file = None
 | 
						|
    for line, filename in strutil.parse_entries(entries, ignoresep=sep):
 | 
						|
        _sep = sep
 | 
						|
        if filename:
 | 
						|
            if header and cur_file != filename:
 | 
						|
                cur_file = filename
 | 
						|
                # Skip the first line if it's the header.
 | 
						|
                if line.strip() == header:
 | 
						|
                    continue
 | 
						|
                else:
 | 
						|
                    # We expected the header.
 | 
						|
                    raise NotImplementedError((header, line))
 | 
						|
        elif rawsep and sep not in line:
 | 
						|
            _sep = rawsep
 | 
						|
 | 
						|
        row = _parse_row(line, _sep, ncols, default)
 | 
						|
        if strict and not ncols:
 | 
						|
            ncols = len(row)
 | 
						|
        yield row, filename
 | 
						|
 | 
						|
 | 
						|
def parse_row(line, sep, *, ncols=None, default=NOT_SET):
 | 
						|
    if not sep:
 | 
						|
        raise ValueError('missing "sep"')
 | 
						|
    return _parse_row(line, sep, ncols, default)
 | 
						|
 | 
						|
 | 
						|
def _parse_row(line, sep, ncols, default):
 | 
						|
    row = tuple(v.strip() for v in line.split(sep))
 | 
						|
    if (ncols or 0) > 0:
 | 
						|
        diff = ncols - len(row)
 | 
						|
        if diff:
 | 
						|
            if default is NOT_SET or diff < 0:
 | 
						|
                raise Exception(f'bad row (expected {ncols} columns, got {row!r})')
 | 
						|
            row += (default,) * diff
 | 
						|
    return row
 | 
						|
 | 
						|
 | 
						|
def _normalize_table_file_props(header, sep):
 | 
						|
    if not header:
 | 
						|
        return None, sep
 | 
						|
 | 
						|
    if not isinstance(header, str):
 | 
						|
        if not sep:
 | 
						|
            raise NotImplementedError(header)
 | 
						|
        header = sep.join(header)
 | 
						|
    elif not sep:
 | 
						|
        for sep in ('\t', ',', ' '):
 | 
						|
            if sep in header:
 | 
						|
                break
 | 
						|
        else:
 | 
						|
            sep = None
 | 
						|
    return header, sep
 | 
						|
 | 
						|
 | 
						|
##################################
 | 
						|
# stdout tables
 | 
						|
 | 
						|
WIDTH = 20
 | 
						|
 | 
						|
 | 
						|
def resolve_columns(specs):
 | 
						|
    if isinstance(specs, str):
 | 
						|
        specs = specs.replace(',', ' ').strip().split()
 | 
						|
    resolved = []
 | 
						|
    for raw in specs:
 | 
						|
        column = ColumnSpec.from_raw(raw)
 | 
						|
        resolved.append(column)
 | 
						|
    return resolved
 | 
						|
 | 
						|
 | 
						|
def build_table(specs, *, sep=' ', defaultwidth=None):
 | 
						|
    columns = resolve_columns(specs)
 | 
						|
    return _build_table(columns, sep=sep, defaultwidth=defaultwidth)
 | 
						|
 | 
						|
 | 
						|
class ColumnSpec(namedtuple('ColumnSpec', 'field label fmt')):
 | 
						|
 | 
						|
    REGEX = re.compile(textwrap.dedent(r'''
 | 
						|
        ^
 | 
						|
        (?:
 | 
						|
            \[
 | 
						|
            (
 | 
						|
                (?: [^\s\]] [^\]]* )?
 | 
						|
                [^\s\]]
 | 
						|
            )  # <label>
 | 
						|
            ]
 | 
						|
        )?
 | 
						|
        ( [-\w]+ )  # <field>
 | 
						|
        (?:
 | 
						|
            (?:
 | 
						|
                :
 | 
						|
                ( [<^>] )  # <align>
 | 
						|
                ( \d+ )?  # <width1>
 | 
						|
            )
 | 
						|
            |
 | 
						|
            (?:
 | 
						|
                (?:
 | 
						|
                    :
 | 
						|
                    ( \d+ )  # <width2>
 | 
						|
                )?
 | 
						|
                (?:
 | 
						|
                    :
 | 
						|
                    ( .*? )  # <fmt>
 | 
						|
                )?
 | 
						|
            )
 | 
						|
        )?
 | 
						|
        $
 | 
						|
    '''), re.VERBOSE)
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def from_raw(cls, raw):
 | 
						|
        if not raw:
 | 
						|
            raise ValueError('missing column spec')
 | 
						|
        elif isinstance(raw, cls):
 | 
						|
            return raw
 | 
						|
 | 
						|
        if isinstance(raw, str):
 | 
						|
            *values, _ = cls._parse(raw)
 | 
						|
        else:
 | 
						|
            *values, _ = cls._normalize(raw)
 | 
						|
        if values is None:
 | 
						|
            raise ValueError(f'unsupported column spec {raw!r}')
 | 
						|
        return cls(*values)
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def parse(cls, specstr):
 | 
						|
        parsed = cls._parse(specstr)
 | 
						|
        if not parsed:
 | 
						|
            return None
 | 
						|
        *values, _ = parsed
 | 
						|
        return cls(*values)
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def _parse(cls, specstr):
 | 
						|
        m = cls.REGEX.match(specstr)
 | 
						|
        if not m:
 | 
						|
            return None
 | 
						|
        (label, field,
 | 
						|
         align, width1,
 | 
						|
         width2, fmt,
 | 
						|
         ) = m.groups()
 | 
						|
        if not label:
 | 
						|
            label = field
 | 
						|
        if fmt:
 | 
						|
            assert not align and not width1, (specstr,)
 | 
						|
            _parsed = _parse_fmt(fmt)
 | 
						|
            if not _parsed:
 | 
						|
                raise NotImplementedError
 | 
						|
            elif width2:
 | 
						|
                width, _ = _parsed
 | 
						|
                if width != int(width2):
 | 
						|
                    raise NotImplementedError(specstr)
 | 
						|
        elif width2:
 | 
						|
            fmt = width2
 | 
						|
            width = int(width2)
 | 
						|
        else:
 | 
						|
            assert not fmt, (fmt, specstr)
 | 
						|
            if align:
 | 
						|
                width = int(width1) if width1 else len(label)
 | 
						|
                fmt = f'{align}{width}'
 | 
						|
            else:
 | 
						|
                width = None
 | 
						|
        return field, label, fmt, width
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def _normalize(cls, spec):
 | 
						|
        if len(spec) == 1:
 | 
						|
            raw, = spec
 | 
						|
            raise NotImplementedError
 | 
						|
            return _resolve_column(raw)
 | 
						|
 | 
						|
        if len(spec) == 4:
 | 
						|
            label, field, width, fmt = spec
 | 
						|
            if width:
 | 
						|
                if not fmt:
 | 
						|
                    fmt = str(width)
 | 
						|
                elif _parse_fmt(fmt)[0] != width:
 | 
						|
                    raise ValueError(f'width mismatch in {spec}')
 | 
						|
        elif len(raw) == 3:
 | 
						|
            label, field, fmt = spec
 | 
						|
            if not field:
 | 
						|
                label, field = None, label
 | 
						|
            elif not isinstance(field, str) or not field.isidentifier():
 | 
						|
                # XXX This doesn't seem right...
 | 
						|
                fmt = f'{field}:{fmt}' if fmt else field
 | 
						|
                label, field = None, label
 | 
						|
        elif len(raw) == 2:
 | 
						|
            label = None
 | 
						|
            field, fmt = raw
 | 
						|
            if not field:
 | 
						|
                field, fmt = fmt, None
 | 
						|
            elif not field.isidentifier() or fmt.isidentifier():
 | 
						|
                label, field = field, fmt
 | 
						|
        else:
 | 
						|
            raise NotImplementedError
 | 
						|
 | 
						|
        fmt = f':{fmt}' if fmt else ''
 | 
						|
        if label:
 | 
						|
            return cls._parse(f'[{label}]{field}{fmt}')
 | 
						|
        else:
 | 
						|
            return cls._parse(f'{field}{fmt}')
 | 
						|
 | 
						|
    @property
 | 
						|
    def width(self):
 | 
						|
        if not self.fmt:
 | 
						|
            return None
 | 
						|
        parsed = _parse_fmt(self.fmt)
 | 
						|
        if not parsed:
 | 
						|
            return None
 | 
						|
        width, _ = parsed
 | 
						|
        return width
 | 
						|
 | 
						|
    def resolve_width(self, default=None):
 | 
						|
        return _resolve_width(self.width, self.fmt, self.label, default)
 | 
						|
 | 
						|
 | 
						|
def _parse_fmt(fmt):
 | 
						|
    if fmt.startswith(tuple('<^>')):
 | 
						|
        align = fmt[0]
 | 
						|
        width = fmt[1:]
 | 
						|
        if width.isdigit():
 | 
						|
            return int(width), align
 | 
						|
    elif fmt.isdigit():
 | 
						|
        return int(fmt), '<'
 | 
						|
    return None
 | 
						|
 | 
						|
 | 
						|
def _resolve_width(width, fmt, label, default):
 | 
						|
    if width:
 | 
						|
        if not isinstance(width, int):
 | 
						|
            raise NotImplementedError
 | 
						|
        return width
 | 
						|
    elif fmt:
 | 
						|
        parsed = _parse_fmt(fmt)
 | 
						|
        if parsed:
 | 
						|
            width, _ = parsed
 | 
						|
            if width:
 | 
						|
                return width
 | 
						|
 | 
						|
    if not default:
 | 
						|
        return WIDTH
 | 
						|
    elif hasattr(default, 'get'):
 | 
						|
        defaults = default
 | 
						|
        default = defaults.get(None) or WIDTH
 | 
						|
        return defaults.get(label) or default
 | 
						|
    else:
 | 
						|
        return default or WIDTH
 | 
						|
 | 
						|
 | 
						|
def _build_table(columns, *, sep=' ', defaultwidth=None):
 | 
						|
    header = []
 | 
						|
    div = []
 | 
						|
    rowfmt = []
 | 
						|
    for spec in columns:
 | 
						|
        width = spec.resolve_width(defaultwidth)
 | 
						|
        colfmt = spec.fmt
 | 
						|
        colfmt = f':{spec.fmt}' if spec.fmt else f':{width}'
 | 
						|
 | 
						|
        header.append(f' {{:^{width}}} '.format(spec.label))
 | 
						|
        div.append('-' * (width + 2))
 | 
						|
        rowfmt.append(f' {{{spec.field}{colfmt}}} ')
 | 
						|
    return (
 | 
						|
        sep.join(header),
 | 
						|
        sep.join(div),
 | 
						|
        sep.join(rowfmt),
 | 
						|
    )
 |