mirror of
				https://github.com/python/cpython.git
				synced 2025-10-24 18:33:49 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			214 lines
		
	
	
	
		
			5.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			214 lines
		
	
	
	
		
			5.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import csv
 | |
| 
 | |
| from . import NOT_SET, strutil, fsutil
 | |
| 
 | |
| 
 | |
| EMPTY = '-'
 | |
| UNKNOWN = '???'
 | |
| 
 | |
| 
 | |
| def parse_markers(markers, default=None):
 | |
|     if markers is NOT_SET:
 | |
|         return default
 | |
|     if not markers:
 | |
|         return None
 | |
|     if type(markers) is not str:
 | |
|         return markers
 | |
|     if markers == markers[0] * len(markers):
 | |
|         return [markers]
 | |
|     return list(markers)
 | |
| 
 | |
| 
 | |
| def fix_row(row, **markers):
 | |
|     if isinstance(row, str):
 | |
|         raise NotImplementedError(row)
 | |
|     empty = parse_markers(markers.pop('empty', ('-',)))
 | |
|     unknown = parse_markers(markers.pop('unknown', ('???',)))
 | |
|     row = (val if val else None for val in row)
 | |
|     if not empty:
 | |
|         if unknown:
 | |
|             row = (UNKNOWN if val in unknown else val for val in row)
 | |
|     elif not unknown:
 | |
|         row = (EMPTY if val in empty else val for val in row)
 | |
|     else:
 | |
|         row = (EMPTY if val in empty else (UNKNOWN if val in unknown else val)
 | |
|                for val in row)
 | |
|     return tuple(row)
 | |
| 
 | |
| 
 | |
| def _fix_read_default(row):
 | |
|     for value in row:
 | |
|         yield value.strip()
 | |
| 
 | |
| 
 | |
| def _fix_write_default(row, empty=''):
 | |
|     for value in row:
 | |
|         yield empty if value is None else str(value)
 | |
| 
 | |
| 
 | |
| def _normalize_fix_read(fix):
 | |
|     if fix is None:
 | |
|         fix = ''
 | |
|     if callable(fix):
 | |
|         def fix_row(row):
 | |
|             values = fix(row)
 | |
|             return _fix_read_default(values)
 | |
|     elif isinstance(fix, str):
 | |
|         def fix_row(row):
 | |
|             values = _fix_read_default(row)
 | |
|             return (None if v == fix else v
 | |
|                     for v in values)
 | |
|     else:
 | |
|         raise NotImplementedError(fix)
 | |
|     return fix_row
 | |
| 
 | |
| 
 | |
| def _normalize_fix_write(fix, empty=''):
 | |
|     if fix is None:
 | |
|         fix = empty
 | |
|     if callable(fix):
 | |
|         def fix_row(row):
 | |
|             values = fix(row)
 | |
|             return _fix_write_default(values, empty)
 | |
|     elif isinstance(fix, str):
 | |
|         def fix_row(row):
 | |
|             return _fix_write_default(row, fix)
 | |
|     else:
 | |
|         raise NotImplementedError(fix)
 | |
|     return fix_row
 | |
| 
 | |
| 
 | |
| def read_table(infile, header, *,
 | |
|                sep='\t',
 | |
|                fix=None,
 | |
|                _open=open,
 | |
|                _get_reader=csv.reader,
 | |
|                ):
 | |
|     """Yield each row of the given ???-separated (e.g. tab) file."""
 | |
|     if isinstance(infile, str):
 | |
|         with _open(infile, newline='') as infile:
 | |
|             yield from read_table(
 | |
|                 infile,
 | |
|                 header,
 | |
|                 sep=sep,
 | |
|                 fix=fix,
 | |
|                 _open=_open,
 | |
|                 _get_reader=_get_reader,
 | |
|             )
 | |
|             return
 | |
|     lines = strutil._iter_significant_lines(infile)
 | |
| 
 | |
|     # Validate the header.
 | |
|     if not isinstance(header, str):
 | |
|         header = sep.join(header)
 | |
|     try:
 | |
|         actualheader = next(lines).strip()
 | |
|     except StopIteration:
 | |
|         actualheader = ''
 | |
|     if actualheader != header:
 | |
|         raise ValueError(f'bad header {actualheader!r}')
 | |
| 
 | |
|     fix_row = _normalize_fix_read(fix)
 | |
|     for row in _get_reader(lines, delimiter=sep or '\t'):
 | |
|         yield tuple(fix_row(row))
 | |
| 
 | |
| 
 | |
| def write_table(outfile, header, rows, *,
 | |
|                 sep='\t',
 | |
|                 fix=None,
 | |
|                 backup=True,
 | |
|                 _open=open,
 | |
|                 _get_writer=csv.writer,
 | |
|                 ):
 | |
|     """Write each of the rows to the given ???-separated (e.g. tab) file."""
 | |
|     if backup:
 | |
|         fsutil.create_backup(outfile, backup)
 | |
|     if isinstance(outfile, str):
 | |
|         with _open(outfile, 'w', newline='') as outfile:
 | |
|             return write_table(
 | |
|                 outfile,
 | |
|                 header,
 | |
|                 rows,
 | |
|                 sep=sep,
 | |
|                 fix=fix,
 | |
|                 backup=backup,
 | |
|                 _open=_open,
 | |
|                 _get_writer=_get_writer,
 | |
|             )
 | |
| 
 | |
|     if isinstance(header, str):
 | |
|         header = header.split(sep or '\t')
 | |
|     fix_row = _normalize_fix_write(fix)
 | |
|     writer = _get_writer(outfile, delimiter=sep or '\t')
 | |
|     writer.writerow(header)
 | |
|     for row in rows:
 | |
|         writer.writerow(
 | |
|             tuple(fix_row(row))
 | |
|         )
 | |
| 
 | |
| 
 | |
| def parse_table(entries, sep, header=None, rawsep=None, *,
 | |
|                 default=NOT_SET,
 | |
|                 strict=True,
 | |
|                 ):
 | |
|     header, sep = _normalize_table_file_props(header, sep)
 | |
|     if not sep:
 | |
|         raise ValueError('missing "sep"')
 | |
| 
 | |
|     ncols = None
 | |
|     if header:
 | |
|         if strict:
 | |
|             ncols = len(header.split(sep))
 | |
|         cur_file = None
 | |
|     for line, filename in strutil.parse_entries(entries, ignoresep=sep):
 | |
|         _sep = sep
 | |
|         if filename:
 | |
|             if header and cur_file != filename:
 | |
|                 cur_file = filename
 | |
|                 # Skip the first line if it's the header.
 | |
|                 if line.strip() == header:
 | |
|                     continue
 | |
|                 else:
 | |
|                     # We expected the header.
 | |
|                     raise NotImplementedError((header, line))
 | |
|         elif rawsep and sep not in line:
 | |
|             _sep = rawsep
 | |
| 
 | |
|         row = _parse_row(line, _sep, ncols, default)
 | |
|         if strict and not ncols:
 | |
|             ncols = len(row)
 | |
|         yield row, filename
 | |
| 
 | |
| 
 | |
| def parse_row(line, sep, *, ncols=None, default=NOT_SET):
 | |
|     if not sep:
 | |
|         raise ValueError('missing "sep"')
 | |
|     return _parse_row(line, sep, ncols, default)
 | |
| 
 | |
| 
 | |
| def _parse_row(line, sep, ncols, default):
 | |
|     row = tuple(v.strip() for v in line.split(sep))
 | |
|     if (ncols or 0) > 0:
 | |
|         diff = ncols - len(row)
 | |
|         if diff:
 | |
|             if default is NOT_SET or diff < 0:
 | |
|                 raise Exception(f'bad row (expected {ncols} columns, got {row!r})')
 | |
|             row += (default,) * diff
 | |
|     return row
 | |
| 
 | |
| 
 | |
| def _normalize_table_file_props(header, sep):
 | |
|     if not header:
 | |
|         return None, sep
 | |
| 
 | |
|     if not isinstance(header, str):
 | |
|         if not sep:
 | |
|             raise NotImplementedError(header)
 | |
|         header = sep.join(header)
 | |
|     elif not sep:
 | |
|         for sep in ('\t', ',', ' '):
 | |
|             if sep in header:
 | |
|                 break
 | |
|         else:
 | |
|             sep = None
 | |
|     return header, sep
 | 
