from collections import OrderedDict, defaultdict
from enum import Enum
from pathlib import Path
from typing import Optional, Union
from Pegasus.api._utils import _chained, _get_class_enum_member_str, _get_enum_str
from Pegasus.api.errors import DuplicateError
from Pegasus.api.mixins import ProfileMixin
from Pegasus.api.writable import Writable, _filter_out_nones
PEGASUS_VERSION = "5.0.4"
__all__ = [
"Arch",
"OS",
"Grid",
"SupportedJobs",
"Operation",
"Scheduler",
"FileServer",
"Directory",
"Site",
"SiteCatalog",
]
[docs]
class Arch(Enum):
"""Architecture types"""
X86 = "x86"
X86_64 = "x86_64"
PPC = "ppc"
PPC_64 = "ppc_64"
PPC64LE = "ppc64le"
IA64 = "ia64"
SPARCV7 = "sparcv7"
SPARCV9 = "sparcv9"
AMD64 = "amd64"
AARCH64 = "aarch64"
[docs]
class OS(Enum):
"""Operating system types"""
LINUX = "linux"
SUNOS = "sunos"
AIX = "aix"
MACOSX = "macosx"
WINDOWS = "windows"
[docs]
class Operation(Enum):
"""Different types of operations supported by a file server"""
ALL = "all"
PUT = "put"
GET = "get"
class _DirectoryType(Enum):
"""Different types of directories supported for a site"""
#: Describes a scratch file systems. Pegasus will use this to store
#: intermediate `da`ta between jobs and other temporary files.
SHARED_SCRATCH = "sharedScratch"
# TODO: where is this documented? the others were in user guide
SHARED_STORAGE = "sharedStorage"
#: Describes the scratch file systems available locally on a compute node.
LOCAL_SCRATCH = "localScratch"
#: Describes a long term storage file system. This is the directory
#: Pegasus will stage output files to.
LOCAL_STORAGE = "localStorage"
class _GridType(Enum):
"""Different grid types that can be supported by Pegasus. Mirror the Condor
grid types. https://htcondor.readthedocs.io/en/latest/grid-computing/grid-universe.html
"""
GT5 = "gt5"
CONDOR = "condor"
CREAM = "cream"
BATCH = "batch"
NORDUGRID = "nordugrid"
UNICORE = "unicore"
EC2 = "ec2"
DELTACLOUD = "deltacloud"
[docs]
class Scheduler(Enum):
"""Different scheduler types on the Grid"""
FORK = "fork"
PBS = "pbs"
LSF = "lsf"
CONDOR = "condor"
SGE = "sge"
SLURM = "slurm"
UNKNOWN = "unknown"
[docs]
class FileServer(ProfileMixin):
"""Describes the fileserver to access data from outside"""
def __init__(self, url: str, operation_type: Operation):
"""
:param url: url including protocol such as :code:`scp://obelix.isi.edu/data`
:type url: str
:param operation_type: operation type defined in :py:class:`~Pegasus.api.site_catalog.OperationType` (e.g. :code:`Operation.ALL`)
:type operation_type: OperationType
:raises ValueError: operation_type must be one defined in :py:class:`~Pegasus.api.site_catalog.OperationType`
"""
self.url = url
if not isinstance(operation_type, Operation):
raise TypeError(
"invalid operation_type: {operation_type}; operation_type must be one of {enum_str}".format(
operation_type=operation_type, enum_str=_get_enum_str(Operation)
)
)
self.operation_type = operation_type.value
self.profiles = defaultdict(OrderedDict)
def __json__(self):
return _filter_out_nones(
OrderedDict(
[
("url", self.url),
("operation", self.operation_type),
(
"profiles",
OrderedDict(sorted(self.profiles.items(), key=lambda _: _[0]))
if len(self.profiles) > 0
else None,
),
]
)
)
[docs]
class Directory:
"""Information about filesystems Pegasus can use for storing temporary and long-term
files.
"""
#: Describes a scratch file systems. Pegasus will use this to store
#: intermediate data between jobs and other temporary files.
SHARED_SCRATCH = _DirectoryType.SHARED_SCRATCH
# TODO: where is this documented? the others were in user guide
SHARED_STORAGE = _DirectoryType.SHARED_STORAGE
#: Describes the scratch file systems available locally on a compute node.
LOCAL_SCRATCH = _DirectoryType.LOCAL_SCRATCH
#: Describes a long term storage file system. This is the directory
#: Pegasus will stage output files to.
LOCAL_STORAGE = _DirectoryType.LOCAL_STORAGE
# the site catalog schema lists freeSize and totalSize as an attribute
# however this appears to not be used; removing it as a parameter
# def __init__(self, directory_type, path, free_size=None, total_size=None):
def __init__(
self,
directory_type: _DirectoryType,
path: Union[str, Path],
shared_file_system: bool = False,
):
"""
:param directory_type: directory type defined in :py:class:`~Pegasus.api.site_catalog.DirectoryType` (e.g. :code:`Directory.SHARED_SCRATCH` or :code:`Directory.LOCAL_STORAGE`)
:type directory_type: _DirectoryType
:param path: directory path
:type path: Union[str, Path]
:param shared_file_system: indicate whether the shared scratch space is accessible to the worker nodes via a shared filesystem, defaults to False
:type shared_file_system: bool
:raises ValueError: directory_type must be one of :py:class:`~Pegasus.api.site_catalog.DirectoryType`
:raises ValueError: path must be given as an absolute path
"""
if not isinstance(directory_type, _DirectoryType):
raise TypeError(
"invalid directory_type: {directory_type}; directory type must be one of {cls_enum_str}".format(
directory_type=directory_type,
cls_enum_str=_get_class_enum_member_str(Directory, _DirectoryType),
)
)
self.directory_type = directory_type.value
# ensure given path is absolute
if isinstance(path, str):
path = Path(path)
if not path.is_absolute():
raise ValueError(
"invalid path: {}, path must be given as an absolute path".format(path)
)
self.path = str(path)
# self.free_size = free_size
# self.total_size = total_size
self.shared_file_system = shared_file_system
self.file_servers = list()
[docs]
@_chained
def add_file_servers(self, *file_servers: FileServer):
"""
add_file_servers(self, *file_servers: FileServer)
Add one or more access methods to this directory
:param file_server: a :py:class:`~Pegasus.api.site_catalog.FileServer`
:raises ValueError: file_server must be of type :py:class:`~Pegasus.api.site_catalog.FileServer`
:return: self
"""
for fs in file_servers:
if not isinstance(fs, FileServer):
raise TypeError(
"invalid file_server: {file_server}; file_server must be of type FileServer".format(
file_server=fs
)
)
self.file_servers.append(fs)
def __json__(self):
return _filter_out_nones(
OrderedDict(
[
("type", self.directory_type),
("path", self.path),
("sharedFileSystem", self.shared_file_system),
("fileServers", [fs for fs in self.file_servers]),
("freeSize", None),
("totalSize", None),
]
)
)
[docs]
class SupportedJobs(Enum):
"""Types of jobs in the executable workflow this grid supports"""
COMPUTE = "compute"
AUXILLARY = "auxillary"
TRANSFER = "transfer"
REGISTER = "register"
CLEANUP = "cleanup"
[docs]
class Grid:
"""Each site supports various (usually two) job managers"""
GT5 = _GridType.GT5
CONDOR = _GridType.CONDOR
CREAM = _GridType.CREAM
BATCH = _GridType.BATCH
NORDUGRID = _GridType.NORDUGRID
UNICORE = _GridType.UNICORE
EC2 = _GridType.EC2
DELTACLOUD = _GridType.DELTACLOUD
def __init__(
self,
grid_type: _GridType,
contact: str,
scheduler_type: Scheduler,
job_type: Optional[SupportedJobs] = None,
):
"""
:param grid_type: a grid type defined in :py:class:`~Pegasus.api.site_catalog.Grid` (e.g. :code:`Grid.SLURM`)
:type grid_type: _GridType
:param contact: endpoint such as :code:`"workflow.isi.edu"`
:type contact: str
:param scheduler_type: a scheduler type defined in :py:class:`~Pegasus.api.site_catalog.Scheduler` (e.g. :code:`Scheduler.SLURM`)
:type scheduler_type: Scheduler
:param job_type: a job type defined in :py:class:`~Pegasus.api.site_catalog.SupportedJobs` (e.g. :code:`SupportedJobs.COMPUTE`), defaults to None
:type job_type: Optional[SupportedJobs], optional
:raises ValueError: grid_type must be one defined in :py:class:`~Pegasus.api.site_catalog.Grid` (e.g. :code:`Grid.PBS`)
:raises ValueError: scheduler_type must be one defined in :py:class:`~Pegasus.api.site_catalog.Scheduler` (e.g. :code:`Scheduler.PBS`)
:raises ValueError: job_type must be one defined in :py:class`~Pegasus.api.site_catalog.SupportedJobs` (e.g. :code:`SupportedJobs.COMPUTE`)
"""
if not isinstance(grid_type, _GridType):
raise TypeError(
"invalid grid_type: {grid_type}; grid_type must be one of {cls_enum_str}".format(
grid_type=grid_type,
cls_enum_str=_get_class_enum_member_str(Grid, _GridType),
)
)
self.grid_type = grid_type.value
self.contact = contact
if not isinstance(scheduler_type, Scheduler):
raise TypeError(
"invalid scheduler_type: {scheduler_type}; scheduler_type must be one of {enum_str}".format(
scheduler_type=scheduler_type, enum_str=_get_enum_str(Scheduler)
)
)
self.scheduler_type = scheduler_type.value
self.job_type = job_type
if job_type is not None:
if not isinstance(job_type, SupportedJobs):
raise TypeError(
"invalid job_type: {job_type}; job_type must be one of {enum_str}".format(
job_type=job_type, enum_str=_get_enum_str(SupportedJobs)
)
)
else:
self.job_type = job_type.value
def __json__(self):
return _filter_out_nones(
OrderedDict(
[
("type", self.grid_type),
("contact", self.contact),
("scheduler", self.scheduler_type),
("jobtype", self.job_type),
]
)
)
[docs]
class Site(ProfileMixin):
"""A compute resource (which is often a cluster) that we intend to run
the workflow upon. A site is a homogeneous part of a cluster that has at
least a single GRAM gatekeeper with a jobmanager-fork and jobmanager-<scheduler>
interface and at least one gridftp server along with a shared file system.
The GRAM gatekeeper can be either WS GRAM or Pre-WS GRAM. A site can also
be a condor pool or glidein pool with a shared file system.
.. code-block:: python
# Example
site = Site(LOCAL, arch=Arch.X86_64, os_type=OS.LINUX, os_release="rhel", os_version="7")\\
.add_directories(
Directory(Directory.SHARED_SCRATCH, shared_scratch_dir)
.add_file_servers(FileServer("file://" + shared_scratch_dir, Operation.ALL)),
Directory(Directory.LOCAL_STORAGE, local_storage_dir)
.add_file_servers(FileServer("file://" + local_storage_dir, Operation.ALL))
)
"""
def __init__(
self,
name: str,
arch: Optional[Arch] = None,
os_type: Optional[OS] = None,
os_release: Optional[str] = None,
os_version: Optional[str] = None,
):
"""
:param name: name of the site
:type name: str
:param arch: the site's architecture (e.g. :code:`Arch.X86_64`), defaults to None
:type arch: Optional[Arch]
:param os_type: the site's operating system (e.g. :code:`OS.LINUX`), defaults to None
:type os_type: Optional[OS], optional
:param os_release: the release of the site's operating system, defaults to None
:type os_release: Optional[str]
:param os_version: the version of the site's operating system, defaults to None
:type os_version: Optional[str]
:raises ValueError: arch must be one of :py:class:`~Pegasus.api.site_catalog.Arch`
:raises ValueError: os_type must be one of :py:class:`~Pegasus.api.site_catalog.OS`
"""
self.name = name
self.directories = list()
self.grids = list()
if arch is not None:
if not isinstance(arch, Arch):
raise TypeError(
"invalid arch: {arch}; arch must be one of {enum_str}".format(
arch=arch, enum_str=_get_enum_str(Arch)
)
)
else:
self.arch = arch.value
else:
self.arch = arch
if os_type is not None:
if not isinstance(os_type, OS):
raise TypeError(
"invalid os_type: {os_type}; os_type must be one of {enum_str}".format(
os_type=os_type, enum_str=_get_enum_str(OS)
)
)
else:
self.os_type = os_type.value
else:
self.os_type = os_type
self.os_release = os_release
self.os_version = os_version
self.profiles = defaultdict(OrderedDict)
[docs]
@_chained
def add_directories(self, *directories: Directory):
"""
add_directories(self, *directories: Directory)
Add one or more :py:class:`~Pegasus.api.site_catalog.Directory` to this :py:class:`~Pegasus.api.site_catalog.Site`
:param directory: the :py:class:`~Pegasus.api.site_catalog.Directory` to be added
:raises TypeError: directory must be of type :py:class:`~Pegasus.api.site_catalog.Directory`
:return: self
"""
for d in directories:
if not isinstance(d, Directory):
raise TypeError(
"invalid directory: {directory}; directory is not of type Directory".format(
directory=d
)
)
self.directories.append(d)
[docs]
@_chained
def add_grids(self, *grids: Grid):
"""
add_grids(self, *grids: Grid)
Add one or more :py:class:`~Pegasus.api.site_catalog.Grid` to this :py:class:`~Pegasus.api.site_catalog.Site`
:param grid: the :py:class:`~Pegasus.api.site_catalog.Grid` to be added
:raises TypeError: grid must be of type :py:class:`~Pegasus.api.site_catalog.Grid`
:return: self
"""
for g in grids:
if not isinstance(g, Grid):
raise TypeError(
"invalid grid: {grid}; grid must be of type Grid".format(grid=g)
)
self.grids.append(g)
def __json__(self):
return _filter_out_nones(
OrderedDict(
[
("name", self.name),
("arch", self.arch),
("os.type", self.os_type),
("os.release", self.os_release),
("os.version", self.os_version),
("directories", [d for d in self.directories]),
("grids", [g for g in self.grids] if len(self.grids) > 0 else None),
(
"profiles",
OrderedDict(sorted(self.profiles.items(), key=lambda _: _[0]))
if len(self.profiles) > 0
else None,
),
]
)
)
[docs]
class SiteCatalog(Writable):
"""The SiteCatalog describes the compute resources, or :py:class:`~Pegasus.api.site_catalog.Site` s
that we intend to run the workflow upon.
.. code-block:: python
sc = SiteCatalog()
WORK_DIR = Path(".").resolve()
shared_scratch_dir = WORK_DIR / RUN_ID
local_storage_dir = WORK_DIR / "outputs" / RUN_ID
local = Site("local")\\
.add_directories(
Directory(Directory.SHARED_SCRATCH, shared_scratch_dir)
.add_file_servers(FileServer("file://" + shared_scratch_dir, Operation.ALL)),
Directory(Directory.LOCAL_STORAGE, local_storage_dir)
.add_file_servers(FileServer("file://" + local_storage_dir, Operation.ALL))
)
condorpool = Site("condorpool")\\
.add_pegasus_profile(style="condor")\\
.add_pegasus_profile(auxillary_local="true")\\
.add_condor_profile(universe="vanilla")
sc.add_sites(local, condorpool)
sc.write()
"""
_DEFAULT_FILENAME = "sites.yml"
def __init__(self):
Writable.__init__(self)
self.sites = OrderedDict()
[docs]
@_chained
def add_sites(self, *sites: Site):
"""
add_sites(self, *sites: Site)
Add one or more sites to this catalog
:param site: the site to be added
:raises TypeError: site must be of type :py:class:`~Pegasus.api.site_catalog.Site`
:raises DuplicateError: a site with the same name already exists in this catalog
:return: self
"""
for s in sites:
if not isinstance(s, Site):
raise TypeError(
"invalid site: {site}; site must be of type Site".format(site=s)
)
if s.name in self.sites:
raise DuplicateError(
"site with name: {} already exists in this SiteCatalog".format(
s.name
)
)
self.sites[s.name] = s
def __json__(self):
return OrderedDict(
[
("pegasus", PEGASUS_VERSION),
("sites", [site for _, site in self.sites.items()]),
]
)