1.1.1. Pegasus.api package

1.1.1.1. Submodules

1.1.1.2. Pegasus.api.errors module

exception DuplicateError[source]

Bases: PegasusError

exception NotFoundError[source]

Bases: PegasusError

exception PegasusError[source]

Bases: Exception

1.1.1.3. Pegasus.api.mixins module

class EventType(value)[source]

Bases: Enum

Event type on which a hook will be triggered

ALL = 'all'
END = 'end'
ERROR = 'error'
NEVER = 'never'
START = 'start'
SUCCESS = 'success'
class HookMixin[source]

Bases: object

Derived class can have hooks assigned to it. This currently supports shell hooks. The supported hooks are triggered when some event, specified by EventType, takes place.

add_shell_hook(self, event_type: EventType, cmd: str)[source]

Add a shell hook. The given command will be executed by the shell when the specified EventType takes place.

# Example
wf.add_shell_hook(EventType.START, "echo 'hello'")
Parameters:
  • event_type (EventType) – an event type defined in EventType

  • cmd (str) – shell command

Raises:

ValueError – event_type must be one of EventType

Returns:

self

class MetadataMixin[source]

Bases: object

Derived class can have metadata assigned to it as key value pairs.

add_metadata(self, *args: Dict[str, str | int | float | bool], **kwargs)[source]

Add metadata key value pairs to this object

# Example 1
job.add_metadata({"key1": "value1"})

# Example 2
job.add_metadata(key1="value1, key2="value2")
Parameters:

args (Dict[str, Union[str, int, float, bool]]) – dictionary of key value pair to add as metadata

Raises:

TypeError – each arg in args must be a dict

Returns:

self

class Namespace(value)[source]

Bases: Enum

Profile Namespace values recognized by Pegasus

CONDOR = 'condor'
DAGMAN = 'dagman'
ENV = 'env'
GLOBUS = 'globus'
PEGASUS = 'pegasus'
SELECTOR = 'selector'
STAT = 'stat'
class ProfileMixin[source]

Bases: object

add_condor_profile(*, universe: str = None, periodic_release: str = None, periodic_remove: str = None, filesystem_domain: str = None, stream_error: bool = None, stream_output: bool = None, priority: str = None, request_cpus: str = None, request_gpus: str = None, request_memory: str = None, request_disk: str = None, requirements: str = None, should_transfer_files: str = None, when_to_transfer_output: str = None, condor_collector: str = None, grid_resource: str = None, cream_attributes: str = None)[source]

Add Condor profile(s).

The condor profiles permit to add or overwrite instructions in the Condor submit file.

Parameters:
  • universe (str, optional) – Pegasus defaults to either globus or scheduler universes. Set to standard for compute jobs that require standard universe. Set to vanilla to run natively in a condor pool, or to run on resources grabbed via condor glidein, defaults to None

  • periodic_release (str, optional) – is the number of times job is released back to the queue if it goes to HOLD, e.g. due to Globus errors. Pegasus defaults to 3, defaults to None

  • periodic_remove (str, optional) – is the number of times a job is allowed to get into HOLD state before being removed from the queue. Pegasus defaults to 3, defaults to None

  • filesystem_domain (str, optional) – Useful for Condor glide-ins to pin a job to a remote site, defaults to None

  • stream_error (bool, optional) – boolean to turn on the streaming of the stderr of the remote job back to submit host, defaults to None

  • stream_output (bool, optional) – boolean to turn on the streaming of the stdout of the remote job back to submit host, defaults to None

  • priority (str, optional) – integer value to assign the priority of a job. Higher value means higher priority. The priorities are only applied for vanilla / standard/ local universe jobs. Determines the order in which a users own jobs are executed, defaults to None

  • request_cpus (str, optional) – Number of CPU’s a job requires, defaults to None

  • request_gpus (str, optional) – Number of GPU’s a job requires, defaults to None

  • request_memory (str, optional) – Amount of memory a job requires. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None

  • request_disk (str, optional) – Amount of disk a job requires. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None

  • requirements (str, optional) – a job requirements expression such as "(CUDACapability >= 1.2) && $(requirements:True)"

  • should_transfer_files – Used to define if HTCondor should transfer files to and from the remote machine where the job runs, Given as "YES", "NO", or "IF_NEEDED"

  • when_to_transfer_output (str, optional) – Given as a one of "ON_EXIT", "ON_EXIT_OR_EVICT", or "ON_SUCCESS"

  • condor_collector (str, optional) – Specify the condor collector to use (e.g "ccg-testing999.isi.edu")

  • grid_resource (str, optional,) – Specify a grid resource such as "batch pbs"

  • cream_attributes (str, optional) – Additional cream attributes (e.g. "key1=value1;key2=value2")

Returns:

self

add_condor_profiles(*, universe: str = None, periodic_release: str = None, periodic_remove: str = None, filesystem_domain: str = None, stream_error: bool = None, stream_output: bool = None, priority: str = None, request_cpus: str = None, request_gpus: str = None, request_memory: str = None, request_disk: str = None, requirements: str = None, should_transfer_files: str = None, when_to_transfer_output: str = None, condor_collector: str = None, grid_resource: str = None, cream_attributes: str = None)

Add Condor profile(s).

The condor profiles permit to add or overwrite instructions in the Condor submit file.

Parameters:
  • universe (str, optional) – Pegasus defaults to either globus or scheduler universes. Set to standard for compute jobs that require standard universe. Set to vanilla to run natively in a condor pool, or to run on resources grabbed via condor glidein, defaults to None

  • periodic_release (str, optional) – is the number of times job is released back to the queue if it goes to HOLD, e.g. due to Globus errors. Pegasus defaults to 3, defaults to None

  • periodic_remove (str, optional) – is the number of times a job is allowed to get into HOLD state before being removed from the queue. Pegasus defaults to 3, defaults to None

  • filesystem_domain (str, optional) – Useful for Condor glide-ins to pin a job to a remote site, defaults to None

  • stream_error (bool, optional) – boolean to turn on the streaming of the stderr of the remote job back to submit host, defaults to None

  • stream_output (bool, optional) – boolean to turn on the streaming of the stdout of the remote job back to submit host, defaults to None

  • priority (str, optional) – integer value to assign the priority of a job. Higher value means higher priority. The priorities are only applied for vanilla / standard/ local universe jobs. Determines the order in which a users own jobs are executed, defaults to None

  • request_cpus (str, optional) – Number of CPU’s a job requires, defaults to None

  • request_gpus (str, optional) – Number of GPU’s a job requires, defaults to None

  • request_memory (str, optional) – Amount of memory a job requires. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None

  • request_disk (str, optional) – Amount of disk a job requires. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None

  • requirements (str, optional) – a job requirements expression such as "(CUDACapability >= 1.2) && $(requirements:True)"

  • should_transfer_files – Used to define if HTCondor should transfer files to and from the remote machine where the job runs, Given as "YES", "NO", or "IF_NEEDED"

  • when_to_transfer_output (str, optional) – Given as a one of "ON_EXIT", "ON_EXIT_OR_EVICT", or "ON_SUCCESS"

  • condor_collector (str, optional) – Specify the condor collector to use (e.g "ccg-testing999.isi.edu")

  • grid_resource (str, optional,) – Specify a grid resource such as "batch pbs"

  • cream_attributes (str, optional) – Additional cream attributes (e.g. "key1=value1;key2=value2")

Returns:

self

add_dagman_profile(*, pre: str | Path = None, pre_arguments: str = None, post: str = None, post_arguments: str = None, retry: str = None, category: str = None, priority: str = None, abort_dag_on: str = None, max_pre: str = None, max_post: str = None, max_jobs: str = None, max_idle: str = None, post_scope: str = None)[source]

Add Dagman profile(s).

Note, to add the profile keys “post.path.[value of dagman.post]” or [“category-name].maxjobs”, use the following:

job.add_profiles(Namespace.DAGMAN, key="post.path.<value of dagman.post>", value=<value string>)
job.add_profiles(Namespace.DAGMAN, key="<category-name>.maxjobs", value=<value string>)
Parameters:
  • pre (Union[str, Path], optional) – is the path to the pre-script. DAGMan executes the pre-script before it runs the job, defaults to None

  • pre_arguments (str, optional) – are command-line arguments for the pre-script, if any, defaults to None

  • post (str, optional) – is the postscript type/mode that a user wants to associate with a job (see docs for more information), defaults to None

  • post_arguments (str, optional) – are the command line arguments for the post script, if any, defaults to None

  • retry (int, optional) – is the number of times DAGMan retries the full job cycle from pre-script through post-script, if failure was detected, defaults to None

  • category (str, optional) – the DAGMan category the job belongs to, defaults to None

  • priority (int, optional) – the priority to apply to a job. DAGMan uses this to select what jobs to release when MAXJOBS is enforced for the DAG, defaults to None

  • abort_dag_on (str, optional) – The ABORT-DAG-ON key word provides a way to abort the entire DAG if a given node returns a specific exit code (AbortExitValue). The syntax for the value of the key is AbortExitValue [RETURN DAGReturnValue] . When a DAG aborts, by default it exits with the node return value that caused the abort. This can be changed by using the optional RETURN key word along with specifying the desired DAGReturnValue, defaults to None

  • max_pre (str, optional) – sets the maximum number of PRE scripts within the DAG that may be running at one time, defaults to None

  • max_post (str, optional) – sets the maximum number of POST scripts within the DAG that may be running at one time, defaults to None

  • max_jobs (str, optional) – sets the maximum number of jobs within the DAG that will be submitted to Condor at one time, defaults to None

  • max_idle (str, optional) – Sets the maximum number of idle jobs allowed before HTCondor DAGMan stops submitting more jobs. Once idle jobs start to run, HTCondor DAGMan will resume submitting jobs. If the option is omitted, the number of idle jobs is unlimited, defaults to None

  • post_scope (str, optional) – can be “all”, “none” or “essential” (see docs for more information), defaults to None

Returns:

self

add_dagman_profiles(*, pre: str | Path = None, pre_arguments: str = None, post: str = None, post_arguments: str = None, retry: str = None, category: str = None, priority: str = None, abort_dag_on: str = None, max_pre: str = None, max_post: str = None, max_jobs: str = None, max_idle: str = None, post_scope: str = None)

Add Dagman profile(s).

Note, to add the profile keys “post.path.[value of dagman.post]” or [“category-name].maxjobs”, use the following:

job.add_profiles(Namespace.DAGMAN, key="post.path.<value of dagman.post>", value=<value string>)
job.add_profiles(Namespace.DAGMAN, key="<category-name>.maxjobs", value=<value string>)
Parameters:
  • pre (Union[str, Path], optional) – is the path to the pre-script. DAGMan executes the pre-script before it runs the job, defaults to None

  • pre_arguments (str, optional) – are command-line arguments for the pre-script, if any, defaults to None

  • post (str, optional) – is the postscript type/mode that a user wants to associate with a job (see docs for more information), defaults to None

  • post_arguments (str, optional) – are the command line arguments for the post script, if any, defaults to None

  • retry (int, optional) – is the number of times DAGMan retries the full job cycle from pre-script through post-script, if failure was detected, defaults to None

  • category (str, optional) – the DAGMan category the job belongs to, defaults to None

  • priority (int, optional) – the priority to apply to a job. DAGMan uses this to select what jobs to release when MAXJOBS is enforced for the DAG, defaults to None

  • abort_dag_on (str, optional) – The ABORT-DAG-ON key word provides a way to abort the entire DAG if a given node returns a specific exit code (AbortExitValue). The syntax for the value of the key is AbortExitValue [RETURN DAGReturnValue] . When a DAG aborts, by default it exits with the node return value that caused the abort. This can be changed by using the optional RETURN key word along with specifying the desired DAGReturnValue, defaults to None

  • max_pre (str, optional) – sets the maximum number of PRE scripts within the DAG that may be running at one time, defaults to None

  • max_post (str, optional) – sets the maximum number of POST scripts within the DAG that may be running at one time, defaults to None

  • max_jobs (str, optional) – sets the maximum number of jobs within the DAG that will be submitted to Condor at one time, defaults to None

  • max_idle (str, optional) – Sets the maximum number of idle jobs allowed before HTCondor DAGMan stops submitting more jobs. Once idle jobs start to run, HTCondor DAGMan will resume submitting jobs. If the option is omitted, the number of idle jobs is unlimited, defaults to None

  • post_scope (str, optional) – can be “all”, “none” or “essential” (see docs for more information), defaults to None

Returns:

self

add_env(key: str | None = None, value: str | int | float | bool | Path | None = None, **kw)

Add environment variable(s)

add_globus_profile(*, count: int = None, job_type: str = None, max_cpu_time: int = None, max_memory: str = None, max_time: int = None, max_wall_time: int = None, min_memory: int = None, project: str = None, queue: str = None)[source]

Add Globus profile(s).

The globus profile namespace encapsulates Globus resource specification language (RSL) instructions. The RSL configures settings and behavior of the remote scheduling system.

Parameters:
  • count (int, optional) – the number of times an executable is started, defaults to None

  • job_type (str, optional) – specifies how the job manager should start the remote job. While Pegasus defaults to single, use mpi when running MPI jobs., defaults to None

  • max_cpu_time (int, optional) – the max CPU time in minutes for a single execution of a job, defaults to None

  • max_memory (str, optional) – the maximum memory in MB required for the job. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None

  • max_time (int, optional) – the maximum time or walltime in minutes for a single execution of a job, defaults to None

  • max_wall_time (int, optional) – the maximum walltime in minutes for a single execution of a job, defaults to None

  • min_memory (str, optional) – the minumum amount of memory required for this job. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None

  • project (str, optional) – associates an account with a job at the remote end, defaults to None

  • queue (str, optional) – the remote queue in which the job should be run. Used when remote scheduler is PBS that supports queues, defaults to None

Returns:

self

add_globus_profiles(*, count: int = None, job_type: str = None, max_cpu_time: int = None, max_memory: str = None, max_time: int = None, max_wall_time: int = None, min_memory: int = None, project: str = None, queue: str = None)

Add Globus profile(s).

The globus profile namespace encapsulates Globus resource specification language (RSL) instructions. The RSL configures settings and behavior of the remote scheduling system.

Parameters:
  • count (int, optional) – the number of times an executable is started, defaults to None

  • job_type (str, optional) – specifies how the job manager should start the remote job. While Pegasus defaults to single, use mpi when running MPI jobs., defaults to None

  • max_cpu_time (int, optional) – the max CPU time in minutes for a single execution of a job, defaults to None

  • max_memory (str, optional) – the maximum memory in MB required for the job. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None

  • max_time (int, optional) – the maximum time or walltime in minutes for a single execution of a job, defaults to None

  • max_wall_time (int, optional) – the maximum walltime in minutes for a single execution of a job, defaults to None

  • min_memory (str, optional) – the minumum amount of memory required for this job. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None

  • project (str, optional) – associates an account with a job at the remote end, defaults to None

  • queue (str, optional) – the remote queue in which the job should be run. Used when remote scheduler is PBS that supports queues, defaults to None

Returns:

self

add_pegasus_profile(*, clusters_num: int = None, clusters_size: int = None, job_aggregator: int = None, job_aggregator_arguments: str = None, grid_start: int = None, grid_start_path: str = None, grid_start_arguments: str = None, grid_start_launcher: str = None, grid_start_launcher_arguments: str = None, stagein_clusters: int = None, stagein_local_clusters: int = None, stagein_remote_clusters: int = None, stageout_clusters: int = None, stageout_local_clusters: int = None, stageout_remote_clusters: int = None, group: str = None, change_dir: bool = None, create_dir: bool = None, transfer_proxy: bool = None, style: str = None, pmc_request_memory: str = None, pmc_request_cpus: int = None, pmc_priority: int = None, pmc_task_arguments: str = None, exitcode_failure_msg: str = None, exitcode_success_msg: str = None, checkpoint_time: int = None, max_walltime: int = None, glite_arguments: str = None, auxillary_local: bool = None, condor_arguments_quote: bool = None, runtime: str = None, clusters_max_runtime: int = None, cores: int = None, gpus: int = None, nodes: int = None, ppn: int = None, memory: str = None, diskspace: str = None, data_configuration=None, queue: str = None, project: str = None, boto_config: str = None, container_arguments: str = None, label: str = None, pegasus_lite_env_source: str | Path = None, SSH_PRIVATE_KEY: str = None, relative_submit_dir: str | Path = None)[source]

Add Pegasus profile(s).

Parameters:
  • clusters_num (int, optional) – Determines the total number of clusters per level, jobs are evenly spread across clusters (see Pegasus Clustering Guide for more information), defaults to None

  • clusters_size (int, optional) –

    Determines the number of jobs in each cluster (see Pegasus Clustering Guide for more information), defaults to None

  • job_aggregator (int, optional) – Indicates the clustering executable that is used to run the clustered job on the remote site, defaults to None

  • job_aggregator_arguments (str, optional) – Additional arguments with which a clustering executable should be invoked, defaults to None

  • grid_start (int, optional) – Determines the executable for launching a job (see docs for more information), defaults to None

  • grid_start_path (str, optional) – Sets the path to the gridstart . This profile is best set in the Site Catalog, defaults to None

  • grid_start_arguments (str, optional) – Sets the arguments with which GridStart is used to launch a job on the remote site, defaults to None

  • grid_start_launcher (str, optional) – specifies the path to the executable to launch kickstart, defaults to None,

  • grid_start_launcher_arguments (str, optional) – the arguments to the launcher executable if any

  • stagein_clusters (int, optional) – This key determines the maximum number of stage-in jobs that are can executed locally or remotely per compute site per workflow. This is used to configure the BalancedCluster Transfer Refiner, which is the Default Refiner used in Pegasus. This profile is best set in the Site Catalog or in the Properties file, defaults to None

  • stagein_local_clusters (int, optional) – This key provides finer grained control in determining the number of stage-in jobs that are executed locally and are responsible for staging data to a particular remote site. This profile is best set in the Site Catalog or in the Properties file, defaults to None

  • stagein_remote_clusters (int, optional) – This key provides finer grained control in determining the number of stage-in jobs that are executed remotely on the remote site and are responsible for staging data to it. This profile is best set in the Site Catalog or in the Properties file, defaults to None

  • stageout_clusters (int, optional) – This key determines the maximum number of stage-out jobs that are can executed locally or remotely per compute site per workflow. This is used to configure the BalancedCluster Transfer Refiner, , which is the Default Refiner used in Pegasus, defaults to None

  • stageout_local_clusters (int, optional) – This key provides finer grained control in determining the number of stage-out jobs that are executed locally and are responsible for staging data from a particular remote site. This profile is best set in the Site Catalog or in the Properties file, defaults to None

  • stageout_remote_clusters (int, optional) – This key provides finer grained control in determining the number of stage-out jobs that are executed remotely on the remote site and are responsible for staging data from it. This profile is best set in the Site Catalog or in the Properties file, defaults to None

  • group (str, optional) – Tags a job with an arbitrary group identifier. The group site selector makes use of the tag, defaults to None

  • change_dir (bool, optional) – If true, tells kickstart to change into the remote working directory. Kickstart itself is executed in whichever directory the remote scheduling system chose for the job, defaults to None

  • create_dir (bool, optional) – If true, tells kickstart to create the the remote working directory before changing into the remote working directory. Kickstart itself is executed in whichever directory the remote scheduling system chose for the job, defaults to None

  • transfer_proxy (bool, optional) – If true, tells Pegasus to explicitly transfer the proxy for transfer jobs to the remote site. This is useful, when you want to use a full proxy at the remote end, instead of the limited proxy that is transferred by CondorG, defaults to None

  • style (str, optional) – Sets the condor submit file style. If set to globus, submit file generated refers to CondorG job submissions. If set to condor, submit file generated refers to direct Condor submission to the local Condor pool. It applies for glidein, where nodes from remote grid sites are glided into the local condor pool. The default style that is applied is globus, defaults to None

  • pmc_request_memory (str, optional) – This key is used to set the -m option for pegasus-mpi-cluster. It specifies the amount of memory in MB that a job requires. This profile is usually set in the DAX for each job. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None

  • pmc_request_cpus (int, optional) – This key is used to set the -c option for pegasus-mpi-cluster. It specifies the number of cpu’s that a job requires. This profile is usually set in the DAX for each job, defaults to None

  • pmc_priority (int, optional) – This key is used to set the -p option for pegasus-mpi-cluster. It specifies the priority for a job . This profile is usually set in the DAX for each job. Negative values are allowed for priorities, defaults to None

  • pmc_task_arguments (str, optional) – The key is used to pass any extra arguments to the PMC task during the planning time. They are added to the very end of the argument string constructed for the task in the PMC file. Hence, allows for overriding of any argument constructed by the planner for any particular task in the PMC job, defaults to None

  • exitcode_failure_msg (str, optional) – The message string that pegasus-exitcode searches for in the stdout and stderr of the job to flag failures, defaults to None

  • exitcode_success_msg (str, optional) – The message string that pegasus-exitcode searches for in the stdout and stderr of the job to determine whether a job logged it’s success message or not. Note this value is used to check for whether a job failed or not i.e if this profile is specified, and pegasus-exitcode DOES NOT find the string in the job stdout or stderr, the job is flagged as failed. The complete rules for determining failure are described in the man page for pegasus-exitcode, defaults to None

  • checkpoint_time (int, optional) – the expected time in minutes for a job after which it should be sent a TERM signal to generate a job checkpoint file, defaults to None

  • max_walltime (int, optional) – the maximum walltime in minutes for a single execution of a job, defaults to None

  • glite_arguments (str, optional) – specifies the extra arguments that must appear in the local PBS generated script for a job, when running workflows on a local cluster with submissions through Glite. This is useful when you want to pass through special options to underlying LRMS such as PBS e.g. you can set value -l walltime=01:23:45 -l nodes=2 to specify your job’s resource requirements, defaults to None

  • auxillary_local (bool, optional) – indicates whether auxillary jobs associated with a compute site X, can be run on local site. This CAN ONLY be specified as a profile in the site catalog and should be set when the compute site filesystem is accessible locally on the submit host, defaults to None

  • condor_arguments_quote (bool, optional) – indicates whether condor quoting rules should be applied for writing out the arguments key in the condor submit file. By default it is true unless the job is schedule to a glite style site. The value is automatically set to false for glite style sites, as condor quoting is broken in batch_gahp, defaults to None

  • runtime (str, optional) – Specifies the expected runtime of a job in seconds, defaults to None

  • clusters_max_runtime (int, optional) – Specifies the maximum runtime of a job, defaults to None

  • cores (int, optional) – The total number of cores required for a job. This is also used for accounting purposes in the database while generating statistics. It corresponds to the multiplier_factor in the job_instance table, defaults to None

  • gpus (int, optional) – The total number of gpus required for a job, defaults to None

  • nodes (int, optional) – Indicates the number of nodes a job requires, defaults to None

  • ppn (int, optional) – Indicates the number of processors per node. This profile is best set in the Site Catalog and usually set when running workflows with MPI jobs, defaults to None

  • memory (str, optional) – Indicates the maximum memory a job requires. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None

  • diskspace (int, optional) – Indicates the maximum diskspace a job requires in MB. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None

  • data_configuration (str, optional) – Indicates the data configuration setup. Can be one of “sharedfs”, “condorio”, or “nonsharedfs”, defaults to None

  • queue (str, optional) – This specifies the queue for the job. (e.g. "normal")

  • project (str, optional) – Causes the job time to be charged to or associated with a particular project/account. This is not used for SGE.

  • boto_config (str, optional) – Specified which .boto file to use (e.g. "/home/myuser/.boto")

  • container_arguments (str, optional) – additional cli arguments that will be added to the docker container run or singularity exec command

  • label (str, optional) – associate a label to a job; used for label based clustering

  • pegasus_lite_env_source (Union[str, Path], optional) – specify a path on the submit host to indicate the file that needs to be sourced, defaults to None

  • SSH_PRIVATE_KEY (str, optional) – path to the ssh private key which will be used in this workflow (it is recommended that a special set of keys are created specifically for workflow use)

  • relative_submit_dir (Union[str, Path], optional) – specify the relative directory a job’s submit files are written to, defaults to None

add_pegasus_profiles(*, clusters_num: int = None, clusters_size: int = None, job_aggregator: int = None, job_aggregator_arguments: str = None, grid_start: int = None, grid_start_path: str = None, grid_start_arguments: str = None, grid_start_launcher: str = None, grid_start_launcher_arguments: str = None, stagein_clusters: int = None, stagein_local_clusters: int = None, stagein_remote_clusters: int = None, stageout_clusters: int = None, stageout_local_clusters: int = None, stageout_remote_clusters: int = None, group: str = None, change_dir: bool = None, create_dir: bool = None, transfer_proxy: bool = None, style: str = None, pmc_request_memory: str = None, pmc_request_cpus: int = None, pmc_priority: int = None, pmc_task_arguments: str = None, exitcode_failure_msg: str = None, exitcode_success_msg: str = None, checkpoint_time: int = None, max_walltime: int = None, glite_arguments: str = None, auxillary_local: bool = None, condor_arguments_quote: bool = None, runtime: str = None, clusters_max_runtime: int = None, cores: int = None, gpus: int = None, nodes: int = None, ppn: int = None, memory: str = None, diskspace: str = None, data_configuration=None, queue: str = None, project: str = None, boto_config: str = None, container_arguments: str = None, label: str = None, pegasus_lite_env_source: str | Path = None, SSH_PRIVATE_KEY: str = None, relative_submit_dir: str | Path = None)

Add Pegasus profile(s).

Parameters:
  • clusters_num (int, optional) –

    Determines the total number of clusters per level, jobs are evenly spread across clusters (see Pegasus Clustering Guide for more information), defaults to None

  • clusters_size (int, optional) –

    Determines the number of jobs in each cluster (see Pegasus Clustering Guide for more information), defaults to None

  • job_aggregator (int, optional) – Indicates the clustering executable that is used to run the clustered job on the remote site, defaults to None

  • job_aggregator_arguments (str, optional) – Additional arguments with which a clustering executable should be invoked, defaults to None

  • grid_start (int, optional) – Determines the executable for launching a job (see docs for more information), defaults to None

  • grid_start_path (str, optional) – Sets the path to the gridstart . This profile is best set in the Site Catalog, defaults to None

  • grid_start_arguments (str, optional) – Sets the arguments with which GridStart is used to launch a job on the remote site, defaults to None

  • grid_start_launcher (str, optional) – specifies the path to the executable to launch kickstart, defaults to None,

  • grid_start_launcher_arguments (str, optional) – the arguments to the launcher executable if any

  • stagein_clusters (int, optional) – This key determines the maximum number of stage-in jobs that are can executed locally or remotely per compute site per workflow. This is used to configure the BalancedCluster Transfer Refiner, which is the Default Refiner used in Pegasus. This profile is best set in the Site Catalog or in the Properties file, defaults to None

  • stagein_local_clusters (int, optional) – This key provides finer grained control in determining the number of stage-in jobs that are executed locally and are responsible for staging data to a particular remote site. This profile is best set in the Site Catalog or in the Properties file, defaults to None

  • stagein_remote_clusters (int, optional) – This key provides finer grained control in determining the number of stage-in jobs that are executed remotely on the remote site and are responsible for staging data to it. This profile is best set in the Site Catalog or in the Properties file, defaults to None

  • stageout_clusters (int, optional) – This key determines the maximum number of stage-out jobs that are can executed locally or remotely per compute site per workflow. This is used to configure the BalancedCluster Transfer Refiner, , which is the Default Refiner used in Pegasus, defaults to None

  • stageout_local_clusters (int, optional) – This key provides finer grained control in determining the number of stage-out jobs that are executed locally and are responsible for staging data from a particular remote site. This profile is best set in the Site Catalog or in the Properties file, defaults to None

  • stageout_remote_clusters (int, optional) – This key provides finer grained control in determining the number of stage-out jobs that are executed remotely on the remote site and are responsible for staging data from it. This profile is best set in the Site Catalog or in the Properties file, defaults to None

  • group (str, optional) – Tags a job with an arbitrary group identifier. The group site selector makes use of the tag, defaults to None

  • change_dir (bool, optional) – If true, tells kickstart to change into the remote working directory. Kickstart itself is executed in whichever directory the remote scheduling system chose for the job, defaults to None

  • create_dir (bool, optional) – If true, tells kickstart to create the the remote working directory before changing into the remote working directory. Kickstart itself is executed in whichever directory the remote scheduling system chose for the job, defaults to None

  • transfer_proxy (bool, optional) – If true, tells Pegasus to explicitly transfer the proxy for transfer jobs to the remote site. This is useful, when you want to use a full proxy at the remote end, instead of the limited proxy that is transferred by CondorG, defaults to None

  • style (str, optional) – Sets the condor submit file style. If set to globus, submit file generated refers to CondorG job submissions. If set to condor, submit file generated refers to direct Condor submission to the local Condor pool. It applies for glidein, where nodes from remote grid sites are glided into the local condor pool. The default style that is applied is globus, defaults to None

  • pmc_request_memory (str, optional) – This key is used to set the -m option for pegasus-mpi-cluster. It specifies the amount of memory in MB that a job requires. This profile is usually set in the DAX for each job. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None

  • pmc_request_cpus (int, optional) – This key is used to set the -c option for pegasus-mpi-cluster. It specifies the number of cpu’s that a job requires. This profile is usually set in the DAX for each job, defaults to None

  • pmc_priority (int, optional) – This key is used to set the -p option for pegasus-mpi-cluster. It specifies the priority for a job . This profile is usually set in the DAX for each job. Negative values are allowed for priorities, defaults to None

  • pmc_task_arguments (str, optional) – The key is used to pass any extra arguments to the PMC task during the planning time. They are added to the very end of the argument string constructed for the task in the PMC file. Hence, allows for overriding of any argument constructed by the planner for any particular task in the PMC job, defaults to None

  • exitcode_failure_msg (str, optional) – The message string that pegasus-exitcode searches for in the stdout and stderr of the job to flag failures, defaults to None

  • exitcode_success_msg (str, optional) – The message string that pegasus-exitcode searches for in the stdout and stderr of the job to determine whether a job logged it’s success message or not. Note this value is used to check for whether a job failed or not i.e if this profile is specified, and pegasus-exitcode DOES NOT find the string in the job stdout or stderr, the job is flagged as failed. The complete rules for determining failure are described in the man page for pegasus-exitcode, defaults to None

  • checkpoint_time (int, optional) – the expected time in minutes for a job after which it should be sent a TERM signal to generate a job checkpoint file, defaults to None

  • max_walltime (int, optional) – the maximum walltime in minutes for a single execution of a job, defaults to None

  • glite_arguments (str, optional) – specifies the extra arguments that must appear in the local PBS generated script for a job, when running workflows on a local cluster with submissions through Glite. This is useful when you want to pass through special options to underlying LRMS such as PBS e.g. you can set value -l walltime=01:23:45 -l nodes=2 to specify your job’s resource requirements, defaults to None

  • auxillary_local (bool, optional) – indicates whether auxillary jobs associated with a compute site X, can be run on local site. This CAN ONLY be specified as a profile in the site catalog and should be set when the compute site filesystem is accessible locally on the submit host, defaults to None

  • condor_arguments_quote (bool, optional) – indicates whether condor quoting rules should be applied for writing out the arguments key in the condor submit file. By default it is true unless the job is schedule to a glite style site. The value is automatically set to false for glite style sites, as condor quoting is broken in batch_gahp, defaults to None

  • runtime (str, optional) – Specifies the expected runtime of a job in seconds, defaults to None

  • clusters_max_runtime (int, optional) – Specifies the maximum runtime of a job, defaults to None

  • cores (int, optional) – The total number of cores required for a job. This is also used for accounting purposes in the database while generating statistics. It corresponds to the multiplier_factor in the job_instance table, defaults to None

  • gpus (int, optional) – The total number of gpus required for a job, defaults to None

  • nodes (int, optional) – Indicates the number of nodes a job requires, defaults to None

  • ppn (int, optional) – Indicates the number of processors per node. This profile is best set in the Site Catalog and usually set when running workflows with MPI jobs, defaults to None

  • memory (str, optional) – Indicates the maximum memory a job requires. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None

  • diskspace (int, optional) – Indicates the maximum diskspace a job requires in MB. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None

  • data_configuration (str, optional) – Indicates the data configuration setup. Can be one of “sharedfs”, “condorio”, or “nonsharedfs”, defaults to None

  • queue (str, optional) – This specifies the queue for the job. (e.g. "normal")

  • project (str, optional) – Causes the job time to be charged to or associated with a particular project/account. This is not used for SGE.

  • boto_config (str, optional) – Specified which .boto file to use (e.g. "/home/myuser/.boto")

  • container_arguments (str, optional) – additional cli arguments that will be added to the docker container run or singularity exec command

  • label (str, optional) – associate a label to a job; used for label based clustering

  • pegasus_lite_env_source (Union[str, Path], optional) – specify a path on the submit host to indicate the file that needs to be sourced, defaults to None

  • SSH_PRIVATE_KEY (str, optional) – path to the ssh private key which will be used in this workflow (it is recommended that a special set of keys are created specifically for workflow use)

  • relative_submit_dir (Union[str, Path], optional) – specify the relative directory a job’s submit files are written to, defaults to None

add_profiles(self, ns: Namespace, key: str | None = None, value: str | int | float | bool | Path | None = None, **kw)[source]

Add a profile.

If key and value are given, then **kw are ignored and {Namespace::key : value} is added. Else **kw is added. When the value of “key” is not a valid python variable name, the usage in Example #1 should be used, else follow the usage shown in Example #2.

# Example 1
job.add_profiles(Namespace.DAGMAN, key="pre.arguments", value="-i f1")

# Example 2
job.add_profiles(Namespace.ENV, JAVA_HOME="/usr/bin/java", USER="ryan")

For add_globus_profiles(), add_condor_profiles(), add_dagman_profiles(), add_selector_profiles(), and add_pegasus_profiles(), if a profile key you are trying to use is not listed as a key word argument, use this function to add the profile.

Raises:

TypeError – namespace must be one of Namespace

Returns:

self

add_selector_profile(*, execution_site: str = None, pfn: str | Path = None, grid_job_type: str = None)[source]

Add Selector(s).

The Selector namespace allows users to override the beahvior of the Workflow Mapper during site selection. This gives you finer grained control over where a job executes and what executable it refers to.

Parameters:
  • execution_site (str, optional) – the execution site where a job should be executed, defaults to None

  • pfn (Union[str, Path], optional) – the physical file name to the main executable that a job refers to. Overrides any entries specified in the transformation catalog, defaults to None

  • grid_job_type (str, optional) – This profile is usually used to ensure that a compute job executes on another job manager (see docs for more information), defaults to None

Returns:

self

add_selector_profiles(*, execution_site: str = None, pfn: str | Path = None, grid_job_type: str = None)

Add Selector(s).

The Selector namespace allows users to override the beahvior of the Workflow Mapper during site selection. This gives you finer grained control over where a job executes and what executable it refers to.

Parameters:
  • execution_site (str, optional) – the execution site where a job should be executed, defaults to None

  • pfn (Union[str, Path], optional) – the physical file name to the main executable that a job refers to. Overrides any entries specified in the transformation catalog, defaults to None

  • grid_job_type (str, optional) – This profile is usually used to ensure that a compute job executes on another job manager (see docs for more information), defaults to None

Returns:

self

to_kb(value: str) int[source]

Convert the given value to KB

Parameters:

value (str) – str formatted as str formatted as '<int> [MB | GB | TB | PB | EB | ZB | YB]'

Raises:

ValueError – invalid format

Returns:

value in KB

Return type:

int

to_mb(value: str) int[source]

Convert the given value to MB

Parameters:

value (str) – str formatted as str formatted as '<int> [MB | GB | TB | PB | EB | ZB | YB]'

Raises:

ValueError – invalid format

Returns:

value in MB

Return type:

int

1.1.1.4. Pegasus.api.properties module

class Properties[source]

Bases: object

Write Pegasus properties to a file.

# Example
props = Properties()
props["globus.maxtime"] = 900
props["globus.maxwalltime"] = 1000
props["dagman.retry"] = 4

props.write()
add_site_profile(site, namespace, key, value)[source]

Maps a site profile to a property that can be picked up by the planner, to override site catalog entries loaded from the Site Catalog.

Parameters:
  • site – the site

  • namespace – the namespace for which profile has to be added

  • key – the profile key

  • value – the profile value

Returns:

static ls(prop: str | None = None)[source]

List property keys. Refer to Configuration docs for additional information. If prop is given, all properties containing prop will be printed, else all properties will be printed.

# Example
>>> P.ls("request")
condor.request_cpus
condor.request_disk
condor.request_gpus
condor.request_memory
pegasus.pmc_request_cpus
pegasus.pmc_request_memory
Parameters:

prop (Optional[str]) – properties containing “prop” will be listed in alphabetical order, defaults to None

write(file: str | TextIO | None = None)[source]

Write these properties to a file. If file is not given, these properties are written to ./pegasus.properties

# Example 1
props.write()

# Example 2
with open("conf", "w") as f:
    props.write(f)
Parameters:

file (Optional[Union[str, TextIO]]) – file path or file object where properties will be written to, defaults to None

Raises:

TypeError – file must be of type str or file like object

1.1.1.5. Pegasus.api.replica_catalog module

class File(lfn: str, size: int | None = None, for_planning: bool | None = False)[source]

Bases: MetadataMixin

A workflow File. This class is used to represent the inputs and outputs of a Job.

# Example
input_file = File("data.txt").add_metadata(creator="ryan")
Parameters:
  • lfn (str) – a unique logical filename

  • size (int) – size in bytes, defaults to None

  • for_planning (bool) – indicate that a file is to be used for planning purposes

class ReplicaCatalog[source]

Bases: Writable

Maintains a mapping of logical filenames to physical filenames. Any input files to the workflow are specified here so that Pegasus knows where to obtain them.

# Example
if1 = File("if")
if2 = File("if2")

rc = ReplicaCatalog()\
    .add_replica("local", if1, "/nfs/u2/ryan/data.csv")\
    .add_replica("local", "if2", "/nfs/u2/ryan/data2.csv")\
    .write()
add_regex_replica(self, site: str, pattern: str, pfn: str | Path, metadata: Dict[str, int | str | float] | None = None)[source]

Add an entry to this replica catalog using a regular expression pattern. Note that regular expressions should follow Java regular expression syntax as the underlying code that handles this catalog is Java based.

# Example 1: Match f<any-character>a i.e. faa, f.a, f0a, etc.
rc.add_regex_replica("local", "f.a", "/Volumes/data/input/f.a")

# Example 2: Using groupings
rc.add_regex_replica("local", "alpha\.(csv|txt|xml)", "/Volumes/data/input/[1]/[0]")

# If the file being looked up is alpha.csv, the pfn for the file will be
# generated as /Volumes/data/input/csv/alpha.csv

# Example 3: Specifying a default location for all lfns that don't match any
# regular expressions. Note that this should be the last entry into the replica
# catalog if used.

rc.add_regex_replica("local", ".*", Path("/Volumes/data") / "input/[0]")
Parameters:
  • site (str) – the site at which this replica (file) resides

  • pattern (str) – regular expression used to match a file

  • pfn (Union[str, Path]) – path to the file (may also be a pattern as shown in the example above)

  • metadata (Optional[Dict[str, Union[int, str, float]]]) – any metadata to be associated with the matched files, for example: {"creator": "pegasus"}, defaults to None

Raises:

DuplicateError – Duplicate patterns with different PFNs are currently not supported

add_replica(self, site: str, lfn: str | File, pfn: str | Path, checksum: Dict[str, str] | None = None, metadata: Optiona[Dict[str, int | str | float]] = None)[source]

Add an entry to this replica catalog.

# Example 1
f = File("in.txt").add_metadata(creator="pegasus")
rc.add_replica("local", f, Path(".").resolve() / "in.txt")

# Example 2: Adding metadata and a checksum
rc.add_replica(
    "local",
    "in.txt",
    "/home/ryan/wf/in.txt",
    checksum={"sha256": "abc123"},
    metadata={"creator": "pegasus"}
)

# Example 3: Adding multiple pfns for the same lfn (metadata and checksum will be
# updated for that lfn if given.
rc.add_replica("local", "in.txt", Path(".").resolve() / "in.txt")
rc.add_replica("condorpool", "in.txt", "/path/to/file/in.txt")
Parameters:
  • site (str) – the site at which this replica (file) resides

  • lfn (Union[str, File]) – logical file name

  • pfn (Union[str, Path]) – physical file name such as Path("f.txt").resolve(), /home/ryan/file.txt, or http://pegasus.isi.edu/file.txt

  • checksum (Optional[Dict[str, str]], optional) – Dict containing checksums for this file. Currently only sha256 is given. This should be entered as {"sha256": <value>}, defaults to None

  • metadata (Optional[Dict[str, Union[int, str, float]]], optional) – metadata key value pairs associated with this lfn such as {"created": "Thu Jun 18 22:18:36 PDT 2020", "owner": "pegasus"}, defaults to None

Raises:
  • ValueError – if pfn is given as a pathlib.Path, it must be an absolute path

  • ValueError – an unsupported checksum type was given

1.1.1.6. Pegasus.api.site_catalog module

class Arch(value)[source]

Bases: Enum

Architecture types

AARCH64 = 'aarch64'
AMD64 = 'amd64'
IA64 = 'ia64'
PPC = 'ppc'
PPC64LE = 'ppc64le'
PPC_64 = 'ppc_64'
SPARCV7 = 'sparcv7'
SPARCV9 = 'sparcv9'
X86 = 'x86'
X86_64 = 'x86_64'
class Directory(directory_type: _DirectoryType, path: str | Path, shared_file_system: bool = False)[source]

Bases: object

Information about filesystems Pegasus can use for storing temporary and long-term files.

Parameters:
  • directory_type (_DirectoryType) – directory type defined in DirectoryType (e.g. Directory.SHARED_SCRATCH or Directory.LOCAL_STORAGE)

  • path (Union[str, Path]) – directory path

  • shared_file_system (bool) – indicate whether the shared scratch space is accessible to the worker nodes via a shared filesystem, defaults to False

Raises:
  • ValueError – directory_type must be one of DirectoryType

  • ValueError – path must be given as an absolute path

LOCAL_SCRATCH = 'localScratch'

Describes the scratch file systems available locally on a compute node.

LOCAL_STORAGE = 'localStorage'

Describes a long term storage file system. This is the directory Pegasus will stage output files to.

SHARED_SCRATCH = 'sharedScratch'

Describes a scratch file systems. Pegasus will use this to store intermediate data between jobs and other temporary files.

SHARED_STORAGE = 'sharedStorage'
add_file_servers(self, *file_servers: FileServer)[source]

Add one or more access methods to this directory

Parameters:

file_server – a FileServer

Raises:

ValueError – file_server must be of type FileServer

Returns:

self

class FileServer(url: str, operation_type: Operation)[source]

Bases: ProfileMixin

Describes the fileserver to access data from outside

Parameters:
  • url (str) – url including protocol such as scp://obelix.isi.edu/data

  • operation_type (OperationType) – operation type defined in OperationType (e.g. Operation.ALL)

Raises:

ValueError – operation_type must be one defined in OperationType

class Grid(grid_type: _GridType, contact: str, scheduler_type: Scheduler, job_type: SupportedJobs | None = None)[source]

Bases: object

Each site supports various (usually two) job managers

Parameters:
  • grid_type (_GridType) – a grid type defined in Grid (e.g. Grid.SLURM)

  • contact (str) – endpoint such as "workflow.isi.edu"

  • scheduler_type (Scheduler) – a scheduler type defined in Scheduler (e.g. Scheduler.SLURM)

  • job_type (Optional[SupportedJobs], optional) – a job type defined in SupportedJobs (e.g. SupportedJobs.COMPUTE), defaults to None

Raises:
  • ValueError – grid_type must be one defined in Grid (e.g. Grid.PBS)

  • ValueError – scheduler_type must be one defined in Scheduler (e.g. Scheduler.PBS)

  • ValueError – job_type must be one defined in :py:class`~Pegasus.api.site_catalog.SupportedJobs` (e.g. SupportedJobs.COMPUTE)

BATCH = 'batch'
CONDOR = 'condor'
CREAM = 'cream'
DELTACLOUD = 'deltacloud'
EC2 = 'ec2'
GT5 = 'gt5'
NORDUGRID = 'nordugrid'
UNICORE = 'unicore'
class OS(value)[source]

Bases: Enum

Operating system types

AIX = 'aix'
LINUX = 'linux'
MACOSX = 'macosx'
SUNOS = 'sunos'
WINDOWS = 'windows'
class Operation(value)[source]

Bases: Enum

Different types of operations supported by a file server

ALL = 'all'
GET = 'get'
PUT = 'put'
class Scheduler(value)[source]

Bases: Enum

Different scheduler types on the Grid

CONDOR = 'condor'
FORK = 'fork'
LSF = 'lsf'
PBS = 'pbs'
SGE = 'sge'
SLURM = 'slurm'
UNKNOWN = 'unknown'
class Site(name: str, arch: Arch | None = None, os_type: OS | None = None, os_release: str | None = None, os_version: str | None = None)[source]

Bases: ProfileMixin

A compute resource (which is often a cluster) that we intend to run the workflow upon. A site is a homogeneous part of a cluster that has at least a single GRAM gatekeeper with a jobmanager-fork and jobmanager-<scheduler> interface and at least one gridftp server along with a shared file system. The GRAM gatekeeper can be either WS GRAM or Pre-WS GRAM. A site can also be a condor pool or glidein pool with a shared file system.

# Example
site = Site(LOCAL, arch=Arch.X86_64, os_type=OS.LINUX, os_release="rhel", os_version="7")\
    .add_directories(
        Directory(Directory.SHARED_SCRATCH, shared_scratch_dir)
            .add_file_servers(FileServer("file://" + shared_scratch_dir, Operation.ALL)),

        Directory(Directory.LOCAL_STORAGE, local_storage_dir)
            .add_file_servers(FileServer("file://" + local_storage_dir, Operation.ALL))
    )
Parameters:
  • name (str) – name of the site

  • arch (Optional[Arch]) – the site’s architecture (e.g. Arch.X86_64), defaults to None

  • os_type (Optional[OS], optional) – the site’s operating system (e.g. OS.LINUX), defaults to None

  • os_release (Optional[str]) – the release of the site’s operating system, defaults to None

  • os_version (Optional[str]) – the version of the site’s operating system, defaults to None

Raises:
  • ValueError – arch must be one of Arch

  • ValueError – os_type must be one of OS

add_directories(self, *directories: Directory)[source]

Add one or more Directory to this Site

Parameters:

directory – the Directory to be added

Raises:

TypeError – directory must be of type Directory

Returns:

self

add_grids(self, *grids: Grid)[source]

Add one or more Grid to this Site

Parameters:

grid – the Grid to be added

Raises:

TypeError – grid must be of type Grid

Returns:

self

class SiteCatalog[source]

Bases: Writable

The SiteCatalog describes the compute resources, or Site s that we intend to run the workflow upon.

sc = SiteCatalog()

WORK_DIR = Path(".").resolve()

shared_scratch_dir = WORK_DIR / RUN_ID
local_storage_dir = WORK_DIR / "outputs" / RUN_ID

local = Site("local")\
                .add_directories(
                    Directory(Directory.SHARED_SCRATCH, shared_scratch_dir)
                        .add_file_servers(FileServer("file://" + shared_scratch_dir, Operation.ALL)),

                    Directory(Directory.LOCAL_STORAGE, local_storage_dir)
                        .add_file_servers(FileServer("file://" + local_storage_dir, Operation.ALL))
                )

condorpool = Site("condorpool")\
                .add_pegasus_profile(style="condor")\
                .add_pegasus_profile(auxillary_local="true")\
                .add_condor_profile(universe="vanilla")

sc.add_sites(local, condorpool)

sc.write()
add_sites(self, *sites: Site)[source]

Add one or more sites to this catalog

Parameters:

site – the site to be added

Raises:
  • TypeError – site must be of type Site

  • DuplicateError – a site with the same name already exists in this catalog

Returns:

self

class SupportedJobs(value)[source]

Bases: Enum

Types of jobs in the executable workflow this grid supports

AUXILLARY = 'auxillary'
CLEANUP = 'cleanup'
COMPUTE = 'compute'
REGISTER = 'register'
TRANSFER = 'transfer'

1.1.1.7. Pegasus.api.transformation_catalog module

class Container(name: str, container_type: _ContainerType, image: str, arguments: str | None = None, mounts: List[str] | None = None, image_site: str | None = None, checksum: Dict[str, str] | None = None, metadata: Dict[str, int | str | float] | None = None, bypass_staging: bool = False)[source]

Bases: ProfileMixin

Describes a container that can be added to the TransformationCatalog . Note that the checksum parameter refers to the checksum of the tar file of the image and not the specific version of an image (digest/content-addressable identifer in the case of Docker).

# Example 1: Docker
centos_pegasus = Container(
                    "centos-pegasus",
                    Container.DOCKER,
                    "docker:///ryan/centos-pegasus:latest",
                    arguments="--shm-size",
                    mounts=["/Volumes/Work/lfs1:/shared-data/:ro"]
                )

# Example 2: Singularity
fb_nlp = Container(
            "fb-nlp",
            Container.SINGULARITY,
            image="library://papajim/default/fb-nlp",
            mounts=["/data:/mnt:ro"]
        )
Parameters:
  • name (str) – name of this container

  • container_type (_ContainerType) – a container type defined in Container

  • image (str) – image, such as docker:///rynge/montage:latest

  • arguments (Optional[str]) – additional cli arguments to be added to the docker container run or singularity exec commands when starting this container

  • mounts (Optional[List[str]]) – list of mount strings such as ['/Volumes/Work/lfs1:/shared-data/:ro', ...]

  • image_site (Optional[str]) – optional site attribute to tell pegasus which site tar file exists, defaults to None

  • checksum (Optional[Dict[str, str]]) – Dict containing checksums for the tar file of this image. Currently only sha256 is supported. This should be entered as {"sha256": <value>}, defaults to None

  • metadata (Optional[Dict[str, Union[int, str, float]]]) – Dict containing metadata key, value pairs associated with this container, defaults to None

  • bypass_staging (bool, optional) – whether or not to bypass the stage in job for this container, defaults to False

Raises:

ValueError – container_type must be one of _ContainerType (Container.DOCKER | Container.SINGULARITY | Container.SHIFTER)

DOCKER = 'docker'
SHIFTER = 'shifter'
SINGULARITY = 'singularity'
class Transformation(name: str, namespace: str | None = None, version: str | None = None, site: str | None = None, pfn: str | Path | None = None, is_stageable: bool = False, bypass_staging: bool = False, arch: Arch | None = None, os_type: OS | None = None, os_release: str | None = None, os_version: str | None = None, container: Container | str | None = None, checksum: Dict[str, str] | None = None)[source]

Bases: ProfileMixin, HookMixin, MetadataMixin

A transformation, which can be a standalone executable, or one that requires other executables.

When a transformation resides on a single site, the syntax in Example 1 can be used where the args: site and pfn are provided to the constructor. If site and pfn are specified, then the args: is_stageable, bypass_staging, arch, os_type, os_release, os_version, and container, are applied to the site, else they are ignored. When a transformation resides multiple sites, the syntax in Example 2 can be used where multiple TransformationSite objects can be added. Note that when specifying a checksum such as {"sha256": <value>} , this only applies stageable executables.

# Example 1: transformation that resides on a single site
preprocess = Transformation(
        "preprocess",
        namespace="pegasus",
        version="4.0",
        site="condorpool",
        pfn="/usr/bin/pegasus-keg",
        is_stageable=False,
        bypass_staging=False,
        arch=Arch.X86_64,
        os_type=OS.LINUX,
        container=centos_pegasus
    )

# Example 2: transformation that resides on multiple sites
preprocess = Transformation("preprocess", namespace="pegasus", version="4.0")\
                .add_sites(
                    TransformationSite(
                        "condorpool",
                        "/usr/bin/pegasus-keg",
                        is_stageable=False,
                        arch=Arch.X86_64,
                        os_type=OS.LINUX,
                        container="centos-pegasus"
                    ),
                    ...
                )
Parameters:
  • name (str) – the logical name of the transformation

  • namespace (Optional[str]) – the namespace that this transformation belongs to, defaults to None

  • version (Optional[str]) – the version of this transformation (e.g. "1.1"), defaults to None

  • site (Optional[str]) – a Site specified in the SiteCatalog on which this transformation resides, defaults to None

  • pfn (Optional[Union[str, Path]]) – the physical filename of this transformation (e.g. "/usr/bin/tar"), defaults to None

  • is_stageable – whether or not this transformation is stageable or installed, defaults to False

  • bypass_staging (bool, optional) – whether or not to bypass the stage in job of this executable (Note that this only works for transformations where is_stageable=True), defaults to False

  • arch (Optional[Arch]) – architecture that this transformation was compiled for (defined in Arch , e.g Arch.X86_64), defaults to None

  • os_type (Optional[OS]) – name of os that this transformation was compiled for (defined in OS , e.g. OS.LINUX), defaults to None

  • os_release (Optional[str]) – release of os that this transformation was compiled for, defaults to None

  • os_version (Optional[str]) – version of os that this transformation was compiled for, defaults to None

  • container (Optional[Union[Container, str]]) – a Container or name of the container to be used for this transformation, defaults to None

  • checksum (Optional[Dict[str, str]]) – Dict containing checksums for this file. Currently only sha256 is given. This should be entered as {"sha256": <value>}, defaults to None

Raises:
  • TypeError – container must be of type Container or str

  • TypeError – arch must be one of Arch

  • TypeError – os_type must be one of OS

  • ValueError – fields: namespace, name, and field must not contain any : (colons)

  • ValueError – pfn must be given as an absolute path when pathlib.Path is used

  • ValueErrorbypass_staging=True can only be used when is_stageable=True

add_requirement(self, required_transformation: str | Transformation, namespace: str = None, version: str = None)[source]

Add a requirement to this transformation. Specify the other transformation, identified by name, namespace, and version, that this transformation depends upon. If a Transformation is passed in for required_transformation, then namespace and version are ignored.

Parameters:

required_transformation (str or Transformation) – Transformation that this transformation requires

Raises:
  • DuplicateError – this requirement already exists

  • ValueErrorrequired_transformation must be of type Transformation or str

  • ValueError – namespace, required transformation name, and version cannot contain any : (colon) characters

  • TypeError – required_transformation must be one of type str or Transformation

Returns:

self

add_sites(self, *transformation_sites: TransformationSite)[source]

Add one or more TransformationSite s to this transformation.

Parameters:

transformation_sites – the TransformationSite (s) to be added

Raises:
Returns:

self

class TransformationCatalog[source]

Bases: Writable

Maintains a list a Transformation s and :py:class:~`Pegasus.api.transformation_catalog.Container` s

# Example
centos_pegasus = Container(
        "centos-pegasus",
        Container.DOCKER,
        "docker:///ryan/centos-pegasus:latest",
        mounts=["/Volumes/Work/lfs1:/shared-data/:ro"]
    )

preprocess = Transformation(
                "preprocess",
                site="condorpool",
                pfn="/usr/bin/pegasus-keg",
                is_stageable=False,
                arch=Arch.X86_64,
                os_type=OS.LINUX,
                container=centos_pegasus
            )

findrange = Transformation(
                "findrange",
                site="condorpool",
                pfn="/usr/bin/pegasus-keg",
                is_stageable=False,
                arch=Arch.X86_64,
                os_type=OS.LINUX,
                container=centos_pegasus
            )

analyze = Transformation(
                "analyze",
                site="condorpool",
                pfn="/usr/bin/pegasus-keg",
                is_stageable=False,
                arch=Arch.X86_64,
                os_type=OS.LINUX,
                container=centos_pegasus
            )

tc = TransformationCatalog()\
    .add_containers(centos_pegasus)\
    .add_transformations(preprocess, findrange, analyze)\
    .write()
add_containers(self, *containers: Container)[source]

Add one or more Container s to this catalog

# Example
tc.add_containers(
    Container(
        "centos-pegasus",
        Container.DOCKER,
        "docker:///ryan/centos-pegasus:latest",
        mounts=["/Volumes/Work/lfs1:/shared-data/:ro"]
    )
)
Parameters:

containers – the Container to be added

Raises:
  • TypeError – argument(s) must be of type Container

  • DuplicateError – a container with the same name already exists in this catalog

Returns:

self

add_transformations(self, *transformations: Transformation)[source]

Add one or more Transformation s to this catalog

# Example
tc.add_transformations(
    Transformation(
            "analyze",
            site="condorpool",
            pfn="/usr/bin/pegasus-keg",
            is_stageable=False,
            arch=Arch.X86_64,
            os_type=OS.LINUX,
            container=centos_pegasus
        )
)
Parameters:

transformations – the Transformation (s) to be added

Raises:
Returns:

self

class TransformationSite(name: str, pfn: str | Path, is_stageable: bool = False, bypass_staging: bool = False, arch: Arch | None = None, os_type: OS | None = None, os_release: str | None = None, os_version: str | None = None, container: Container | str | None = None)[source]

Bases: ProfileMixin, MetadataMixin

Site specific information about a Transformation. A Transformation must contain at least one transformation site.

Parameters:
  • name (str) – name of the site at which this Transformation resides

  • pfn (Union[str, Path]) – physical file name, an absolute path given as a str or Path

  • is_stageable – whether or not this transformation is stageable or installed, defaults to False

  • bypass_staging (bool, optional) – whether or not to bypass the stage in job of this executable (Note that this only works for transformations where is_stageable=True), defaults to False

  • arch (Optional[Arch], optional) – architecture that this Transformation was compiled for (defined in Arch), defaults to None

  • os_type (Optional[OS], optional) – name of os that this Transformation was compiled for (defined in OS), defaults to None

  • os_release (Optional[str], optional) – release of os that this Transformation was compiled for, defaults to None, defaults to None

  • os_version (Optional[str], optional) – version of os that this Transformation was compiled for, defaults to None, defaults to None

  • container (Optional[Union[Container, str]], optional) – specify the name of the container or Container object to use, optional

Raises:
  • TypeError – arch must be one of Arch

  • TypeError – os_type must be one of OS

  • ValueError – pfn must be given as an absolute path when pathlib.Path is used

  • ValueErrorbypass_staging=True can only be used when is_stageable=True

1.1.1.8. Pegasus.api.workflow module

class AbstractJob(_id: str | None = None, node_label: str | None = None)[source]

Bases: HookMixin, ProfileMixin, MetadataMixin

An abstract representation of a workflow job

Parameters:
  • _id (Optional[str]) – a unique id, if None is given then one will be assigned when this job is added to a Workflow, defaults to None

  • node_label (Optional[str]) – a short descriptive label that can be assigned to this job, defaults to None

Note: avoid using IDs such as '0000008' or '00000009' as these may end up being unquoted by PyYaml, and consequently misinterpreted as integer values when read in by other tools.

add_args(self, *args: File | int | float | str)[source]

Add arguments to this job. Each argument will be separated by a space. Each argument must be either a File, scalar, or str.

Parameters:

args (Union[File, int, float, str]) – arguments to pass to this job (each arg in arg will be separated by a space)

Returns:

self

add_checkpoint(self, checkpoint_file: File | str, stage_out: bool = True, register_replica: bool = True)[source]

Add an output File of this job as a checkpoint file If checkpoint_file is given as a str, a File object is created for you internally with the given value as its lfn.

Parameters:
  • checkpoint_file (Union[File, str]) – the File to be added as a checkpoint file to this job

  • stage_out (bool, optional) – whether or not to send files back to an output directory, defaults to True

  • register_replica (bool, optional) – whether or not to register replica with a ReplicaCatalog, defaults to True

Raises:
  • DuplicateError – all output files must be unique

  • TypeError – job inputs must be of type File or str

Returns:

self

add_inputs(self, *input_files: File | str, bypass: bool = False)[source]

Add one or more File objects as input to this job. If input_file is given as a str, a File object is created for you internally with the given value as its lfn.

Parameters:
  • input_files (Union[File, str]) – the File objects to be added as inputs to this job

  • bypass_staging (bool, optional) – whether or not to bypass the staging site when this file is fetched by the job, defaults to False

Raises:
  • DuplicateError – all input files must be unique

  • TypeError – job inputs must be of type File or str

Returns:

self

add_outputs(self, *output_files: File | str, stage_out: bool = True, register_replica: bool = True)[source]

Add one or more File objects as outputs to this job. stage_out and register_replica will be applied to all files given. If output_file is given as a str, a File object is created for you internally with the given value as its lfn.

Parameters:
  • output_files (Union[File, str]) – the File objects to be added as outputs to this job

  • stage_out (bool, optional) – whether or not to send files back to an output directory, defaults to True

  • register_replica (bool, optional) – whether or not to register replica with a ReplicaCatalog, defaults to True

Raises:
  • DuplicateError – all output files must be unique

  • TypeError – job outputs must be of type File or str

Returns:

self

get_inputs()[source]

Get this job’s input File s

Returns:

all input files associated with this job

Return type:

set

get_outputs()[source]

Get this job’s output File objects

Returns:

all output files associated with this job

Return type:

set

get_stderr()[source]

Get the File being used for stderr

Returns:

the stderr file

Return type:

File

get_stdin()[source]

Get the File being used for stdin

Returns:

the stdin file

Return type:

File

get_stdout()[source]

Get the File being used for stdout

Returns:

the stdout file

Return type:

File

set_stderr(self, file: str | File, stage_out: bool = True, register_replica: bool = True)[source]

Set stderr to a File . If file is given as a str, a File object is created for you internally with the given value as its lfn.

Parameters:
  • file (Union[str, File]) – a file that stderr will be written to

  • stage_out (bool, optional) – whether or not to send files back to an output directory, defaults to True

  • register_replica (bool, optional) – whether or not to register replica with a ReplicaCatalog, defaults to True

Raises:
  • TypeError – file must be of type File or str

  • DuplicateError – stderr is already set or the given file has already been added as an output to this job

Returns:

self

set_stdin(self, file: str | File)[source]

Set stdin to a File . If file is given as a str, a File object is created for you internally with the given value as its lfn.

Parameters:

file (Union[str, File]) – a file that will be read into stdin

Raises:
  • TypeError – file must be of type File or str

  • DuplicateError – stdin is already set or the given file has already been added as an input to this job

Returns:

self

set_stdout(self, file: str | File, stage_out: bool = True, register_replica: bool = True)[source]

Set stdout to a File . If file is given as a str, a File object is created for you internally with the given value as its lfn.

Parameters:
  • file (Union[str, File]) – a file that stdout will be written to

  • stage_out (bool, optional) – whether or not to send files back to an output directory, defaults to True

  • register_replica (bool, optional) – whether or not to register replica with a ReplicaCatalog, defaults to True

Raises:
  • TypeError – file must be of type File or str

  • DuplicateError – stdout is already set or the given file has already been added as an output to this job

Returns:

self

class Job(transformation: str | Transformation, _id: str | None = None, node_label: str | None = None, namespace: str | None = None, version: str | None = None)[source]

Bases: AbstractJob

A typical workflow Job that executes a Transformation. See AbstractJob for full list of available functions.

# Example
if1 = File("if1")
if2 = File("if2")

of1 = File("of1")
of2 = File("of2")

# Assuming a transformation named "analyze.py" has been added to your
# transformation catalog:
job = Job("analyze.py")\
        .add_args("-i", if1, if2, "-o", of1, of2)\
        .add_inputs(if1, if2)\
        .add_outputs(of1, of2, stage_out=True, register_replica=False)
Parameters:
  • transformation (Union[str, Transformation]) – Transformation object or name of the transformation that this job uses

  • _id (Optional[str]) – a unique id; if none is given then one will be assigned when the job is added by a Workflow, defaults to None

  • node_label (Optional[str]) – a short descriptive label that can be assigned to this job, defaults to None

  • namespace (Optional[str]) – namespace to which the Transformation belongs, defaults to None

  • version (Optional[str]) – version of the given Transformation, defaults to None

Raises:

TypeError – transformation must be one of type Transformation or str

class SubWorkflow(file: str | File | Workflow, is_planned: bool = False, _id: str | None = None, node_label: str | None = None)[source]

Bases: AbstractJob

Job that represents a subworkflow. See AbstractJob for full list of available functions. SubWorkflow jobs can be created using several different methods. These are outlined below.

root_wf = Workflow("root")
# "workflow.yml" must be added to the ReplicaCatalog
j1 = SubWorkflow(file="workflow.yml", is_planned=False)
root_wf.add_jobs(j1)

another_wf = Workflow("another-wf")
j2 = SubWorkflow(another_wf, _id="j2")
root_wf.add_jobs(j2)

# Upon invoking root_wf.write() or root_wf.plan(), another_wf will automatically
# be serialized to CWD / "another-wf_j2.yml" and added to an inline ReplicaCatalog;
# This means that you may define "another_wf" in a separate python script, and
# import it here where it would be used.
root_wf.write()
Parameters:
  • file (Union[str, File, Workflow]) – File, the name of the workflow file as a str, or Workflow

  • is_planned (bool) – whether or not this subworkflow has already been planned by the Pegasus planner, defaults to False

  • _id (Optional[str]) – a unique id; if none is given then one will be assigned when the job is added by a Workflow, defaults to None

  • node_label (Optional[str]) – a short descriptive label that can be assigned to this job, defaults to None

Raises:

TypeError – file must be of type File or str

add_planner_args(self, conf: str | Path | None = None, basename: str | None = None, job_prefix: str | None = None, cluster: List[str] | None = None, sites: List[str] | None = None, output_sites: List[str] | None = None, staging_sites: Dict[str, str] | None = None, cache: List[str | Path] | None = None, input_dirs: List[str] | None = None, output_dir: str | None = None, dir: str | None = None, relative_dir: str | Path | None = None, random_dir: bool | str | Path = False, relative_submit_dir: str | Path | None = None, inherited_rc_files: List[str | Path] | None = None, cleanup: str | None = None, reuse: List[str | Path] | None = None, verbose: int = 0, quiet: int = 0, force: bool = False, force_replan: bool = False, forward: List[str] | None = None, submit: bool = False, json: bool = False, java_options: List[str] | None = None, **properties: Dict[str, str])[source]

Add pegasus-planner arguments. This function can only be used when is_planned=False is set in SubWorkflow and may only be invoked once.

Parameters:
  • conf (Optional[Union[str, Path]]) – the path to the properties file to use for planning, defaults to None

  • basename (Optional[str]) – the basename prefix while constructing the per workflow files like .dag etc., defaults to None

  • job_prefix (Optional[str]) – the prefix to be applied while construction job submit filenames, defaults to None

  • cluster (Optional[List[str]]) – comma separated list of clustering techniques to be applied to the workflow to cluster jobs in to larger jobs, to avoid scheduling overheads., defaults to None

  • sites (Optional[List[str]]) – list of execution sites on which to map the workflow, defaults to None

  • output_sites (Optional[List[str]]) – the output sites where the data products during workflow execution are transferred to, defaults to None

  • staging_sites (Optional[Dict[str,str]]) – key, value pairs of execution site to staging site mappings such as {"condorpool": "staging-site"}, defaults to None

  • cache (Optional[List[Union[str, Path]]]) – comma separated list of replica cache files, defaults to None

  • input_dirs (Optional[List[Union[str, Path]]]) – comma separated list of optional input directories where the input files reside on submit host, defaults to None

  • output_dir (Optional[Union[str, Path]]) – an optional output directory where the output files should be transferred to on submit host, defaults to None

  • transformations_dir (Optional[Union[str, Path]]) – an optional directory containing executables used by the workflow, from where to construct transformation catalog entries

  • dir (Optional[Union[str, Path]]) – the directory where to generate the executable workflow, defaults to None

  • relative_dir (Optional[Union[str, Path]]) – the relative directory to the base directory where to generate the concrete workflow, defaults to None

  • random_dir (Union[bool, str, Path], optional) – if set to True, a random timestamp based name will be used for the execution directory that is created by the create dir jobs; else if a path is given as a str or pathlib.Path, then that will be used as the basename of the directory that is to be created, defaults to False

  • relative_submit_dir (Optional[Union[str, Path]]) – the relative submit directory where to generate the concrete workflow. Overrides relative_dir, defaults to None

  • inherited_rc_files (Optional[List[Union[str, Path]]]) – comma separated list of replica files, defaults to None

  • cleanup (Optional[str]) – the cleanup strategy to use. Can be a str none|inplace|leaf|constraint, defaults to None (internally, pegasus-plan will use the default inplace if None is given)

  • reuse (Optional[List[Union[str,Path]]]) – list of submit directories of previous runs from which to pick up for reuse (e.g. ["/workflows/submit_dir1", "/workflows/submit_dir2"]), defaults to None

  • verbose (int, optional) – verbosity, defaults to 0

  • quiet (int) – decreases the verbosity of messages about what is going on, defaults to 0

  • force (bool, optional) – skip reduction of the workflow, resulting in build style dag, defaults to False

  • force_replan (bool) – force replanning for sub workflows in case of failure, defaults to False

  • forward (Optional[List[str]]) – any options that need to be passed ahead to pegasus-run in format option[=value] (e.g. ["nogrid"]), defaults to None

  • submit (bool, optional) – submit the executable workflow generated, defaults to False

  • java_options (Optional[List[str]]) – pass to jvm a non standard option (e.g. ["mx1024m", "ms512m"]), defaults to None

  • **properties – configuration properties (e.g. **{"pegasus.mode": "development"}, which would be passed to pegasus-plan as -Dpegasus.mode=development). Note that configuration properties set here take precedance over the properties file property with the same key.

Raises:
  • TypeError – an invalid type was given for one or more arguments

  • PegasusError – SubWorkflow.add_planner_args() can only be called by SubWorkflows that have not yet been planned (i.e. SubWorkflow(‘wf_file’, is_planned=False))

  • PegasusError – SubWorkflow.add_planner_args() can only be invoked once

Returns:

self

class Workflow(name: str, infer_dependencies: bool = True)[source]

Bases: Writable, HookMixin, ProfileMixin, MetadataMixin

Represents multi-step computational steps as a directed acyclic graph.

# Example
import logging

from pathlib import Path

from Pegasus.api import *

logging.basicConfig(level=logging.DEBUG)

# --- Replicas -----------------------------------------------------------------
with open("f.a", "w") as f:
    f.write("This is sample input to KEG")

fa = File("f.a").add_metadata(creator="ryan")
rc = ReplicaCatalog().add_replica("local", fa, Path(".") / "f.a")

# --- Transformations ----------------------------------------------------------
preprocess = Transformation(
                "preprocess",
                site="condorpool",
                pfn="/usr/bin/pegasus-keg",
                is_stageable=False,
                arch=Arch.X86_64,
                os_type=OS.LINUX
            )

findrange = Transformation(
                "findrange",
                site="condorpool",
                pfn="/usr/bin/pegasus-keg",
                is_stageable=False,
                arch=Arch.X86_64,
                os_type=OS.LINUX
            )

analyze = Transformation(
                "analyze",
                site="condorpool",
                pfn="/usr/bin/pegasus-keg",
                is_stageable=False,
                arch=Arch.X86_64,
                os_type=OS.LINUX
            )

tc = TransformationCatalog().add_transformations(preprocess, findrange, analyze)

# --- Workflow -----------------------------------------------------------------
'''
                    [f.b1] - (findrange) - [f.c1]
                    /                             \
[f.a] - (preprocess)                               (analyze) - [f.d]
                    \                             /
                    [f.b2] - (findrange) - [f.c2]

'''
wf = Workflow("blackdiamond")

fb1 = File("f.b1")
fb2 = File("f.b2")
job_preprocess = Job(preprocess)\
                    .add_args("-a", "preprocess", "-T", "3", "-i", fa, "-o", fb1, fb2)\
                    .add_inputs(fa)\
                    .add_outputs(fb1, fb2)

fc1 = File("f.c1")
job_findrange_1 = Job(findrange)\
                    .add_args("-a", "findrange", "-T", "3", "-i", fb1, "-o", fc1)\
                    .add_inputs(fb1)\
                    .add_outputs(fc1)

fc2 = File("f.c2")
job_findrange_2 = Job(findrange)\
                    .add_args("-a", "findrange", "-T", "3", "-i", fb2, "-o", fc2)\
                    .add_inputs(fb2)\
                    .add_outputs(fc2)

fd = File("f.d")
job_analyze = Job(analyze)\
                .add_args("-a", "analyze", "-T", "3", "-i", fc1, fc2, "-o", fd)\
                .add_inputs(fc1, fc2)\
                .add_outputs(fd)

wf.add_jobs(job_preprocess, job_findrange_1, job_findrange_2, job_analyze)
wf.add_replica_catalog(rc)
wf.add_transformation_catalog(tc)

try:
    wf.plan(submit=True)\
        .wait()\
        .analyze()\
        .statistics()
except PegasusClientError as e:
    print(e.output)
Parameters:
  • name (str) – name of the Workflow

  • infer_dependencies (bool, optional) – whether or not to automatically compute job dependencies based on input and output files used by each job, defaults to True

Raises:

ValueError – workflow name may not contain any / or spaces

add_dependency(self, job: Job | SubWorkflow, *, parents: List[Job | SubWorkflow] = [], children: List[Job | SubWorkflow] = [])[source]

Add parent, child dependencies for a given job.

Parameters:
  • job (AbstractJob) – the job to which parents and children will be assigned

  • parents (list, optional) – jobs to be added as parents to this job, defaults to []

  • children (list, optional) – jobs to be added as children of this job, defaults to []

Raises:
  • ValueError – the given job(s) do not have ids assigned to them

  • DuplicateError – a dependency between two jobs already has been added

Returns:

self

add_jobs(self, *jobs: Job | SubWorkflow)[source]

Add one or more jobs at a time to the Workflow

Raises:

DuplicateError – a job with the same id already exists in this workflow

Returns:

self

add_replica_catalog(self, rc: ReplicaCatalog)[source]

Add a ReplicaCatalog to this workflow. The contents fo the replica catalog will be inlined into the same file as this workflow when it is written.

Parameters:

rc (ReplicaCatalog) – the ReplicaCatalog to be added

Raises:
Returns:

self

add_site_catalog(self, sc: SiteCatalog)[source]

Add a SiteCatalog to this workflow. The contents fo the site catalog will be inlined into the same file as this workflow when it is written out.

Parameters:

sc (SiteCatalog) – the SiteCatalog to be added

Raises:
Returns:

self

add_transformation_catalog(self, tc: TransformationCatalog)[source]

Add a TransformationCatalog to this workflow. The contents fo the transformation catalog will be inlined into the same file as this workflow when it is written.

Parameters:

tc (TransformationCatalog) – the TransformationCatalog to be added

Raises:
Returns:

self

analyze(self, verbose: int = 0)[source]

Debug a workflow.

Parameters:

verbose (int, optional) – verbosity, defaults to 0

Raises:

PegasusClientError – pegasus-analyzer encountered an error

Returns:

self

property braindump

Once this workflow has been planned using plan, the braindump file can be accessed for information such as user, submit_dir, and root_wf_uuid. For a full list of available attributes, see Braindump.

try:
    wf.plan(submit=True)

    print(wf.braindump.user)
    print(wf.braindump.submit_hostname)
    print(wf.braindump.submit_dir)
    print(wf.braindump.root_wf_uuid)
    print(wf.braindump.wf_uuid)
except PegasusClientError as e:
    print(e.output)
Getter:

returns a Braindump object corresponding to the most recent call of plan

Return type:

Pegasus.braindump.Braindump

Raises:

PegasusErrorplan must be called before accessing the braindump file

get_job(_id: str)[source]

Retrieve the job with the given id

Parameters:

_id (str) – id of the job to be retrieved from the Workflow

Raises:

NotFoundError – a job with the given id does not exist in this workflow

Returns:

the job with the given id

Return type:

Job

get_status(self)[source]

Returns current status information of the workflow as a dict in the following format:

{
    "totals": {
        "unready": <int>,
        "ready": <int>,
        "pre": <int>,
        "queued": <int>,
        "post": <int>,
        "succeeded": <int>,
        "failed": <int>,
        "percent_done": <float>,
        "total": <int>
    },
    "dags": {
        "root": {
            "unready": <int>,
            "ready": <int>,
            "pre": <int>,
            "queued": <int>,
            "post": <int>,
            "succeeded": <int>,
            "failed": <int>,
            "percent_done": <float>,
            "state": <"Running" | "Success" | "Failure">,
            "dagname": <str>
        }
    }
}
Keys are defined as follows
  • unready: Jobs blocked by dependencies

  • ready: Jobs ready for submission

  • pre: PRE-Scripts running

  • queued: Submitted jobs

  • post: POST-Scripts running

  • succeeded: Job completed with success

  • failed: Jobs completed with failure

  • percent_done: Success percentage

  • state: Workflow state

  • dagname: Name of workflow

Returns:

current status information

Return type:

Union[dict, None]

graph(self, include_files: bool = True, no_simplify: bool = True, label: Literal['label', 'xform', 'id', 'xform-id', 'label-xform', 'label-id'] = 'label', output: str | None = None, remove: List[str] | None = None, width: int | None = None, height: int | None = None)[source]

Convert workflow into a graphviz dot format

Parameters:
  • include_files (bool, optional) – include files as nodes, defaults to True

  • no_simplify (bool, optional) – when set to False a transitive reduction is performed to remove extra edges, defaults to True

  • label (Literal["label", "xform", "id", "xform-id", "label-xform", "label-id"], optional) – what attribute to use for labels, defaults to “label”

  • output (Optional[str], optional) – Write output to a file. If none is given dot output is written to stdout. If output is a file name with any of the following extensions: “jpg”, “jpeg”, “pdf”, “gif”, or “svg”, dot -T<ext> -o <output> will be invoked to draw the graph. If any other extension is given, the dot representation of the graph will be output. defaults to None

  • remove (Optional[List[str]], optional) – remove one or more nodes by transformation name, defaults to None

  • width (Optional[int], optional) – width of the digraph, defaults to None

  • height (Optional[int], optional) – height of the digraph, defaults to None

Returns:

self

Raises:
  • PegasusError – workflow must be written to a file prior to invoking graph with write or plan

  • ValueError – label must be one of label, xform, id, xform-id, label-xform, or label-id

plan(self, conf: str | Path | None = None, basename: str | None = None, job_prefix: str | None = None, cluster: List[str] | None = None, sites: List[str] | None = None, output_sites: List[str] = ['local'], staging_sites: Dict[str, str] | None = None, cache: List[str | Path] | None = None, input_dirs: List[str] | None = None, output_dir: str | None = None, dir: str | None = None, relative_dir: str | Path | None = None, random_dir: bool | str | Path = False, relative_submit_dir: str | Path | None = None, inherited_rc_files: List[str | Path] | None = None, cleanup: str = 'inplace', reuse: List[str | Path] | None = None, verbose: int = 0, quiet: int = 0, force: bool = False, force_replan: bool = False, forward: List[str] | None = None, submit: bool = False, json: bool = False, java_options: List[str] | None = None, **properties: Dict[str, str])[source]

Plan the workflow.

try:
    # configuration properties
    properties = {"pegasus.mode": "development}

    wf.plan(verbose=3, submit=True, **properties)
except PegasusClientError as e:
    print(e.output)
Parameters:
  • conf (Optional[Union[str, Path]]) – the path to the properties file to use for planning, defaults to None

  • basename (Optional[str]) – the basename prefix while constructing the per workflow files like .dag etc., defaults to None

  • job_prefix (Optional[str]) – the prefix to be applied while construction job submit filenames, defaults to None

  • cluster (Optional[List[str]]) – comma separated list of clustering techniques to be applied to the workflow to cluster jobs in to larger jobs, to avoid scheduling overheads., defaults to None

  • sites (Optional[List[str]]) – list of execution sites on which to map the workflow, defaults to None

  • output_sites (List[str]) – the output sites where the data products during workflow execution are transferred to, defaults to ["local"]

  • staging_sites (Optional[Dict[str,str]]) – key, value pairs of execution site to staging site mappings such as {"condorpool": "staging-site"}, defaults to None

  • cache (Optional[List[Union[str, Path]]]) – comma separated list of replica cache files, defaults to None

  • input_dirs (Optional[List[Union[str, Path]]]) – comma separated list of optional input directories where the input files reside on submit host, defaults to None

  • output_dir (Optional[Union[str, Path]]) – an optional output directory where the output files should be transferred to on submit host, defaults to None

  • transformations_dir (Optional[Union[str, Path]]) – an optional directory containing executables used by the workflow, from where to construct transformation catalog entries

  • dir (Optional[Union[str, Path]]) – the directory where to generate the executable workflow, defaults to None

  • relative_dir (Optional[Union[str, Path]]) – the relative directory to the base directory where to generate the concrete workflow, defaults to None

  • random_dir (Union[bool, str, Path], optional) – if set to True, a random timestamp based name will be used for the execution directory that is created by the create dir jobs; else if a path is given as a str or pathlib.Path, then that will be used as the basename of the directory that is to be created, defaults to False

  • relative_submit_dir (Optional[Union[str, Path]]) – the relative submit directory where to generate the concrete workflow. Overrides relative_dir, defaults to None

  • inherited_rc_files (Optional[List[Union[str, Path]]]) – comma separated list of replica files, defaults to None

  • cleanup (str, optional) – the cleanup strategy to use. Can be none|inplace|leaf|constraint, defaults to inplace

  • reuse (Optional[List[Union[str,Path]]]) – list of submit directories of previous runs from which to pick up for reuse (e.g. ["/workflows/submit_dir1", "/workflows/submit_dir2"]), defaults to None

  • verbose (int, optional) – verbosity, defaults to 0

  • quiet (int) – decreases the verbosity of messages about what is going on, defaults to 0

  • force (bool, optional) – skip reduction of the workflow, resulting in build style dag, defaults to False

  • force_replan (bool) – force replanning for sub workflows in case of failure, defaults to False

  • forward (Optional[List[str]]) – any options that need to be passed ahead to pegasus-run in format option[=value] (e.g. ["nogrid"]), defaults to None

  • submit (bool, optional) – submit the executable workflow generated, defaults to False

  • java_options (Optional[List[str]]) – pass to jvm a non standard option (e.g. ["mx1024m", "ms512m"]), defaults to None

  • **properties – configuration properties (e.g. **{"pegasus.mode": "development"}, which would be passed to pegasus-plan as -Dpegasus.mode=development). Note that configuration properties set here take precedance over the properties file property with the same key.

Raises:

PegasusClientError – pegasus-plan encountered an error

Returns:

self

remove(self, verbose: int = 0)[source]

Removes this workflow that has been planned and submitted.

Parameters:

verbose (int, optional) – verbosity, defaults to 0

Raises:

PegasusClientError – pegasus-remove encountered an error

Returns:

self

run(self, verbose: int = 0, json: bool = False, grid: bool = False)[source]

Run the planned workflow.

Parameters:
  • verbose (int, optional) – verbosity, defaults to 0

  • grid (bool, optional) – enable checking for grids, defaults to False

Raises:

PegasusClientError – pegasus-run encountered an error

Returns:

self

property run_output

Get the json output from pegasus-run after it has been called.

try:
    wf.plan()
    wf.run()

    print(wf.run_output)
except PegasusClientError as e:
    print(e.output)
Raises:

PegasusErrorrun must be called prior to accessing run_output

Returns:

output of pegasus-run

Return type:

dict

statistics(self, verbose: int = 0)[source]

Generate statistics about the workflow run.

Parameters:

verbose (int, optional) – verbosity, defaults to 0

Raises:

PegasusClientError – pegasus-statistics encountered an error

Returns:

self

status(self, long: bool = False, verbose: int = 0)[source]

Monitor the workflow by quering Condor and directories.

Parameters:
  • long (bool, optional) – Show all DAG states, including sub-DAGs, default only totals. defaults to False

  • verbose (int, optional) – verbosity, defaults to False

Raises:

PegasusClientError – pegasus-status encountered an error

Returns:

self

wait(self, delay: int = 5)[source]

Displays progress bar to stdout and blocks until the workflow either completes or fails.

Parameters:

delay (int, optional) – refresh rate in seconds of the progress bar, defaults to 5

Raises:

PegasusClientError – pegasus-status encountered an error

Returns:

self

write(self, file: str | TextIO | None = None, _format: str = 'yml')[source]

Write this workflow to a file. If no file is given, it will written to workflow.yml

Parameters:
  • file (Optional[Union[str, TextIO]]) – path or file object (opened in “w” mode) to write to, defaults to None

  • _format (str, optional) – serialized format of the workflow object (this should be left as its default)

Returns:

self

1.1.1.9. Pegasus.api.writable module

class Writable[source]

Bases: object

Derived class can be serialized to a json or yaml file

property path: Path

Retrieve the path to which this object has been written to.

Raises:

PegasusError – object has not yet been written to a file

Returns:

resolved path to which this object has been written

Return type:

Path

write(file: str | TextIO | None = None, _format: str = 'yml')[source]

Serialize this class as either yaml or json and write to the given file. If file==None, this class will be written to a default file. The following classes have these defaults:

Default Files

Class

Default Filename

SiteCatalog

sites.yml

ReplicaCatalog

replicas.yml

TransformationCatalog

transformations.yml

Workflow

workflow.yml

Parameters:
  • file (Optional[Union[str, TextIO]]) – path or file object (opened in “w” mode) to write to, defaults to None

  • _format (str, optional) – can be either “yml”, “yaml” or “json”, defaults to “yml”

Raises:
  • ValueError – _format must be one of “yml”, “yaml” or “json”

  • TypeError – file must be a str or file object

1.1.1.10. Module contents