Source code for pdstemplate.asciitable

##########################################################################################
# pdstemplate/asciitable.py
##########################################################################################
"""
.. _asciitable:

######################
pdstemplate.asciitable
######################

``asciitable`` is a plug-in module to assist with the labeling of ASCII tables in PDS3 and
PDS4. It supports the :ref:`pds3table` module and the ``tablelabel`` tool, and will also
be used by a future ``pds4table`` tool. To import::

    import pdstemplate.asciitable

This import creates two new pds-defined functions, which can be accessed within any
template.

* :meth:`ANALYZE_TABLE` takes the path to an existing ASCII table and analyzes its
  content, inferring details about the content and formats of all the columns.
* :meth:`TABLE_VALUE` returns information about the content of the table for use within
  the label to be generated.

For example, consider a template that contains this content::

    $ONCE(ANALYZE_TABLE(LABEL_PATH().replace('.lbl', '.tab')))
    ...
    OBJECT              = TABLE
      ...
      ROWS              = $TABLE_VALUE('ROWS')$
      COLUMNS           = $TABLE_VALUE('COLUMNS')$

      OBJECT            = COLUMN
        NAME            = FILE_NAME
        DATA_TYPE       = $TABLE_VALUE("PDS3_DATA_TYPE", 1)$
        START_BYTE      = $TABLE_VALUE("START_BYTE", 1)$
        BYTES           = $TABLE_VALUE("BYTES", 1)$
        FORMAT          = $TABLE_VALUE("PDS3_FORMAT", 1))$
        MINIMUM_VALUE   = $TABLE_VALUE("MINIMUM", 1))$
        MAXIMUM_VALUE   = $TABLE_VALUE("MAXIMUM", 1))$
        DESCRIPTION     = "Name of file in the directory"
      END_OBJECT        = COLUMN
    ...


The initial call to :meth:`ANALYZE_TABLE` is embedded inside a :ref:`ONCE` directive
because it returns no content. However, it reads the table file and assembles a database
of what it has found. The subsequent calls to it can be used for multiple labels and each
label will always contain the correct numbers of ROWS and COLUMNS. :meth:`TABLE_VALUE` can
also retrieve information about the content and format about each of the table's columns.
"""

import re

from filecache import FCPath

from . import PdsTemplate
from .utils import get_logger, TemplateError, TemplateAbort, _check_terminators

##########################################################################################
# Pre-defined template functions
##########################################################################################

# For global access to the latest table
_LATEST_ASCII_TABLE = None


[docs] def ANALYZE_TABLE(filepath, *, separator=',', crlf=None, escape=''): """Analyze the given table and define it as the default table for subsequent calls to :meth:`TABLE_VALUE` inside a template. Parameters: filepath (str, Path, or FCPath): The path to an ASCII table file. separator (str, optional): The column separator character, typically a comma. Other options are semicolon, tab, and vertical bar ("|"). crlf (bool, optional): True to raise an error if the line terminators are not <CR><LF>; False to raise an error if the line terminator is not <LF> alone; None to accept either line terminator. escape (str, optional): The character to appear before a quote ('"') if the quote is to be taken as a literal part of the string. Options are '"' for a doubled quote and '\\' for a backslash. If not specified, quote characters inside quoted strings are disallowed. """ global _LATEST_ASCII_TABLE _LATEST_ASCII_TABLE = None logger = get_logger() logger.debug('Analyzing ASCII table', filepath) try: _LATEST_ASCII_TABLE = AsciiTable(filepath, separator=separator, crlf=crlf, escape=escape) except Exception as err: logger.exception(err)
[docs] def TABLE_VALUE(name, column=0): """Lookup function for information about the table analyzed in the most recent call to :meth:`ANALYZE_TABLE`. These are all the options; a column is indicated by an integer starting from zero: * `TABLE_VALUE("PATH")` = full path to the table file. * `TABLE_VALUE("BASENAME")` = basename of the table file. * `TABLE_VALUE("ROWS")` = number of rows. * `TABLE_VALUE("ROW_BYTES")` = bytes per row. * `TABLE_VALUE("COLUMNS")` = number of columns. * `TABLE_VALUE["TERMINATORS"]` = length of terminator: 1 for <LF>, 2 for <CR><LF>. * `TABLE_VALUE("WIDTH", <column>)` = width of the column in bytes. * `TABLE_VALUE("PDS3_FORMAT", <column>)` = a string containing the format for PDS3, e.g.,"I7", "A23", or "F12.4". * `TABLE_VALUE("PDS4_FORMAT", <column>)` = a string containing the format for PDS4, e.g., "%7d", "%23s", or "%12.4f". * 'TABLE_VALUE("PDS3_DATA_TYPE", <column>)` = PDS3 data type, one of `CHARACTER`, "ASCII_REAL", "ASCII_INTEGER", or "TIME". * 'TABLE_VALUE("PDS4_DATA_TYPE", <column>)` = PDS3 data type, e.g., "ASCII_Text_Preserved", "ASCII_Real", or "ASCII_Date_YMD". * 'TABLE_VALUE("QUOTES", <column>)` = number of quotes before field value, 0 or 1. * 'TABLE_VALUE("START_BYTE", <column>)` = start byte of column, starting from 1. * 'TABLE_VALUE("BYTES", <column>)` = number of bytes in column, excluding quotes. * 'TABLE_VALUE("VALUES", <column>)` = a list of all the values found in the column. * 'TABLE_VALUE("MINIMUM", <column>)` = the minimum value in the column. * 'TABLE_VALUE("MAXIMUM", <column>)` = the maximum value in the column. * 'TABLE_VALUE("FIRST", <column>)` = the first value in the column. * 'TABLE_VALUE("LAST", <column>)` = the last value in the column. Parameters: name (str): Name of a parameter. column (int, optional): The index of the column, starting from zero. Returns: str, int, float, or bool: The value of the specified parameter as inferred from the ASCII table. Raises: TemplateAbort: If no ASCII Table was successfully analyzed. TemplateError: A wrapper for any other exception. """ if not _LATEST_ASCII_TABLE: raise TemplateAbort('No ASCII table has been analyzed') try: return _LATEST_ASCII_TABLE.lookup(name, column) except Exception as err: raise TemplateError(err) from err
def _latest_ascii_table(): """The most recently defined AsciiTable object. Provided for global access.""" return _LATEST_ASCII_TABLE def _reset_ascii_table(): """Reset the most recently defined AsciiTable object to None, for debugging.""" global _LATEST_ASCII_TABLE _LATEST_ASCII_TABLE = None PdsTemplate.define_global('ANALYZE_TABLE', ANALYZE_TABLE) ########################################################################################## # AsciiTable class definition and API ##########################################################################################
[docs] class AsciiTable(): # This will match any valid fields between un-quoted commas _COMMA_REGEX = rb'([^",]*| *"[^"]*" *)(?:,|$)' _COLUMN_REGEX = { b',' : re.compile(_COMMA_REGEX), b'|' : re.compile(_COMMA_REGEX.replace(b',', rb'\|')), b';' : re.compile(_COMMA_REGEX.replace(b',', b';')), b'\t': re.compile(_COMMA_REGEX.replace(b',', b'\t')), }
[docs] def __init__(self, filepath, content=[], *, separator=',', crlf=None, escape=''): """Constructor for an AsciiTable. Parameters: filepath (str, Path, or FCPath): The path to an ASCII table file. content (bytes or list[bytes], optional): The table file content as a byte string or sequence of byte strings. If this input is empty, the file will be read; otherwise, this content is used without reading the file. Line terminators must be included. separator (str, optional): The column separator character, typically a comma. Other options are semicolon, tab, and vertical bar ("|"). crlf (bool, optional): True to raise an error if the line terminators are not <CR><LF>; False to raise an error if the line terminator is not <LF> alone; None to accept either line terminator. escape (str, optional): The character to appear before a quote ('"') if the quote is to be taken as a literal part of the string. Options are '"' for a doubled quote and '\\' for a backslash. If not specified, quote characters inside quoted strings are disallowed. """ global _LATEST_ASCII_TABLE self.filepath = FCPath(filepath) if separator not in ',;|\t': raise ValueError('Disallowed separator: ' + repr(separator)) self.separator = separator.encode('latin-1') if escape not in ('"', '\\', ''): raise ValueError('Disallowed escape character: ' + repr(escape)) self.escape = escape.encode('latin-1') # Read the file if necessary if not content: content = self.filepath.read_bytes() if not content: raise TemplateAbort('Table file is empty', self.filepath) # Identify the line terminator and validate try: self.crlf = _check_terminators(filepath, content, crlf=crlf) except TemplateError as err: raise TemplateAbort(err.message, self.filepath) self._terminators = 2 if self.crlf else 1 terminator = b'\r\n' if self.crlf else b'\n' # Convert content to a list of byte strings if isinstance(content, list): records = content else: records = [rec + terminator for rec in content.split(terminator)[:-1]] # Intialize internals self._row_bytes = 0 self._rows = 0 self._formats = [] # column -> tuple (letter, offset, length[, precision]) self._start_bytes = [] # column -> first byte of column in row, starting with 1 self._widths = [] # column -> width in bytes including surrounding quote self._bvalues = [] # column -> list of byte strings from column self._values_ = [] # column -> list of values, using lazy evaluation # Interpret the table shape self._row_bytes = len(records[0]) self._rows = len(records) # Interpret the columns in each row regex = AsciiTable._COLUMN_REGEX[self.separator] for recno, record in enumerate(records): # Replace literal quotes with nulls for now if self.escape: original_length = len(record) record = record.replace(self.escape + b'"', b'\x00') changed = len(record) != original_length else: changed = False # This pattern matches any valid field delimited by commas outside quotes parts = regex.split(record[:-self._terminators]) # If the record was valid, every even-numbered item will be blank and also # the second-to last item if not (all(p == b'' for p in parts[::2]) and parts[-2] == b''): raise TemplateAbort(f'Invalid use of quotes in record {recno+1}') columns = parts[1:-2:2] # Restore escaped quotes if changed: columns = [c.replace(b'\x00', self.escape + b'"') for c in columns] if not self._bvalues: self._bvalues = [[] for _ in columns] self._values_ = [[] for _ in columns] if len(self._bvalues) != len(columns): raise TemplateAbort('Inconsistent column count') for k, value in enumerate(columns): self._bvalues[k].append(value) # Check each column start_byte = 1 for colno, column in enumerate(self._bvalues): # Save widths width = len(column[0]) self._widths.append(width) # Get the start bytes ignoring quote offsets self._start_bytes.append(start_byte) start_byte += width + 1 # Check that all widths are consistent for recno, value in enumerate(column): if len(value) != width: raise TemplateAbort(f'Inconsistent width in record {recno+1}, ' f'column {colno+1}') # Infer the common format within this column self._formats.append(self._column_format(column, colno)) # Provide global access _LATEST_ASCII_TABLE = self PdsTemplate.define_global('TABLE_VALUE', self.lookup)
def _column_format(self, column, colno): """Derived the format for the entire column, handling possible mixed formats. Parameters: column (list[bytes]): Content of column as a list of byte strings. colno (int): Index of the column starting from 0. Returns: tuple: `(type, offset, length[, precision])` where: * `type` (str): "I" for int, "E" for exponential notation with uppercase "E", "e" for exponential notation with lowercase "e", "F" for float, "A" for string, "D" for date, or "T" for date-time. * `offset` (int): 1 if the first character is a quote; 0 otherwise. * `length` (int): characters used (excluding quotes if quoted). * `precision` (int or str, optional): For E and F types, this is the longest numeric precision. For D and T types, this is the most specific PDS4 date or date-time type. """ def pds4_date_time(formats): types = {fmt[0] for fmt in formats} type_ = 'ASCII_Date' if 'T' in types: type_ += '_Time' if all(fmt[3].startswith('YD') for fmt in formats): type_ += '_DOY' elif all(fmt[3].startswith('YMD') for fmt in formats): type_ += '_YMD' if all(fmt[3].endswith('Z') for fmt in formats): type_ += '_UTC' return type_ # Assemble the set of formats found formats = set() for value in column: formats.add(self._cell_format(value)) # If they're all the same, we're done if len(formats) == 1: fmt = list(formats)[0] if fmt[0] in 'DT': fmt = fmt[:3] + (pds4_date_time(formats),) return fmt # If there's a variation in offsets, any quotes will be part of the string offsets = {fmt[1] for fmt in formats} if len(offsets) == 1: offset = list(offsets)[0] else: offset = 0 # Get a string representation of all the cell types types = list({fmt[0] for fmt in formats}) types.sort() types = ''.join(types) length = max(fmt[2] for fmt in formats) # use longest length # Handle "E" and "F", giving preference to "F", using longest precision if types in {'E', 'F', 'EF', 'EI', 'FI', 'EFI'}: letter = 'F' if 'F' in types else 'E' prec = max(fmt[3] for fmt in formats if fmt[0] == letter) return (letter, 0, length, prec) # Handle "D" and/or "T", possibly combined with "A" if types in {'D', 'T', 'DT'}: return ('T' if 'T' in types else 'D', offset, length, pds4_date_time(formats)) # Handle "A" combined with "D" and/or "T" if types in {'AD', 'AT', 'ADT'}: subset = {fmt for fmt in formats if fmt[0] != 'A'} return ('T' if 'T' in types else 'D', offset, length, pds4_date_time(subset)) # Same format but different lengths if len(types) == 1: return (types[0], offset, length) raise TemplateAbort(f'Illegal mixture of types in column {colno+1} at ' f'start byte {self._start_bytes[-1]}', self.filepath) # Regular expressions for numeric cell values _INTEGER = re.compile(rb' *[+-]?\d+') _EFLOAT = re.compile(rb' *[+-]?(\d*)\.?(\d*)([eE])[+-]?\d{1,3}') _FFLOAT = re.compile(rb' *[+-]?\d*\.(\d*)') _DATE = re.compile(rb' *\d\d\d\d-(\d\d-\d\d|\d\d\d)(T\d\d:\d\d:\d\d(?:|\.\d*)Z?)? *') def _cell_format(self, value): """Returns cell format information for a single table cell value. Returns: tuple: `(type, offset, length[, precision])` where: * `type` (str): "I" for int, "E" for exponential notation with uppercase "E", "e" for exponential notation with lowercase "e", "F" for float, "A" for string, "D" for date, or "T" for date-time. * `offset` (int): 1 if the first character is a quote; 0 otherwise. * `length` (int): characters used (excluding quotes if quoted). * `precision`: For E and F types, this is the numeric precision. For D and T types, this is a string that begins with "YMD" for dates in "yyyy-mm-dd" or "YD" for dates in "yyyy-ddd" format; for T formats, "T" is appended, followed by "Z" if the time ends in "Z". """ stripped = value.rstrip() # strip trailing blankcs # Date checker, which might be inside quotes def _date_fmt(string, offset): if match := AsciiTable._DATE.fullmatch(string): prec = 'YD' if len(match.group(1)) == 3 else 'YMD' if match.group(2): prec += 'T' if match.group(2).endswith(b'Z'): prec += 'Z' return ('T' if 'T' in prec else 'D', offset, len(string), prec) return None # Integer if AsciiTable._INTEGER.fullmatch(stripped): return ('I', 0, len(stripped)) # Float if match := AsciiTable._EFLOAT.fullmatch(stripped): prec = len(match.group(1)) + len(match.group(2)) - 1 return (match.group(3).decode('latin-1').upper(), 0, len(stripped), prec) if match := AsciiTable._FFLOAT.fullmatch(stripped): prec = len(match.group(1)) return ('F', 0, len(stripped), prec) # Date fmt = _date_fmt(stripped, 0) if fmt is not None: return fmt # Quoted string case if value.startswith(b'"') and value.endswith(b'"'): string = value[1:-1] # Could still be a date fmt = _date_fmt(string, 1) if fmt is not None: return fmt # Otherwise, it's a quoted string return ('A', 1, len(string)) # Anything else is an un-quoted, full-length string return ('A', 0, len(value)) ###################################################################################### # Lookup function ###################################################################################### _PDS3_DATA_TYPES = { 'A': 'CHARACTER', 'D': 'DATE', 'E': 'ASCII_REAL', 'F': 'ASCII_REAL', 'I': 'ASCII_INTEGER', 'T': 'TIME', } _PDS4_DATA_TYPES = { 'A': 'ASCII_Text_Preserved', 'D': 'ASCII_Date', 'E': 'ASCII_Real', 'F': 'ASCII_Real', 'I': 'ASCII_Integer', 'T': 'ASCII_Date_Time' }
[docs] def lookup(self, name, column=0): """Lookup function for information about this AsciiTable. These are all the options; a column is indicated by an integer starting from zero: * `lookup("PATH")` = full path to the table file. * `lookup("BASENAME")` = basename of the table file. * `lookup("ROWS")` = number of rows. * `lookup("ROW_BYTES")` = bytes per row. * `lookup("COLUMNS")` = number of columns. * `lookup["TERMINATORS"]` = length of terminator: 1 for <LF>, 2 for <CR><LF>. * `lookup("WIDTH", <column>)` = width of the column in bytes. * `lookup("PDS3_FORMAT", <column>)` = a string containing the format for PDS3, e.g.,"I7", "A23", or "F12.4". * `lookup("PDS4_FORMAT", <column>)` = a string containing the format for PDS4, e.g., "%7d", "%23s", or "%12.4f". * `lookup("PDS3_DATA_TYPE", <column>)` = PDS3 data type, one of "CHARACTER", "ASCII_REAL", "ASCII_INTEGER", or "TIME". * `lookup("PDS4_DATA_TYPE", <column>)` = PDS3 data type, e.g., "ASCII_Text_Preserved", "ASCII_Real", or "ASCII_Date_YMD". * `lookup("QUOTES", <column>)` = number of quotes before field value, 0 or 1. * `lookup("START_BYTE", <column>)` = start byte of column, starting from 1. * `lookup("BYTES", <column>)` = number of bytes in column, excluding quotes. * `lookup("VALUES", <column>)` = a list of all the values found in the column. * `lookup("MINIMUM", <column>)` = the minimum value in the column. * `lookup("MAXIMUM", <column>)` = the maximum value in the column. * `lookup("FIRST", <column>)` = the first value in the column. * `lookup("LAST", <column>)` = the last value in the column. Parameters: name (str): Name of a parameter. column (int, optional): The index of the column, starting from zero. Returns: (str, int, float, or bool): The value of the specified parameter as inferred from the table. """ match name: case 'PATH': return str(self.filepath) case 'BASENAME': return self.filepath.name case 'ROWS': return self._rows case 'ROW_BYTES': return self._row_bytes case 'COLUMNS': return len(self._bvalues) case 'TERMINATORS': return self._terminators case 'WIDTH': return self._widths[column] case 'PDS3_FORMAT': fmt = self._formats[column] if fmt[0] in 'eEF': return f'{fmt[0]}{fmt[2]}.{fmt[3]}' elif fmt[0] == 'I': return f'I{fmt[2]}' else: return f'A{fmt[2]}' case 'PDS4_FORMAT': fmt = self._formats[column] if fmt[0] in 'eE': return f'%{fmt[2]}.{fmt[3]}{fmt[0]}' elif fmt[0] == 'F': return f'%{fmt[2]}.{fmt[3]}f' elif fmt[0] == 'I': return f'%{fmt[2]}d' else: return f'%{fmt[2]}s' case 'PDS3_DATA_TYPE': return AsciiTable._PDS3_DATA_TYPES[self._formats[column][0]] case 'PDS4_DATA_TYPE': type_ = self._formats[column][0] if type_ in 'DT': return self._formats[column][3] else: return AsciiTable._PDS4_DATA_TYPES[type_] case 'QUOTES': return self._formats[column][1] case 'START_BYTE': return self._start_bytes[column] case 'BYTES': return self._widths[column] - 2 * self._formats[column][1] case 'VALUES': return self._values(column) case 'MINIMUM': return min(self._values(column)) case 'MAXIMUM': return max(self._values(column)) case 'FIRST': return self._values(column)[0] case 'LAST': return self._values(column)[-1] raise KeyError(name)
def _values(self, column): """All the values in a column using lazy evaluation.""" if not self._values_[column]: fmt = self._formats[column] if fmt[0] in 'IEeF' or fmt[1] == 1: self._values_[column] = [self._eval(bvalue) for bvalue in self._bvalues[column]] else: self._values_[column] = [bvalue.decode('utf-8') for bvalue in self._bvalues[column]] return self._values_[column] def _eval(self, bvalue): """Convert the given bytes value to int, float, or un-quoted string.""" stripped = bvalue.strip() if stripped.startswith(b'"') and stripped.endswith(b'"') and len(bvalue) > 1: if self.escape: original_length = len(stripped) - 2 stripped = stripped[1:-1].replace(self.escape + b'"', b'\x00') changed = len(stripped) != original_length if changed: stripped = stripped.replace(b'\x00', b'"') return stripped.decode('utf-8') else: return stripped.strip()[1:-1].decode('utf-8') try: return int(bvalue) except ValueError: pass try: return float(bvalue) except ValueError: # pragma: no cover pass return bvalue.decode('utf-8') # pragma: no cover # Alternative name for the lookup function, primarily for when used in templates. TABLE_VALUE = lookup
##########################################################################################