Source code for Pegasus.api.replica_catalog

from collections import OrderedDict
from pathlib import Path
from typing import Dict, Optional, Set, Union

from ._utils import _chained
from .errors import DuplicateError
from .mixins import MetadataMixin
from .writable import Writable, _filter_out_nones

PEGASUS_VERSION = "5.0.4"

__all__ = ["File", "ReplicaCatalog"]


class _PFN:
    """A physical file name comprising site and path"""

    def __init__(self, site, pfn):
        self.site = site
        self.pfn = pfn

    def __eq__(self, other):
        if isinstance(other, _PFN):
            return self.site == other.site and self.pfn == other.pfn
        return False

    def __hash__(self):
        return hash((self.site, self.pfn))

    def __repr__(self):
        return "<_PFN site: {}, pfn: {}>".format(self.site, self.pfn)

    def __json__(self):
        return OrderedDict([("site", self.site), ("pfn", self.pfn)])


[docs] class File(MetadataMixin): """ A workflow File. This class is used to represent the inputs and outputs of a :py:class:`~Pegasus.api.workflow.Job`. .. code-block:: python # Example input_file = File("data.txt").add_metadata(creator="ryan") """ def __init__( self, lfn: str, size: Optional[int] = None, for_planning: Optional[bool] = False ): """ :param lfn: a unique logical filename :type lfn: str :param size: size in bytes, defaults to None :type size: int :param for_planning: indicate that a file is to be used for planning purposes :type for_planning: bool """ if not isinstance(lfn, str): raise TypeError( "invalid lfn: {lfn}; lfn must be of type str".format(lfn=lfn) ) self.metadata = OrderedDict() self.lfn = lfn self.size = size if size: self.metadata["size"] = size if for_planning: self.for_planning = for_planning else: self.for_planning = None def __str__(self): return self.lfn def __hash__(self): return hash(self.lfn) def __eq__(self, other): if isinstance(other, File): return self.lfn == other.lfn return False def __repr__(self): return "<{} {}>".format(self.__class__.__name__, self.lfn) def __json__(self): return _filter_out_nones( OrderedDict( [ ("lfn", self.lfn), ("metadata", self.metadata if len(self.metadata) > 0 else None), ("size", self.size), ("forPlanning", self.for_planning), ] ) )
class _ReplicaCatalogEntry: def __init__( self, lfn: str, pfns: Set[_PFN], checksum: Optional[Dict[str, str]] = None, metadata: Optional[Dict[str, Union[int, str, float]]] = None, regex: bool = False, ): self.lfn = lfn self.pfns = pfns self.checksum = checksum or OrderedDict() self.metadata = metadata or OrderedDict() self.regex = regex def __json__(self): return _filter_out_nones( OrderedDict( [ ("lfn", self.lfn), ("pfns", [pfn for pfn in self.pfns]), ("checksum", self.checksum if len(self.checksum) > 0 else None), ("metadata", self.metadata if len(self.metadata) > 0 else None), ("regex", self.regex if self.regex else None), ] ) )
[docs] class ReplicaCatalog(Writable): """Maintains a mapping of logical filenames to physical filenames. Any input files to the workflow are specified here so that Pegasus knows where to obtain them. .. code-block:: python # Example if1 = File("if") if2 = File("if2") rc = ReplicaCatalog()\\ .add_replica("local", if1, "/nfs/u2/ryan/data.csv")\\ .add_replica("local", "if2", "/nfs/u2/ryan/data2.csv")\\ .write() """ _DEFAULT_FILENAME = "replicas.yml" _SUPPORTED_CHECKSUMS = {"sha256"} def __init__(self): Writable.__init__(self) # Using key = (<lfn or pattern>, <is_regex>) to preserve insertion # order of entries while distinguishing between regex and # non regex entries self.entries = OrderedDict()
[docs] @_chained def add_regex_replica( self, site: str, pattern: str, pfn: Union[str, Path], metadata: Optional[Dict[str, Union[int, str, float]]] = None, ): r""" add_regex_replica(self, site: str, pattern: str, pfn: Union[str, Path], metadata: Optional[Dict[str, Union[int, str, float]]] = None) Add an entry to this replica catalog using a regular expression pattern. Note that regular expressions should follow Java regular expression syntax as the underlying code that handles this catalog is Java based. .. code-block:: python # Example 1: Match f<any-character>a i.e. faa, f.a, f0a, etc. rc.add_regex_replica("local", "f.a", "/Volumes/data/input/f.a") # Example 2: Using groupings rc.add_regex_replica("local", "alpha\.(csv|txt|xml)", "/Volumes/data/input/[1]/[0]") # If the file being looked up is alpha.csv, the pfn for the file will be # generated as /Volumes/data/input/csv/alpha.csv # Example 3: Specifying a default location for all lfns that don't match any # regular expressions. Note that this should be the last entry into the replica # catalog if used. rc.add_regex_replica("local", ".*", Path("/Volumes/data") / "input/[0]") :param site: the site at which this replica (file) resides :type site: str :param pattern: regular expression used to match a file :type pattern: str :param pfn: path to the file (may also be a pattern as shown in the example above) :type pfn: Union[str, Path] :param metadata: any metadata to be associated with the matched files, for example: :code:`{"creator": "pegasus"}`, defaults to None :type metadata: Optional[Dict[str, Union[int, str, float]]] :raises DuplicateError: Duplicate patterns with different PFNs are currently not supported """ metadata = metadata or OrderedDict() # restricting pattern to single pfn (may be relaxed in future release) if (pattern, True) in self.entries: raise DuplicateError( "Pattern: {} already exists in this replica catalog".format(pattern) ) # handle Path obj if given for pfn if isinstance(pfn, Path): if not pfn.is_absolute(): raise ValueError( "Invalid pfn: {}, the given pfn must be an absolute path".format( pfn ) ) pfn = str(pfn) self.entries[(pattern, True)] = _ReplicaCatalogEntry( lfn=pattern, pfns={_PFN(site, pfn)}, metadata=metadata, regex=True )
[docs] @_chained def add_replica( self, site: str, lfn: Union[str, File], pfn: Union[str, Path], checksum: Optional[Dict[str, str]] = None, metadata: Optional[Dict[str, Union[int, str, float]]] = None, ): """ add_replica(self, site: str, lfn: Union[str, File], pfn: Union[str, Path], checksum: Optional[Dict[str, str]] = None, metadata: Optiona[Dict[str, Union[int, str, float]]] = None) Add an entry to this replica catalog. .. code-block:: python # Example 1 f = File("in.txt").add_metadata(creator="pegasus") rc.add_replica("local", f, Path(".").resolve() / "in.txt") # Example 2: Adding metadata and a checksum rc.add_replica( "local", "in.txt", "/home/ryan/wf/in.txt", checksum={"sha256": "abc123"}, metadata={"creator": "pegasus"} ) # Example 3: Adding multiple pfns for the same lfn (metadata and checksum will be # updated for that lfn if given. rc.add_replica("local", "in.txt", Path(".").resolve() / "in.txt") rc.add_replica("condorpool", "in.txt", "/path/to/file/in.txt") :param site: the site at which this replica (file) resides :type site: str :param lfn: logical file name :type lfn: Union[str, File] :param pfn: physical file name such as :code:`Path("f.txt").resolve()`, :code:`/home/ryan/file.txt`, or :code:`http://pegasus.isi.edu/file.txt` :type pfn: Union[str, Path] :param checksum: Dict containing checksums for this file. Currently only sha256 is given. This should be entered as :code:`{"sha256": <value>}`, defaults to None :type checksum: Optional[Dict[str, str]], optional :param metadata: metadata key value pairs associated with this lfn such as :code:`{"created": "Thu Jun 18 22:18:36 PDT 2020", "owner": "pegasus"}`, defaults to None :type metadata: Optional[Dict[str, Union[int, str, float]]], optional :raises ValueError: if pfn is given as a :code:`pathlib.Path`, it must be an absolute path :raises ValueError: an unsupported checksum type was given """ # handle Path obj if given for pfn if isinstance(pfn, Path): if not pfn.is_absolute(): raise ValueError( "Invalid pfn: {}, the given path must be an absolute path".format( str(pfn) ) ) pfn = str(pfn) if isinstance(pfn, File): raise TypeError( "Invalid pfn: {}, the given pfn must be a str or pathlib.Path".format( pfn ) ) metadata = metadata or OrderedDict() checksum = checksum or OrderedDict() # File might contain metadata that should be included if isinstance(lfn, File): if lfn.metadata: metadata.update(lfn.metadata) lfn = lfn.lfn # ensure supported checksum type given if len(checksum) > 0: for checksum_type in checksum: if checksum_type.lower() not in ReplicaCatalog._SUPPORTED_CHECKSUMS: raise ValueError( "Invalid checksum: {}, supported checksum types are: {}".format( checksum_type, ReplicaCatalog._SUPPORTED_CHECKSUMS ) ) # if an entry with the given lfn already exists, update it # else create and add a new one if (lfn, False) in self.entries: self.entries[(lfn, False)].pfns.add(_PFN(site, pfn)) self.entries[(lfn, False)].checksum.update(checksum) self.entries[(lfn, False)].metadata.update(metadata) else: self.entries[(lfn, False)] = _ReplicaCatalogEntry( lfn, {_PFN(site, pfn)}, checksum=checksum, metadata=metadata, regex=False, )
def __json__(self): return OrderedDict( [ ("pegasus", PEGASUS_VERSION), ("replicas", [v for _, v in self.entries.items()]), ] )