import logging
import re
from configparser import DEFAULTSECT, ConfigParser
from io import StringIO
from typing import Optional, TextIO, Union
__all__ = ["Properties"]
log = logging.getLogger(__name__)
[docs]
class Properties:
"""Write Pegasus properties to a file.
.. code-block:: python
# Example
props = Properties()
props["globus.maxtime"] = 900
props["globus.maxwalltime"] = 1000
props["dagman.retry"] = 4
props.write()
"""
_pattern_props = (
# variable property keys
"pegasus.file.cleanup.constraint.*.maxspace",
"pegasus.log.*",
"pegasus.metrics.app.*",
"pegasus.transfer.*.remote.sites",
"pegasus.transfer.*.impl",
"pegasus.selector.site.env.*",
"pegasus.selector.regex.rank.*",
"pegasus.selector.replica.*.prefer.stagein.sites",
"pegasus.selector.replica.*.ignore.stagein.sites",
"pegasus.catalog.replica.output.*",
"pegasus.catalog.*.timeout",
"pegasus.catalog.replica.db.*",
"pegasus.catalog.site.sites.*.profiles.*.*",
"env.*",
"dagman.*",
"condor.*",
"globus.*",
)
_props = (
"pegasus.mode",
"pegasus.home.datadir",
"pegasus.home.sysconfdir",
"pegasus.home.sharedstatedir",
"pegasus.home.localstatedir",
"pegasus.dir.submit.logs",
"pegasus.dir.useTimestamp",
"pegasus.dir.exec",
"pegasus.dir.submit.mapper",
"pegasus.dir.staging.mapper",
"pegasus.dir.storage.mapper",
"pegasus.dir.storage.deep",
"pegasus.dir.create.strategy",
"pegasus.schema.dax",
"pegasus.schema.sc",
"pegasus.schema.ivr",
"pegasus.catalog.replica",
"pegasus.catalog.replica.file",
"pegasus.catalog.replica.chunk.size",
"pegasus.catalog.replica.cache.asrc",
"pegasus.catalog.replica.dax.asrc",
"pegasus.catalog.site",
"pegasus.catalog.site.file",
"pegasus.catalog.transformation",
"pegasus.catalog.transformation.file",
"pegasus.selector.replica",
"pegasus.selector.site",
"pegasus.selector.site.path",
"pegasus.selector.site.timeout",
"pegasus.selector.site.keep.tmp",
"pegasus.data.configuration",
"pegasus.transfer.bypass.input.staging",
"pegasus.transfer.arguments",
"pegasus.transfer.threads",
"pegasus.transfer.lite.arguments",
"pegasus.transfer.worker.package",
"pegasus.transfer.worker.package.autodownload",
"pegasus.transfer.worker.package.strict",
"pegasus.transfer.links",
"pegasus.transfer.staging.delimiter",
"pegasus.transfer.disable.chmod.sites",
"pegasus.transfer.setup.source.base.url",
"pegasus.monitord.events",
"pegasus.catalog.workflow.url",
"pegasus.catalog.workflow.amqp.events",
"pegasus.catalog.workflow.amqp.url",
"pegasus.catalog.master.url",
"pegasus.monitord.output",
"pegasus.dashboard.output",
"pegasus.monitord.notifications",
"pegasus.monitord.notifications.max",
"pegasus.monitord.notifications.timeout",
"pegasus.monitord.stdout.disable.parsing",
"pegasus.monitord.encoding",
"pegasus.monitord.arguments",
"pegasus.clusterer.job.aggregator",
"pegasus.clusterer.job.aggregator.arguments",
"pegasus.clusterer.job.aggregator.seqexec.log",
"pegasus.clusterer.job.aggregator.seqexec.firstjobfail",
"pegasus.clusterer.allow.single",
"pegasus.clusterer.label.key",
"pegasus.clusterer.preference",
"pegasus.log.manager",
"pegasus.log.manager.formatter",
"pegasus.log.memory.usage",
"pegasus.metrics.app",
"pegasus.file.cleanup.strategy",
"pegasus.file.cleanup.impl",
"pegasus.file.cleanup.clusters.num",
"pegasus.file.cleanup.clusters.size",
"pegasus.file.cleanup.scope",
"pegasus.file.cleanup.constraint.deferstageins",
"pegasus.file.cleanup.constraint.csv",
"pegasus.aws.account",
"pegasus.aws.region",
"pegasus.aws.batch.job_definition",
"pegasus.aws.batch.compute_environment",
"pegasus.aws.batch.job_queue",
"pegasus.aws.batch.s3_bucket",
"pegasus.code.generator",
"pegasus.condor.concurrency.limits",
"pegasus.register",
"pegasus.register.deep",
"pegasus.data.reuse.scope",
"pegasus.catalog.transformation.mapper",
"pegasus.selector.transformation",
"pegasus.parser.dax.preserver.linebreaks",
"pegasus.parser.dax.data.dependencies",
"pegasus.integrity.checking",
"env.PEGASUS_HOME",
"env.GLOBUS_LOCATION",
"env.LD_LIBRARY_PATH",
"globus.count",
"globus.jobtype",
"globus.maxcputime",
"globus.maxmemory",
"globus.maxtime",
"globus.maxwalltime",
"globus.minmemory",
"globus.project",
"globus.queue",
"condor.universe",
"condor.periodic_release",
"condor.periodic_remove",
"condor.filesystemdomain",
"condor.stream_error",
"condor.stream_output",
"condor.priority",
"condor.request_cpus",
"condor.request_gpus",
"condor.request_memory",
"condor.request_disk",
"dagman.pre",
"dagman.pre.arguments",
"dagman.post",
"dagman.post.path",
"dagman.post.arguments",
"dagman.retry",
"dagman.category",
"dagman.priority",
"dagman.abort-dag-on",
"dagman.maxpre",
"dagman.maxpost",
"dagman.maxjobs",
"dagman.maxidle",
"dagman.post.scope",
"pegasus.cluster.num",
"pegasus.clusters.size",
"pegasus.job.aggregator",
"pegasus.gridstart",
"pegasus.gridstart.path",
"pegasus.gridstart.arguments",
"pegasus.gridstart.launcher",
"pegasus.gridstart.launcher.arguments",
"pegasus.stagein.clusters",
"pegasus.stagein.local.clusters",
"pegasus.stagein.remote.clusters",
"pegasus.stageout.clusters",
"pegasus.stageout.local.clusters",
"pegasus.stageout.remote.clusters",
"pegasus.group",
"pegasus.change.dir",
"pegasus.create.dir",
"pegasus.transfer.proxy",
"pegasus.style",
"pegasus.pmc_request_memory",
"pegasus.pmc_request_cpus",
"pegasus.pmc_priority",
"pegasus.pmc_task_arguments",
"pegasus.exitcode.failuremsg",
"pegasus.exitcode.successmsg",
"pegasus.checkpoint.time",
"pegasus.maxwalltime",
"pegasus.glite.arguments",
"pegasus.condor.arguments.quote",
"pegasus.runtime",
"pegasus.clusters.maxruntime",
"pegasus.cores",
"pegasus.nodes",
"pegasus.ppn",
"pegasus.memory",
"pegasus.diskspace",
"selector.execution.site",
"selector.pfn",
"selector.grid.jobtype",
# Database
"pegasus.catalog.replica.db.password",
"pegasus.catalog.replica.db.user",
"pegasus.catalog.replica.db.url",
"pegasus.catalog.replica.db.driver",
*_pattern_props,
)
_cfg_header_len = len("[{}]\n".format(DEFAULTSECT))
[docs]
@staticmethod
def ls(prop: Optional[str] = None):
"""List property keys. Refer to
`Configuration docs <https://pegasus.isi.edu/documentation/configuration.php>`_
for additional information. If :code:`prop` is given, all properties
containing prop will be printed, else all properties will be printed.
.. code-block:: python
# Example
>>> P.ls("request")
condor.request_cpus
condor.request_disk
condor.request_gpus
condor.request_memory
pegasus.pmc_request_cpus
pegasus.pmc_request_memory
:param prop: properties containing "prop" will be listed in alphabetical order, defaults to None
:type prop: Optional[str]
"""
if prop:
to_print = list()
for p in Properties._props:
if prop in p:
to_print.append(p)
to_print.sort()
print(*to_print, sep="\n")
else:
print(*sorted(Properties._props), sep="\n")
def __init__(self):
self._conf = ConfigParser()
# preserve case for keys
self._conf.optionxform = str
self._conf[DEFAULTSECT] = {}
def __setitem__(self, k, v):
self._conf[DEFAULTSECT][k] = self._escape(v)
if not self._check_key(k):
log.warning(
"Unrecognized property key: '{}' has been set to '{}'".format(
k, self._escape(v)
)
)
def __getitem__(self, k):
return self._conf[DEFAULTSECT][k]
def __delitem__(self, k):
self._conf.remove_option(DEFAULTSECT, k)
@classmethod
def _check_key(cls, k) -> bool:
"""Check if the key :code:`k` is a valid Pegasus property."""
rv = False
if k in cls._props:
rv = True
else:
for p in cls._pattern_props:
_ = p.split("*")
if len(_) > 2:
# do pure regex match , as there are more than 1 *
if re.search(p, k) is not None:
rv = True
break
else:
if k.startswith(_[0]) and k.endswith(_[1]) and len(k) >= len(p):
rv = True
break
return rv
@staticmethod
def _escape(v):
"""Escape value :code:`v`."""
if isinstance(v, str):
return v
else:
return str(v)
@staticmethod
def _get_site_profile_key(site, namespace, key):
if site is None:
raise ValueError("Site cannot be none")
if namespace is None:
raise ValueError("Namespace cannot be none")
if key is None:
raise ValueError("Key cannot be none")
return "pegasus.catalog.site.sites.{site}.profiles.{namespace}.{key}".format(
site=site, namespace=namespace, key=key
)
[docs]
def add_site_profile(self, site, namespace, key, value):
"""
Maps a site profile to a property that can be picked up by the planner, to
override site catalog entries loaded from the Site Catalog.
:param site: the site
:param namespace: the namespace for which profile has to be added
:param key: the profile key
:param value: the profile value
:return:
"""
self.__setitem__(self._get_site_profile_key(site, namespace, key), value)
[docs]
def write(self, file: Optional[Union[str, TextIO]] = None):
"""Write these properties to a file. If :code:`file` is not given, these
properties are written to :code:`./pegasus.properties`
.. code-block:: python
# Example 1
props.write()
# Example 2
with open("conf", "w") as f:
props.write(f)
:param file: file path or file object where properties will be written to, defaults to None
:type file: Optional[Union[str, TextIO]]
:raises TypeError: file must be of type str or file like object
"""
with StringIO() as sio:
self._conf.write(sio)
# write without header
props = sio.getvalue()[Properties._cfg_header_len :]
# default file
if file is None:
file = "pegasus.properties"
if isinstance(file, str):
with open(file, "w") as f:
f.write(props)
elif hasattr(file, "read"):
file.write(props)
else:
raise TypeError(
"invalid file: {}; file must be of type str or file like object".format(
file
)
)