##########################################################################################
# pdstemplate/pds3table.py
##########################################################################################
"""
.. _pds3table:
#####################
pdstemplate.pds3table
#####################
``pds3table`` is a plug-in module to automate the generation and validation of PDS3 labels
for ASCII tables. It works in concert with the :ref:`asciitable` module, which analyzes
the content of ASCII table files. It is used by stand-alone program ``tablelabel`` to
validate and repair existing PDS3 labels as well as to generate new labels; if
``tablelabel`` meets your needs, you can avoid any programming in Python.
To import::
import pdstemplate.pds3table
Once imported, the following pre-defined functions become available for use within a
:class:`PdsTemplate`:
* :meth:`ANALYZE_PDS3_LABEL` analyzes the content of a PDS3 label or template, gathering
information about the names and other properties of its TABLE and COLUMN objects. Once
it is called, the following functions become available.
* :meth:`~asciitable.ANALYZE_TABLE` (from :ref:`asciitable`) takes the path to an existing
ASCII table and analyzes its content, inferring details about the content and formats of
all the columns.
* :meth:`VALIDATE_PDS3_LABEL` issues a warning message for any errors found in the label
or template. Optionally, it can abort the generation of the label if it encounters an
irrecoverable incompatibility with the ASCII table.
* :meth:`LABEL_VALUE` returns correct and valid PDS3 values for many of the attributes of
PDS3 TABLE and COLUMN objects, based on its analysis of the table.
* :meth:`OLD_LABEL_VALUE` returns the current (although possibly incorrect or missing)
values for many of the same PDS3 TABLE and COLUMN attributes.
For example, consider a template that contains this content::
$ONCE(ANALYZE_TABLE(LABEL_PATH().replace('.lbl', '.tab')))
$ONCE(ANALYZE_PDS3_LABEL(TEMPLATE_PATH()))
...
OBJECT = TABLE
...
ROWS = $LABEL_VALUE('ROWS')$
COLUMNS = $LABEL_VALUE('COLUMNS')$
OBJECT = COLUMN
NAME = FILE_NAME
COLUMN_NUMBER = $LABEL_VALUE("COLUMN_NUMBER", "FILE_NAME")$
DATA_TYPE = $LABEL_VALUE("DATA_TYPE", "FILE_NAME")$
START_BYTE = $LABEL_VALUE("START_BYTE", "FILE_NAME")$
BYTES = $LABEL_VALUE("BYTES", "FILE_NAME")$
FORMAT = $LABEL_VALUE("FORMAT", "FILE_NAME")$
MINIMUM_VALUE = $LABEL_VALUE("MINIMUM_VALUE", "FILE_NAME")$
MAXIMUM_VALUE = $LABEL_VALUE("MAXIMUM_VALUE", "FILE_NAME")$
DESCRIPTION = "Name of file in the directory"
END_OBJECT = COLUMN
...
The initial calls to :meth:`~asciitable.ANALYZE_TABLE` and :meth:`ANALYZE_PDS3_LABEL` are
embedded inside a :ref:`ONCE` directive because they return no content. The first call
analyzes the content and structure of the ASCII table, and the second analyzes the
template. The subsequent calls to :meth:`LABEL_VALUE` fill in the correct values for the
specified quantities.
Optionally, you could include this as the third line in the template::
$ONCE(VALIDATE_PDS3_LABEL())
This function logs a warnings and errors for any incorrect TABLE and COLUMN values
currently in the template.
This module also provides a pre-processor, which can be used to validate or repair an
exising PDS3 label. The function :meth:`pds3_table_preprocessor`, when used as the
`preprocess` input to the :meth:`~pdstemplate.PdsTemplate` constructor, transforms an
existing PDS3 label into a new template by replacing all needed TABLE and COLUMN
attributes with calls to :meth:`LABEL_VALUE`. The effect is that when the label is
generated, it is guaranteed to contain correct information where the earlier label might
have been incorrect. In this case, your program would look something like this::
from pdstemplate import PdsTemplate
from pdstemplate.pds3table import pds3_table_preprocessor
template = PdsTemplate(label_path, crlf=True, ...
preprocess=pds3_table_preprocessor, kwargs={...})
template.write({}, label_path, ...)
The constructor invokes :meth:`pds3_table_preprocessor` to transform the label into a
template. You can use the `kwargs` input dictionary to provide inputs to the
pre-processor, such as adding a requirement that each column contain FORMAT,
COLUMN_NUMBER, MINIMUM/MAXIMUM_VALUEs, etc., and designating how warnings and errors are
to be handled.
Afterward, the call to the template's :meth:`~pdstemplate.PdsTemplate.write` method will
validate the label and/or write a new label, depending on its input parameters.
For example, suppose the label contains this::
PDS_VERSION_ID = PDS3
RECORD_TYPE = FIXED_LENGTH
RECORD_BYTES = 1089
FILE_RECORDS = 1711
^INDEX_TABLE = "COVIMS_0094_index.tab"
OBJECT = INDEX_TABLE
INTERCHANGE_FORMAT = ASCII
ROWS = 1711
COLUMNS = 61
ROW_BYTES = 1089
DESCRIPTION = "This Cassini VIMS image index ...."
OBJECT = COLUMN
NAME = FILE_NAME
DATA_TYPE = CHARACTER
START_BYTE = 2
BYTES = 25
DESCRIPTION = "Name of file in the directory"
END_OBJECT = COLUMN
...
You then execute this::
template = PdsTemplate(label_path, crlf=True,
preprocess=pds3_table_preprocessor,
kwargs={'numbers': True, 'formats': True})
After the call, you can look at the template's `content` attribute, which contains the
template's content after pre-processing. Its value is this::
$ONCE(ANALYZE_TABLE(LABEL_PATH().replace(".lbl",".tab").replace(".LBL",".TAB"), crlf=True))
$ONCE(VALIDATE_PDS3_LABEL(hide_warnings, abort_on_error))
PDS_VERSION_ID = PDS3
RECORD_TYPE = $LABEL_VALUE("RECORD_TYPE")$
RECORD_BYTES = $LABEL_VALUE("RECORD_BYTES")$
FILE_RECORDS = $LABEL_VALUE("FILE_RECORDS")$
OBJECT = INDEX_TABLE
INTERCHANGE_FORMAT = $LABEL_VALUE("INTERCHANGE_FORMAT")$
ROWS = $LABEL_VALUE("ROWS")$
COLUMNS = $LABEL_VALUE("COLUMNS")$
ROW_BYTES = $LABEL_VALUE("ROW_BYTES")$
DESCRIPTION = "This Cassini VIMS image index ...."
OBJECT = COLUMN
NAME = FILE_NAME
COLUMN_NUMBER = $LABEL_VALUE("COLUMN_NUMBER", 1)$
DATA_TYPE = $LABEL_VALUE("DATA_TYPE", 1)$
START_BYTE = $LABEL_VALUE("START_BYTE", 1)$
BYTES = $LABEL_VALUE("BYTES", 1)$
FORMAT = $QUOTE_IF(LABEL_VALUE("FORMAT", 1))$
DESCRIPTION = "Name of file in the directory"
END_OBJECT = COLUMN
...
The TABLE and COLUMN attributes defining table format and structure have been replaced by
calls to :meth:`LABEL_VALUE`, which will provide the correct value whether or not the
value in the original label was correct. Also, COLUMN_NUMBER and FORMAT have been added to
the COLUMN object because of the pre-processor inputs `numbers=True` and `formats=True`.
Another application of the preprocessor is to simplify the construction of a template for
an ASCII table. Within a template, the only required attributes of a COLUMN object are
NAME and DESCRIPTION. Optionally, you can also specify any special constants,
VALID_MINIMUM/MAXIMUM values, OFFSET and SCALING_FACTOR, and the number of ITEMS if the
COLUMN object describes more than one. All remaining information about the column, such as
DATA_TYPE, START_BYTE, BYTES, etc., will be filled in by the pre-processor. Inputs to the
preprocessor let you indicate whether to include FORMATs, COLUMN_NUMBERs, and the
MINIMUM/MAXIMUM_VALUEs attributes automatically.
"""
import re
import warnings
from filecache import FCPath
from . import PdsTemplate
from .asciitable import ANALYZE_TABLE, TABLE_VALUE, _latest_ascii_table
from .utils import get_logger, TemplateError, TemplateAbort, _check_terminators
##########################################################################################
# Pre-defined template functions
##########################################################################################
# For global access to the latest table
_LATEST_PDS3_TABLE = None
[docs]
def ANALYZE_PDS3_LABEL(labelpath, *, validate=True):
"""Analyze the current template as applied to the most recently analyzed ASCII table.
After this call, :meth:`LABEL_VALUE` can be used anywhere in the template to fill in
values derived from the table.
Parameters:
labelpath (str, Path, or FCPath):
Path to the current label.
validate (bool, optional):
If True, a warning or error message will be logged for every problem found in
the template. Otherwise, warnings will be corrected silently.
"""
global _LATEST_PDS3_TABLE
get_logger().debug('Analyzing PDS3 label', labelpath)
_LATEST_PDS3_TABLE = Pds3Table(labelpath, validate=False, analyze_only=True)
if validate:
return Pds3Table._validate_inside_template(_LATEST_PDS3_TABLE,
hide_warnings=False,
abort_on_error=False)
[docs]
def VALIDATE_PDS3_LABEL(hide_warnings=False, abort_on_error=True):
"""Log a warning for every error found when generating this PDS3 label.
:meth:`ANALYZE_PDS3_LABEL` must be called first.
Parameters:
abort_on_error (bool): If True and a validation error occurs, further evaluation
of the template will be aborted.
Returns:
int: The number of errors issued.
int: The number of warnings issued.
"""
get_logger().debug('Validating PDS3 label', _LATEST_PDS3_TABLE.labelpath)
return Pds3Table._validate_inside_template(_LATEST_PDS3_TABLE,
hide_warnings=hide_warnings,
abort_on_error=abort_on_error)
[docs]
def LABEL_VALUE(name, column=0):
"""Lookup function returning information about the PDS3 label after it has been
analyzed or pre-processed and after the ASCII table has been analyzed.
Each of the following function calls returns a valid PDS3 parameter value. Columns can
be identified by name or by number starting from 1.
* `LABEL_VALUE("PATH")`
* `LABEL_VALUE("BASENAME")`
* `LABEL_VALUE("RECORD_TYPE")`
* `LABEL_VALUE("RECORD_BYTES")`
* `LABEL_VALUE("FILE_RECORDS")`
* `LABEL_VALUE("INTERCHANGE_FORMAT")`
* `LABEL_VALUE("ROWS")`
* `LABEL_VALUE("COLUMNS")`
* `LABEL_VALUE("ROW_BYTES")`
* `LABEL_VALUE("DATA_TYPE", <column>)`
* `LABEL_VALUE("START_BYTE", <column>)`
* `LABEL_VALUE("BYTES", <column>)`
* `LABEL_VALUE("COLUMN_NUMBER", <column>)`
* `LABEL_VALUE("FORMAT", <column>)`
* `LABEL_VALUE("UNIT", <column>)`
* `LABEL_VALUE("MINIMUM_VALUE", <column>)`
* `LABEL_VALUE("MAXIMUM_VALUE", <column>)`
* `LABEL_VALUE("DERIVED_MINIMUM", <column>)`
* `LABEL_VALUE("DERIVED_MAXIMUM", <column>)`
It also provides these values derived from the existing template or label: "NAME",
"ITEMS", "SCALING_FACTOR", "OFFSET", "INVALID_CONSTANT", "MISSING_CONSTANT",
"NOT_APPLICABLE_CONSTANT", "NULL_CONSTANT", "UNKNOWN_CONSTANT", "VALID_MINIMUM", and
"VALID_MAXIMUM".
In addition, these options are supported:
* `LABEL_VALUE("TABLE_PATH")`: full path to the associated ASCII table file.
* `LABEL_VALUE("TABLE_BASENAME")`: basename of the associated ASCII table file.
* `LABEL_VALUE("FIRST", <column>)`: value from the first row of this column.
* `LABEL_VALUE("LAST", <column>)`: value from the last row of this column.
Parameters:
name (str): Name of a parameter.
column (str or int, optional): The name or COLUMN_NUMBER (starting at 1) for a
column; use 0 for general parameters.
Returns:
int, float, str, or None: The correct value for the specified parameter.
"""
if not _LATEST_PDS3_TABLE:
raise TemplateAbort('No PDS3 label has been analyzed')
if _latest_ascii_table(): # make sure we're referring to the latest AsciiTable
_LATEST_PDS3_TABLE.assign_to()
try:
return _LATEST_PDS3_TABLE.lookup(name, column)
except Exception as err:
raise TemplateError(err) from err
[docs]
def OLD_LABEL_VALUE(name, column=0):
"""Lookup function returning information about the current content of the PDS3 label,
whether or not it is correct.
Available top-level keywords are "RECORD_TYPE", "RECORD_BYTES", "FILE_RECORDS",
"INTERCHANGE_FORMAT", "ROWS", "COLUMNS", and "ROW_BYTES".
Available column-level keywords are "NAME", "COLUMN_NUMBER", "DATA_TYPE",
"START_BYTE", "BYTES", "FORMAT", "ITEMS", "ITEM_BYTES", "ITEM_OFFSET",
"SCALING_FACTOR", "OFFSET", "INVALID_CONSTANT", "MISSING_CONSTANT",
"NOT_APPLICABLE_CONSTANT", "NULL_CONSTANT", "UNKNOWN_CONSTANT", "VALID_MAXIMUM",
"VALID_MINIMUM", "MINIMUM_VALUE", "MAXIMUM_VALUE", "DERIVED_MINIMUM", and
"DERIVED_MAXIMUM".
Parameters:
name (str): Name of a parameter.
column (str or int, optional): The name or COLUMN_NUMBER (starting at 1) for a
column; use 0 for general parameters.
Returns:
int, float, str, or None: The correct value for the specified parameter.
"""
try:
return _LATEST_PDS3_TABLE.old_lookup(name, column)
except Exception as err:
raise TemplateError(err) from err
def _latest_pds3_table():
"""The most recently defined AsciiTable object. Provided for global access."""
return _LATEST_PDS3_TABLE
PdsTemplate.define_global('ANALYZE_PDS3_LABEL', ANALYZE_PDS3_LABEL)
##########################################################################################
# Preprocessor
##########################################################################################
[docs]
def pds3_table_preprocessor(labelpath, content, *, validate=True, numbers=False,
formats=False, units=False, minmax=(), derived=(), edits=[],
reals=[]):
"""A pre-processor function for use in the :meth:~pdstemplate.PdsTemplate`
constructor.
This function receives a PDS3 label or template describing an ASCII table and returns
a revised template in which the supported TABLE and COLUMN attributes have been
replaced by calls to LABEL_VALUE. This ensures that the generated label will contain a
complete and accurate set of values.
Parameters:
labelpath (str, Path, or FCPath):
The path to the PDS3 label or template file.
content (str): The full content of the template as a single string with a newline
character after each line.
validate (bool, optional):
If True, a warning will be issued for each error found when the label is
generated; otherwise, errors will be repaired silently.
numbers (bool, optional):
True to include COLUMN_NUMBER into each COLUMN object if it is not already
there.
formats (bool, optional):
True to include FORMAT into each COLUMN object if it is not already there.
units (bool, optional):
True to repair units to conform to the options in the PDS3 Data Dictionary.
minmax (str, tuple[str], or list[str], optional):
Zero or more names of columns for which to include the MINIMUM_VALUE and
MAXIMUM_VALUE. In addition or as an alternative, use "float" to include these
values for all floating-point columns and/or "int" to include these values for
all integer columns.
derived (str, tuple[str], or list[str], optional):
Zero or more names of columns for which to include the DERIVED_MINIMUM and
DERIVED_MAXIMUM. In addition or as an alternative, use "float" to include
these values for all floating-point columns.
edits (list[str]), optional):
A list of strings of the form "column:name = value", which should be used to
insert or replace values currently in the label.
reals (str, tuple[str], or list[str]), optional):
Names of columns that should be treated as ASCII_REAL even if thee column only
contains integers.
Returns:
str: The revised content for the template.
"""
logger = get_logger()
logger.debug('PDS3 table preprocessor', labelpath)
pds3_label = Pds3Table(labelpath, content, validate=validate, numbers=numbers,
formats=formats, units=units, minmax=minmax, derived=derived,
edits=edits, reals=reals)
return pds3_label.content
##########################################################################################
# Pds3Table class definition and API
##########################################################################################
[docs]
class Pds3Table():
"""Class encapsulating a label or template that describes a PDS3 label containing a
TABLE object.
"""
# These split a content string into its constituent objects
_OBJECT_TABLE_REGEX = re.compile(r'(?<![ \w])'
r'( *OBJECT *= *[^\n]*TABLE *\r?\n)(.*?\r?\n)'
r'( *END_OBJECT *= *[^\n]*TABLE *\r?\n)', re.DOTALL)
_OBJECT_COLUMN_REGEX = re.compile(r'(?<![ \w])'
r'( *OBJECT *= *COLUMN *\r?\n)(.*?\r?\n)'
r'( *END_OBJECT *= *COLUMN *\r?\n)', re.DOTALL)
[docs]
def __init__(self, labelpath, label='', *, validate=True, analyze_only=False,
crlf=None, numbers=False, formats=False, units=False, minmax=(),
derived=(), edits=[], reals=[]):
"""Constructor for a Pds3Table object. It analyzes the content of a PDS3 label or
template and saves the info for validation or possible repair.
Parameters:
labelpath (str, Path, or FCPath):
The path to a PDS3 label or template file.
label (str, optional): The full content of the template as a single string or
list of strings, one per record (including line terminators).
validate (bool, optional):
True to issue a warning for each error in the label or template when the
label is generated; False to correct errors silently.
analyze_only (bool, optional):
True to prevent the generating an alternative label for purposes of
repair. This step can be slow for large labels so it should be avoided if
it is not needed.
crlf (bool, optional):
True to raise an error if the line terminators are not <CR><LF>; False to
raise an error if the line terminator is not <LF> alone; None to accept
either line terminator.
numbers (bool, optional):
True to include COLUMN_NUMBER into each COLUMN object if it is not already
there.
formats (bool, optional):
True to include FORMAT into each COLUMN object if it is not already there.
units (bool, optional):
True to repair units to conform to the options in the PDS3 Data
Dictionary.
minmax (str, tuple[str], or list[str], optional):
Zero or more names of columns for which to include the MINIMUM_VALUE and
MAXIMUM_VALUE. In addition or as an alternative, use "float" to include
these values for all floating-point columns and/or "int" to include these
values for all integer columns.
derived (str, tuple[str], or list[str], optional):
Zero or more names of columns for which to include the DERIVED_MINIMUM and
DERIVED_MAXIMUM. In addition or as an alternative, use "float" to include
these values for all floating-point columns.
edits (str or list[str]), optional):
Expressions of the form "column:name = value", which should be used to
insert or replace values currently in the label.
reals (str, tuple[str], or list[str]), optional):
Names of columns that should be treated as ASCII_REAL even if thee column
only contains integers.
"""
global _LATEST_PDS3_TABLE
self.labelpath = FCPath(labelpath)
if not label:
label = self.labelpath.read_bytes() # binary to preserve terminators
label = label.decode('latin-1')
# Identify the line terminator and validate it
self.crlf = _check_terminators(self.labelpath, label, crlf)
self.terminator = '\r\n' if self.crlf else '\n'
# Convert to a single string
if isinstance(label, list):
label = ''.join(label)
self.label = label
self.analyze_only = analyze_only
self.numbers = numbers
self.formats = formats
self.minmax = (minmax,) if isinstance(minmax, str) else minmax
self.derived = (derived,) if isinstance(derived, str) else derived
self.reals = (reals,) if isinstance(reals, str) else reals
self.units = units
self._table_values = {} # parameter name -> value in label or None
self._column_values = [None] # list of parameter dicts, one per column
self._column_name = [None] # list of column names or str(column number)
self._column_items = [None] # list of ITEM counts, minimum 1
self._column_number = {} # column name -> column number
self._table_index = {} # column name or number -> index in the table
self._extra_items = 0 # cumulative number of ITEMS > 1 in COLUMN objects
self._quotes_missing = [None] # column name -> set of parameters missing quotes
# Defined by assign_to()
self.table = None # AsciiTable to which this label refers
self._unique_values_ = [None] # lazily evaluated set of values in each column
self._unique_valids_ = [None] # lazily evaluated set of valid valuesn
# Note that the lists above have one initial value so they can be indexed by
# column number without subtracting one.
# Pre-process the edits
edits = [edits] if isinstance(edits, str) else edits
self._edit_dict = {} # [colname][parname] -> replacement value
self._edited_values = {} # [colname][parname] -> original value or None
for edit in edits:
colname, _, tail = edit.partition(':')
colname = colname.strip()
if colname not in self._edit_dict:
self._edit_dict[colname] = {}
self._edited_values[colname] = {}
name, _, value = tail.partition('=')
self._edit_dict[colname][name.strip()] = value.strip()
# parts[0] = label records before the first table
# parts[1] = record containing "OBJECT = ...TABLE"
# parts[2] = interior of table object including all COLUMN objects
# parts[3] = record containing "END_OBJECT = ...TABLE"
# parts[4] = remainder of label
parts = Pds3Table._OBJECT_TABLE_REGEX.split(label)
if len(parts) == 1:
raise TemplateAbort('Template does not contain a PDS3 TABLE object',
self.labelpath)
if len(parts) > 5:
raise TemplateAbort('Template contains multiple PDS3 TABLE objects',
self.labelpath)
# Process the table interior
parts[2] = self._process_table_interior(parts[2])
# Process the file header
parts[0], self._table_values['RECORD_TYPE'] = self._replace_value(
parts[0], 'RECORD_TYPE',
'$LABEL_VALUE("RECORD_TYPE")$',
required=True, after='PDS_VERSION_ID')
parts[0], self._table_values['RECORD_BYTES'] = self._replace_value(
parts[0], 'RECORD_BYTES',
'$LABEL_VALUE("RECORD_BYTES")$',
required=True, after='RECORD_TYPE')
parts[0], self._table_values['FILE_RECORDS'] = self._replace_value(
parts[0], 'FILE_RECORDS',
'$LABEL_VALUE("FILE_RECORDS")$',
required=True, after='RECORD_BYTES')
# Create header to analyze the table
header = ['$ONCE(ANALYZE_TABLE(LABEL_PATH().replace(".lbl",".tab")'
'.replace(".LBL",".TAB"), crlf=True))', self.terminator]
if validate:
header += ['$ONCE(VALIDATE_PDS3_LABEL(hide_warnings, abort_on_error))',
self.terminator]
self.content = ''.join(header) + ''.join(parts)
self.table = None
# Set globals for access within the template object
_LATEST_PDS3_TABLE = self
PdsTemplate.define_global('VALIDATE_PDS3_LABEL', VALIDATE_PDS3_LABEL)
PdsTemplate.define_global('LABEL_VALUE', LABEL_VALUE)
PdsTemplate.define_global('OLD_LABEL_VALUE', OLD_LABEL_VALUE)
PdsTemplate.define_global('ANALYZE_TABLE', ANALYZE_TABLE)
PdsTemplate.define_global('TABLE_VALUE', TABLE_VALUE)
def _process_table_interior(self, label):
# parts[0] = label records before the first column
# parts[1] = record containing "OBJECT = COLUMN"
# parts[2] = interior of column object
# parts[3] = record containing "END_OBJECT = COLUMN"
# parts[4] = anything after the column object, usually empty
# parts[5-8] repeat parts[1-4] for each column
parts = Pds3Table._OBJECT_COLUMN_REGEX.split(label)
# Process each column
for k, part in enumerate(parts[2::4]):
self._column_values.append({})
parts[2 + 4*k] = self._process_column(part, k+1)
# Prepare for lazy evaluation as needed
self._unique_values_ = [None for c in self._column_values] + [None]
self._unique_valids_ = [None for c in self._column_values] + [None]
# Process the TABLE object header
head = parts[0]
head, self._table_values['INTERCHANGE_FORMAT'] = self._replace_value(
head, 'INTERCHANGE_FORMAT',
'$LABEL_VALUE("INTERCHANGE_FORMAT")$',
required=True, first=True)
head, self._table_values['ROWS'] = self._replace_value(
head, 'ROWS',
'$LABEL_VALUE("ROWS")$',
required=True, after='INTERCHANGE_FORMAT')
head, self._table_values['COLUMNS'] = self._replace_value(
head, 'COLUMNS',
'$LABEL_VALUE("COLUMNS")$',
required=True, after='ROWS')
head, self._table_values['ROW_BYTES'] = self._replace_value(
head, 'ROW_BYTES',
'$LABEL_VALUE("ROW_BYTES")$',
required=True, after='COLUMNS')
return head + ''.join(parts[1:])
def _process_column(self, label, colnum):
# Add this COLUMN object to the mapping from object to column index in the table
name = Pds3Table._get_value(label, 'NAME')
self._column_values[-1]['NAME'] = name
self._column_name.append(name or str(colnum))
self._column_number[name] = colnum
# Identify parameters with missing quotes
self._quotes_missing.append(set())
fmt = Pds3Table._get_value(label, 'FORMAT', raw=True)
if fmt is not None and '.' in fmt and not fmt.startswith('"'):
self._quotes_missing[-1].add('FORMAT')
unit = Pds3Table._get_value(label, 'UNIT', raw=True)
if (unit is not None and not unit.isidentifier() and not unit.startswith('"')
and unit != "'N/A'"):
self._quotes_missing[-1].add('UNIT')
# Edit the label if necessary
edits = self._edit_dict.get(name, {})
for parname, value in edits.items():
label, value = self._replace_value(label, parname, value, required=True,
before='DESCRIPTION')
self._edited_values[name][parname] = value
self._table_index[name] = colnum + self._extra_items - 1
self._table_index[colnum] = colnum + self._extra_items - 1
# Interpret ITEMS, ITEM_BYTES, ITEM_OFFSETS
items = Pds3Table._get_value(label, 'ITEMS')
self._column_values[-1]['ITEMS'] = items
items = items or 1 # change None to 1
self._column_items.append(items)
label, self._column_values[-1]['ITEM_BYTES'] = self._replace_value(
label, 'ITEM_BYTES',
f'$LABEL_VALUE("ITEM_BYTES", {colnum})$',
required=(items > 1), after='ITEMS')
label, self._column_values[-1]['ITEM_OFFSET'] = self._replace_value(
label, 'ITEM_OFFSET',
f'$LABEL_VALUE("ITEM_OFFSET", {colnum})$',
required=(items > 1), after='ITEM_BYTES')
# Update the offset for the next column
self._extra_items += items - 1 # accumulate the column offset
# Parameters always present: DATA_TYPE, START_BYTE, BYTES
label, data_type = self._replace_value(
label, 'DATA_TYPE',
f'$LABEL_VALUE("DATA_TYPE", {colnum})$',
required=True, after='NAME')
self._column_values[-1]['DATA_TYPE'] = data_type
label, self._column_values[-1]['START_BYTE'] = self._replace_value(
label, 'START_BYTE',
f'$LABEL_VALUE("START_BYTE", {colnum})$',
required=True, after='DATA_TYPE')
label, self._column_values[-1]['BYTES'] = self._replace_value(
label, 'BYTES',
f'$LABEL_VALUE("BYTES", {colnum})$',
required=True, after='START_BYTE')
# Optional COLUMN_NUMBER
label, self._column_values[-1]['COLUMN_NUMBER'] = self._replace_value(
label, 'COLUMN_NUMBER',
f'$LABEL_VALUE("COLUMN_NUMBER", {colnum})$',
required=self.numbers, after='NAME')
# Optional FORMAT
label, self._column_values[-1]['FORMAT'] = self._replace_value(
label, 'FORMAT',
f'$QUOTE_IF(LABEL_VALUE("FORMAT", {colnum}))$',
required=self.formats, after='BYTES')
# Optional UNIT
if self.units:
label, self._column_values[-1]['UNIT'] = self._replace_value(
label, 'UNIT',
f'$QUOTE_IF(LABEL_VALUE("UNIT", {colnum}))$',
required=False, after='FORMAT')
else:
self._column_values[-1]['UNIT'] = self._get_value(label, 'UNIT')
# Optional MINIMUM_VALUE, MAXIMUM_VALUE
required = ((name in self.minmax)
or ('float' in self.minmax and 'REAL' in data_type)
or ('int' in self.minmax and 'INT' in data_type))
label, self._column_values[-1]['MINIMUM_VALUE'] = self._replace_value(
label, 'MINIMUM_VALUE',
f'$LABEL_VALUE("MINIMUM_VALUE", {colnum})$',
required=required, before='DESCRIPTION')
label, self._column_values[-1]['MAXIMUM_VALUE'] = self._replace_value(
label, 'MAXIMUM_VALUE',
f'$LABEL_VALUE("MAXIMUM_VALUE", {colnum})$',
required=required, after='MINIMUM_VALUE')
# Optional DERIVED_MINIMUM, DERIVED_MAXIMUM
required = ((name in self.derived)
or ('float' in self.derived and 'REAL' in data_type)
or ('int' in self.derived and 'INT' in data_type))
label, self._column_values[-1]['DERIVED_MINIMUM'] = self._replace_value(
label, 'DERIVED_MINIMUM',
f'$LABEL_VALUE("DERIVED_MINIMUM", {colnum})$',
required=required, before='DESCRIPTION')
label, self._column_values[-1]['DERIVED_MAXIMUM'] = self._replace_value(
label, 'DERIVED_MAXIMUM',
f'$LABEL_VALUE("DERIVED_MAXIMUM", {colnum})$',
required=required, after='DERIVED_MINIMUM')
# Save these for later use if needed
self._column_values[-1]['INVALID_CONSTANT'] = \
Pds3Table._get_value(label, 'INVALID_CONSTANT')
self._column_values[-1]['MISSING_CONSTANT'] = \
Pds3Table._get_value(label, 'MISSING_CONSTANT')
self._column_values[-1]['NOT_APPLICABLE_CONSTANT'] = \
Pds3Table._get_value(label, 'NOT_APPLICABLE_CONSTANT')
self._column_values[-1]['NULL_CONSTANT'] = \
Pds3Table._get_value(label, 'NULL_CONSTANT')
self._column_values[-1]['UNKNOWN_CONSTANT'] = \
Pds3Table._get_value(label, 'UNKNOWN_CONSTANT')
self._column_values[-1]['VALID_MAXIMUM'] = \
Pds3Table._get_value(label, 'VALID_MAXIMUM')
self._column_values[-1]['VALID_MINIMUM'] = \
Pds3Table._get_value(label, 'VALID_MINIMUM')
self._column_values[-1]['SCALING_FACTOR'] = \
Pds3Table._get_value(label, 'SCALING_FACTOR')
self._column_values[-1]['OFFSET'] = \
Pds3Table._get_value(label, 'OFFSET')
return label
######################################################################################
# assign_to()
######################################################################################
[docs]
def assign_to(self, table=None):
"""Assign this PDS3 label to the given ASCII table.
Parameters:
table (AsciiTable, optional): Table to which this PDS3 label should apply. If
not specified, the table defined by AsciiTable._latest_ascii_table() is
used.
"""
table = table or _latest_ascii_table()
if not table:
raise TemplateAbort('No ASCII table has been analyzed for label',
self.labelpath)
if table is not self.table:
self.table = table
self._unique_values_ = [None for _ in self._column_values] + [None]
self._unique_valids_ = [None for _ in self._column_values] + [None]
PdsTemplate.define_global('TABLE_VALUE', self.table.lookup)
_TABLE_NAME_REGEX = re.compile(r'.*\^\w*TABLE *= *"?(\w+\.\w+)"? *\r?\n', re.DOTALL)
[docs]
def get_table_basename(self):
"""The table basename in the template or label, if present.
If the TABLE value in the template is a variable name or expression, an empty
string is returned instead.
"""
match = Pds3Table._TABLE_NAME_REGEX.match(self.label)
if match:
return match.group(1)
return ''
[docs]
def get_table_path(self):
"""The file path to the table described by this label.
If the TABLE value in the template is a variable name or expression, an empty
string is returned instead.
Returns:
FCPath: Path to the table file if defined in the label; otherwise, an empty
string.
"""
basename = self.get_table_basename()
if basename:
return self.labelpath.parent / basename
return ''
######################################################################################
# validate()
######################################################################################
[docs]
def validate(self, table=None):
"""Compare this object to the given AsciiTable object and issue a warning for each
erroneous value identified.
Parameters:
table (AsciiTable, optional):
The AsciiTable assigned to this label. If this is specified and is
different from the currently assigned table, it becomes the assigned
table.
Returns:
int: The number of warning messages issued.
"""
if table:
self.assign_to(table)
messages = self._validation_warnings(table)
for message in messages:
warnings.warn(message)
return len(messages)
def _validate_inside_template(self, table=None, *, hide_warnings=False,
abort_on_error=True):
"""Compare this object to the given AsciiTable object and log a warning message
for each erroneous value identified.
Parameters:
table (AsciiTable, optional):
The AsciiTable assigned to this label. If this is specified and is
different from the currently assigned table, it becomes the assigned
table.
hide_warnings (bool, options): True to log errors but not warnings.
abort_on_error (bool, optional): True to issue a TemplateAbort exception if
errors are encountered.
Returns:
int: The number of errors issued.
int: The number of warnings issued.
"""
messages = self._validation_warnings(table)
if not messages:
return (0, 0)
logger = get_logger()
errors = 0
warns = 0
for message in messages:
if message.startswith('ERROR: '):
logger.error(message[7:])
errors += 1
else:
warns += 1
if not hide_warnings:
logger.warning(message)
if errors and abort_on_error:
raise TemplateAbort('Aborted')
return (errors, warns)
def _validation_warnings(self, table=None):
"""Compare this object to the given AsciiTable object and return a list of
warnings, one for each erroneous value identified.
Parameters:
table (AsciiTable, optional): The AsciiTable to assign to this label before
validation. If not specified, the latest analyzed ASCII table is used.
Returns:
list[str]: A list of messages. Messages that begin with "ERROR: " are
irrecoverable errors; anything else is a warning about something that can
be repaired by the preprocessor.
"""
self.assign_to(table)
table = self.table
if not table:
raise TemplateAbort('No ASCII table has been analyzed for label',
self.labelpath)
messages = [] # accumulated list of warnings
# Check <CR><LF> in original file
try:
_check_terminators(self.labelpath, crlf=True)
except TemplateError as err:
messages.append(err.message)
# Required top-level attributes
for name in ['RECORD_TYPE', 'RECORD_BYTES', 'FILE_RECORDS', 'INTERCHANGE_FORMAT',
'ROWS', 'COLUMNS', 'ROW_BYTES']:
messages += self._check_value(name, required=True)
# Check each column...
label_columns = len(self._column_values) - 1
table_columns = self.lookup('COLUMNS')
for colnum in range(1, min(label_columns, table_columns)+1):
colname = self._column_name[colnum]
data_type = self.lookup('DATA_TYPE', colnum)
# Direct edits
edited_names = set(self._edited_values.get(colname, {}).keys())
for name, old_value in self._edited_values.get(colname, {}).items():
new_fmt = self._edit_dict[colname][name]
if old_value is None:
messages.append(f'{colname}:{name} was inserted: {new_fmt}')
else:
old_fmt = Pds3Table._format_for_message(old_value)
messages.append(f'{colname}:{name} was edited: '
f'{old_fmt} -> {new_fmt}')
# Required attributes
for name in ['NAME', 'DATA_TYPE', 'START_BYTE', 'BYTES']:
if name not in edited_names:
messages += self._check_value(name, colnum, required=True)
# Tests for multiple ITEMS
items = self._column_items[colnum]
for name in ['ITEM_BYTES', 'ITEM_OFFSET']:
if name not in edited_names:
messages += self._check_value(name, colnum, required=(items > 1),
forbidden=(items == 1))
indx = self._table_index[colnum]
for k in range(1, items):
if table.lookup('WIDTH', k+indx) != table.lookup('WIDTH', indx):
messages.append(f'ERROR: {colname}:{name} items have inconsistent '
'widths')
if table.lookup('QUOTES', k+indx) != self.table.lookup('QUOTES', indx):
messages.append(f'ERROR: {colname}:{name} items have inconsistent '
'quote usage')
# Optional attributes
messages += self._check_value('COLUMN_NUMBER', colnum, required=self.numbers)
if 'FORMAT' not in edited_names:
fmt_messages = self._check_value('FORMAT', colnum, required=self.formats)
if fmt_messages:
messages += fmt_messages
elif 'FORMAT' in self._quotes_missing[colnum]:
value = self.lookup('FORMAT', colnum)
messages.append(f'{colname}:FORMAT error: {value} -> "{value}"')
if 'UNIT' not in edited_names:
test = self._check_value('UNIT', colnum)
# messages += self._check_value('UNIT', colnum)
messages += test
if self.units:
old_value = self._column_values[colnum].get('UNIT', None)
if not Pds3Table._unit_is_valid(old_value):
old_fmt = Pds3Table._format_for_message(old_value)
new_value = self.lookup('UNIT', colnum)
if new_value == old_value:
messages.append(f'{colname}:UNIT error: {old_fmt} '
'is not a recognized unit')
else:
messages.append(f'{colname}:UNIT error: {old_fmt} -> '
f'"{new_value}"')
elif 'UNIT' in self._quotes_missing[colnum]:
value = self._column_values[colnum]['UNIT']
messages.append(f'{colname}:UNIT error: {value} -> "{value}"')
# Minima/maxima
required = ((colname in self.minmax)
or ('float' in self.minmax and 'REAL' in data_type)
or ('int' in self.minmax and 'INT' in data_type))
messages += self._check_value('MINIMUM_VALUE', colnum, required=required)
messages += self._check_value('MAXIMUM_VALUE', colnum, required=required)
required = ((colname in self.derived)
or ('float' in self.derived and 'REAL' in data_type)
or ('int' in self.derived and 'INT' in data_type))
messages += self._check_value('DERIVED_MINIMUM', colnum,
required=required)
messages += self._check_value('DERIVED_MAXIMUM', colnum,
required=required)
# Constants
for name in ['INVALID_CONSTANT', 'MISSING_CONSTANT',
'NOT_APPLICABLE_CONSTANT', 'NULL_CONSTANT', 'UNKNOWN_CONSTANT',
'VALID_MINIMUM', 'VALID_MAXIMUM']:
value = self.lookup(name, colnum)
if value is None:
continue
if isinstance(value, float) and 'REAL' in data_type:
continue
if isinstance(value, int) and 'INT' in data_type:
continue
if isinstance(value, str) and 'CHAR' in data_type or data_type == 'TIME':
continue
valfmt = Pds3Table._format_for_message(value)
message = (f'ERROR: {colname}:{name} value {valfmt} is incompatible with '
f'column type {data_type}')
messages.append(message)
# Check for missing or extraneous columns
for colnum in range(table_columns+1, label_columns+1):
colname = self._column_name[colnum]
messages.append(f'ERROR: Column {colname} is missing')
table_extras = table_columns - label_columns
if table_extras > 0:
messages.append(f'ERROR: Table contains {table_extras} undefined column'
+ ('s' if table_extras > 1 else ''))
# Check duplicated column names
if len(self._column_number) != label_columns:
for k, name in enumerate(self._column_name):
try:
dk = self._column_name[k+1:].index(name)
except ValueError:
pass
else:
messages.append(f'ERROR: Name {name} is duplicated at columns '
f'{k} and {k+dk+1}')
return messages
def _check_value(self, name, colnum=0, *, required=False, forbidden=False):
"""A list of warnings about anything wrong with the specified PDS3 parameter."""
# Get the old value from the template; None if absent
if colnum:
old_value = self._column_values[colnum][name]
prefix = self._column_name[colnum] + ':'
else:
old_value = self._table_values[name]
prefix = ''
# If the old value is an expression, don't warn
if isinstance(old_value, str) and '$' in old_value:
return []
if required and old_value is None:
new_value = self.lookup(name, colnum)
if new_value is None:
return [f'{prefix}{name} is missing']
new_fmt = Pds3Table._format_for_message(new_value)
return [f'{prefix}{name} is missing: {new_fmt}']
if forbidden:
if old_value is not None:
old_fmt = Pds3Table._format_for_message(old_value)
return [f'ERROR: {prefix}{name} is forbidden: ({old_fmt})']
return []
if not required and not forbidden and old_value is None:
return []
# Get the new value
new_value = self.lookup(name, colnum)
if old_value == new_value:
return []
new_fmt = Pds3Table._format_for_message(new_value) # deal with quoting mismatch
old_fmt = Pds3Table._format_for_message(old_value)
if old_fmt == new_fmt:
return []
return [f'{prefix}{name} error: {old_fmt} -> {new_fmt}']
######################################################################################
# lookup()
######################################################################################
[docs]
def lookup(self, name, column=0):
"""Lookup function returning information about the PDS3 label as it has been
applied to the current table.
Each of the following function calls returns a valid PDS3 parameter value. Columns
can be identified by name or by number starting from 1.
* `lookup("PATH")`
* `lookup("BASENAME")`
* `lookup("RECORD_TYPE")`
* `lookup("RECORD_BYTES")`
* `lookup("FILE_RECORDS")`
* `lookup("INTERCHANGE_FORMAT")`
* `lookup("ROWS")`
* `lookup("COLUMNS")`
* `lookup("ROW_BYTES")`
* `lookup("DATA_TYPE", <column>)`
* `lookup("START_BYTE", <column>)`
* `lookup("BYTES", <column>)`
* `lookup("COLUMN_NUMBER", <column>)`
* `lookup("FORMAT", <column>)`
* `lookup("UNIT", <colnum>)`
* `lookup("MINIMUM_VALUE", <column>)`
* `lookup("MAXIMUM_VALUE", <column>)`
* `lookup("DERIVED_MINIMUM", <column>)`
* `lookup("DERIVED_MAXIMUM", <column>)`
It also provides these values derived from the existing template or label: "NAME",
"ITEMS", "SCALING_FACTOR", "OFFSET", "INVALID_CONSTANT", "MISSING_CONSTANT",
"NOT_APPLICABLE_CONSTANT", "NULL_CONSTANT", "UNKNOWN_CONSTANT", "VALID_MINIMUM",
and "VALID_MAXIMUM".
In addition, these options are supported:
* `lookup("TABLE_PATH")`: full path to the associated ASCII table file.
* `lookup("TABLE_BASENAME")`: basename of the associated ASCII table file.
* `lookup("FIRST", <column>)`: value from the first row of this column.
* `lookup("LAST", <column>)`: value from the last row of this column.
Parameters:
name (str): Name of a parameter.
column (str or int, optional): The name or COLUMN_NUMBER (starting at 1) for a
column; use 0 for general parameters.
Returns:
str: The correct PDS3-formatted value for the specified parameter.
"""
if not column:
colnum = 0
colname = ''
else:
colnum = self._column_number[column] if isinstance(column, str) else column
colname = self._column_name[colnum]
if name in self._edit_dict.get(colname, {}):
return Pds3Table._eval(self._edit_dict[colname][name])
indx = self._table_index[colnum] if colnum else None
match name:
case 'PATH':
return str(self.labelpath)
case 'BASENAME':
return self.labelpath.name
case 'RECORD_TYPE':
return 'FIXED_LENGTH'
case 'INTERCHANGE_FORMAT':
return 'ASCII'
case 'COLUMN_NUMBER':
return colnum
case 'TABLE_PATH':
if self.get_table_path():
return str(self.get_table_path())
case 'TABLE_BASENAME':
if self.get_table_basename():
return self.get_table_basename()
case ('NAME' | 'ITEMS' | 'SCALING_FACTOR' | 'OFFSET' | 'INVALID_CONSTANT' |
'MISSING_CONSTANT' | 'NOT_APPLICABLE_CONSTANT' | 'NULL_CONSTANT' |
'UNKNOWN_CONSTANT' | 'VALID_MINIMUM' | 'VALID_MAXIMUM'):
return self.old_lookup(name, colnum)
if not self.table:
self.assign_to()
match name:
case 'TABLE_PATH':
return self.table.lookup('PATH')
case 'TABLE_BASENAME':
return self.table.lookup('BASENAME')
case 'RECORD_BYTES' | 'ROW_BYTES':
return self.table.lookup('ROW_BYTES')
case 'FILE_RECORDS' | 'ROWS':
return self.table.lookup('ROWS')
case 'COLUMNS':
return self._columns_carefully()
case 'DATA_TYPE':
data_type = self.table.lookup('PDS3_DATA_TYPE', indx)
if data_type == 'ASCII_INTEGER':
# Override ASCII INTEGERS if there's evidence the intent is REAL
if (colname in self.reals
or self._constant_type(colnum) == 'ASCII_REAL'):
return 'ASCII_REAL'
# Override the derived FORMAT if every value in the table is invalid
old_type = self._column_values[colnum]['DATA_TYPE']
if old_type is not None and len(self._unique_valids(colnum)) == 0:
return old_type
return data_type
case 'START_BYTE':
return (self.table.lookup('START_BYTE', indx)
+ self.table.lookup('QUOTES', indx))
case 'ITEM_BYTES':
return self.table.lookup('BYTES', indx)
case 'ITEM_OFFSET':
return self.table.lookup('WIDTH', indx) + 1
case 'BYTES':
items = self._column_items[colnum]
if items == 1:
return self.table.lookup('BYTES', indx)
else:
item_bytes = self.table.lookup('BYTES', indx)
item_offset = (self.table.lookup('START_BYTE', indx+1)
- self.table.lookup('START_BYTE', indx))
return (items-1) * item_offset + item_bytes
case 'FORMAT':
fmt = self.table.lookup('PDS3_FORMAT', indx)
# Force "I" to "F" if necessary
if fmt[0] == 'I':
# Override ASCII INTEGERS if there's evidence the intent is REAL
if (colname in self.reals
or self._constant_type(colnum) == 'ASCII_REAL'):
return 'F' + str(self.table.lookup('BYTES', indx)) + '.0'
# Override the derived FORMAT if every value in the table is invalid
old_fmt = self._column_values[colnum]['FORMAT']
if (old_fmt is not None and len(self._unique_valids(colnum)) == 0
and Pds3Table._format_is_valid(old_fmt)):
return old_fmt
return fmt
case 'UNIT':
unit = self._column_values[colnum]['UNIT']
if self.units:
return Pds3Table._get_valid_unit(unit) or unit
return unit
case ('MINIMUM_VALUE' | 'MAXIMUM_VALUE' | 'DERIVED_MINIMUM' |
'DERIVED_MAXIMUM'):
unique = self._unique_valids(colnum) or self._unique_values(colnum)
new_value = min(unique) if 'MINIMUM' in name else max(unique)
if 'DERIVED' not in name or self._equals_a_constant(colnum, new_value):
return new_value
scaling = self._column_values[colnum].get('SCALING_FACTOR', 1) or 1
offset = self._column_values[colnum].get('OFFSET', 0) or 0
if (scaling, offset) != (1, 0): # can't multiply string values!
new_value = new_value * scaling + offset
return new_value
case 'FIRST' | 'LAST':
return Pds3Table._eval(self.table.lookup(name, indx))
case _:
return self.old_lookup(name, colnum)
def _unique_values(self, colnum):
"""The set of unique values in the specified column number.
Values are cached so any value only needs to be validated once.
"""
unique = self._unique_values_[colnum] # first check the cache
if unique is None:
indx = self._table_index[colnum]
items = self._column_items[colnum]
unique = set()
for i in range(indx, indx+items):
unique |= {Pds3Table._eval(v) for v in self.table.lookup('VALUES', i)}
self._unique_values_[colnum] = unique
return unique
def _unique_valids(self, colnum):
"""The set of unique, valid values in the specified column number.
Values are cached so any value only needs to be validated once.
"""
unique = self._unique_valids_[colnum] # first check the cache
if unique is None:
column_dict = self._column_values[colnum]
constants = {
column_dict['INVALID_CONSTANT'],
column_dict['MISSING_CONSTANT'],
column_dict['NOT_APPLICABLE_CONSTANT'],
column_dict['NULL_CONSTANT'],
column_dict['UNKNOWN_CONSTANT'],
}
unique = self._unique_values(colnum) - constants
if (value := column_dict['VALID_MINIMUM']) is not None:
try:
unique = {v for v in unique if v >= value}
except TypeError: # ignore an invalid VALID_MINIMUM
pass
if (value := column_dict['VALID_MAXIMUM']) is not None:
try:
unique = {v for v in unique if v <= value}
except TypeError: # ignore an invalid VALID_MAXIMUM
pass
self._unique_valids_[colnum] = unique
return unique
def _equals_a_constant(self, colnum, value):
"""True if the given value matches one of the constants in the specified column.
"""
column_dict = self._column_values[colnum]
for name in ['INVALID_CONSTANT', 'MISSING_CONSTANT', 'NOT_APPLICABLE_CONSTANT',
'NULL_CONSTANT', 'UNKNOWN_CONSTANT']:
if value == column_dict[name]:
return False
return True
def _constant_type(self, colnum):
"""The type of the constants for this column, one of ASCII_INTEGER, ASCII_REAL, or
CHARACTER. None if there are no constants or if constants are inconsistent.
"""
types = set()
for name in ['INVALID_CONSTANT', 'MISSING_CONSTANT',
'NOT_APPLICABLE_CONSTANT', 'NULL_CONSTANT', 'UNKNOWN_CONSTANT',
'VALID_MINIMUM', 'VALID_MAXIMUM']:
value = self.lookup(name, colnum)
if value is None:
continue
if isinstance(value, float):
types.add('ASCII_REAL')
elif isinstance(value, int):
types.add('ASCII_INTEGER')
else:
types.add('CHARACTER')
if len(types) == 1:
return list(types)[0]
return None
def _columns_carefully(self):
"""Careful tally of the correct number of COLUMN objects, allowing for a mismatch
between the table and the label.
Without errors, this should work::
columns = self.table.lookup('COLUMNS') - self._extra_items
However, self._extra_items includes extra items that might be missing from the
table.
"""
table_columns = self.table.lookup('COLUMNS')
table_count = 0
for colnum in range(1, len(self._column_values)):
table_count += self._column_items[colnum]
if table_count >= table_columns: # if we reach last table column
return colnum
return colnum + table_columns - table_count # maybe table has more columns
[docs]
def old_lookup(self, name, column=0):
"""Lookup function returning information about the current content of the PDS3
label, whether or not it is correct.
Available top-level keywords are "RECORD_TYPE", "RECORD_BYTES", "FILE_RECORDS",
"INTERCHANGE_FORMAT", "ROWS", "COLUMNS", and "ROW_BYTES".
Available column-level keywords are "NAME", "COLUMN_NUMBER", "DATA_TYPE",
"START_BYTE", "BYTES", "FORMAT", "ITEMS", "ITEM_BYTES", "ITEM_OFFSET",
"SCALING_FACTOR", "OFFSET", "UNIT", "INVALID_CONSTANT", "MISSING_CONSTANT",
"NOT_APPLICABLE_CONSTANT", "NULL_CONSTANT", "UNKNOWN_CONSTANT", "VALID_MAXIMUM",
"VALID_MINIMUM", "MINIMUM_VALUE", "MAXIMUM_VALUE", "DERIVED_MINIMUM", and
"DERIVED_MAXIMUM".
Parameters:
name (str): Name of a parameter.
column (str or int, optional): The name or COLUMN_NUMBER (starting at 1) for a
column; use 0 for general parameters.
Returns:
int, float, str, or None: The current value of the specified parameter; None
if it is not found in the label.
"""
if not column:
return self._table_values[name]
else:
colnum = self._column_number[column] if isinstance(column, str) else column
return self._column_values[colnum][name]
raise KeyError(name)
# Alternative names for use inside templates
LABEL_VALUE = lookup
OLD_LABEL_VALUE = old_lookup
######################################################################################
# PDS3 label utilities
######################################################################################
def _replace_value(self, label, name, replacement, *, required=False, after=None,
before=None, first=False):
"""Replace a value in a label string with the given replacement.
Parameters:
label (str): PDS3 label substring.
name (str): PDS3 parameter name.
replacement (str): Replacement string, formatted as needed for the label.
required (bool): True if the parameter is required. If required, the name and
replacement will be inserted if not present already.
after (str, optional): If the new parameter must be inserted, it will appear
immediately after this parameter.
before (str, optional): If the new parameter must be inserted, it will apppear
immediately before this parameter.
first (str, optional): If the new parameter must be inserted, it will apppear
first in the label.
Returns:
str: The revised label string.
str or None: The prior value of the parameter before replacement, if present.
Notes:
If the replacement string contains "$", meaning that it already contains a
PdsTemplate expression, it is not replaced.
"""
# Split by the name=value substring
parts = re.split(r'(?<!\S)(' + name + r' *= *)([^\r\n]*)', label)
# If a match is found, this will be a list [before, "<name> = ", value, after]
# where `value` includes any trailing blanks and/or a comment
if len(parts) == 1: # if not found
if not required:
return (label, None)
new_label = self._insert_value(label, name, replacement, after=after,
before=before, first=first)
return (new_label, None)
# Split trailing blanks and an optional comment
subparts = parts[2].partition('/*')
value = subparts[0].rstrip()
tail = (len(subparts[0]) - len(value)) * ' ' + subparts[1] + subparts[2]
value = Pds3Table._eval(parts[2])
if not self.analyze_only:
label = ''.join(parts[:2]) + replacement + tail + ''.join(parts[3:])
return (label, value)
def _insert_value(self, label, name, value, *, after=None, before=None, first=False):
"""Insert a new name=value entry into the label string.
Parameters:
label (str): PDS3 label substring.
name (str): PDS3 parameter name.
value (str): Value string, formatted as needed for the label.
after (str, optional): Insert the new name=value entry immediately after this
parameter, if present.
before (str, optional): Insert the new name=value entry immediately before
this parameter, if present.
first (str, optional): If True, insert the new name=value entry first.
Returns:
str: The revised label.
Notes:
If neither `after` nor `before` is specified and `first` is False, the new
entry appears at the end. The order of precedence is `first`, `after`,
before`.
"""
if self.analyze_only:
return label
# Figure out the alignments and terminator for the new entry
indent = len(label) - len(label.lstrip())
equal = len(label.partition('=')[0])
terminator = '\r\n' if label.endswith('\r\n') else '\n'
# Define the full line to be inserted
new_line = indent * ' ' + name + equal * ' '
new_line = new_line[:equal] + '= ' + value + terminator
# Apply `first`
if first:
return new_line + label
# Apply `after`
if after:
parts = re.split(r'(?<!\S)(' + after + r' *=.*?\n)', label)
if len(parts) > 1:
return parts[0] + parts[1] + new_line + ''.join(parts[2:])
# Apply `before`
if before:
parts = re.split(r'(?<![^\n])( *' + before + r' *=)', label)
if len(parts) > 1:
return parts[0] + new_line + ''.join(parts[1:])
# Otherwise, insert at the end
return label + new_line
@staticmethod
def _get_value(label, name, raw=False):
"""The value of the named parameter within the label.
Parameters:
label (str): PDS3 label string.
name (str): PDS3 parameter name.
raw (bool, optional): True for a "raw" value without evaluation.
Returns:
str or None: The string value of the parameter if present; None otherwise.
"""
# Find name=value substring
matches = re.findall(r'(?<!\S)' + name + r' *= *([^\r\n]*)', label)
if not matches:
return None
value = matches[0].partition('/*')[0].rstrip()
if raw:
return value
return Pds3Table._eval(value)
_UNQUOTED_OK = re.compile(r'[A-Z][A-Z0-9_]*')
@staticmethod
def _eval(value):
"""Convert the given string value to int, float, or string.
Unnecessary quotes are stripped, but necessary quotes are retained.
"""
if value is None:
return None
if isinstance(value, str):
value = value.strip()
if value.startswith('"') and value.endswith('"'):
return value[1:-1]
try:
return int(value)
except ValueError:
pass
try:
return float(value)
except ValueError:
pass
return value
@staticmethod
def _format_for_message(value):
"""Return the value formatted for an error message or the PDS3 label."""
if isinstance(value, int):
return str(value)
if isinstance(value, float):
value = str(value)
(before, optional_e, after) = value.partition('e')
if '.' not in before:
before += '.'
before.rstrip('0')
return before + optional_e.upper() + after
if value in ('N/A', "'N/A'"):
return "'N/A'"
if value.startswith('"'):
return value
if Pds3Table._UNQUOTED_OK.fullmatch(value):
return value
return '"' + value + '"'
_VALID_AI_FORMAT_STRING = re.compile(r'([AI])(\d+)')
_VALID_EF_FORMAT_STRING = re.compile(r'([EF])(\d+)\.(\d+)')
@staticmethod
def _format_is_valid(value):
"""True if the given value is a valid PDS3 format string."""
if not isinstance(value, str):
return False
match = Pds3Table._VALID_AI_FORMAT_STRING.fullmatch(value)
if match:
return int(match.group(2)) > 0
match = Pds3Table._VALID_EF_FORMAT_STRING.fullmatch(value)
if match:
i1 = int(match.group(2))
i2 = int(match.group(3))
if match.group(1) == 'F' and i1 > i2 + 1:
return True
if match.group(1) == 'E' and i1 > i2 + 5:
return True
return False
######################################################################################
# Translator/validator for PDS3 units
######################################################################################
_VALID_UNITS = { # from pdsdd.full
'A', 'A/m', 'A/m**2', 'B', 'Bq', 'C', 'C/kg', 'C/m**2', 'C/m**3', 'F',
'F/m', 'Gy', 'Gy/s', 'H', 'H/m', 'Hz', 'J', 'J/(kg.K)', 'J/(m**2)/s',
'J/(mol.K)', 'J/K', 'J/T', 'J/kg', 'J/m**3', 'J/mol', 'K', 'MB', 'N',
'N.m', 'N/A', 'N/m', 'N/m**2', 'Pa', 'Pa.s', 'S', 'Sv', 'T', 'V',
'V/m', 'W', 'W.m**-2.sr**-1', 'W/(m.K)', 'W/m**2', 'W/sr', 'Wb',
'arcsec/pixel', 'arcsecond',
'bar', 'cd', 'cd/m**2', 'd', 'dB', 'deg', 'deg/day', 'deg/s', 'degC',
'g', 'g/cm**3', 'h', 'kHz', 'kb/s', 'kg', 'kg/m**3', 'km', 'km**-1',
'km**2', 'km/pixel', 'km/s', 'lm', 'local day/24', 'lx', 'm', 'm**-1',
'm**2', 'm**2/s', 'm**3', 'm**3/kg', 'm/pixel', 'm/s', 'm/s**2', 'mA',
'mag', 'micron', 'min', 'mm', 'mm/s', 'mol', 'mol/m**3', 'mrad', 'ms',
'n/a', 'nT', 'nm', 'none', 'ohm', 'p/line', 'pixel', 'pixel/deg',
'rad', 'rad/s', 'rad/s**2', 's', 'sr', 'uW', 'us', 'us_dollar',
#
# disallowed: b -> bit; pix -> pixel; degree -> deg
# 'b/pixel', 'b/s', 'km/pix', 'm/pix', 'pix/deg', 'pix/degree', 'pixel/degree'
#
# added manually...
'bit', 'kbit', 'bit/s', 'kbit/s', 'bit/pixel', 'kbit/pixel', 'cm', 'KB', 'KB/s',
'MB/s', 'erg/s/cm**2/micron/sr',
"'N/A'", None,
}
_COMMON_UNITS_TO_REPAIR = {
('celsius degree', 'degC'),
('kelvin', 'K'),
('degree', 'deg'),
('radian', 'rad'),
('arcsecond', 'arcsec'),
('arcsec', 'arcsecond'),
('steradian', 'sr'),
('ster', 'sr'),
('centimeter', 'cm'),
('kilometer', 'km'),
('meter', 'm'),
('micron', 'micron'), # needed for "microns" -> "micron"
('micrometer', 'micron'),
('second', 's'),
('sec', 's'),
('minute', 'min'),
('hour', 'h'),
('millisec', 'ms'),
('millisecond', 'ms'),
('pix', 'pixel'),
('bit', 'bit'), # needed for "bits" -> "bit"
('kbit', 'kbit'), # needed for "kbits" -> "kbit"
('kilobit', 'kbit'),
('kilobyte', 'KB'),
('megabyte', 'MB'),
('erg', 'erg'), # needed for "ergs" -> erg
('hz', 'Hz'),
}
_WORD_SPLITTER = re.compile(r'([a-zA-Z ]+).*?')
@staticmethod
def _get_valid_unit(unit):
"""The valid version of the given unit, or an empty string on failure."""
if unit in Pds3Table._VALID_UNITS:
return unit
unit_lc = unit.lower()
if unit_lc in Pds3Table._VALID_UNITS:
return unit_lc
# Fix exponent style, punctuation
if '^' in unit:
return Pds3Table._get_valid_unit(unit.replace('^', '**'))
if unit.startswith("'") and unit.endswith("'"):
return Pds3Table._get_valid_unit(unit[1:-1])
# Split into words (split by punctuation but not by blanks)
parts = Pds3Table._WORD_SPLITTER.split(unit)
# Convert each word to lower case if it's a unit in and of itself
parts_lc = [part.lower() for part in parts]
for k, part_lc in enumerate(parts_lc):
if part_lc in Pds3Table._VALID_UNITS:
parts[k] = part_lc
# Replace other units with common options
words = set(parts_lc[1::2])
for (before, after) in Pds3Table._COMMON_UNITS_TO_REPAIR:
for suffix in ('', 's'):
test = (before + suffix).lower()
if test in words:
k = parts_lc.index(test)
parts[k] = after
unit = ''.join(parts)
return unit if unit in Pds3Table._VALID_UNITS else ''
@staticmethod
def _unit_is_valid(unit):
"""True if the given unit is valid."""
valid_unit = Pds3Table._get_valid_unit(unit)
return bool(valid_unit != '')
##########################################################################################