##########################################################################################
# pdstemplate/__init__.py
##########################################################################################
"""PDS Ring-Moon Systems Node, SETI Institute
``pdstemplate`` is a Python module that defines the :class:`PdsTemplate` class, which is
used to generate PDS labels based on template files. Both PDS3 and PDS4 (xml) labels are
supported. Although specifically designed to facilitate data deliveries by PDS data
providers, the template system is generic and can be used to generate files from templates
for other purposes.
###############
Getting Started
###############
The general procedure is as follows:
1. Create a template object by calling the :meth:`PdsTemplate` constructor to read a
template file::
from pdstemplate import PdsTemplate
template = PdsTemplate(template_file_path)
2. Create a dictionary that contains the parameter values to use inside the label.
3. Construct the label using method :meth:`~PdsTemplate.write` as follows::
template.write(dictionary, label_file)
This will create a new label of the given name, using the values in the given dictionary.
Once the template has been constructed, steps 2 and 3 can be repeated any number of times.
Alternatively, you can obtain the content of a label without writing it to a file using
method :meth:`~PdsTemplate.generate`.
``pdstemplate`` employs the RMS Node's `rms-filecache
<https://pypi.org/project/rms-filecache>`_ module and its `FCPath
<https://rms-filecache.readthedocs.io/en/latest/module.html#filecache.file_cache_path.FCPath>`_
class to support the handling of files at a website or in the cloud. You can refer to a
remote file by URL and the :class:`PdsTemplate` will treat it as if it were a local file.
See `filecache's documentation
<https://rms-filecache.readthedocs.io/en/latest/index.html>`_ for further details.
###############
Template Syntax
###############
A template file will look generally like a label file, except for certain embedded
expressions that will be replaced when the template's :meth:`~PdsTemplate.write` or
:meth:`~PdsTemplate.generate` method is called.
*************
Substitutions
*************
In general, everything between dollar signs "$" in the template is interpreted as a
Python expression to be evaluated. The result of this expression then replaces it
inside the label. For example, if ``dictionary['INSTRUMENT_ID'] == 'ISSWA'``, then::
<instrument_id>$INSTRUMENT_ID$</instrument_id>
in the template will become::
<instrument_id>ISSWA</instrument_id>
in the label. The expression between "$" in the template can include indexes, function
calls, or just about any other Python expression. As another example, using the same
dictionary above::
<camera_fov>$"Narrow" if INSTRUMENT_ID == "ISSNA" else "Wide"$</camera_fov>
in the template will become this in the label::
<camera_fov>Wide</camera_fov>
An expression in the template of the form ``$name=expression$``, where the `name` is a
valid Python variable name, will also also have the side-effect of defining this
variable so that it can be re-used later in the template. For example, if this appears
as an expression::
$cruise_or_saturn=('cruise' if START_TIME < 2004 else 'saturn')$
then later in the template, one can write::
<lid_reference>
urn:nasa:pds:cassini_iss_$cruise_or_saturn$:data_raw:cum-index
</lid_reference>
To embed a literal "$" inside a label, enter "$$" into the template.
*******
Headers
*******
Headers provide even more sophisticaed control over the content of a label. A header
appears alone on a line of the template and begins with "$" as the first non-blank
character. It determines whether or how subsequent text of the template will appear in the
file, from here up to the next header line.
===============
FOR and END_FOR
===============
You can include one or more repetitions of the same text using ``FOR`` and ``END_FOR``.
The format is::
$FOR(expression)
<template text>
$END_FOR
where `expression` evaluates to a Python iterable. Within the `template text`, these new
variable names are assigned:
- `VALUE` = the current value of the iterator;
- `INDEX` = the index of this iteration, starting from zero;
- `LENGTH` = the total number of iterations.
For example, if::
dictionary["targets"] = ["Jupiter", "Io", "Europa"]
dictionary["naif_ids"] = [599, 501, 502]
then::
$FOR(targets)
<target_name>$VALUE (naif_ids[INDEX])$</target_name>
$END_FOR
in the template will become this in the label::
<target_name>Jupiter (599)</target_name>
<target_name>Io (501)</target_name>
<target_name>Europa (502)</target_name>
Instead of using the names `VALUE`, `INDEX`, and `LENGTH`, you can customize the variable
names by listing up to three comma-separated names and an equal sign "=" before the
iterable expression. For example, this will produce the same results as the example
above::
$FOR(name, k=targets)
<target_name>$name (naif_ids[k])$</target_name>
$END_FOR
=============================
IF, ELSE_IF, ELSE, and END_IF
=============================
You can use ``IF``, ``ELSE_IF``, ``ELSE``, and ``END_IF`` to select among alternative
blocks of text in the template:
- ``IF(expression)``: Evaluate `expression` and include the next lines of the template if
it is logically True (e.g., boolean True, a nonzero number, a non-empty list or string,
etc.).
- ``ELSE_IF(expression)``: Include the next lines of the template if `expression` is
logically True and every previous expression was logically false.
- ``ELSE``: Include the next lines of the template only if all prior expressions were
logically False.
- ``END_IF``: This marks the end of the set of if/else alternatives.
As with other substitutions, you can define a new variable of a specified name by using
`name=expression` inside the parentheses of ``IF()`` and ``ELSE_IF()``.
Note that headers can be nested arbitrarily inside the template.
.. _ONCE:
====
ONCE
====
``ONCE`` is a header that simply includes the content that follows it one time. However,
it is useful for its side-effect, which is that ``ONCE(expression)`` allows the embedded
`expression` to be evaluated without writing new text into the label. You can use this
capability to define variables internally without affecting the content of the label
produced. For example::
$ONCE(date = big_dictionary["key"]["date"])
will assign the value of the variable named `date` for subsequent use within the template.
=======
INCLUDE
=======
This header will read the content of another file and insert its content into the template
here::
$INCLUDE(filename)
Using the environment variable ``PDSTEMPLATE_INCLUDES``, you can define one or more
directories that will be searched for a file to be included. If multiple directories are
to be searched, they should be separated by colons. You can also specify one or more
directories to search in the :meth:`PdsTemplate` constructor using the `includes` input
parameter.
Include files are handled somewhat differently from other headers. When ``INCLUDE``
references a file as a literal string rather than as an expression to evaluate, it is
processed at the time that the :class:`PdsTemplate` is constructed. However, if the
filename is given as an expression, it is not evaluated until :meth:`~PdsTemplate.write``
or :meth:`~PdsTemplate.generate`` is called for each label.
=================
NOTE and END_NOTE
=================
You can use ``NOTE`` and ``END_NOTE`` to embed any arbitrary comment block into the
template. Any text between these headers does not appear in the label::
$NOTE
Here is an extended comment about the templae
$END_NOTE
You can also use ``$NOTE:`` for an in-line comment. This text, and any blanks before it,
are not included in the label::
<filter>$FILTER$</filter> $NOTE: This is where we identify the filter
*********************
Pre-defined Functions
*********************
The following pre-defined functions can be used inside any expression in the template.
- :meth:`~.PdsTemplate.BASENAME`:
The basename of a filepath, with leading directory path removed.
- :meth:`~.PdsTemplate.BOOL`
Return one of two strings based on a boolean input.
- :meth:`~.PdsTemplate.COUNTER`
The current value of a counter.
- :meth:`~.PdsTemplate.CURRENT_TIME`
The current date or time in the local time zone as a string of the form "yyyy-mm-dd" or
"yyyy-mm-ddThh:mm:sss".
- :meth:`~.PdsTemplate.CURRENT_ZULU`
The current UTC date or time as a string of the form "yyyy-mm-dd" or
"yyyy-mm-ddThh:mm:sssZ".
- :meth:`~.PdsTemplate.DATETIME`
Convert a time to an ISO time string with the date expressed as "yyyy-mm-dd".
- :meth:`~.PdsTemplate.DATETIME_DOY`
Convert a time to an ISO time string with the date expressed as "yyyy-ddd".
- :meth:`~.PdsTemplate.DAYSECS`
Convert a time to the number of elapsed seconds since the most recent midnight.
- :meth:`~.PdsTemplate.FILE_BYTES`
The size in bytes of a specified file.
- :meth:`~.PdsTemplate.FILE_MD5`
The MD5 checksum of a specified file.
- :meth:`~.PdsTemplate.FILE_RECORDS`
The number of records in a specified file.
- :meth:`~.PdsTemplate.FILE_TIME`
The modification time in the local time zone of the specified file.
- :meth:`~.PdsTemplate.FILE_ZULU`
The UTC modification time of a specified by file.
- :meth:`~.PdsTemplate.GETENV`
The value of any environment variable.
- :meth:`~.PdsTemplate.LABEL_PATH`
The file path of the label being written.
- :meth:`~.PdsTemplate.LOG`
Write a message to the current log.
- :meth:`~.PdsTemplate.NOESCAPE`
If the template is XML, evaluated expressions are "escaped" to ensure that they are
suitable for embedding in a PDS4 label. For example, ">" inside a string will be
replaced by ">". This function prevents text from being escaped in the label,
allowing it to contain literal XML.
- :meth:`~.PdsTemplate.QUOTE_IF`
Quote the given text if it requires quotes within a PDS3 label.
- :meth:`~.PdsTemplate.RAISE`
Raise an exception with a given class and text message.
- :meth:`~.PdsTemplate.RECORD_BYTES`
The maximum number of bytes in any record of a specified file, including line
terminators.
- :meth:`~.PdsTemplate.REPLACE_NA`
Return either a given string or an indication that it is "not applicable".
- :meth:`~.PdsTemplate.REPLACE_UNK`
Return either a given string or an indication that it is "unknown".
- :meth:`~.PdsTemplate.TEMPLATE_PATH`
The directory path to the template file.
- :meth:`~.PdsTemplate.VERSION_ID`
Version ID of this module using two digits, e.g., "v1.0".
- :meth:`~.PdsTemplate.WRAP`
Wrap the given text to a specified indentation and width.
These functions can also be used directly by the programmer within a template; they are
static functions of class :class:`PdsTemplate`.
##############################
Logging and Exception Handling
##############################
``pdstemplate`` employs the RMS Node's `rms-pdslogger
<https://pypi.org/project/rms-pdslogger>`_ module to handle logging. By default, the
logger is a `PdsLogger
<https://rms-pdslogger.readthedocs.io/en/latest/module.html#pdslogger.PdsLogger>`_ object,
although any ``logging.Logger`` object will work. See `pdslogger's documentation
<https://rms-pdslogger.readthedocs.io>`_ for further details.
You can override the default Logger using static method :meth:`~utils.set_logger`. You can
also set the logging level ("info", "warning", "error", etc.) using
:meth:`~utils.set_log_level` and can select among many log formatting options using
:meth:`~set_log_format`. Use :meth:`~utils.get_logger` to obtain the current Logger.
By default, exceptions during a call to :meth:`~PdsTemplate.write` or
:meth:`~PdsTemplate.generate` are handled as follows:
1. They are written to the log.
2. The expression that triggered the exception is replaced by the error text in the label,
surrounded by "[[[" and "]]]" to make it easier to find.
3. The attributes ``fatal_count``, ``error_count``, and ``warning_count`` of the
:class:`PdsTemplate` contain the number of messages logged by each category.
4. The exception is otherwise suppressed.
This behavior can be modified using ``raise_exceptions=True`` in the call to
:meth:`~PdsTemplate.write` or :meth:`~PdsTemplate.generate`; in this case, the exception
will be raised, label generation will stop, and the label will not be written.
##############
Pre-processors
##############
A pre-processor is a function that takes the text of a template file as input and returns
a new template as output. As described above, ``INCLUDE`` headers that contain an explicit
file name (rather than an expression to be evaluated) are handled by a pre-processor.
You may define your own functions to pre-process the content of a template. They must have
this call signature::
func(path: str | Path | FCPath, content: str, *args, **kwargs) -> str
where
* `path` is the path to the template file (used here just for error logging).
* `content` is the content of a template represented by a single string with <LF> line
terminators.
* `*args` is for any additional positional arguments to `func`.
* `**kwargs` is for any additional keyword arguments to `func`.
When you invoke the :meth:`PdsTemplate` constructor, one of the optional inputs is
`preprocess`, which takes either a single function or a list of functions to apply after
the INCLUDE pre-processor. For the first of these, the `args` and `kwargs` inputs can be
provided as additional inputs to the constructor. Subsequent pre-processors cannot take
additional arguments; define them using lambda notation instead.
Note that a :class:`PdsTemplate` object has an attribute `content`, which contains the
full content of the template after all pre-processing has been performed. You can examine
this attribute to see the final result of all processing. Note also that when line numbers
appear in an error message, they refer to the line number of the template after
pre-processing, not before.
"""
import datetime
import hashlib
import numbers
import os
import re
import string
import textwrap
import time
from collections import deque
from filecache import FCPath
import julian
import pdslogger
try:
from ._version import __version__
except ImportError: # pragma: no cover
__version__ = 'Version unspecified'
from .utils import TemplateError, TemplateAbort # noqa: F401
# Unused here but included to support "from pdstemplate import TemplateError", etc.
from .utils import _RaisedException, _NOESCAPE_FLAG
from .utils import set_logger, get_logger, set_log_level, set_log_format
from ._pdsblock import _PdsBlock, _PdsIncludeBlock
[docs]
class PdsTemplate:
"""Class to generate PDS labels based on a template.
See https://rms-pdstemplate.readthedocs.io/en/latest/module.html for details.
"""
# We need to handle certain attributes as class variables because we need to support
# the various default functions such as LABEL_PATH(), etc., and these function execute
# within a template without any associated context. Therefore, the PdsTemplate module
# cannot operate in a multi-threaded environment!
_CURRENT_TEMPLATE = None
_CURRENT_LABEL_PATH = ''
_CURRENT_GLOBAL_DICT = {}
_GETENV_INCLUDE_DIRS = None
[docs]
def __init__(self, template, content='', *, xml=None, crlf=None, upper_e=False,
includes=[], preprocess=None, args=(), kwargs={}, postprocess=None):
"""Construct a PdsTemplate object from the contents of a template file.
Parameters:
template (str, Path, or FCPath):
Path of the input template file.
content (str or list[str], optional):
Alternative source of the template content rather than reading it from a
file.
xml (bool, optional):
Use True to indicate that the template is in xml format; False otherwise.
If not specified, an attempt is made to detect the format from the
template.
upper_e (bool, optional):
True to force the "E" in the exponents of floating-point numbers to be
upper case.
crlf (bool, optional):
True to indicate that the line termination should be <CR><LF>; False for
<LF> only. If not specified, the line termination is inferred from the
template.
includes (str, Path, FCPath, or list):
One or more directory paths where template include files can be found. The
directory containing `template` is always searched first. Note that
include paths can also be specified using the environment variable
PDSTEMPLATE_INCLUDES, which should contain one or more directory paths
separated by colons. Any directories specified here are searched before
those defined by PDSTEMPLATE_INCLUDES.
preprocess (function or list[function], optional):
An optional function or list of functions that transform the content of a
template into a new template. The call signature is::
func(path, content, *args, **kwargs) -> str
where `path` is the path of the template file (used here just for error
reporting), `content` is the template's content, provided as a single
string with <LF> line terminators, and `args` and `kwargs` are additional
inputs. Note that any pre-processor after the first cannot have `args` or
`kwargs` as inputs; use lambda notation if later functions require inputs.
args (tuple or list):
Any arguments to be passed to the first preprocess function after the
template's content.
kwargs (dict):
Any keywords=value arguments to be passed to the first preprocess
function.
postprocess (function, optional):
An optional function to apply after label content has been generated. This
could be used to transform the content or to apply further validation. The
call signature is::
func(content: str) -> str
For example, use `postprocess=ps3_syntax_checker` to ensure a generated
label strictly conforms to the PDS3 standard.
"""
self.template_path = FCPath(template)
PdsTemplate._CURRENT_TEMPLATE = self
PdsTemplate._CURRENT_LABEL_PATH = ''
PdsTemplate._CURRENT_GLOBAL_DICT = {}
includes = includes if isinstance(includes, (list, tuple)) else [includes]
self._includes = [FCPath(dir) for dir in includes]
self.upper_e = bool(upper_e)
self.postprocess = postprocess
logger = get_logger()
logger.info('New PdsTemplate', self.template_path)
try:
# Read the template if necessary; use binary to preserve line terminators
if not content:
logger.debug('Reading template', self.template_path)
content = self.template_path.read_bytes().decode('utf-8')
# Check the line terminators
if crlf is None:
if isinstance(content, list):
crlf = content[0].endswith('\r\n')
else:
crlf = content.endswith('\r\n')
logger.debug(f'Inferred terminator is {"<CR><LF>" if crlf else "<LF>"}')
self.crlf = crlf
self.terminator = '\r\n' if self.crlf else '\n'
# Convert to a single string with <LF> line terminators
if not isinstance(content, list):
content = content.split('\n')
if not content[-1]: # strip extraneous empty string at end
content = content[:-1]
content = [c.rstrip('\r\n') for c in content] + ['']
content = '\n'.join(content)
# Preprocess the explicit $INCLUDES
content = self._preprocess_includes(content)
# Apply any additional preprocessor
if preprocess:
if not isinstance(preprocess, list):
preprocess = [preprocess]
for k, func in enumerate(preprocess):
logger.info('Preprocessing with ' + func.__name__)
if k == 0:
content = func(self.template_path, content, *args, **kwargs)
else:
content = func(self.template_path, content)
self.content = content
# If the template has been pre-processed, line numbers in the error messages
# will no longer be correct (because they are the line numbers _after_
# pre-processing. Inside _pdsblock.py, this flag tells the logger to print the
# actual content of the line causing the error, for simpler diagnosis of the
# problem. DISABLED for now.
# self._include_more_error_info = (content != before)
self._include_more_error_info = False
# Detect XML if not specified
if xml is None:
self.xml = self._detect_xml(content)
else:
self.xml = xml
# Compile into a deque of _PdsBlock objects
self._blocks = _PdsBlock.process_headers(content, self)
except Exception as err:
logger.exception(err, self.template_path)
raise
# For managing errors and warnings raised during generate()
self.fatal_count = 0
self.error_count = 0
self.warning_count = 0
def _include_dirs(self):
"""Ordered list of all include directories to search."""
if PdsTemplate._GETENV_INCLUDE_DIRS is None:
value = os.getenv('PDSTEMPLATE_INCLUDES')
dirs = value.split(':') if value else []
PdsTemplate._GETENV_INCLUDE_DIRS = [FCPath(d) for d in dirs]
return ([self.template_path.parent] + self._includes
+ PdsTemplate._GETENV_INCLUDE_DIRS)
_INCLUDE_REGEX = re.compile(r'(?<![^\n]) *\$INCLUDE\( *(\'[^\']+\'|"[^"]+") *\) *\n')
def _preprocess_includes(self, content):
"""Pre-process the template content for $INCLUDE directives with explicit paths.
Paths containing expressions of any sort are left alone.
"""
# Split based on $INCLUDE headers. The entire template is split into substrings:
# - Even indices contain text between the $INCLUDES
# - Odd indices contain the file name surrounded by quotes
parts = PdsTemplate._INCLUDE_REGEX.split(content)
for k, part in enumerate(parts):
if k % 2 == 1:
part = _PdsIncludeBlock.get_content(part[1:-1], self._include_dirs())
part = self._preprocess_includes(part) # process recursively
parts[k] = part
return ''.join(parts)
@staticmethod
def _detect_xml(content):
"""Determine whether the given content is xml."""
first_line = content.partition('\n')[0]
if '<?xml' in first_line:
return True
count = len(first_line.split('<'))
if count > 1 and count == len(first_line.split('>')):
return True
return False
[docs]
def generate(self, dictionary, label_path='', *, raise_exceptions=False,
hide_warnings=False, abort_on_error=False):
"""Generate the content of one label based on the template and dictionary.
Parameters:
dictionary (dict):
The dictionary of parameters to replace in the template.
label_path (str, Path, or FCPath, optional):
The output label file path. Although a file is not written, this path is
used in error messages.
raise_exceptions (bool, optional):
True to raise any exceptions encountered; False to log them and embed the
error message into the label surrounded by "[[[" and "]]]".
hide_warnings (bool, optional):
True to hide warning messages.
abort_on_error (bool, optional):
True to abort the generation process if a validation error is encountered.
If `raise_exceptions` is True, an exception will be raised; otherwise, the
error will logged and an empty string will be returned.
Returns:
str: The generated content.
"""
label_path = str(label_path) if label_path else ''
# Initialize
PdsTemplate._CURRENT_TEMPLATE = self
PdsTemplate._CURRENT_LABEL_PATH = label_path
# Add predefined functions to the dictionary
global_dict = dictionary.copy()
for name, value in PdsTemplate._PREDEFINED_FUNCTIONS.items():
if name not in global_dict:
global_dict[name] = value
global_dict['hide_warnings'] = bool(hide_warnings)
global_dict['abort_on_error'] = bool(abort_on_error)
PdsTemplate._CURRENT_GLOBAL_DICT = global_dict
state = _LabelState(self, global_dict, label_path,
raise_exceptions=raise_exceptions)
# Generate the label content recursively
results = deque()
logger = get_logger()
logger.open('Generating label', label_path)
try:
for block in self._blocks:
results += block.execute(state)
content = ''.join(results)
if self.postprocess: # postprocess if necessary
content = self.postprocess(content)
except TemplateAbort as err:
logger.fatal('**** ' + err.message, label_path)
except Exception as err:
logger.exception(err, label_path)
raise err
finally:
(fatals, errors, warns, total) = logger.close()
content = ''.join(results)
self.fatal_count = fatals
self.error_count = errors
self.warning_count = warns
# Update the terminator if necessary
if self.terminator != '\n':
content = content.replace('\n', self.terminator)
# Reset global symbols
PdsTemplate._CURRENT_LABEL_PATH = ''
PdsTemplate._CURRENT_GLOBAL_DICT = {}
return content
[docs]
def write(self, dictionary, label_path, *, mode='save', backup=False,
raise_exceptions=False, handler=None):
"""Write one label based on the template, dictionary, and output filename.
Parameters:
dictionary (dict):
The dictionary of parameters to replace in the template.
label_path (str, Path, or FCPath, optional):
The output label file path.
mode (str, optional):
"save" to save the new label content regardless of any warnings or errors;
"repair" to save the new label if warnings occurred but no errors;
"validate" to log errors and warnings but never save the new label file.
backup (bool, optional):
If True and an existing file of the same name as label_path already
exists, that file is renamed with a suffix indicating its original
modification date. The format is "_yyyy-mm-ddThh-mm-ss" and it appears
after the file stem, before the extension.
raise_exceptions (bool, optional):
True to raise any exceptions encountered; False to log them and embed the
error message into the label surrounded by "[[[" and "]]]".
handler (str, Path, FCPath, or logger.Handler, optional):
Define a handler to use exclusively during the generation of this label.
If it is a string, Path, or FCPath, a logger.FileHandler using this path
is constructed. However, if the string begins with "." is instead
interpreted as the file extension to use, where the path up to the
extension is defined by `label_path`; for example, if handler=".log", then
when writing "path/to/123.lbl", the log created will be "path/to/123.log".
This log is automatically closed once the label is written.
Returns:
int: Number of errors issued.
int: Number of warnings issued.
"""
if mode not in {'save', 'repair', 'validate'}:
raise ValueError('invalid mode value: ' + repr(mode))
label_path = FCPath(label_path)
logger = get_logger()
if handler:
# Convert any string to an FCPath
if isinstance(handler, str):
if handler.startswith('.'):
handler = label_path.with_suffix(handler)
else:
handler = FCPath(handler)
# Convert any FCPath to a FileHandler
if isinstance(handler, FCPath):
handler = pdslogger.file_handler(handler)
logger.add_handler(handler)
try:
content = self.generate(dictionary, label_path,
raise_exceptions=raise_exceptions,
hide_warnings=(mode == 'save'),
abort_on_error=(mode != 'save'))
fatals = self.fatal_count
errors = self.error_count
warns = self.warning_count
if fatals and not errors:
errors = fatals
# Validation case
if mode == 'validate':
if errors:
plural = 's' if errors > 1 else ''
logger.error(f'Validation failed with {errors} error{plural}',
label_path, force=True)
elif warns:
plural = 's' if warns > 1 else ''
logger.warning(f'Validation failed with {warns} warning{plural}',
label_path, force=True)
else:
logger.info('Validation successful', label_path, force=True)
# Repair case
elif mode == 'repair':
if errors:
plural = 's' if errors > 1 else ''
logger.warning(f'Repair failed with {errors} error{plural}',
label_path)
elif label_path.exists():
old_content = label_path.read_bytes().decode('utf-8')
if old_content == content:
logger.info('Repair unnecessary; content is unchanged',
label_path)
else:
mode = 'save' # re-save the file
else:
plural = 's' if warns > 1 else ''
logger.info(f'Repairing {warns} warning{plural}', label_path,
force=True)
mode = 'save' # proceed with saving the file
# Otherwise, save
if mode != 'save':
return (errors, warns)
# Don't save a file after a fatal error
if fatals:
logger.error('File save aborted due to prior errors')
return (errors, warns)
# Backup existing label if necessary
exists = label_path.exists()
if exists and backup:
timestamp = os.path.getmtime(label_path.get_local_path())
date = datetime.datetime.fromtimestamp(timestamp) # wrong if remote
datestr = date.isoformat(timespec='seconds').replace(':', '-')
backup_path = label_path.parent / (label_path.stem + '_' + datestr +
label_path.suffix)
label_path.rename(backup_path)
logger.info('Existing label renamed to', backup_path)
exists = False
# Write label
if content and not content.endswith(self.terminator):
content += self.terminator
label_path.write_bytes(content.encode('utf-8'))
# Log event
if exists:
logger.info('Label re-written', label_path)
else:
logger.info('Label written', label_path)
finally:
logger.remove_handler(handler) # OK if handler is None
return (errors, warns)
[docs]
@staticmethod
def log(level, message, filepath='', *, force=False):
"""Send a message to the current logger.
This allows external modules to issue warnings and other messages.
Parameters:
level (int or str): Level of the message: 'info', 'error', 'warn', etc.
message (str): Text of the warning message.
filepath (str, Path, or FCPath, optional): File path to include in the
message.
force (bool, optional): True to force the logging of the message regardless
of the level.
"""
get_logger().log(level, message, filepath, force=force)
[docs]
@staticmethod
def define_global(name, value):
"""Define a new global symbol.
This allows external modules to define new symbols during template generation.
Parameters:
name (str): Name of global symbol as it will appear inside the template.
value (any): Value of the symbol.
"""
# Add the new value to the permanent set (even if it's not really a function)
PdsTemplate._PREDEFINED_FUNCTIONS[name] = value
# If generate() is currently active, add it to the active dictionary too
if PdsTemplate._CURRENT_LABEL_PATH: # hard to get here # pragma: no cover
PdsTemplate._CURRENT_GLOBAL_DICT[name] = value
######################################################################################
# Utility functions
######################################################################################
[docs]
@staticmethod
def BASENAME(filepath):
"""The basename of `filepath`, with the leading directory path removed.
Parameters:
filepath (Path | FCPath | str): The filepath.
Returns:
str: The basename of the filepath (the final filename).
"""
return FCPath(filepath).name
[docs]
@staticmethod
def BOOL(value, true='true', false='false'):
"""Return `true` if `value` evaluates to Boolean True; otherwise, return `false`.
Parameters:
value (truthy): The expression to evaluate for truthy-ness.
true (str, optional): The value to return for a True expression.
false (str, optional): The value to return for a False expression.
Returns:
str: "true" or "false", or the given values in the `true` and/or `false`
parameters.
"""
return (true if value else false)
_counters = {}
[docs]
@staticmethod
def COUNTER(name, reset=False):
"""The value of a counter identified by `name`, starting at 1.
Parameters:
name (str): The name of the counter. If the counter has not been used
before, it will start with a value of 1.
reset (bool, optional): If True, reset the counter to a value of zero
and return the value 0. The next time this counter is referenced,
it will have the value 1.
Returns:
int: The value of the counter.
"""
if name not in PdsTemplate._counters.keys():
PdsTemplate._counters[name] = 0
PdsTemplate._counters[name] += 1
if reset:
PdsTemplate._counters[name] = 0
return PdsTemplate._counters[name]
[docs]
@staticmethod
def CURRENT_TIME(date_only=False):
"""The current date/time in the local time zone.
Parameters:
date_only (bool, optional): Return only the date without the time.
Returns:
str: The current date/time in the local time zone as a formatted string of
the form "yyyy-mm-ddThh:mm:sss" if `date_only=False` or "yyyy-mm-dd" if
`date_only=True`.
"""
if date_only:
return datetime.datetime.now().isoformat()[:10]
return datetime.datetime.now().isoformat()[:19]
[docs]
@staticmethod
def CURRENT_ZULU(date_only=False):
"""The current UTC date/time.
Parameters:
date_only (bool, optional): Return only the date without the time.
Returns:
str: The current date/time in UTC as a formatted string of the form
"yyyy-mm-ddThh:mm:sssZ" if `date_only=False` or "yyyy-mm-dd" if
`date_only=True`.
"""
if date_only:
return time.strftime('%Y-%m-%d', time.gmtime())
return time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
@staticmethod
def _DATETIME(value, offset=0, digits=None, date_type='YMD'):
"""Convert the given date/time string or time in TDB seconds to a year-month-day
format with a trailing "Z". The date can be in any format parsable by the Julian
module. An optional offset in seconds is applied. If the value is "UNK", then
"UNK" is returned.
"""
if isinstance(value, numbers.Real):
if digits is None:
digits = 3
tai = julian.tai_from_tdb(value)
# Convert to ISO format or return seconds
if date_type in ('YMDT', 'YDT'):
return julian.format_tai(tai + offset, order=date_type, sep='T',
digits=digits, suffix='Z')
else:
(day, sec) = julian.day_sec_from_tai(tai + offset)
return sec
if value.strip() == 'UNK':
return 'UNK'
# Convert to day and seconds
(day, sec) = julian.day_sec_from_string(value, timesys=True)[:2]
# Retain the number of digits precision in the source, if appropriate
if digits is None and offset % 1 == 0:
parts = re.split(r'\d\d:\d\d:\d\d', value)
if len(parts) == 2 and parts[1].startswith('.'):
digits = len(re.match(r'(\.\d*)', parts[1]).group(1)) - 1
# Apply offset if necessary
if offset:
tai = julian.tai_from_day_sec(day, sec)
(day, sec) = julian.day_sec_from_tai(tai + offset)
# Interpret the number of digits if still unknown
if digits is None:
if sec % 1 == 0.:
digits = -1 # no fractional part, no decimal point
else:
digits = 3
elif digits == 0:
digits = -1 # suppress decimal point
# Convert to ISO format or return seconds
if date_type in ('YMDT', 'YDT'):
return julian.format_day_sec(day, sec, order=date_type, sep='T',
digits=digits, suffix='Z')
else:
return sec
[docs]
@staticmethod
def DATETIME(time, offset=0, digits=None):
"""Convert `time` to an ISO date of the form "yyyy-mm-ddThh:mm:ss[.fff]Z".
Parameters:
time (str or float): The time as an arbitrary date/time string or TDB seconds.
If `time` is "UNK", then "UNK" is returned.
offset (float, optional): The offset, in seconds, to add to the time.
digits (int, optional): The number of digits after the decimal point in the
seconds field to return. If not specified, the appropriate number of
digits for the time is used.
Returns:
str: The time in the format "yyyy-mm-ddThh:mm:ss[.fff]Z".
"""
return PdsTemplate._DATETIME(time, offset, digits, date_type='YMDT')
[docs]
@staticmethod
def DATETIME_DOY(time, offset=0, digits=None):
"""Convert `time` to an ISO date of the form "yyyy-dddThh:mm:ss[.fff]Z".
Parameters:
time (str or float): The time as an arbitrary date/time string or TDB seconds.
If `time` is "UNK", then "UNK" is returned.
offset (float, optional): The offset, in seconds, to add to the time.
digits (int, optional): The number of digits after the decimal point in the
seconds field to return. If not specified, the appropriate number of
digits for the time is used.
Returns:
str: The time in the format "yyyy-dddThh:mm:ss[.fff]Z".
"""
return PdsTemplate._DATETIME(time, offset, digits, date_type='YDT')
[docs]
@staticmethod
def DAYSECS(time):
"""The number of elapsed seconds since the most recent midnight.
Parameters:
time (str or float): The time as an arbitrary date/time string or TDB seconds.
If `time` is "UNK", then "UNK" is returned.
Returns:
float: The number of elapsed seconds since the most recent midnight.
"""
if isinstance(time, numbers.Real):
return PdsTemplate._DATETIME(time, 0, None, date_type='SEC')
try:
return julian.sec_from_string(time)
except Exception:
return PdsTemplate._DATETIME(time, 0, None, date_type='SEC')
[docs]
@staticmethod
def FILE_BYTES(filepath):
"""The size in bytes of the file specified by `filepath`.
Parameters:
filepath (Path | FCPath | str): The filepath.
Returns:
int: The size in bytes of the file.
"""
local_path = FCPath(filepath).retrieve()
return os.path.getsize(local_path)
# From http://stackoverflow.com/questions/3431825/-
[docs]
@staticmethod
def FILE_MD5(filepath):
"""The MD5 checksum of the file specified by `filepath`.
Parameters:
filepath (Path | FCPath | str): The filepath.
Returns:
str: The MD5 checksum of the file.
"""
blocksize = 65536
with FCPath(filepath).open('rb') as f:
hasher = hashlib.md5()
buf = f.read(blocksize)
while len(buf) > 0:
hasher.update(buf)
buf = f.read(blocksize)
return hasher.hexdigest()
[docs]
@staticmethod
def FILE_RECORDS(filepath):
"""The number of records in the the file specified by `filepath`.
Parameters:
filepath (Path | FCPath | str): The filepath.
Returns:
int: The number of records in the file if it is ASCII;
0 if the file is binary.
"""
# We intentionally open this in non-binary mode so we don't have to contend with
# line terminator issues.
printable = string.printable.encode('latin8')
with FCPath(filepath).open('rb') as f:
count = 0
asciis = 0
non_asciis = 0
for line in f:
for c in line:
if c in printable:
asciis += 1
else:
non_asciis += 1
count += 1
if non_asciis > 0.05 * asciis:
return 0
return count
[docs]
@staticmethod
def FILE_TIME(filepath):
"""The modification time in the local time zone of a file.
Parameters:
filepath (Path | FCPath | str): The filepath.
Returns:
str: The modification time in the local time zone of the file specified by
`filepath` in the form "yyyy-mm-ddThh:mm:ss".
"""
timestamp = FCPath(filepath).modification_time()
return datetime.datetime.fromtimestamp(timestamp).isoformat()[:19]
[docs]
@staticmethod
def FILE_ZULU(filepath):
"""The UTC modification time of a file.
Parameters:
filepath (Path | FCPath | str): The filepath.
Returns:
str: The UTC modification time of the file specified by `filepath` in the
form "yyyy-mm-ddThh:mm:ssZ".
"""
timestamp = FCPath(filepath).modification_time()
try:
utc_dt = datetime.datetime.fromtimestamp(timestamp, datetime.UTC)
except AttributeError: # pragma: no cover
# Python < 3.11
utc_dt = datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc)
return utc_dt.strftime('%Y-%m-%dT%H:%M:%SZ')
[docs]
@staticmethod
def GETENV(name, default=''):
"""The value of the specified environment variable.
Parameters:
name (str): Name of the environment variable.
default (str): Value to return if the environment variable is undefined.
Returns:
str: The value of the variable or else the default.
"""
return os.getenv(name, default=default)
[docs]
@staticmethod
def LABEL_PATH():
"""The path to the current label file being generated.
Returns:
str: Path string to the label file.
"""
return str(PdsTemplate._CURRENT_LABEL_PATH)
[docs]
@staticmethod
def LOG(level, message, filepath='', *, force=False):
"""Send a message to the logger; nothing is returned.
This allows a template to issue warnings and other messages.
Parameters:
level (int or str): Level of the message: 'info', 'error', 'warn', etc.
message (str): Text of the warning message.
filepath (str, Path, or FCPath, optional): File path to include in the
message.
force (bool, optional): True to force the logging of the message regardless
of the level.
"""
get_logger().log(level, message, filepath, force=force)
[docs]
@staticmethod
def NOESCAPE(text):
"""Prevent the given text from being escaped in the XML.
If the template is XML, evaluated expressions are "escaped" to ensure that they
are suitable for embedding in a PDS label. For example, ">" inside a string will
be replaced by ">". This function prevents `text` from being escaped in the
label, allowing it to contain literal XML.
Parameters:
text (str): The text that should not be escaped.
Returns:
str: The text marked so that it won't be escaped.
"""
return _NOESCAPE_FLAG + text
[docs]
@staticmethod
def QUOTE_IF(text):
"""Place the given text in quotes if it is not a valid upper-case identifier.
An empty string and a string starting with a digit is also quoted. A string that
is already enclosed in quotes or apostrophes is not quoted (but quote balancing is
not checked). Other values are returned unchanged.
Parameters:
text (str): Text to possibly quote.
Returns:
str: The text with quotes if necessary.
"""
if text == text.upper() and text.isidentifier():
return text
if text.startswith('"') and text.endswith('"'):
return text
if text.startswith("'") and text.endswith("'"):
return text
if text == 'N/A':
return "'N/A'"
return '"' + text + '"'
[docs]
@staticmethod
def RAISE(exception, message):
"""Raise an exception with the given class `exception` and the `message`.
Parameters:
exception (type): The class of the exception to raise, e.g., ValueError.
message (str): The message to include in the exception.
Raises:
Exception: The specified exception.
"""
raise _RaisedException(exception, message) # wrapper used to handle formatting
[docs]
@staticmethod
def RECORD_BYTES(filepath):
"""The maximum number of bytes in any record of the specified file, including line
terminators.
Parameters:
filepath (Path | FCPath | str): The filepath.
"""
# We intentionally open this in binary mode so we don't have to contend with
# line terminator issues.
max_bytes = 0
with FCPath(filepath).open('rb') as f:
for line in f:
max_bytes = max(max_bytes, len(line))
return max_bytes
[docs]
@staticmethod
def REPLACE_NA(value, na_value, flag='N/A'):
"""Return `na_value` if `value` equals "N/A"; otherwise, return `value`.
Parameters:
value (str or int or float or bool): The input value.
flag (str or int or float or bool, optional): The value that means N/A.
Defaults to the string "N/A".
Returns:
str or int or float or bool: The original value if it is not equal to
`flag`, otherwise `na_value`.
"""
if isinstance(value, str):
value = value.strip()
if value == flag:
return na_value
else:
return value
[docs]
@staticmethod
def REPLACE_UNK(value, unk_value):
"""Return `unk_value` if `value` equals "UNK"; otherwise, return `value`.
Parameters:
value (str or int or float or bool): The input value.
Returns:
str or int or float or bool: The original value if it is not equal to
"UNK", otherwise `unk_value`.
"""
return PdsTemplate.REPLACE_NA(value, unk_value, flag='UNK')
[docs]
@staticmethod
def TEMPLATE_PATH():
"""The path to this template file.
Returns:
str: Path string to this template file.
"""
return str(PdsTemplate._CURRENT_TEMPLATE.template_path)
[docs]
@staticmethod
def VERSION_ID():
"""The PdsTemplate version ID using two digits, e.g., "v1.0".
Returns:
str: The version ID.
"""
parts = __version__.split('.')
if len(parts) >= 2: # pragma: no cover
return '.'.join(parts[:2])
return '0.0' # version unspecified # pragma: no cover
[docs]
@staticmethod
def WRAP(left, right, text, preserve_single_newlines=True):
"""Format `text` to fit between the `left` and `right` column numbers.
The first line is not indented, so the text will begin in the column where "$WRAP"
first appears in the template.
Parameters:
left (int): The starting column number, numbered from 0.
right (int): the ending column number, numbered from 0.
text (str): The text to wrap.
preserve_single_newlines (bool, optional): If True, single newlines
are preserved. If False, single newlines are just considered to be
wrapped text and do not cause a break in the flow.
Returns:
str: The wrapped text.
"""
if not preserve_single_newlines:
# Remove any newlines between otherwise good text - we do this twice
# because sub is non-overlapping and single-character lines won't
# get treated properly
# Remove any single newlines at the beginning or end of the string
# Remove any pair of newlines after otherwise good text
# Remove any leading or trailing spaces
text = re.sub(r'([^\n])\n([^\n])', r'\1 \2', text)
text = re.sub(r'([^\n])\n([^\n])', r'\1 \2', text)
text = re.sub(r'([^\n])\n$', r'\1', text)
text = re.sub(r'^\n([^\n])', r'\1', text)
text = re.sub(r'([^\n])\n\n', r'\1\n', text)
text = text.strip(' ')
old_lines = text.splitlines()
indent = left * ' '
new_lines = []
for line in old_lines:
if line:
new_lines += textwrap.wrap(line,
width=right,
initial_indent=indent,
subsequent_indent=indent,
break_long_words=False,
break_on_hyphens=False)
else:
new_lines.append('')
# strip the first left indent; this should be where "$WRAP" appears in the
# template.
new_lines[0] = new_lines[0][left:]
return '\n'.join(new_lines)
PdsTemplate._PREDEFINED_FUNCTIONS = {}
PdsTemplate._PREDEFINED_FUNCTIONS['BASENAME' ] = PdsTemplate.BASENAME
PdsTemplate._PREDEFINED_FUNCTIONS['BOOL' ] = PdsTemplate.BOOL
PdsTemplate._PREDEFINED_FUNCTIONS['COUNTER' ] = PdsTemplate.COUNTER
PdsTemplate._PREDEFINED_FUNCTIONS['CURRENT_TIME' ] = PdsTemplate.CURRENT_TIME
PdsTemplate._PREDEFINED_FUNCTIONS['CURRENT_ZULU' ] = PdsTemplate.CURRENT_ZULU
PdsTemplate._PREDEFINED_FUNCTIONS['DATETIME' ] = PdsTemplate.DATETIME
PdsTemplate._PREDEFINED_FUNCTIONS['DATETIME_DOY' ] = PdsTemplate.DATETIME_DOY
PdsTemplate._PREDEFINED_FUNCTIONS['DAYSECS' ] = PdsTemplate.DAYSECS
PdsTemplate._PREDEFINED_FUNCTIONS['FILE_BYTES' ] = PdsTemplate.FILE_BYTES
PdsTemplate._PREDEFINED_FUNCTIONS['FILE_MD5' ] = PdsTemplate.FILE_MD5
PdsTemplate._PREDEFINED_FUNCTIONS['FILE_RECORDS' ] = PdsTemplate.FILE_RECORDS
PdsTemplate._PREDEFINED_FUNCTIONS['FILE_TIME' ] = PdsTemplate.FILE_TIME
PdsTemplate._PREDEFINED_FUNCTIONS['FILE_ZULU' ] = PdsTemplate.FILE_ZULU
PdsTemplate._PREDEFINED_FUNCTIONS['GETENV' ] = PdsTemplate.GETENV
PdsTemplate._PREDEFINED_FUNCTIONS['LABEL_PATH' ] = PdsTemplate.LABEL_PATH
PdsTemplate._PREDEFINED_FUNCTIONS['LOG' ] = PdsTemplate.LOG
PdsTemplate._PREDEFINED_FUNCTIONS['NOESCAPE' ] = PdsTemplate.NOESCAPE
PdsTemplate._PREDEFINED_FUNCTIONS['QUOTE_IF' ] = PdsTemplate.QUOTE_IF
PdsTemplate._PREDEFINED_FUNCTIONS['RAISE' ] = PdsTemplate.RAISE
PdsTemplate._PREDEFINED_FUNCTIONS['RECORD_BYTES' ] = PdsTemplate.RECORD_BYTES
PdsTemplate._PREDEFINED_FUNCTIONS['REPLACE_NA' ] = PdsTemplate.REPLACE_NA
PdsTemplate._PREDEFINED_FUNCTIONS['REPLACE_UNK' ] = PdsTemplate.REPLACE_UNK
PdsTemplate._PREDEFINED_FUNCTIONS['TEMPLATE_PATH'] = PdsTemplate.TEMPLATE_PATH
PdsTemplate._PREDEFINED_FUNCTIONS['VERSION_ID' ] = PdsTemplate.VERSION_ID
PdsTemplate._PREDEFINED_FUNCTIONS['WRAP' ] = PdsTemplate.WRAP
##########################################################################################
# LabelStatus class
##########################################################################################
class _LabelState(object):
"""Internal class to carry status information about where we are in the template and
the label generation.
Parameters:
template (PdsTemplate): The template being processed into a label file.
dictionary (dict): The dictionary of values used for substitutions.
label_path (str, path, or FCPath): The path to the file being generated.
terminator (str, optional):
The line terminator, either "\\n" or "\\r\\n". The default is to retain the
line terminator used in the template.
raise_exceptions (bool, optional):
True to raise any exceptions encountered; False to log them and embed the
error messages into the label, marked by "[[[" and "]]]".
"""
def __init__(self, template, dictionary, label_path='', *, terminator=None,
raise_exceptions=False):
self.template = template
self.label_path = label_path
self.terminator = terminator
self.raise_exceptions = raise_exceptions
self.local_dicts = [{}]
# Merge the predefined functions into a copy of the global dictionary
self.global_dict = dictionary.copy()
for key, func in PdsTemplate._PREDEFINED_FUNCTIONS.items():
if key not in self.global_dict:
self.global_dict[key] = func
def define_global(self, name, value):
"""Add this definition to this state's global dictionary."""
self.global_dict[name] = value
##########################################################################################
# Allow access of key functions a static methods of PdsTemplate
##########################################################################################
PdsTemplate.set_logger = set_logger
PdsTemplate.get_logger = get_logger
PdsTemplate.set_log_level = set_log_level
PdsTemplate.set_log_format = set_log_format
##########################################################################################