Source code for Pegasus.api.transformation_catalog

from collections import OrderedDict, defaultdict
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional, Union

from .errors import DuplicateError
from .mixins import HookMixin, MetadataMixin, ProfileMixin
from .site_catalog import OS, Arch
from .writable import Writable, _filter_out_nones

from Pegasus.api._utils import _chained, _get_class_enum_member_str, _get_enum_str

PEGASUS_VERSION = "5.0.4"

__all__ = [
    "Container",
    "Transformation",
    "TransformationSite",
    "TransformationCatalog",
]


class _ContainerType(Enum):
    """Container types recognized by Pegasus"""

    DOCKER = "docker"
    SINGULARITY = "singularity"
    SHIFTER = "shifter"


[docs] class Container(ProfileMixin): """ Describes a container that can be added to the :py:class:`~Pegasus.api.transformation_catalog.TransformationCatalog` . Note that the :code:`checksum` parameter refers to the checksum of the tar file of the image and not the specific version of an image (digest/content-addressable identifer in the case of Docker). .. code-block:: python # Example 1: Docker centos_pegasus = Container( "centos-pegasus", Container.DOCKER, "docker:///ryan/centos-pegasus:latest", arguments="--shm-size", mounts=["/Volumes/Work/lfs1:/shared-data/:ro"] ) # Example 2: Singularity fb_nlp = Container( "fb-nlp", Container.SINGULARITY, image="library://papajim/default/fb-nlp", mounts=["/data:/mnt:ro"] ) """ DOCKER = _ContainerType.DOCKER SINGULARITY = _ContainerType.SINGULARITY SHIFTER = _ContainerType.SHIFTER _SUPPORTED_CHECKSUMS = {"sha256"} def __init__( self, name: str, container_type: _ContainerType, image: str, arguments: Optional[str] = None, mounts: Optional[List[str]] = None, image_site: Optional[str] = None, checksum: Optional[Dict[str, str]] = None, metadata: Optional[Dict[str, Union[int, str, float]]] = None, bypass_staging: bool = False, ): """ :param name: name of this container :type name: str :param container_type: a container type defined in :py:class:`~Pegasus.api.transformation_catalog.Container` :type container_type: _ContainerType :param image: image, such as :code:`docker:///rynge/montage:latest` :type image: str :param arguments: additional cli arguments to be added to the :code:`docker container run` or :code:`singularity exec` commands when starting this container :type arguments: Optional[str] :param mounts: list of mount strings such as :code:`['/Volumes/Work/lfs1:/shared-data/:ro', ...]` :type mounts: Optional[List[str]] :param image_site: optional site attribute to tell pegasus which site tar file exists, defaults to None :type image_site: Optional[str] :param checksum: Dict containing checksums for the tar file of this image. Currently only sha256 is supported. This should be entered as :code:`{"sha256": <value>}`, defaults to None :type checksum: Optional[Dict[str, str]] :param metadata: Dict containing metadata key, value pairs associated with this container, defaults to None :type metadata: Optional[Dict[str, Union[int, str, float]]] :param bypass_staging: whether or not to bypass the stage in job for this container, defaults to False :type bypass_staging: bool, optional :raises ValueError: container_type must be one of :py:class:`~Pegasus.api.transformation_catalog._ContainerType` (:code:`Container.DOCKER` | :code:`Container.SINGULARITY` | :code:`Container.SHIFTER`) """ self.name = name if not isinstance(container_type, _ContainerType): raise TypeError( "invalid container_type: {container_type}; container_type must be one of {enum_str}".format( container_type=container_type, enum_str=_get_class_enum_member_str(Container, _ContainerType), ) ) self.container_type = container_type.value self.image = image self.mounts = mounts self.image_site = image_site # ensure supported checksum type given if checksum and len(checksum) > 0: for checksum_type in checksum: if checksum_type.lower() not in Container._SUPPORTED_CHECKSUMS: raise ValueError( "invalid checksum: {}, supported checksum types are: {}".format( checksum_type, Container._SUPPORTED_CHECKSUMS ) ) self.checksum = checksum # TODO: remove once this is supported if metadata: raise NotImplementedError( "Metadata support for Containers is not yet supported" ) self.metadata = metadata self.profiles = defaultdict(OrderedDict) # add additional arguments if given (this is not part of the schema # and must be added to profiles) if arguments: self.add_pegasus_profile(container_arguments=arguments) self.bypass = None if bypass_staging: self.bypass = bypass_staging def __json__(self): return _filter_out_nones( OrderedDict( [ ("name", self.name), ("type", self.container_type), ("image", self.image), ("mounts", self.mounts), ("bypass", self.bypass), ("image.site", self.image_site), ("checksum", self.checksum), ("metadata", self.metadata), ( "profiles", OrderedDict(sorted(self.profiles.items(), key=lambda _: _[0])) if len(self.profiles) > 0 else None, ), ] ) )
[docs] class TransformationSite(ProfileMixin, MetadataMixin): """Site specific information about a :py:class:`~Pegasus.api.transformation_catalog.Transformation`. A :py:class:`~Pegasus.api.transformation_catalog.Transformation` must contain at least one transformation site. """ def __init__( self, name: str, pfn: Union[str, Path], is_stageable: bool = False, bypass_staging: bool = False, arch: Optional[Arch] = None, os_type: Optional[OS] = None, os_release: Optional[str] = None, os_version: Optional[str] = None, container: Optional[Union[Container, str]] = None, ): """ :param name: name of the site at which this :py:class:`~Pegasus.api.transformation_catalog.Transformation` resides :type name: str :param pfn: physical file name, an absolute path given as a str or Path :type pfn: Union[str, Path] :param is_stageable: whether or not this transformation is stageable or installed, defaults to False :type type: bool, optional :param bypass_staging: whether or not to bypass the stage in job of this executable (Note that this only works for transformations where :code:`is_stageable=True`), defaults to False :type bypass_staging: bool, optional :param arch: architecture that this :py:class:`~Pegasus.api.transformation_catalog.Transformation` was compiled for (defined in :py:class:`~Pegasus.api.site_catalog.Arch`), defaults to None :type arch: Optional[Arch], optional :param os_type: name of os that this :py:class:`~Pegasus.api.transformation_catalog.Transformation` was compiled for (defined in :py:class:`~Pegasus.api.site_catalog.OS`), defaults to None :type os_type: Optional[OS], optional :param os_release: release of os that this :py:class:`~Pegasus.api.transformation_catalog.Transformation` was compiled for, defaults to None, defaults to None :type os_release: Optional[str], optional :param os_version: version of os that this :py:class:`~Pegasus.api.transformation_catalog.Transformation` was compiled for, defaults to None, defaults to None :type os_version: Optional[str], optional :param container: specify the name of the container or Container object to use, optional :type container: Optional[Union[Container, str]], optional :raises TypeError: arch must be one of :py:class:`~Pegasus.api.site_catalog.Arch` :raises TypeError: os_type must be one of :py:class:`~Pegasus.api.site_catalog.OS` :raises ValueError: pfn must be given as an absolute path when :code:`pathlib.Path` is used :raises ValueError: :code:`bypass_staging=True` can only be used when :code:`is_stageable=True` """ self.name = name if isinstance(pfn, Path): if not pfn.is_absolute(): raise ValueError( "invalid pfn: {}, the given pfn must be an absolute path".format( str(pfn) ) ) pfn = str(pfn) self.pfn = pfn self.transformation_type = "stageable" if is_stageable else "installed" self.bypass = None if bypass_staging: if not is_stageable: raise ValueError( "bypass_staging can only be used when is_stageable is set to True" ) self.bypass = bypass_staging if arch is not None: if not isinstance(arch, Arch): raise TypeError( "invalid arch: {arch}; arch must be one of {enum_str}".format( arch=arch, enum_str=_get_enum_str(Arch) ) ) else: self.arch = arch.value else: self.arch = None if os_type is not None: if not isinstance(os_type, OS): raise TypeError( "invalid os_type: {os_type}; os_type must be one of {enum_str}".format( os_type=os_type, enum_str=_get_enum_str(OS) ) ) else: self.os_type = os_type.value else: self.os_type = None self.os_release = os_release self.os_version = os_version container_name = None if container: if isinstance(container, Container): container_name = container.name elif isinstance(container, str): container_name = container else: raise TypeError( "invalid container: {container}; container must be of type Container or str (the container name)".format( container=container ) ) self.container = container_name self.profiles = defaultdict(OrderedDict) self.metadata = OrderedDict() def __json__(self): return _filter_out_nones( OrderedDict( [ ("name", self.name), ("pfn", self.pfn), ("type", self.transformation_type), ("bypass", self.bypass), ("arch", self.arch), ("os.type", self.os_type), ("os.release", self.os_release), ("os.version", self.os_version), ("container", self.container), ( "profiles", OrderedDict(sorted(self.profiles.items(), key=lambda _: _[0])) if len(self.profiles) > 0 else None, ), ( "metadata", dict(self.metadata) if len(self.metadata) > 0 else None, ), ] ) )
[docs] class Transformation(ProfileMixin, HookMixin, MetadataMixin): """A transformation, which can be a standalone executable, or one that requires other executables. """ _SUPPORTED_CHECKSUMS = {"sha256"} def __init__( self, name: str, namespace: Optional[str] = None, version: Optional[str] = None, site: Optional[str] = None, pfn: Optional[Union[str, Path]] = None, is_stageable: bool = False, bypass_staging: bool = False, arch: Optional[Arch] = None, os_type: Optional[OS] = None, os_release: Optional[str] = None, os_version: Optional[str] = None, container: Optional[Union[Container, str]] = None, checksum: Optional[Dict[str, str]] = None, ): """ When a transformation resides on a single site, the syntax in Example 1 can be used where the args: site and pfn are provided to the constructor. If site and pfn are specified, then the args: is_stageable, bypass_staging, arch, os_type, os_release, os_version, and container, are applied to the site, else they are ignored. When a transformation resides multiple sites, the syntax in Example 2 can be used where multiple TransformationSite objects can be added. Note that when specifying a checksum such as :code:`{"sha256": <value>}` , this only applies stageable executables. .. code-block:: python # Example 1: transformation that resides on a single site preprocess = Transformation( "preprocess", namespace="pegasus", version="4.0", site="condorpool", pfn="/usr/bin/pegasus-keg", is_stageable=False, bypass_staging=False, arch=Arch.X86_64, os_type=OS.LINUX, container=centos_pegasus ) # Example 2: transformation that resides on multiple sites preprocess = Transformation("preprocess", namespace="pegasus", version="4.0")\\ .add_sites( TransformationSite( "condorpool", "/usr/bin/pegasus-keg", is_stageable=False, arch=Arch.X86_64, os_type=OS.LINUX, container="centos-pegasus" ), ... ) :param name: the logical name of the transformation :type name: str :param namespace: the namespace that this transformation belongs to, defaults to None :type namespace: Optional[str] :param version: the version of this transformation (e.g. :code:`"1.1"`), defaults to None :type version: Optional[str] :param site: a :py:class:`~Pegasus.api.site_catalog.Site` specified in the :py:class:`~Pegasus.api.site_catalog.SiteCatalog` on which this transformation resides, defaults to None :type site: Optional[str] :param pfn: the physical filename of this transformation (e.g. :code:`"/usr/bin/tar"`), defaults to None :type pfn: Optional[Union[str, Path]] :param is_stageable: whether or not this transformation is stageable or installed, defaults to False :type type: bool, optional :param bypass_staging: whether or not to bypass the stage in job of this executable (Note that this only works for transformations where :code:`is_stageable=True`), defaults to False :type bypass_staging: bool, optional :param arch: architecture that this transformation was compiled for (defined in :py:class:`~Pegasus.api.site_catalog.Arch` , e.g :code:`Arch.X86_64`), defaults to None :type arch: Optional[Arch] :param os_type: name of os that this transformation was compiled for (defined in :py:class:`~Pegasus.api.site_catalog.OS` , e.g. :code:`OS.LINUX`), defaults to None :type os_type: Optional[OS] :param os_release: release of os that this transformation was compiled for, defaults to None :type os_release: Optional[str] :param os_version: version of os that this transformation was compiled for, defaults to None :type os_version: Optional[str] :param container: a :py:class:`~Pegasus.api.transformation_catalog.Container` or name of the container to be used for this transformation, defaults to None :type container: Optional[Union[Container, str]] :param checksum: Dict containing checksums for this file. Currently only sha256 is given. This should be entered as :code:`{"sha256": <value>}`, defaults to None :type checksum: Optional[Dict[str, str]] :raises TypeError: container must be of type :py:class:`~Pegasus.api.transformation_catalog.Container` or str :raises TypeError: arch must be one of :py:class:`~Pegasus.api.site_catalog.Arch` :raises TypeError: os_type must be one of :py:class:`~Pegasus.api.site_catalog.OS` :raises ValueError: fields: namespace, name, and field must not contain any :code:`:` (colons) :raises ValueError: pfn must be given as an absolute path when :code:`pathlib.Path` is used :raises ValueError: :code:`bypass_staging=True` can only be used when :code:`is_stageable=True` """ for field, value in { "name": name, "namespace": namespace, "version": version, }.items(): if ":" in str(value): raise ValueError( "invalid {field}: {value}; {field} must not contain ':' (colon) characters".format( field=field, value=value ) ) self.name = name self.namespace = namespace self.version = version self.sites = OrderedDict() self.requires = set() self.hooks = defaultdict(list) self.profiles = defaultdict(OrderedDict) self.metadata = OrderedDict() # add site if site if given if site is not None and pfn is not None: self.add_sites( TransformationSite( site, pfn, is_stageable, bypass_staging=bypass_staging, arch=arch, os_type=os_type, os_release=os_release, os_version=os_version, container=container, ) ) # ensure supported checksum type given if checksum and len(checksum) > 0: for checksum_type in checksum: if checksum_type.lower() not in Transformation._SUPPORTED_CHECKSUMS: raise ValueError( "invalid checksum: {}, supported checksum types are: {}".format( checksum_type, Transformation._SUPPORTED_CHECKSUMS ) ) self.checksum = checksum def _get_key(self): return "{}::{}::{}".format(self.namespace, self.name, self.version)
[docs] @_chained def add_sites(self, *transformation_sites: TransformationSite): """ add_sites(self, *transformation_sites: TransformationSite) Add one or more :py:class:`~Pegasus.api.transformation_catalog.TransformationSite` s to this transformation. :param transformation_sites: the :py:class:`~Pegasus.api.transformation_catalog.TransformationSite` (s) to be added :raises TypeError: argument(s) must be of type :py:class:`~Pegasus.api.transformation_catalog.TransformationSite` :raises DuplicateError: a :py:class:`~Pegasus.api.transformation_catalog.TransformationSite` with the same name as the one you are attempting to add already exists :return: self """ for ts in transformation_sites: if not isinstance(ts, TransformationSite): raise TypeError( "invalid transformation_site: {transformation_site}; transformation_site must be of type TransformationSite".format( transformation_site=ts ) ) if ts.name in self.sites: raise DuplicateError( "transformation site: {name} has already been added to {transformation}".format( name=ts.name, transformation=self ) ) self.sites[ts.name] = ts
[docs] @_chained def add_requirement( self, required_transformation: Union[str, "Transformation"], namespace: str = None, version: str = None, ): """ add_requirement(self, required_transformation: Union[str, Transformation], namespace: str = None, version: str = None) Add a requirement to this transformation. Specify the other transformation, identified by name, namespace, and version, that this transformation depends upon. If a :py:class:`~Pegasus.api.transformation_catalog.Transformation` is passed in for :code:`required_transformation`, then namespace and version are ignored. :param required_transformation: :py:class:`~Pegasus.api.transformation_catalog.Transformation` that this transformation requires :type required_transformation: str or Transformation :raises DuplicateError: this requirement already exists :raises ValueError: :code:`required_transformation` must be of type :py:class:`~Pegasus.api.transformation_catalog.Transformation` or str :raises ValueError: namespace, required transformation name, and version cannot contain any :code:`:` (colon) characters :raises TypeError: required_transformation must be one of type str or :py:class:`~Pegasus.api.transformation_catalog.Transformation` :return: self """ key = "" if isinstance(required_transformation, Transformation): if required_transformation.namespace: key += required_transformation.namespace + "::" key += required_transformation.name if required_transformation.version: key += ":" + required_transformation.version elif isinstance(required_transformation, str): for field, value in { "namespace": namespace, "required_transformation": required_transformation, "version": version, }.items(): if ":" in str(value): raise ValueError( "invalid {field}: {value}; {field} must not contain `:` characters".format( field=field, value=value ) ) if namespace: key += namespace + "::" key += required_transformation if version: key += ":" + version else: raise TypeError( "invalid required_transformation: {required_transformation}; required_transformation must be of type Transformation or str".format( required_transformation=required_transformation ) ) if key in self.requires: raise DuplicateError( "transformation: {key} already added as a required transformation to {tr}".format( key=key, tr=self ) ) self.requires.add(key)
def __json__(self): return _filter_out_nones( OrderedDict( [ ("namespace", self.namespace), ("name", self.name), ("version", self.version), ( "requires", [req for req in self.requires] if len(self.requires) > 0 else None, ), ("sites", [site for _, site in self.sites.items()]), ( "profiles", OrderedDict(sorted(self.profiles.items(), key=lambda _: _[0])) if len(self.profiles) > 0 else None, ), ( "hooks", OrderedDict( [ (hook_name, [hook for hook in values]) for hook_name, values in self.hooks.items() ] ) if len(self.hooks) > 0 else None, ), ("metadata", self.metadata if len(self.metadata) > 0 else None), ("checksum", self.checksum), ] ) ) def __str__(self): return "<Transformation {}::{}:{}>".format( self.namespace, self.name, self.version ) def __hash__(self): return hash(self._get_key()) def __eq__(self, other): if isinstance(other, Transformation): return self._get_key() == other._get_key() raise ValueError( "Transformation cannot be compared with {}".format(type(other)) )
[docs] class TransformationCatalog(Writable): """Maintains a list a :py:class:`~Pegasus.api.transformation_catalog.Transformation` s and :py:class:~`Pegasus.api.transformation_catalog.Container` s .. code-block:: python # Example centos_pegasus = Container( "centos-pegasus", Container.DOCKER, "docker:///ryan/centos-pegasus:latest", mounts=["/Volumes/Work/lfs1:/shared-data/:ro"] ) preprocess = Transformation( "preprocess", site="condorpool", pfn="/usr/bin/pegasus-keg", is_stageable=False, arch=Arch.X86_64, os_type=OS.LINUX, container=centos_pegasus ) findrange = Transformation( "findrange", site="condorpool", pfn="/usr/bin/pegasus-keg", is_stageable=False, arch=Arch.X86_64, os_type=OS.LINUX, container=centos_pegasus ) analyze = Transformation( "analyze", site="condorpool", pfn="/usr/bin/pegasus-keg", is_stageable=False, arch=Arch.X86_64, os_type=OS.LINUX, container=centos_pegasus ) tc = TransformationCatalog()\\ .add_containers(centos_pegasus)\\ .add_transformations(preprocess, findrange, analyze)\\ .write() """ _DEFAULT_FILENAME = "transformations.yml" def __init__(self): Writable.__init__(self) self.transformations = OrderedDict() self.containers = OrderedDict()
[docs] @_chained def add_transformations(self, *transformations: Transformation): """ add_transformations(self, *transformations: Transformation) Add one or more :py:class:`~Pegasus.api.transformation_catalog.Transformation` s to this catalog .. code-block:: python # Example tc.add_transformations( Transformation( "analyze", site="condorpool", pfn="/usr/bin/pegasus-keg", is_stageable=False, arch=Arch.X86_64, os_type=OS.LINUX, container=centos_pegasus ) ) :param transformations: the :py:class:`~Pegasus.api.transformation_catalog.Transformation` (s) to be added :raises TypeError: argument(s) must be of type :py:class:`~Pegasus.api.transformation_catalog.Transformation` :raises DuplicateError: the given :py:class:`~Pegasus.api.transformation_catalog.Transformation` already exists in this catalog :return: self """ for tr in transformations: if not isinstance(tr, Transformation): raise TypeError( "invalid transformation: {tr}, transformation(s) must be of type Transformation".format( tr=tr ) ) if tr._get_key() in self.transformations: raise DuplicateError( "transformation: {key} has already been added to this TransformationCatalog".format( key=tr._get_key() ) ) self.transformations[tr._get_key()] = tr
[docs] @_chained def add_containers(self, *containers: Container): """ add_containers(self, *containers: Container) Add one or more :py:class:`~Pegasus.api.transformation_catalog.Container` s to this catalog .. code-block:: python # Example tc.add_containers( Container( "centos-pegasus", Container.DOCKER, "docker:///ryan/centos-pegasus:latest", mounts=["/Volumes/Work/lfs1:/shared-data/:ro"] ) ) :param containers: the :py:class:`~Pegasus.api.transformation_catalog.Container` to be added :raises TypeError: argument(s) must be of type :py:class:`~Pegasus.api.transformation_catalog.Container` :raises DuplicateError: a container with the same name already exists in this catalog :return: self """ for c in containers: if not isinstance(c, Container): raise TypeError( "invalid container: {container}; container must be of type Container".format( container=c ) ) if c.name in self.containers: raise DuplicateError( "container: {} has already been added to this TransformationCatalog".format( c.name ) ) self.containers[c.name] = c
def __json__(self): containers = None if len(self.containers) > 0: containers = [c for _, c in self.containers.items()] return _filter_out_nones( OrderedDict( [ ("pegasus", PEGASUS_VERSION), ("transformations", [t for _, t in self.transformations.items()]), ("containers", containers), ] ) )