1.1.1. Pegasus.api package

1.1.1.1. Submodules

1.1.1.2. Pegasus.api.errors module

exception DuplicateError[source]

Bases: Pegasus.api.errors.PegasusError

exception NotFoundError[source]

Bases: Pegasus.api.errors.PegasusError

exception PegasusError[source]

Bases: Exception

1.1.1.3. Pegasus.api.mixins module

class EventType(value)[source]

Bases: enum.Enum

Event type on which a hook will be triggered

ALL = 'all'
END = 'end'
ERROR = 'error'
NEVER = 'never'
START = 'start'
SUCCESS = 'success'
class HookMixin[source]

Bases: object

Derived class can have hooks assigned to it. This currently supports shell hooks. The supported hooks are triggered when some event, specified by EventType, takes place.

add_shell_hook(self, event_type: EventType, cmd: str)[source]

Add a shell hook. The given command will be executed by the shell when the specified EventType takes place.

# Example
wf.add_shell_hook(EventType.START, "echo 'hello'")
Parameters
  • event_type (EventType) – an event type defined in EventType

  • cmd (str) – shell command

Raises

ValueError – event_type must be one of EventType

Returns

self

class MetadataMixin[source]

Bases: object

Derived class can have metadata assigned to it as key value pairs.

add_metadata(self, *args: Dict[str, Union[str, int, float, bool]], **kwargs)[source]

Add metadata key value pairs to this object

# Example 1
job.add_metadata({"key1": "value1"})

# Example 2
job.add_metadata(key1="value1, key2="value2")
Parameters

args (Dict[str, Union[str, int, float, bool]]) – dictionary of key value pair to add as metadata

Raises

TypeError – each arg in args must be a dict

Returns

self

class Namespace(value)[source]

Bases: enum.Enum

Profile Namespace values recognized by Pegasus

CONDOR = 'condor'
DAGMAN = 'dagman'
ENV = 'env'
GLOBUS = 'globus'
PEGASUS = 'pegasus'
SELECTOR = 'selector'
STAT = 'stat'
class ProfileMixin[source]

Bases: object

add_condor_profile(*, universe: str = None, periodic_release: str = None, periodic_remove: str = None, filesystem_domain: str = None, stream_error: bool = None, stream_output: bool = None, priority: str = None, request_cpus: str = None, request_gpus: str = None, request_memory: str = None, request_disk: str = None, requirements: str = None, should_transfer_files: str = None, when_to_transfer_output: str = None, condor_collector: str = None, grid_resource: str = None, cream_attributes: str = None)[source]

Add Condor profile(s).

The condor profiles permit to add or overwrite instructions in the Condor submit file.

Parameters
  • universe (str, optional) – Pegasus defaults to either globus or scheduler universes. Set to standard for compute jobs that require standard universe. Set to vanilla to run natively in a condor pool, or to run on resources grabbed via condor glidein, defaults to None

  • periodic_release (str, optional) – is the number of times job is released back to the queue if it goes to HOLD, e.g. due to Globus errors. Pegasus defaults to 3, defaults to None

  • periodic_remove (str, optional) – is the number of times a job is allowed to get into HOLD state before being removed from the queue. Pegasus defaults to 3, defaults to None

  • filesystem_domain (str, optional) – Useful for Condor glide-ins to pin a job to a remote site, defaults to None

  • stream_error (bool, optional) – boolean to turn on the streaming of the stderr of the remote job back to submit host, defaults to None

  • stream_output (bool, optional) – boolean to turn on the streaming of the stdout of the remote job back to submit host, defaults to None

  • priority (str, optional) – integer value to assign the priority of a job. Higher value means higher priority. The priorities are only applied for vanilla / standard/ local universe jobs. Determines the order in which a users own jobs are executed, defaults to None

  • request_cpus (str, optional) – Number of CPU’s a job requires, defaults to None

  • request_gpus (str, optional) – Number of GPU’s a job requires, defaults to None

  • request_memory (str, optional) – Amount of memory a job requires. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None

  • request_disk (str, optional) – Amount of disk a job requires. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None

  • requirements (str, optional) – a job requirements expression such as "(CUDACapability >= 1.2) && $(requirements:True)"

  • should_transfer_files – Used to define if HTCondor should transfer files to and from the remote machine where the job runs, Given as "YES", "NO", or "IF_NEEDED"

  • when_to_transfer_output (str, optional) – Given as a one of "ON_EXIT", "ON_EXIT_OR_EVICT", or "ON_SUCCESS"

  • condor_collector (str, optional) – Specify the condor collector to use (e.g "ccg-testing999.isi.edu")

  • grid_resource (str, optional,) – Specify a grid resource such as "batch pbs"

  • cream_attributes (str, optional) – Additional cream attributes (e.g. "key1=value1;key2=value2")

Returns

self

add_dagman_profile(*, pre: Union[str, pathlib.Path] = None, pre_arguments: str = None, post: str = None, post_arguments: str = None, retry: str = None, category: str = None, priority: str = None, abort_dag_on: str = None, max_pre: str = None, max_post: str = None, max_jobs: str = None, max_idle: str = None, post_scope: str = None)[source]

Add Dagman profile(s).

Note, to add the profile keys “post.path.[value of dagman.post]” or [“category-name].maxjobs”, use the following:

job.add_profiles(Namespace.DAGMAN, key="post.path.<value of dagman.post>", value=<value string>)
job.add_profiles(Namespace.DAGMAN, key="<category-name>.maxjobs", value=<value string>)
Parameters
  • pre (Union[str, Path], optional) – is the path to the pre-script. DAGMan executes the pre-script before it runs the job, defaults to None

  • pre_arguments (str, optional) – are command-line arguments for the pre-script, if any, defaults to None

  • post (str, optional) – is the postscript type/mode that a user wants to associate with a job (see docs for more information), defaults to None

  • post_arguments (str, optional) – are the command line arguments for the post script, if any, defaults to None

  • retry (int, optional) – is the number of times DAGMan retries the full job cycle from pre-script through post-script, if failure was detected, defaults to None

  • category (str, optional) – the DAGMan category the job belongs to, defaults to None

  • priority (int, optional) – the priority to apply to a job. DAGMan uses this to select what jobs to release when MAXJOBS is enforced for the DAG, defaults to None

  • abort_dag_on (str, optional) – The ABORT-DAG-ON key word provides a way to abort the entire DAG if a given node returns a specific exit code (AbortExitValue). The syntax for the value of the key is AbortExitValue [RETURN DAGReturnValue] . When a DAG aborts, by default it exits with the node return value that caused the abort. This can be changed by using the optional RETURN key word along with specifying the desired DAGReturnValue, defaults to None

  • max_pre (str, optional) – sets the maximum number of PRE scripts within the DAG that may be running at one time, defaults to None

  • max_post (str, optional) – sets the maximum number of POST scripts within the DAG that may be running at one time, defaults to None

  • max_jobs (str, optional) – sets the maximum number of jobs within the DAG that will be submitted to Condor at one time, defaults to None

  • max_idle (str, optional) – Sets the maximum number of idle jobs allowed before HTCondor DAGMan stops submitting more jobs. Once idle jobs start to run, HTCondor DAGMan will resume submitting jobs. If the option is omitted, the number of idle jobs is unlimited, defaults to None

  • post_scope (str, optional) –

    can be “all”, “none” or “essential” (see docs for more information), defaults to None

Returns

self

add_env(key: Optional[str] = None, value: Optional[Union[str, int, float, bool, pathlib.Path]] = None, **kw)

Add environment variable(s)

add_globus_profile(*, count: int = None, job_type: str = None, max_cpu_time: int = None, max_memory: str = None, max_time: int = None, max_wall_time: int = None, min_memory: int = None, project: str = None, queue: str = None)[source]

Add Globus profile(s).

The globus profile namespace encapsulates Globus resource specification language (RSL) instructions. The RSL configures settings and behavior of the remote scheduling system.

Parameters
  • count (int, optional) – the number of times an executable is started, defaults to None

  • job_type (str, optional) – specifies how the job manager should start the remote job. While Pegasus defaults to single, use mpi when running MPI jobs., defaults to None

  • max_cpu_time (int, optional) – the max CPU time in minutes for a single execution of a job, defaults to None

  • max_memory (str, optional) – the maximum memory in MB required for the job. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None

  • max_time (int, optional) – the maximum time or walltime in minutes for a single execution of a job, defaults to None

  • max_wall_time (int, optional) – the maximum walltime in minutes for a single execution of a job, defaults to None

  • min_memory (str, optional) – the minumum amount of memory required for this job. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None

  • project (str, optional) – associates an account with a job at the remote end, defaults to None

  • queue (str, optional) – the remote queue in which the job should be run. Used when remote scheduler is PBS that supports queues, defaults to None

Returns

self

add_pegasus_profile(*, clusters_num: int = None, clusters_size: int = None, job_aggregator: int = None, grid_start: int = None, grid_start_path: str = None, grid_start_arguments: str = None, grid_start_launcher: str = None, grid_start_launcher_arguments: str = None, stagein_clusters: int = None, stagein_local_clusters: int = None, stagein_remote_clusters: int = None, stageout_clusters: int = None, stageout_local_clusters: int = None, stageout_remote_clusters: int = None, group: str = None, change_dir: bool = None, create_dir: bool = None, transfer_proxy: bool = None, style: str = None, pmc_request_memory: str = None, pmc_request_cpus: int = None, pmc_priority: int = None, pmc_task_arguments: str = None, exitcode_failure_msg: str = None, exitcode_success_msg: str = None, checkpoint_time: int = None, max_walltime: int = None, glite_arguments: str = None, auxillary_local: bool = None, condor_arguments_quote: bool = None, runtime: str = None, clusters_max_runtime: int = None, cores: int = None, gpus: int = None, nodes: int = None, ppn: int = None, memory: str = None, diskspace: str = None, data_configuration=None, queue: str = None, project: str = None, boto_config: str = None, container_arguments: str = None, label: str = None, pegasus_lite_env_source: Union[str, pathlib.Path] = None)[source]

Add Pegasus profile(s).

Parameters
  • clusters_num (int, optional) – Determines the total number of clusters per level, jobs are evenly spread across clusters (see Pegasus Clustering Guide for more information), defaults to None

  • clusters_size (int, optional) –

    Determines the number of jobs in each cluster (see Pegasus Clustering Guide for more information), defaults to None

  • job_aggregator (int, optional) – Indicates the clustering executable that is used to run the clustered job on the remote site, defaults to None

  • grid_start (int, optional) –

    Determines the executable for launching a job (see docs for more information), defaults to None

  • grid_start_path (str, optional) – Sets the path to the gridstart . This profile is best set in the Site Catalog, defaults to None

  • grid_start_arguments (str, optional) – Sets the arguments with which GridStart is used to launch a job on the remote site, defaults to None

  • grid_start_launcher (str, optional) – specifies the path to the executable to launch kickstart, defaults to None,

  • grid_start_launcher_arguments (str, optional) – the arguments to the launcher executable if any

  • stagein_clusters (int, optional) – This key determines the maximum number of stage-in jobs that are can executed locally or remotely per compute site per workflow. This is used to configure the BalancedCluster Transfer Refiner, which is the Default Refiner used in Pegasus. This profile is best set in the Site Catalog or in the Properties file, defaults to None

  • stagein_local_clusters (int, optional) – This key provides finer grained control in determining the number of stage-in jobs that are executed locally and are responsible for staging data to a particular remote site. This profile is best set in the Site Catalog or in the Properties file, defaults to None

  • stagein_remote_clusters (int, optional) – This key provides finer grained control in determining the number of stage-in jobs that are executed remotely on the remote site and are responsible for staging data to it. This profile is best set in the Site Catalog or in the Properties file, defaults to None

  • stageout_clusters (int, optional) – This key determines the maximum number of stage-out jobs that are can executed locally or remotely per compute site per workflow. This is used to configure the BalancedCluster Transfer Refiner, , which is the Default Refiner used in Pegasus, defaults to None

  • stageout_local_clusters (int, optional) – This key provides finer grained control in determining the number of stage-out jobs that are executed locally and are responsible for staging data from a particular remote site. This profile is best set in the Site Catalog or in the Properties file, defaults to None

  • stageout_remote_clusters (int, optional) – This key provides finer grained control in determining the number of stage-out jobs that are executed remotely on the remote site and are responsible for staging data from it. This profile is best set in the Site Catalog or in the Properties file, defaults to None

  • group (str, optional) – Tags a job with an arbitrary group identifier. The group site selector makes use of the tag, defaults to None

  • change_dir (bool, optional) – If true, tells kickstart to change into the remote working directory. Kickstart itself is executed in whichever directory the remote scheduling system chose for the job, defaults to None

  • create_dir (bool, optional) – If true, tells kickstart to create the the remote working directory before changing into the remote working directory. Kickstart itself is executed in whichever directory the remote scheduling system chose for the job, defaults to None

  • transfer_proxy (bool, optional) – If true, tells Pegasus to explicitly transfer the proxy for transfer jobs to the remote site. This is useful, when you want to use a full proxy at the remote end, instead of the limited proxy that is transferred by CondorG, defaults to None

  • style (str, optional) – Sets the condor submit file style. If set to globus, submit file generated refers to CondorG job submissions. If set to condor, submit file generated refers to direct Condor submission to the local Condor pool. It applies for glidein, where nodes from remote grid sites are glided into the local condor pool. The default style that is applied is globus, defaults to None

  • pmc_request_memory (str, optional) – This key is used to set the -m option for pegasus-mpi-cluster. It specifies the amount of memory in MB that a job requires. This profile is usually set in the DAX for each job. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None

  • pmc_request_cpus (int, optional) – This key is used to set the -c option for pegasus-mpi-cluster. It specifies the number of cpu’s that a job requires. This profile is usually set in the DAX for each job, defaults to None

  • pmc_priority (int, optional) – This key is used to set the -p option for pegasus-mpi-cluster. It specifies the priority for a job . This profile is usually set in the DAX for each job. Negative values are allowed for priorities, defaults to None

  • pmc_task_arguments (str, optional) – The key is used to pass any extra arguments to the PMC task during the planning time. They are added to the very end of the argument string constructed for the task in the PMC file. Hence, allows for overriding of any argument constructed by the planner for any particular task in the PMC job, defaults to None

  • exitcode_failure_msg (str, optional) – The message string that pegasus-exitcode searches for in the stdout and stderr of the job to flag failures, defaults to None

  • exitcode_success_msg (str, optional) – The message string that pegasus-exitcode searches for in the stdout and stderr of the job to determine whether a job logged it’s success message or not. Note this value is used to check for whether a job failed or not i.e if this profile is specified, and pegasus-exitcode DOES NOT find the string in the job stdout or stderr, the job is flagged as failed. The complete rules for determining failure are described in the man page for pegasus-exitcode, defaults to None

  • checkpoint_time (int, optional) – the expected time in minutes for a job after which it should be sent a TERM signal to generate a job checkpoint file, defaults to None

  • max_walltime (int, optional) – the maximum walltime in minutes for a single execution of a job, defaults to None

  • glite_arguments (str, optional) – specifies the extra arguments that must appear in the local PBS generated script for a job, when running workflows on a local cluster with submissions through Glite. This is useful when you want to pass through special options to underlying LRMS such as PBS e.g. you can set value -l walltime=01:23:45 -l nodes=2 to specify your job’s resource requirements, defaults to None

  • auxillary_local (bool, optional) – indicates whether auxillary jobs associated with a compute site X, can be run on local site. This CAN ONLY be specified as a profile in the site catalog and should be set when the compute site filesystem is accessible locally on the submit host, defaults to None

  • condor_arguments_quote (bool, optional) – indicates whether condor quoting rules should be applied for writing out the arguments key in the condor submit file. By default it is true unless the job is schedule to a glite style site. The value is automatically set to false for glite style sites, as condor quoting is broken in batch_gahp, defaults to None

  • runtime (str, optional) – Specifies the expected runtime of a job in seconds, defaults to None

  • clusters_max_runtime (int, optional) – Specifies the maximum runtime of a job, defaults to None

  • cores (int, optional) – The total number of cores required for a job. This is also used for accounting purposes in the database while generating statistics. It corresponds to the multiplier_factor in the job_instance table, defaults to None

  • gpus (int, optional) – The total number of gpus required for a job, defaults to None

  • nodes (int, optional) – Indicates the number of nodes a job requires, defaults to None

  • ppn (int, optional) – Indicates the number of processors per node. This profile is best set in the Site Catalog and usually set when running workflows with MPI jobs, defaults to None

  • memory (str, optional) – Indicates the maximum memory a job requires. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None

  • diskspace (int, optional) – Indicates the maximum diskspace a job requires in MB. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None

  • data_configuration (str, optional) – Indicates the data configuration setup. Can be one of “sharedfs”, “condorio”, or “nonsharedfs”, defaults to None

  • queue (str, optional) – This specifies the queue for the job. (e.g. "normal")

  • project (str, optional) – Causes the job time to be charged to or associated with a particular project/account. This is not used for SGE.

  • boto_config (str, optional) – Specified which .boto file to use (e.g. "/home/myuser/.boto")

  • container_arguments (str, optional) – additional cli arguments that will be added to the docker container run or singularity exec command

  • label (str, optional) – associate a label to a job; used for label based clustering

  • pegasus_lite_env_source (Union[str, Path], optional) – specify a path on the submit host to indicate the file that needs to be sourced, defaults to None

add_profiles(self, ns: Namespace, key: Optional[str] = None, value: Optional[str, int, float, bool, Path] = None, **kw)[source]

Add a profile.

If key and value are given, then **kw are ignored and {Namespace::key : value} is added. Else **kw is added. When the value of “key” is not a valid python variable name, the usage in Example #1 should be used, else follow the usage shown in Example #2.

# Example 1
job.add_profiles(Namespace.DAGMAN, key="pre.arguments", value="-i f1")

# Example 2
job.add_profiles(Namespace.ENV, JAVA_HOME="/usr/bin/java", USER="ryan")

For add_globus_profile(), add_condor_profile(), add_dagman_profile(), add_selector_profile(), and add_pegasus_profile(), if a profile key that you are trying to use is not listed as a key word argument, use this function to add the profile.

Raises

TypeError – namespace must be one of Namespace

Returns

self

add_selector_profile(*, execution_site: str = None, pfn: Union[str, pathlib.Path] = None, grid_job_type: str = None)[source]

Add Selector(s).

The Selector namespace allows users to override the beahvior of the Workflow Mapper during site selection. This gives you finer grained control over where a job executes and what executable it refers to.

Parameters
  • execution_site (str, optional) – the execution site where a job should be executed, defaults to None

  • pfn (Union[str, Path], optional) – the physical file name to the main executable that a job refers to. Overrides any entries specified in the transformation catalog, defaults to None

  • grid_job_type (str, optional) –

    This profile is usually used to ensure that a compute job executes on another job manager (see docs for more information), defaults to None

Returns

self

to_mb(value: str) → int[source]

Convert the given value to MB

Parameters

value (str) – str formatted as str formatted as '<int> [MB | GB | TB | PB | EB]'

Raises

ValueError – invalid format

Returns

value in MB

Return type

int

1.1.1.4. Pegasus.api.properties module

class Properties[source]

Bases: object

Write Pegasus properties to a file.

# Example
props = Properties()
props["globus.maxtime"] = 900
props["globus.maxwalltime"] = 1000
props["dagman.retry"] = 4

props.write()
static ls(prop: Optional[str] = None)[source]

List property keys. Refer to Configuration docs for additional information. If prop is given, all properties beginning with prop will be printed, else all properties will be printed.

# Example
>>> Properties.ls("pegasus.pmc")
pegasus.pmc_priority
pegasus.pmc_request_cpus
pegasus.pmc_request_memory
pegasus.pmc_task_arguments
Parameters

prop (Optional[str]) – properties beginning with “prop” will be listed in alphabetical order, defaults to None

write(file: Optional[Union[str, TextIO]] = None)[source]

Write these properties to a file. If file is not given, these properties are written to ./pegasus.properties

# Example 1
props.write()

# Example 2
with open("conf", "w") as f:
    props.write(f)
Parameters

file (Optional[Union[str, TextIO]]) – file path or file object where properties will be written to, defaults to None

Raises

TypeError – file must be of type str or file like object

1.1.1.5. Pegasus.api.replica_catalog module

class File(lfn: str, size: Optional[int] = None)[source]

Bases: Pegasus.api.mixins.MetadataMixin

A workflow File. This class is used to represent the inputs and outputs of a Job.

# Example
input_file = File("data.txt").add_metadata(creator="ryan")
Parameters
  • lfn (str) – a unique logical filename

  • size (int) – size in bytes, defaults to None

class ReplicaCatalog[source]

Bases: Pegasus.api.writable.Writable

Maintains a mapping of logical filenames to physical filenames. Any input files to the workflow are specified here so that Pegasus knows where to obtain them.

# Example
if1 = File("if")
if2 = File("if2")

rc = ReplicaCatalog()\
    .add_replica("local", if1, "/nfs/u2/ryan/data.csv")\
    .add_replica("local", "if2", "/nfs/u2/ryan/data2.csv")\
    .write()
add_regex_replica(self, site: str, pattern: str, pfn: Union[str, Path], metadata: Optional[Dict[str, Union[int, str, float]]] = None)[source]

Add an entry to this replica catalog using a regular expression pattern. Note that regular expressions should follow Java regular expression syntax as the underlying code that handles this catalog is Java based.

# Example 1: Match f<any-character>a i.e. faa, f.a, f0a, etc.
rc.add_regex_replica("local", "f.a", "/Volumes/data/input/f.a")

# Example 2: Using groupings
rc.add_regex_replica("local", "alpha\.(csv|txt|xml)", "/Volumes/data/input/[1]/[0]")

# If the file being looked up is alpha.csv, the pfn for the file will be
# generated as /Volumes/data/input/csv/alpha.csv

# Example 3: Specifying a default location for all lfns that don't match any
# regular expressions. Note that this should be the last entry into the replica
# catalog if used.

rc.add_regex_replica("local", ".*", Path("/Volumes/data") / "input/[0]")
Parameters
  • site (str) – the site at which this replica (file) resides

  • pattern (str) – regular expression used to match a file

  • pfn (Union[str, Path]) – path to the file (may also be a pattern as shown in the example above)

  • metadata (Optional[Dict[str, Union[int, str, float]]]) – any metadata to be associated with the matched files, for example: {"creator": "pegasus"}, defaults to None

Raises

DuplicateError – Duplicate patterns with different PFNs are currently not supported

add_replica(self, site: str, lfn: Union[str, File], pfn: Union[str, Path], checksum: Optional[Dict[str, str]] = None, metadata: Optiona[Dict[str, Union[int, str, float]]] = None)[source]

Add an entry to this replica catalog.

# Example 1
f = File("in.txt").add_metadata(creator="pegasus")
rc.add_replica("local", f, Path(".").resolve() / "in.txt")

# Example 2: Adding metadata and a checksum
rc.add_replica(
    "local",
    "in.txt",
    "/home/ryan/wf/in.txt",
    checksum={"sha256": "abc123"},
    metadata={"creator": "pegasus"}
)

# Example 3: Adding multiple pfns for the same lfn (metadata and checksum will be
# updated for that lfn if given.
rc.add_replica("local", "in.txt", Path(".").resolve() / "in.txt")
rc.add_replica("condorpool", "in.txt", "/path/to/file/in.txt")
Parameters
  • site (str) – the site at which this replica (file) resides

  • lfn (Union[str, File]) – logical file name

  • pfn (Union[str, Path]) – physical file name such as Path("f.txt").resolve(), /home/ryan/file.txt, or http://pegasus.isi.edu/file.txt

  • checksum (Optional[Dict[str, str]], optional) – Dict containing checksums for this file. Currently only sha256 is given. This should be entered as {"sha256": <value>}, defaults to None

  • metadata (Optional[Dict[str, Union[int, str, float]]], optional) – metadata key value pairs associated with this lfn such as {"created": "Thu Jun 18 22:18:36 PDT 2020", "owner": "pegasus"}, defaults to None

Raises
  • ValueError – if pfn is given as a pathlib.Path, it must be an absolute path

  • ValueError – an unsupported checksum type was given

1.1.1.6. Pegasus.api.site_catalog module

class Arch(value)[source]

Bases: enum.Enum

Architecture types

AMD64 = 'amd64'
IA64 = 'ia64'
PPC = 'ppc'
PPC64LE = 'ppc64le'
PPC_64 = 'ppc_64'
SPARCV7 = 'sparcv7'
SPARCV9 = 'sparcv9'
X86 = 'x86'
X86_64 = 'x86_64'
class Directory(directory_type: Pegasus.api.site_catalog._DirectoryType, path: Union[str, pathlib.Path])[source]

Bases: object

Information about filesystems Pegasus can use for storing temporary and long-term files.

Parameters
  • directory_type (_DirectoryType) – directory type defined in DirectoryType (e.g. Directory.SHARED_SCRATCH or Directory.LOCAL_STORAGE)

  • path (Union[str, Path]) – directory path

Raises
  • ValueError – directory_type must be one of DirectoryType

  • ValueError – path must be given as an absolute path

LOCAL_SCRATCH = 'localScratch'

Describes the scratch file systems available locally on a compute node.

LOCAL_STORAGE = 'localStorage'

Describes a long term storage file system. This is the directory Pegasus will stage output files to.

SHARED_SCRATCH = 'sharedScratch'

Describes a scratch file systems. Pegasus will use this to store intermediate data between jobs and other temporary files.

SHARED_STORAGE = 'sharedStorage'
add_file_servers(self, *file_servers: FileServer)[source]

Add one or more access methods to this directory

Parameters

file_server – a FileServer

Raises

ValueError – file_server must be of type FileServer

Returns

self

class FileServer(url: str, operation_type: Pegasus.api.site_catalog.Operation)[source]

Bases: Pegasus.api.mixins.ProfileMixin

Describes the fileserver to access data from outside

Parameters
  • url (str) – url including protocol such as scp://obelix.isi.edu/data

  • operation_type (OperationType) – operation type defined in OperationType (e.g. Operation.ALL)

Raises

ValueError – operation_type must be one defined in OperationType

class Grid(grid_type: Pegasus.api.site_catalog._GridType, contact: str, scheduler_type: Pegasus.api.site_catalog.Scheduler, job_type: Optional[Pegasus.api.site_catalog.SupportedJobs] = None)[source]

Bases: object

Each site supports various (usually two) job managers

Parameters
  • grid_type (_GridType) – a grid type defined in Grid (e.g. Grid.SLURM)

  • contact (str) – endpoint such as "workflow.isi.edu"

  • scheduler_type (Scheduler) – a scheduler type defined in Scheduler (e.g. Scheduler.SLURM)

  • job_type (Optional[SupportedJobs], optional) – a job type defined in SupportedJobs (e.g. SupportedJobs.COMPUTE), defaults to None

Raises
  • ValueError – grid_type must be one defined in Grid (e.g. Grid.PBS)

  • ValueError – scheduler_type must be one defined in Scheduler (e.g. Scheduler.PBS)

  • ValueError – job_type must be one defined in :py:class`~Pegasus.api.site_catalog.SupportedJobs` (e.g. SupportedJobs.COMPUTE)

BATCH = 'batch'
CONDOR = 'condor'
CREAM = 'cream'
DELTACLOUD = 'deltacloud'
EC2 = 'ec2'
GT5 = 'gt5'
NORDUGRID = 'nordugrid'
UNICORE = 'unicore'
class OS(value)[source]

Bases: enum.Enum

Operating system types

AIX = 'aix'
LINUX = 'linux'
MACOSX = 'macosx'
SUNOS = 'sunos'
WINDOWS = 'windows'
class Operation(value)[source]

Bases: enum.Enum

Different types of operations supported by a file server

ALL = 'all'
GET = 'get'
PUT = 'put'
class Scheduler(value)[source]

Bases: enum.Enum

Different scheduler types on the Grid

CONDOR = 'condor'
FORK = 'fork'
LSF = 'lsf'
PBS = 'pbs'
SGE = 'sge'
SLURM = 'slurm'
UNKNOWN = 'unknown'
class Site(name: str, arch: Optional[Pegasus.api.site_catalog.Arch] = None, os_type: Optional[Pegasus.api.site_catalog.OS] = None, os_release: Optional[str] = None, os_version: Optional[str] = None)[source]

Bases: Pegasus.api.mixins.ProfileMixin

A compute resource (which is often a cluster) that we intend to run the workflow upon. A site is a homogeneous part of a cluster that has at least a single GRAM gatekeeper with a jobmanager-fork and jobmanager-<scheduler> interface and at least one gridftp server along with a shared file system. The GRAM gatekeeper can be either WS GRAM or Pre-WS GRAM. A site can also be a condor pool or glidein pool with a shared file system.

# Example
site = Site(LOCAL, arch=Arch.X86_64, os_type=OS.LINUX, os_release="rhel", os_version="7")\
    .add_directories(
        Directory(Directory.SHARED_SCRATCH, shared_scratch_dir)
            .add_file_servers(FileServer("file://" + shared_scratch_dir, Operation.ALL)),

        Directory(Directory.LOCAL_STORAGE, local_storage_dir)
            .add_file_servers(FileServer("file://" + local_storage_dir, Operation.ALL))
    )
Parameters
  • name (str) – name of the site

  • arch (Optional[Arch]) – the site’s architecture (e.g. Arch.X86_64), defaults to None

  • os_type (Optional[OS], optional) – the site’s operating system (e.g. OS.LINUX), defaults to None

  • os_release (Optional[str]) – the release of the site’s operating system, defaults to None

  • os_version (Optional[str]) – the version of the site’s operating system, defaults to None

Raises
  • ValueError – arch must be one of Arch

  • ValueError – os_type must be one of OS

add_directories(self, *directories: Directory)[source]

Add one or more Directory to this Site

Parameters

directory – the Directory to be added

Raises

TypeError – directory must be of type Directory

Returns

self

add_grids(self, *grids: Grid)[source]

Add one or more Grid to this Site

Parameters

grid – the Grid to be added

Raises

TypeError – grid must be of type Grid

Returns

self

class SiteCatalog[source]

Bases: Pegasus.api.writable.Writable

The SiteCatalog describes the compute resources, or Site s that we intend to run the workflow upon.

sc = SiteCatalog()

WORK_DIR = Path(".").resolve()

shared_scratch_dir = WORK_DIR / RUN_ID
local_storage_dir = WORK_DIR / "outputs" / RUN_ID

local = Site("local")\
                .add_directories(
                    Directory(Directory.SHARED_SCRATCH, shared_scratch_dir)
                        .add_file_servers(FileServer("file://" + shared_scratch_dir, Operation.ALL)),

                    Directory(Directory.LOCAL_STORAGE, local_storage_dir)
                        .add_file_servers(FileServer("file://" + local_storage_dir, Operation.ALL))
                )

condorpool = Site("condorpool")\
                .add_pegasus_profile(style="condor")\
                .add_pegasus_profile(auxillary_local="true")\
                .add_condor_profile(universe="vanilla")

sc.add_sites(local, condorpool)

sc.write()
add_sites(self, *sites: Site)[source]

Add one or more sites to this catalog

Parameters

site – the site to be added

Raises
  • TypeError – site must be of type Site

  • DuplicateError – a site with the same name already exists in this catalog

Returns

self

class SupportedJobs(value)[source]

Bases: enum.Enum

Types of jobs in the executable workflow this grid supports

AUXILLARY = 'auxillary'
CLEANUP = 'cleanup'
COMPUTE = 'compute'
REGISTER = 'register'
TRANSFER = 'transfer'

1.1.1.7. Pegasus.api.transformation_catalog module

class Container(name: str, container_type: Pegasus.api.transformation_catalog._ContainerType, image: str, arguments: Optional[str] = None, mounts: Optional[List[str]] = None, image_site: Optional[str] = None, checksum: Optional[Dict[str, str]] = None, metadata: Optional[Dict[str, Union[int, str, float]]] = None, bypass_staging: bool = False)[source]

Bases: Pegasus.api.mixins.ProfileMixin

Describes a container that can be added to the TransformationCatalog . Note that the checksum parameter refers to the checksum of the tar file of the image and not the specific version of an image (digest/content-addressable identifer in the case of Docker).

# Example 1: Docker
centos_pegasus = Container(
                    "centos-pegasus",
                    Container.DOCKER,
                    "docker:///ryan/centos-pegasus:latest",
                    arguments="--shm-size",
                    mounts=["/Volumes/Work/lfs1:/shared-data/:ro"]
                )

# Example 2: Singularity
fb_nlp = Container(
            "fb-nlp",
            Container.SINGULARITY,
            image="library://papajim/default/fb-nlp",
            mounts=["/data:/mnt:ro"]
        )
Parameters
  • name (str) – name of this container

  • container_type (_ContainerType) – a container type defined in Container

  • image (str) – image, such as docker:///rynge/montage:latest

  • arguments (Optional[str]) – additional cli arguments to be added to the docker container run or singularity exec commands when starting this container

  • mounts (Optional[List[str]]) – list of mount strings such as ['/Volumes/Work/lfs1:/shared-data/:ro', ...]

  • image_site (Optional[str]) – optional site attribute to tell pegasus which site tar file exists, defaults to None

  • checksum (Optional[Dict[str, str]]) – Dict containing checksums for the tar file of this image. Currently only sha256 is supported. This should be entered as {"sha256": <value>}, defaults to None

  • metadata (Optional[Dict[str, Union[int, str, float]]]) – Dict containing metadata key, value pairs associated with this container, defaults to None

  • bypass_staging (bool, optional) – whether or not to bypass the stage in job for this container, defaults to False

Raises

ValueError – container_type must be one of _ContainerType (Container.DOCKER | Container.SINGULARITY | Container.SHIFTER)

DOCKER = 'docker'
SHIFTER = 'shifter'
SINGULARITY = 'singularity'
class Transformation(name: str, namespace: Optional[str] = None, version: Optional[str] = None, site: Optional[str] = None, pfn: Optional[Union[str, pathlib.Path]] = None, is_stageable: bool = False, bypass_staging: bool = False, arch: Optional[Pegasus.api.site_catalog.Arch] = None, os_type: Optional[Pegasus.api.site_catalog.OS] = None, os_release: Optional[str] = None, os_version: Optional[str] = None, container: Optional[Union[Pegasus.api.transformation_catalog.Container, str]] = None, checksum: Optional[Dict[str, str]] = None)[source]

Bases: Pegasus.api.mixins.ProfileMixin, Pegasus.api.mixins.HookMixin, Pegasus.api.mixins.MetadataMixin

A transformation, which can be a standalone executable, or one that requires other executables.

When a transformation resides on a single site, the syntax in Example 1 can be used where the args: site and pfn are provided to the constructor. If site and pfn are specified, then the args: is_stageable, bypass_staging, arch, os_type, os_release, os_version, and container, are applied to the site, else they are ignored. When a transformation resides multiple sites, the syntax in Example 2 can be used where multiple TransformationSite objects can be added. Note that when specifying a checksum such as {"sha256": <value>} , this only applies stageable executables.

# Example 1: transformation that resides on a single site
preprocess = Transformation(
        "preprocess",
        namespace="pegasus",
        version="4.0",
        site="condorpool",
        pfn="/usr/bin/pegasus-keg",
        is_stageable=False,
        bypass_staging=False,
        arch=Arch.X86_64,
        os_type=OS.LINUX,
        container=centos_pegasus
    )

# Example 2: transformation that resides on multiple sites
preprocess = Transformation("preprocess", namespace="pegasus", version="4.0")\
                .add_sites(
                    TransformationSite(
                        "condorpool",
                        "/usr/bin/pegasus-keg",
                        is_stageable=False,
                        arch=Arch.X86_64,
                        os_type=OS.LINUX,
                        container="centos-pegasus"
                    ),
                    ...
                )
Parameters
  • name (str) – the logical name of the transformation

  • namespace (Optional[str]) – the namespace that this transformation belongs to, defaults to None

  • version (Optional[str]) – the version of this transformation (e.g. "1.1"), defaults to None

  • site (Optional[str]) – a Site specified in the SiteCatalog on which this transformation resides, defaults to None

  • pfn (Optional[Union[str, Path]]) – the physical filename of this transformation (e.g. "/usr/bin/tar"), defaults to None

  • is_stageable – whether or not this transformation is stageable or installed, defaults to False

  • bypass_staging (bool, optional) – whether or not to bypass the stage in job of this executable (Note that this only works for transformations where is_stageable=False), defaults to False

  • arch (Optional[Arch]) – architecture that this transformation was compiled for (defined in Arch , e.g Arch.X86_64), defaults to None

  • os_type (Optional[OS]) – name of os that this transformation was compiled for (defined in OS , e.g. OS.LINUX), defaults to None

  • os_release (Optional[str]) – release of os that this transformation was compiled for, defaults to None

  • os_version (Optional[str]) – version of os that this transformation was compiled for, defaults to None

  • container (Optional[Union[Container, str]]) – a Container or name of the container to be used for this transformation, defaults to None

  • checksum (Optional[Dict[str, str]]) – Dict containing checksums for this file. Currently only sha256 is given. This should be entered as {"sha256": <value>}, defaults to None

Raises
  • TypeError – container must be of type Container or str

  • TypeError – arch must be one of Arch

  • TypeError – os_type must be one of OS

  • ValueError – fields: namespace, name, and field must not contain any : (colons)

  • ValueError – pfn must be given as an absolute path when pathlib.Path is used

  • ValueErrorbypass_staging=True can only be used when is_stageable=True

add_requirement(self, required_transformation: Union[str, Transformation], namespace: str = None, version: str = None)[source]

Add a requirement to this transformation. Specify the other transformation, identified by name, namespace, and version, that this transformation depends upon. If a Transformation is passed in for required_transformation, then namespace and version are ignored.

Parameters

required_transformation (str or Transformation) – Transformation that this transformation requires

Raises
  • DuplicateError – this requirement already exists

  • ValueErrorrequired_transformation must be of type Transformation or str

  • ValueError – namespace, required transformation name, and version cannot contain any : (colon) characters

  • TypeError – required_transformation must be one of type str or Transformation

Returns

self

add_sites(self, *transformation_sites: TransformationSite)[source]

Add one or more TransformationSite s to this transformation.

Parameters

transformation_sites – the TransformationSite (s) to be added

Raises
Returns

self

class TransformationCatalog[source]

Bases: Pegasus.api.writable.Writable

Maintains a list a Transformation s and :py:class:~`Pegasus.api.transformation_catalog.Container` s

# Example
centos_pegasus = Container(
        "centos-pegasus",
        Container.DOCKER,
        "docker:///ryan/centos-pegasus:latest",
        mounts=["/Volumes/Work/lfs1:/shared-data/:ro"]
    )

preprocess = Transformation(
                "preprocess",
                site="condorpool",
                pfn="/usr/bin/pegasus-keg",
                is_stageable=False,
                arch=Arch.X86_64,
                os_type=OS.LINUX,
                container=centos_pegasus
            )

findrange = Transformation(
                "findrange",
                site="condorpool",
                pfn="/usr/bin/pegasus-keg",
                is_stageable=False,
                arch=Arch.X86_64,
                os_type=OS.LINUX,
                container=centos_pegasus
            )

analyze = Transformation(
                "analyze",
                site="condorpool",
                pfn="/usr/bin/pegasus-keg",
                is_stageable=False,
                arch=Arch.X86_64,
                os_type=OS.LINUX,
                container=centos_pegasus
            )

tc = TransformationCatalog()\
    .add_containers(centos_pegasus)\
    .add_transformations(preprocess, findrange, analyze)\
    .write()
add_containers(self, *containers: Container)[source]

Add one or more Container s to this catalog

# Example
tc.add_containers(
    Container(
        "centos-pegasus",
        Container.DOCKER,
        "docker:///ryan/centos-pegasus:latest",
        mounts=["/Volumes/Work/lfs1:/shared-data/:ro"]
    )
)
Parameters

containers – the Container to be added

Raises
  • TypeError – argument(s) must be of type Container

  • DuplicateError – a container with the same name already exists in this catalog

Returns

self

add_transformations(self, *transformations: Transformation)[source]

Add one or more Transformation s to this catalog

# Example
tc.add_transformations(
    Transformation(
            "analyze",
            site="condorpool",
            pfn="/usr/bin/pegasus-keg",
            is_stageable=False,
            arch=Arch.X86_64,
            os_type=OS.LINUX,
            container=centos_pegasus
        )
)
Parameters

transformations – the Transformation (s) to be added

Raises
Returns

self

class TransformationSite(name: str, pfn: Union[str, pathlib.Path], is_stageable: bool = False, bypass_staging: bool = False, arch: Optional[Pegasus.api.site_catalog.Arch] = None, os_type: Optional[Pegasus.api.site_catalog.OS] = None, os_release: Optional[str] = None, os_version: Optional[str] = None, container: Optional[Union[Pegasus.api.transformation_catalog.Container, str]] = None)[source]

Bases: Pegasus.api.mixins.ProfileMixin, Pegasus.api.mixins.MetadataMixin

Site specific information about a Transformation. A Transformation must contain at least one transformation site.

Parameters
  • name (str) – name of the site at which this Transformation resides

  • pfn (Union[str, Path]) – physical file name, an absolute path given as a str or Path

  • is_stageable – whether or not this transformation is stageable or installed, defaults to False

  • bypass_staging (bool, optional) – whether or not to bypass the stage in job of this executable (Note that this only works for transformations where is_stageable=False), defaults to False

  • arch (Optional[Arch], optional) – architecture that this Transformation was compiled for (defined in Arch), defaults to None

  • os_type (Optional[OS], optional) – name of os that this Transformation was compiled for (defined in OS), defaults to None

  • os_release (Optional[str], optional) – release of os that this Transformation was compiled for, defaults to None, defaults to None

  • os_version (Optional[str], optional) – version of os that this Transformation was compiled for, defaults to None, defaults to None

  • container (Optional[Union[Container, str]], optional) – specify the name of the container or Container object to use, optional

Raises
  • TypeError – arch must be one of Arch

  • TypeError – os_type must be one of OS

  • ValueError – pfn must be given as an absolute path when pathlib.Path is used

  • ValueErrorbypass_staging=True can only be used when is_stageable=True

1.1.1.8. Pegasus.api.workflow module

class AbstractJob(_id: Optional[str] = None, node_label: Optional[str] = None)[source]

Bases: Pegasus.api.mixins.HookMixin, Pegasus.api.mixins.ProfileMixin, Pegasus.api.mixins.MetadataMixin

An abstract representation of a workflow job

Parameters
  • _id (Optional[str]) – a unique id, if None is given then one will be assigned when this job is added to a Workflow, defaults to None

  • node_label (Optional[str]) – a short descriptive label that can be assined to this job, defaults to None

Note: avoid using IDs such as '0000008' or '00000009' as these may end up being unquoted by PyYaml, and consequently misinterpreted as integer values when read in by other tools.

add_args(self, *args: Union[File, int, float, str])[source]

Add arguments to this job. Each argument will be separated by a space. Each argument must be either a File, scalar, or str.

Parameters

args (Union[File, int, float, str]) – arguments to pass to this job (each arg in arg will be separated by a space)

Returns

self

add_checkpoint(self, checkpoint_file: File, stage_out: bool = True, register_replica: bool = True)[source]

Add an output File of this job as a checkpoint file

Parameters
  • checkpoint_file (File) – the File to be added as a checkpoint file to this job

  • stage_out (bool, optional) – whether or not to send files back to an output directory, defaults to True

  • register_replica (bool, optional) – whether or not to register replica with a ReplicaCatalog, defaults to True

Raises
  • DuplicateError – all output files must be unique

  • TypeError – a job output must be of type File

Returns

self

add_inputs(self, *input_files: File, bypass: bool = False)[source]

Add one or more File objects as input to this job

Parameters
  • input_files – the File objects to be added as inputs to this job

  • bypass_staging (bool, optional) – whether or not to bypass the staging site when this file is fetched by the job, defaults to False

Raises
  • DuplicateError – all input files must be unique

  • TypeError – job inputs must be of type File

Returns

self

add_outputs(self, *output_files: File, stage_out: bool = True, register_replica: bool = True)[source]

Add one or more File objects as outputs to this job. stage_out and register_replica will be applied to all files given.

Parameters
  • output_files – the File objects to be added as outputs to this job

  • stage_out (bool, optional) – whether or not to send files back to an output directory, defaults to True

  • register_replica (bool, optional) – whether or not to register replica with a ReplicaCatalog, defaults to True

Raises
  • DuplicateError – all output files must be unique

  • TypeError – a job output must be of type File

Returns

self

get_inputs()[source]

Get this job’s input File s

Returns

all input files associated with this job

Return type

set

get_outputs()[source]

Get this job’s output File objects

Returns

all output files associated with this job

Return type

set

get_stderr()[source]

Get the File being used for stderr

Returns

the stderr file

Return type

File

get_stdin()[source]

Get the File being used for stdin

Returns

the stdin file

Return type

File

get_stdout()[source]

Get the File being used for stdout

Returns

the stdout file

Return type

File

set_stderr(self, file: Union[str, File], stage_out: bool = True, register_replica: bool = True)[source]

Set stderr to a File . If file is given as a str, a File object is created for you internally with the given value as its lfn.

Parameters
  • file (Union[str, File]) – a file that stderr will be written to

  • stage_out (bool, optional) – whether or not to send files back to an output directory, defaults to True

  • register_replica (bool, optional) – whether or not to register replica with a ReplicaCatalog, defaults to True

Raises
  • TypeError – file must be of type File or str

  • DuplicateError – stderr is already set or the given file has already been added as an output to this job

Returns

self

set_stdin(self, file: Union[str, File])[source]

Set stdin to a File . If file is given as a str, a File object is created for you internally with the given value as its lfn.

Parameters

file (Union[str, File]) – a file that will be read into stdin

Raises
  • TypeError – file must be of type File or str

  • DuplicateError – stdin is already set or the given file has already been added as an input to this job

Returns

self

set_stdout(self, file: Union[str, File], stage_out: bool = True, register_replica: bool = True)[source]

Set stdout to a File . If file is given as a str, a File object is created for you internally with the given value as its lfn.

Parameters
  • file (Union[str, File]) – a file that stdout will be written to

  • stage_out (bool, optional) – whether or not to send files back to an output directory, defaults to True

  • register_replica (bool, optional) – whether or not to register replica with a ReplicaCatalog, defaults to True

Raises
  • TypeError – file must be of type File or str

  • DuplicateError – stdout is already set or the given file has already been added as an output to this job

Returns

self

class Job(transformation: Union[str, Pegasus.api.transformation_catalog.Transformation], _id: Optional[str] = None, node_label: Optional[str] = None, namespace: Optional[str] = None, version: Optional[str] = None)[source]

Bases: Pegasus.api.workflow.AbstractJob

A typical workflow Job that executes a Transformation. See AbstractJob for full list of available functions.

# Example
if1 = File("if1")
if2 = File("if2")

of1 = File("of1")
of2 = File("of2")

# Assuming a transformation named "analyze.py" has been added to your
# transformation catalog:
job = Job("analyze.py")\
        .add_args("-i", if1, if2, "-o", of1, of2)\
        .add_inputs(if1, if2)\
        .add_outputs(of1, of2, stage_out=True, register_replica=False)
Parameters
  • transformation (Union[str, Transformation]) – Transformation object or name of the transformation that this job uses

  • _id (Optional[str]) – a unique id; if none is given then one will be assigned when the job is added by a Workflow, defaults to None

  • node_label (Optional[str]) – a brief job description, defaults to None

  • namespace (Optional[str]) – namespace to which the Transformation belongs, defaults to None

  • version (Optional[str]) – version of the given Transformation, defaults to None

Raises

TypeError – transformation must be one of type Transformation or str

class SubWorkflow(file: Union[str, Pegasus.api.replica_catalog.File], is_planned: bool, _id: Optional[str] = None, node_label: Optional[str] = None)[source]

Bases: Pegasus.api.workflow.AbstractJob

Job that represents a subworkflow. See AbstractJob for full list of available functions.

Parameters
  • file (Union[str, File]) – File object or name of the workflow file that will be used for this job

  • is_planned (bool) – whether or not this subworkflow has already been planned by the Pegasus planner

  • _id (Optional[str]) – a unique id; if none is given then one will be assigned when the job is added by a Workflow, defaults to None

  • node_label (Optional[str]) – a brief job description, defaults to None

Raises

TypeError – file must be of type File or str

class Workflow(name: str, infer_dependencies: bool = True)[source]

Bases: Pegasus.api.writable.Writable, Pegasus.api.mixins.HookMixin, Pegasus.api.mixins.ProfileMixin, Pegasus.api.mixins.MetadataMixin

Represents multi-step computational steps as a directed acyclic graph.

# Example
import logging

from pathlib import Path

from Pegasus.api import *

logging.basicConfig(level=logging.DEBUG)

# --- Replicas -----------------------------------------------------------------
with open("f.a", "w") as f:
    f.write("This is sample input to KEG")

fa = File("f.a").add_metadata(creator="ryan")
rc = ReplicaCatalog().add_replica("local", fa, Path(".") / "f.a")

# --- Transformations ----------------------------------------------------------
preprocess = Transformation(
                "preprocess",
                site="condorpool",
                pfn="/usr/bin/pegasus-keg",
                is_stageable=False,
                arch=Arch.X86_64,
                os_type=OS.LINUX
            )

findrange = Transformation(
                "findrange",
                site="condorpool",
                pfn="/usr/bin/pegasus-keg",
                is_stageable=False,
                arch=Arch.X86_64,
                os_type=OS.LINUX
            )

analyze = Transformation(
                "analyze",
                site="condorpool",
                pfn="/usr/bin/pegasus-keg",
                is_stageable=False,
                arch=Arch.X86_64,
                os_type=OS.LINUX
            )

tc = TransformationCatalog().add_transformations(preprocess, findrange, analyze)

# --- Workflow -----------------------------------------------------------------
'''
                    [f.b1] - (findrange) - [f.c1] 
                    /                             \
[f.a] - (preprocess)                               (analyze) - [f.d]
                    \                             /
                    [f.b2] - (findrange) - [f.c2]

'''
wf = Workflow("blackdiamond")

fb1 = File("f.b1")
fb2 = File("f.b2")
job_preprocess = Job(preprocess)\
                    .add_args("-a", "preprocess", "-T", "3", "-i", fa, "-o", fb1, fb2)\
                    .add_inputs(fa)\
                    .add_outputs(fb1, fb2)

fc1 = File("f.c1")
job_findrange_1 = Job(findrange)\
                    .add_args("-a", "findrange", "-T", "3", "-i", fb1, "-o", fc1)\
                    .add_inputs(fb1)\
                    .add_outputs(fc1)

fc2 = File("f.c2")
job_findrange_2 = Job(findrange)\
                    .add_args("-a", "findrange", "-T", "3", "-i", fb2, "-o", fc2)\
                    .add_inputs(fb2)\
                    .add_outputs(fc2)

fd = File("f.d")
job_analyze = Job(analyze)\
                .add_args("-a", "analyze", "-T", "3", "-i", fc1, fc2, "-o", fd)\
                .add_inputs(fc1, fc2)\
                .add_outputs(fd)

wf.add_jobs(job_preprocess, job_findrange_1, job_findrange_2, job_analyze)
wf.add_replica_catalog(rc)
wf.add_transformation_catalog(tc)

try:
    wf.plan(submit=True)\
        .wait()\
        .analyze()\
        .statistics()
except PegasusClientError as e:
    print(e.output)
Parameters
  • name (str) – name of the Workflow

  • infer_dependencies (bool, optional) – whether or not to automatically compute job dependencies based on input and output files used by each job, defaults to True

Raises

ValueError – workflow name may not contain any / or spaces

add_dependency(self, job: Union[Job, SubWorkflow], *, parents: List[Union[Job, SubWorkflow]] = [], children: List[Union[Job, SubWorkflow]] = [])[source]

Add parent, child dependencies for a given job.

Parameters
  • job (AbstractJob) – the job to which parents and children will be assigned

  • parents (list, optional) – jobs to be added as parents to this job, defaults to []

  • children (list, optional) – jobs to be added as children of this job, defaults to []

Raises
  • ValueError – the given job(s) do not have ids assigned to them

  • DuplicateError – a dependency between two jobs already has been added

Returns

self

add_jobs(self, *jobs: Union[Job, SubWorkflow])[source]

Add one or more jobs at a time to the Workflow

Raises

DuplicateError – a job with the same id already exists in this workflow

Returns

self

add_replica_catalog(self, rc: ReplicaCatalog)[source]

Add a ReplicaCatalog to this workflow. The contents fo the replica catalog will be inlined into the same file as this workflow when it is written.

Parameters

rc (ReplicaCatalog) – the ReplicaCatalog to be added

Raises
Returns

self

add_site_catalog(self, sc: SiteCatalog)[source]

Add a SiteCatalog to this workflow. The contents fo the site catalog will be inlined into the same file as this workflow when it is written out.

Parameters

sc (SiteCatalog) – the SiteCatalog to be added

Raises
Returns

self

add_transformation_catalog(self, tc: TransformationCatalog)[source]

Add a TransformationCatalog to this workflow. The contents fo the transformation catalog will be inlined into the same file as this workflow when it is written.

Parameters

tc (TransformationCatalog) – the TransformationCatalog to be added

Raises
Returns

self

analyze(self, verbose: int = 0)[source]

Debug a workflow.

Parameters

verbose (int, optional) – verbosity, defaults to 0

Raises

PegasusClientError – pegasus-analyzer encountered an error

Returns

self

property braindump

Once this workflow has been planned using plan, the braindump file can be accessed for information such as user, submit_dir, and root_wf_uuid. For a full list of available attributes, see Braindump.

try:
    wf.plan(submit=True)

    print(wf.braindump.user)
    print(wf.braindump.submit_hostname)
    print(wf.braindump.submit_dir)
    print(wf.braindump.root_wf_uuid)
    print(wf.braindump.wf_uuid)
except PegasusClientError as e:
    print(e.output)
Getter

returns a Braindump object corresponding to the most recent call of plan

Return type

Pegasus.braindump.Braindump

Raises

PegasusErrorplan must be called before accessing the braindump file

get_job(_id: str)[source]

Retrieve the job with the given id

Parameters

_id (str) – id of the job to be retrieved from the Workflow

Raises

NotFoundError – a job with the given id does not exist in this workflow

Returns

the job with the given id

Return type

Job

graph(self, include_files: bool = True, no_simplify: bool = True, label: Literal[label, xform, id, xform-id, label-xform, label-id] = 'label', output: Optional[str] = None, remove: Optional[List[str]] = None, width: Optional[int] = None, height: Optional[int] = None)[source]

Convert workflow into a graphviz dot format

Parameters
  • include_files (bool, optional) – include files as nodes, defaults to True

  • no_simplify (bool, optional) – when set to False a transitive reduction is performed to remove extra edges, defaults to True

  • label (Literal["label", "xform", "id", "xform-id", "label-xform", "label-id"], optional) – what attribute to use for labels, defaults to “label”

  • output (Optional[str], optional) – write output to a file; if none is given output is written to stdout, defaults to None

  • remove (Optional[List[str]], optional) – remove one or more nodes by transformation name, defaults to None

  • width (Optional[int], optional) – width of the digraph, defaults to None

  • height (Optional[int], optional) – height of the digraph, defaults to None

Returns

self

Raises
  • PegasusError – workflow must be written to a file prior to invoking graph with write or plan

  • ValueError – label must be one of label, xform, id, xform-id, label-xform, or label-id

plan(self, conf: Optional[str] = None, basename: Optional[str] = None, job_prefix: Optional[str] = None, cluster: Optional[List[str]] = None, sites: Optional[List[str]] = None, output_sites: List[str] = ['local'], staging_sites: Optional[Dict[str, str]] = None, cache: Optional[List[Union[str, Path]]] = None, input_dirs: Optional[List[str]] = None, output_dir: Optional[str] = None, dir: Optional[str] = None, relative_dir: Optional[str] = None, random_dir: Union[bool, str, Path] = False, relative_submit_dir: Optional[str] = None, inherited_rc_files: Optional[List[Union[str, Path]]] = None, cleanup: str = 'inplace', reuse: Optional[List[Union[str, Path]]] = None, verbose: int = 0, quiet: int = 0, force: bool = False, force_replan: bool = False, forward: Optional[List[str]] = None, submit: bool = False, json: bool = False, java_options: Optional[List[str]] = None, **kwargs)[source]

Plan the workflow.

try:
    wf.plan(verbose=3, submit=True)
except PegasusClientError as e:
    print(e.output)
Parameters
  • conf (Optional[str]) – the path to the properties file to use for planning, defaults to None

  • basename (Optional[str]) – the basename prefix while constructing the per workflow files like .dag etc., defaults to None

  • job_prefix (Optional[str]) – the prefix to be applied while construction job submit filenames, defaults to None

  • cluster (Optional[List[str]]) – comma separated list of clustering techniques to be applied to the workflow to cluster jobs in to larger jobs, to avoid scheduling overheads., defaults to None

  • sites (Optional[List[str]]) – list of execution sites on which to map the workflow, defaults to None

  • output_sites (List[str]) – the output sites where the data products during workflow execution are transferred to, defaults to ["local"]

  • staging_sites (Optional[Dict[str,str]]) – key, value pairs of execution site to staging site mappings such as {"condorpool": "staging-site"}, defaults to None

  • cache (Optional[List[Union[str, Path]]]) – comma separated list of replica cache files, defaults to None

  • input_dirs (Optional[List[Union[str, Path]]]) – comma separated list of optional input directories where the input files reside on submit host, defaults to None

  • output_dir (Optional[Union[str, Path]]) – an optional output directory where the output files should be transferred to on submit host, defaults to None

  • dir (Optional[Union[str, Path]]) – the directory where to generate the executable workflow, defaults to None

  • relative_dir (Optional[str]) – the relative directory to the base directory where to generate the concrete workflow, defaults to None

  • random_dir (Union[bool, str, Path], optional) – if set to True, a random timestamp based name will be used for the execution directory that is created by the create dir jobs; else if a path is given as a str or pathlib.Path, then that will be used as the basename of the directory that is to be created, defaults to False

  • relative_submit_dir (Optional[str]) – the relative submit directory where to generate the concrete workflow. Overrides relative_dir, defaults to None

  • inherited_rc_files (Optional[List[Union[str, Path]]]) – comma separated list of replica files, defaults to None

  • cleanup (str, optional) – the cleanup strategy to use. Can be none|inplace|leaf|constraint, defaults to inplace

  • reuse (Optional[List[Union[str,Path]]]) – list of submit directories of previous runs from which to pick up for reuse (e.g. ["/workflows/submit_dir1", "/workflows/submit_dir2"]), defaults to None

  • verbose (int, optional) – verbosity, defaults to 0

  • quiet (int) – decreases the verbosity of messages about what is going on, defaults to 0

  • force (bool, optional) – skip reduction of the workflow, resulting in build style dag, defaults to False

  • force_replan (bool) – force replanning for sub workflows in case of failure, defaults to False

  • forward (Optional[List[str]]) – any options that need to be passed ahead to pegasus-run in format option[=value] (e.g. ["nogrid"]), defaults to None

  • submit (bool, optional) – submit the executable workflow generated, defaults to False

  • java_options (Optional[List[str]]) – pass to jvm a non standard option (e.g. ["mx1024m", "ms512m"]), defaults to None

Raises

PegasusClientError – pegasus-plan encountered an error

Returns

self

remove(self, verbose: int = 0)[source]

Removes this workflow that has been planned and submitted.

Parameters

verbose (int, optional) – verbosity, defaults to 0

Raises

PegasusClientError – pegasus-remove encountered an error

Returns

self

run(self, verbose: int = 0, json: bool = False, grid: bool = False)[source]

Run the planned workflow.

Parameters
  • verbose (int, optional) – verbosity, defaults to 0

  • grid (bool, optional) – enable checking for grids, defaults to False

Raises

PegasusClientError – pegasus-run encountered an error

Returns

self

property run_output

Get the json output from pegasus-run after it has been called.

try:
    wf.plan()
    wf.run()

    print(wf.run_output)
except PegasusClientError as e:
    print(e.output)
Raises

PegasusErrorrun must be called prior to accessing run_output

Returns

output of pegasus-run

Return type

dict

statistics(self, verbose: int = 0)[source]

Generate statistics about the workflow run.

Parameters

verbose (int, optional) – verbosity, defaults to 0

Raises

PegasusClientError – pegasus-statistics encountered an error

Returns

self

status(self, long: bool = False, verbose: int = 0)[source]

Monitor the workflow by quering Condor and directories.

Parameters
  • long (bool, optional) – Show all DAG states, including sub-DAGs, default only totals. defaults to False

  • verbose (int, optional) – verbosity, defaults to False

Raises

PegasusClientError – pegasus-status encountered an error

Returns

self

wait(self, delay: int = 5)[source]

Displays progress bar to stdout and blocks until the workflow either completes or fails.

Parameters

delay (int, optional) – refresh rate in seconds of the progress bar, defaults to 5

Raises

PegasusClientError – pegasus-status encountered an error

Returns

self

write(self, file: Optional[Union[str, TextIO]] = None, _format: str = 'yml')[source]

Write this workflow to a file. If no file is given, it will written to workflow.yml

Parameters
  • file (Optional[Union[str, TextIO]]) – path or file object (opened in “w” mode) to write to, defaults to None

  • _format (str, optional) – serialized format of the workflow object (this should be left as its default)

Raises

PegasusErrorSiteCatalog and TransformationCatalog must be written as a separate file for hierarchical workflows.

Returns

self

1.1.1.9. Pegasus.api.writable module

class Writable[source]

Bases: object

Derived class can be serialized to a json or yaml file

write(file: Optional[Union[str, TextIO]] = None, _format: str = 'yml')[source]

Serialize this class as either yaml or json and write to the given file. If file==None, this class will be written to a default file. The following classes have these defaults:

Default Files

Class

Default Filename

SiteCatalog

sites.yml

ReplicaCatalog

replicas.yml

TransformationCatalog

transformations.yml

Workflow

workflow.yml

Parameters
  • file (Optional[Union[str, TextIO]]) – path or file object (opened in “w” mode) to write to, defaults to None

  • _format (str, optional) – can be either “yml”, “yaml” or “json”, defaults to “yml”

Raises
  • ValueError – _format must be one of “yml”, “yaml” or “json”

  • TypeError – file must be a str or file object

1.1.1.10. Module contents