1.1.1. Pegasus.api package
1.1.1.1. Submodules
1.1.1.2. Pegasus.api.errors module
- exception DuplicateError[source]
Bases:
PegasusError
- exception NotFoundError[source]
Bases:
PegasusError
1.1.1.3. Pegasus.api.mixins module
- class EventType(value)[source]
Bases:
Enum
Event type on which a hook will be triggered
- ALL = 'all'
- END = 'end'
- ERROR = 'error'
- NEVER = 'never'
- START = 'start'
- SUCCESS = 'success'
- class HookMixin[source]
Bases:
object
Derived class can have hooks assigned to it. This currently supports shell hooks. The supported hooks are triggered when some event, specified by
EventType
, takes place.
- class MetadataMixin[source]
Bases:
object
Derived class can have metadata assigned to it as key value pairs.
- add_metadata(self, *args: Dict[str, str | int | float | bool], **kwargs)[source]
Add metadata key value pairs to this object
# Example 1 job.add_metadata({"key1": "value1"}) # Example 2 job.add_metadata(key1="value1, key2="value2")
- Parameters:
args (Dict[str, Union[str, int, float, bool]]) – dictionary of key value pair to add as metadata
- Raises:
TypeError – each arg in args must be a dict
- Returns:
self
- class Namespace(value)[source]
Bases:
Enum
Profile Namespace values recognized by Pegasus
- CONDOR = 'condor'
- DAGMAN = 'dagman'
- ENV = 'env'
- GLOBUS = 'globus'
- PEGASUS = 'pegasus'
- SELECTOR = 'selector'
- STAT = 'stat'
- class ProfileMixin[source]
Bases:
object
- add_condor_profile(*, universe: str = None, periodic_release: str = None, periodic_remove: str = None, filesystem_domain: str = None, stream_error: bool = None, stream_output: bool = None, priority: str = None, request_cpus: str = None, request_gpus: str = None, request_memory: str = None, request_disk: str = None, requirements: str = None, should_transfer_files: str = None, when_to_transfer_output: str = None, condor_collector: str = None, grid_resource: str = None, cream_attributes: str = None)[source]
Add Condor profile(s).
The condor profiles permit to add or overwrite instructions in the Condor submit file.
- Parameters:
universe (str, optional) – Pegasus defaults to either globus or scheduler universes. Set to standard for compute jobs that require standard universe. Set to vanilla to run natively in a condor pool, or to run on resources grabbed via condor glidein, defaults to None
periodic_release (str, optional) – is the number of times job is released back to the queue if it goes to HOLD, e.g. due to Globus errors. Pegasus defaults to 3, defaults to None
periodic_remove (str, optional) – is the number of times a job is allowed to get into HOLD state before being removed from the queue. Pegasus defaults to 3, defaults to None
filesystem_domain (str, optional) – Useful for Condor glide-ins to pin a job to a remote site, defaults to None
stream_error (bool, optional) – boolean to turn on the streaming of the stderr of the remote job back to submit host, defaults to None
stream_output (bool, optional) – boolean to turn on the streaming of the stdout of the remote job back to submit host, defaults to None
priority (str, optional) – integer value to assign the priority of a job. Higher value means higher priority. The priorities are only applied for vanilla / standard/ local universe jobs. Determines the order in which a users own jobs are executed, defaults to None
request_cpus (str, optional) – Number of CPU’s a job requires, defaults to None
request_gpus (str, optional) – Number of GPU’s a job requires, defaults to None
request_memory (str, optional) – Amount of memory a job requires. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None
request_disk (str, optional) – Amount of disk a job requires. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None
requirements (str, optional) – a job requirements expression such as
"(CUDACapability >= 1.2) && $(requirements:True)"
should_transfer_files – Used to define if HTCondor should transfer files to and from the remote machine where the job runs, Given as
"YES"
,"NO"
, or"IF_NEEDED"
when_to_transfer_output (str, optional) – Given as a one of
"ON_EXIT"
,"ON_EXIT_OR_EVICT"
, or"ON_SUCCESS"
condor_collector (str, optional) – Specify the condor collector to use (e.g
"ccg-testing999.isi.edu"
)grid_resource (str, optional,) – Specify a grid resource such as
"batch pbs"
cream_attributes (str, optional) – Additional cream attributes (e.g.
"key1=value1;key2=value2"
)
- Returns:
self
- add_condor_profiles(*, universe: str = None, periodic_release: str = None, periodic_remove: str = None, filesystem_domain: str = None, stream_error: bool = None, stream_output: bool = None, priority: str = None, request_cpus: str = None, request_gpus: str = None, request_memory: str = None, request_disk: str = None, requirements: str = None, should_transfer_files: str = None, when_to_transfer_output: str = None, condor_collector: str = None, grid_resource: str = None, cream_attributes: str = None)
Add Condor profile(s).
The condor profiles permit to add or overwrite instructions in the Condor submit file.
- Parameters:
universe (str, optional) – Pegasus defaults to either globus or scheduler universes. Set to standard for compute jobs that require standard universe. Set to vanilla to run natively in a condor pool, or to run on resources grabbed via condor glidein, defaults to None
periodic_release (str, optional) – is the number of times job is released back to the queue if it goes to HOLD, e.g. due to Globus errors. Pegasus defaults to 3, defaults to None
periodic_remove (str, optional) – is the number of times a job is allowed to get into HOLD state before being removed from the queue. Pegasus defaults to 3, defaults to None
filesystem_domain (str, optional) – Useful for Condor glide-ins to pin a job to a remote site, defaults to None
stream_error (bool, optional) – boolean to turn on the streaming of the stderr of the remote job back to submit host, defaults to None
stream_output (bool, optional) – boolean to turn on the streaming of the stdout of the remote job back to submit host, defaults to None
priority (str, optional) – integer value to assign the priority of a job. Higher value means higher priority. The priorities are only applied for vanilla / standard/ local universe jobs. Determines the order in which a users own jobs are executed, defaults to None
request_cpus (str, optional) – Number of CPU’s a job requires, defaults to None
request_gpus (str, optional) – Number of GPU’s a job requires, defaults to None
request_memory (str, optional) – Amount of memory a job requires. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None
request_disk (str, optional) – Amount of disk a job requires. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None
requirements (str, optional) – a job requirements expression such as
"(CUDACapability >= 1.2) && $(requirements:True)"
should_transfer_files – Used to define if HTCondor should transfer files to and from the remote machine where the job runs, Given as
"YES"
,"NO"
, or"IF_NEEDED"
when_to_transfer_output (str, optional) – Given as a one of
"ON_EXIT"
,"ON_EXIT_OR_EVICT"
, or"ON_SUCCESS"
condor_collector (str, optional) – Specify the condor collector to use (e.g
"ccg-testing999.isi.edu"
)grid_resource (str, optional,) – Specify a grid resource such as
"batch pbs"
cream_attributes (str, optional) – Additional cream attributes (e.g.
"key1=value1;key2=value2"
)
- Returns:
self
- add_dagman_profile(*, pre: str | Path = None, pre_arguments: str = None, post: str = None, post_arguments: str = None, retry: str = None, category: str = None, priority: str = None, abort_dag_on: str = None, max_pre: str = None, max_post: str = None, max_jobs: str = None, max_idle: str = None, post_scope: str = None)[source]
Add Dagman profile(s).
Note, to add the profile keys “post.path.[value of dagman.post]” or [“category-name].maxjobs”, use the following:
job.add_profiles(Namespace.DAGMAN, key="post.path.<value of dagman.post>", value=<value string>) job.add_profiles(Namespace.DAGMAN, key="<category-name>.maxjobs", value=<value string>)
- Parameters:
pre (Union[str, Path], optional) – is the path to the pre-script. DAGMan executes the pre-script before it runs the job, defaults to None
pre_arguments (str, optional) – are command-line arguments for the pre-script, if any, defaults to None
post (str, optional) – is the postscript type/mode that a user wants to associate with a job (see docs for more information), defaults to None
post_arguments (str, optional) – are the command line arguments for the post script, if any, defaults to None
retry (int, optional) – is the number of times DAGMan retries the full job cycle from pre-script through post-script, if failure was detected, defaults to None
category (str, optional) – the DAGMan category the job belongs to, defaults to None
priority (int, optional) – the priority to apply to a job. DAGMan uses this to select what jobs to release when MAXJOBS is enforced for the DAG, defaults to None
abort_dag_on (str, optional) – The ABORT-DAG-ON key word provides a way to abort the entire DAG if a given node returns a specific exit code (AbortExitValue). The syntax for the value of the key is AbortExitValue [RETURN DAGReturnValue] . When a DAG aborts, by default it exits with the node return value that caused the abort. This can be changed by using the optional RETURN key word along with specifying the desired DAGReturnValue, defaults to None
max_pre (str, optional) – sets the maximum number of PRE scripts within the DAG that may be running at one time, defaults to None
max_post (str, optional) – sets the maximum number of POST scripts within the DAG that may be running at one time, defaults to None
max_jobs (str, optional) – sets the maximum number of jobs within the DAG that will be submitted to Condor at one time, defaults to None
max_idle (str, optional) – Sets the maximum number of idle jobs allowed before HTCondor DAGMan stops submitting more jobs. Once idle jobs start to run, HTCondor DAGMan will resume submitting jobs. If the option is omitted, the number of idle jobs is unlimited, defaults to None
post_scope (str, optional) – can be “all”, “none” or “essential” (see docs for more information), defaults to None
- Returns:
self
- add_dagman_profiles(*, pre: str | Path = None, pre_arguments: str = None, post: str = None, post_arguments: str = None, retry: str = None, category: str = None, priority: str = None, abort_dag_on: str = None, max_pre: str = None, max_post: str = None, max_jobs: str = None, max_idle: str = None, post_scope: str = None)
Add Dagman profile(s).
Note, to add the profile keys “post.path.[value of dagman.post]” or [“category-name].maxjobs”, use the following:
job.add_profiles(Namespace.DAGMAN, key="post.path.<value of dagman.post>", value=<value string>) job.add_profiles(Namespace.DAGMAN, key="<category-name>.maxjobs", value=<value string>)
- Parameters:
pre (Union[str, Path], optional) – is the path to the pre-script. DAGMan executes the pre-script before it runs the job, defaults to None
pre_arguments (str, optional) – are command-line arguments for the pre-script, if any, defaults to None
post (str, optional) – is the postscript type/mode that a user wants to associate with a job (see docs for more information), defaults to None
post_arguments (str, optional) – are the command line arguments for the post script, if any, defaults to None
retry (int, optional) – is the number of times DAGMan retries the full job cycle from pre-script through post-script, if failure was detected, defaults to None
category (str, optional) – the DAGMan category the job belongs to, defaults to None
priority (int, optional) – the priority to apply to a job. DAGMan uses this to select what jobs to release when MAXJOBS is enforced for the DAG, defaults to None
abort_dag_on (str, optional) – The ABORT-DAG-ON key word provides a way to abort the entire DAG if a given node returns a specific exit code (AbortExitValue). The syntax for the value of the key is AbortExitValue [RETURN DAGReturnValue] . When a DAG aborts, by default it exits with the node return value that caused the abort. This can be changed by using the optional RETURN key word along with specifying the desired DAGReturnValue, defaults to None
max_pre (str, optional) – sets the maximum number of PRE scripts within the DAG that may be running at one time, defaults to None
max_post (str, optional) – sets the maximum number of POST scripts within the DAG that may be running at one time, defaults to None
max_jobs (str, optional) – sets the maximum number of jobs within the DAG that will be submitted to Condor at one time, defaults to None
max_idle (str, optional) – Sets the maximum number of idle jobs allowed before HTCondor DAGMan stops submitting more jobs. Once idle jobs start to run, HTCondor DAGMan will resume submitting jobs. If the option is omitted, the number of idle jobs is unlimited, defaults to None
post_scope (str, optional) – can be “all”, “none” or “essential” (see docs for more information), defaults to None
- Returns:
self
- add_env(key: str | None = None, value: str | int | float | bool | Path | None = None, **kw)
Add environment variable(s)
- add_globus_profile(*, count: int = None, job_type: str = None, max_cpu_time: int = None, max_memory: str = None, max_time: int = None, max_wall_time: int = None, min_memory: int = None, project: str = None, queue: str = None)[source]
Add Globus profile(s).
The globus profile namespace encapsulates Globus resource specification language (RSL) instructions. The RSL configures settings and behavior of the remote scheduling system.
- Parameters:
count (int, optional) – the number of times an executable is started, defaults to None
job_type (str, optional) – specifies how the job manager should start the remote job. While Pegasus defaults to single, use mpi when running MPI jobs., defaults to None
max_cpu_time (int, optional) – the max CPU time in minutes for a single execution of a job, defaults to None
max_memory (str, optional) – the maximum memory in MB required for the job. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None
max_time (int, optional) – the maximum time or walltime in minutes for a single execution of a job, defaults to None
max_wall_time (int, optional) – the maximum walltime in minutes for a single execution of a job, defaults to None
min_memory (str, optional) – the minumum amount of memory required for this job. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None
project (str, optional) – associates an account with a job at the remote end, defaults to None
queue (str, optional) – the remote queue in which the job should be run. Used when remote scheduler is PBS that supports queues, defaults to None
- Returns:
self
- add_globus_profiles(*, count: int = None, job_type: str = None, max_cpu_time: int = None, max_memory: str = None, max_time: int = None, max_wall_time: int = None, min_memory: int = None, project: str = None, queue: str = None)
Add Globus profile(s).
The globus profile namespace encapsulates Globus resource specification language (RSL) instructions. The RSL configures settings and behavior of the remote scheduling system.
- Parameters:
count (int, optional) – the number of times an executable is started, defaults to None
job_type (str, optional) – specifies how the job manager should start the remote job. While Pegasus defaults to single, use mpi when running MPI jobs., defaults to None
max_cpu_time (int, optional) – the max CPU time in minutes for a single execution of a job, defaults to None
max_memory (str, optional) – the maximum memory in MB required for the job. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None
max_time (int, optional) – the maximum time or walltime in minutes for a single execution of a job, defaults to None
max_wall_time (int, optional) – the maximum walltime in minutes for a single execution of a job, defaults to None
min_memory (str, optional) – the minumum amount of memory required for this job. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None
project (str, optional) – associates an account with a job at the remote end, defaults to None
queue (str, optional) – the remote queue in which the job should be run. Used when remote scheduler is PBS that supports queues, defaults to None
- Returns:
self
- add_pegasus_profile(*, clusters_num: int = None, clusters_size: int = None, job_aggregator: int = None, job_aggregator_arguments: str = None, grid_start: int = None, grid_start_path: str = None, grid_start_arguments: str = None, grid_start_launcher: str = None, grid_start_launcher_arguments: str = None, stagein_clusters: int = None, stagein_local_clusters: int = None, stagein_remote_clusters: int = None, stageout_clusters: int = None, stageout_local_clusters: int = None, stageout_remote_clusters: int = None, group: str = None, change_dir: bool = None, create_dir: bool = None, transfer_proxy: bool = None, style: str = None, pmc_request_memory: str = None, pmc_request_cpus: int = None, pmc_priority: int = None, pmc_task_arguments: str = None, exitcode_failure_msg: str = None, exitcode_success_msg: str = None, checkpoint_time: int = None, max_walltime: int = None, glite_arguments: str = None, auxillary_local: bool = None, condor_arguments_quote: bool = None, runtime: str = None, clusters_max_runtime: int = None, cores: int = None, gpus: int = None, nodes: int = None, ppn: int = None, memory: str = None, diskspace: str = None, data_configuration=None, queue: str = None, project: str = None, boto_config: str = None, container_arguments: str = None, label: str = None, pegasus_lite_env_source: str | Path = None, SSH_PRIVATE_KEY: str = None, relative_submit_dir: str | Path = None)[source]
Add Pegasus profile(s).
- Parameters:
clusters_num (int, optional) – Determines the total number of clusters per level, jobs are evenly spread across clusters (see Pegasus Clustering Guide for more information), defaults to None
clusters_size (int, optional) –
Determines the number of jobs in each cluster (see Pegasus Clustering Guide for more information), defaults to None
job_aggregator (int, optional) – Indicates the clustering executable that is used to run the clustered job on the remote site, defaults to None
job_aggregator_arguments (str, optional) – Additional arguments with which a clustering executable should be invoked, defaults to None
grid_start (int, optional) – Determines the executable for launching a job (see docs for more information), defaults to None
grid_start_path (str, optional) – Sets the path to the gridstart . This profile is best set in the Site Catalog, defaults to None
grid_start_arguments (str, optional) – Sets the arguments with which GridStart is used to launch a job on the remote site, defaults to None
grid_start_launcher (str, optional) – specifies the path to the executable to launch kickstart, defaults to None,
grid_start_launcher_arguments (str, optional) – the arguments to the launcher executable if any
stagein_clusters (int, optional) – This key determines the maximum number of stage-in jobs that are can executed locally or remotely per compute site per workflow. This is used to configure the BalancedCluster Transfer Refiner, which is the Default Refiner used in Pegasus. This profile is best set in the Site Catalog or in the Properties file, defaults to None
stagein_local_clusters (int, optional) – This key provides finer grained control in determining the number of stage-in jobs that are executed locally and are responsible for staging data to a particular remote site. This profile is best set in the Site Catalog or in the Properties file, defaults to None
stagein_remote_clusters (int, optional) – This key provides finer grained control in determining the number of stage-in jobs that are executed remotely on the remote site and are responsible for staging data to it. This profile is best set in the Site Catalog or in the Properties file, defaults to None
stageout_clusters (int, optional) – This key determines the maximum number of stage-out jobs that are can executed locally or remotely per compute site per workflow. This is used to configure the BalancedCluster Transfer Refiner, , which is the Default Refiner used in Pegasus, defaults to None
stageout_local_clusters (int, optional) – This key provides finer grained control in determining the number of stage-out jobs that are executed locally and are responsible for staging data from a particular remote site. This profile is best set in the Site Catalog or in the Properties file, defaults to None
stageout_remote_clusters (int, optional) – This key provides finer grained control in determining the number of stage-out jobs that are executed remotely on the remote site and are responsible for staging data from it. This profile is best set in the Site Catalog or in the Properties file, defaults to None
group (str, optional) – Tags a job with an arbitrary group identifier. The group site selector makes use of the tag, defaults to None
change_dir (bool, optional) – If true, tells kickstart to change into the remote working directory. Kickstart itself is executed in whichever directory the remote scheduling system chose for the job, defaults to None
create_dir (bool, optional) – If true, tells kickstart to create the the remote working directory before changing into the remote working directory. Kickstart itself is executed in whichever directory the remote scheduling system chose for the job, defaults to None
transfer_proxy (bool, optional) – If true, tells Pegasus to explicitly transfer the proxy for transfer jobs to the remote site. This is useful, when you want to use a full proxy at the remote end, instead of the limited proxy that is transferred by CondorG, defaults to None
style (str, optional) – Sets the condor submit file style. If set to globus, submit file generated refers to CondorG job submissions. If set to condor, submit file generated refers to direct Condor submission to the local Condor pool. It applies for glidein, where nodes from remote grid sites are glided into the local condor pool. The default style that is applied is globus, defaults to None
pmc_request_memory (str, optional) – This key is used to set the -m option for pegasus-mpi-cluster. It specifies the amount of memory in MB that a job requires. This profile is usually set in the DAX for each job. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None
pmc_request_cpus (int, optional) – This key is used to set the -c option for pegasus-mpi-cluster. It specifies the number of cpu’s that a job requires. This profile is usually set in the DAX for each job, defaults to None
pmc_priority (int, optional) – This key is used to set the -p option for pegasus-mpi-cluster. It specifies the priority for a job . This profile is usually set in the DAX for each job. Negative values are allowed for priorities, defaults to None
pmc_task_arguments (str, optional) – The key is used to pass any extra arguments to the PMC task during the planning time. They are added to the very end of the argument string constructed for the task in the PMC file. Hence, allows for overriding of any argument constructed by the planner for any particular task in the PMC job, defaults to None
exitcode_failure_msg (str, optional) – The message string that pegasus-exitcode searches for in the stdout and stderr of the job to flag failures, defaults to None
exitcode_success_msg (str, optional) – The message string that pegasus-exitcode searches for in the stdout and stderr of the job to determine whether a job logged it’s success message or not. Note this value is used to check for whether a job failed or not i.e if this profile is specified, and pegasus-exitcode DOES NOT find the string in the job stdout or stderr, the job is flagged as failed. The complete rules for determining failure are described in the man page for pegasus-exitcode, defaults to None
checkpoint_time (int, optional) – the expected time in minutes for a job after which it should be sent a TERM signal to generate a job checkpoint file, defaults to None
max_walltime (int, optional) – the maximum walltime in minutes for a single execution of a job, defaults to None
glite_arguments (str, optional) – specifies the extra arguments that must appear in the local PBS generated script for a job, when running workflows on a local cluster with submissions through Glite. This is useful when you want to pass through special options to underlying LRMS such as PBS e.g. you can set value -l walltime=01:23:45 -l nodes=2 to specify your job’s resource requirements, defaults to None
auxillary_local (bool, optional) – indicates whether auxillary jobs associated with a compute site X, can be run on local site. This CAN ONLY be specified as a profile in the site catalog and should be set when the compute site filesystem is accessible locally on the submit host, defaults to None
condor_arguments_quote (bool, optional) – indicates whether condor quoting rules should be applied for writing out the arguments key in the condor submit file. By default it is true unless the job is schedule to a glite style site. The value is automatically set to false for glite style sites, as condor quoting is broken in batch_gahp, defaults to None
runtime (str, optional) – Specifies the expected runtime of a job in seconds, defaults to None
clusters_max_runtime (int, optional) – Specifies the maximum runtime of a job, defaults to None
cores (int, optional) – The total number of cores required for a job. This is also used for accounting purposes in the database while generating statistics. It corresponds to the multiplier_factor in the job_instance table, defaults to None
gpus (int, optional) – The total number of gpus required for a job, defaults to None
nodes (int, optional) – Indicates the number of nodes a job requires, defaults to None
ppn (int, optional) – Indicates the number of processors per node. This profile is best set in the Site Catalog and usually set when running workflows with MPI jobs, defaults to None
memory (str, optional) – Indicates the maximum memory a job requires. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None
diskspace (int, optional) – Indicates the maximum diskspace a job requires in MB. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None
data_configuration (str, optional) – Indicates the data configuration setup. Can be one of “sharedfs”, “condorio”, or “nonsharedfs”, defaults to None
queue (str, optional) – This specifies the queue for the job. (e.g.
"normal"
)project (str, optional) – Causes the job time to be charged to or associated with a particular project/account. This is not used for SGE.
boto_config (str, optional) – Specified which
.boto
file to use (e.g."/home/myuser/.boto"
)container_arguments (str, optional) – additional cli arguments that will be added to the
docker container run
orsingularity exec
commandlabel (str, optional) – associate a label to a job; used for label based clustering
pegasus_lite_env_source (Union[str, Path], optional) – specify a path on the submit host to indicate the file that needs to be sourced, defaults to None
SSH_PRIVATE_KEY (str, optional) – path to the ssh private key which will be used in this workflow (it is recommended that a special set of keys are created specifically for workflow use)
relative_submit_dir (Union[str, Path], optional) – specify the relative directory a job’s submit files are written to, defaults to None
- add_pegasus_profiles(*, clusters_num: int = None, clusters_size: int = None, job_aggregator: int = None, job_aggregator_arguments: str = None, grid_start: int = None, grid_start_path: str = None, grid_start_arguments: str = None, grid_start_launcher: str = None, grid_start_launcher_arguments: str = None, stagein_clusters: int = None, stagein_local_clusters: int = None, stagein_remote_clusters: int = None, stageout_clusters: int = None, stageout_local_clusters: int = None, stageout_remote_clusters: int = None, group: str = None, change_dir: bool = None, create_dir: bool = None, transfer_proxy: bool = None, style: str = None, pmc_request_memory: str = None, pmc_request_cpus: int = None, pmc_priority: int = None, pmc_task_arguments: str = None, exitcode_failure_msg: str = None, exitcode_success_msg: str = None, checkpoint_time: int = None, max_walltime: int = None, glite_arguments: str = None, auxillary_local: bool = None, condor_arguments_quote: bool = None, runtime: str = None, clusters_max_runtime: int = None, cores: int = None, gpus: int = None, nodes: int = None, ppn: int = None, memory: str = None, diskspace: str = None, data_configuration=None, queue: str = None, project: str = None, boto_config: str = None, container_arguments: str = None, label: str = None, pegasus_lite_env_source: str | Path = None, SSH_PRIVATE_KEY: str = None, relative_submit_dir: str | Path = None)
Add Pegasus profile(s).
- Parameters:
clusters_num (int, optional) –
Determines the total number of clusters per level, jobs are evenly spread across clusters (see Pegasus Clustering Guide for more information), defaults to None
clusters_size (int, optional) –
Determines the number of jobs in each cluster (see Pegasus Clustering Guide for more information), defaults to None
job_aggregator (int, optional) – Indicates the clustering executable that is used to run the clustered job on the remote site, defaults to None
job_aggregator_arguments (str, optional) – Additional arguments with which a clustering executable should be invoked, defaults to None
grid_start (int, optional) – Determines the executable for launching a job (see docs for more information), defaults to None
grid_start_path (str, optional) – Sets the path to the gridstart . This profile is best set in the Site Catalog, defaults to None
grid_start_arguments (str, optional) – Sets the arguments with which GridStart is used to launch a job on the remote site, defaults to None
grid_start_launcher (str, optional) – specifies the path to the executable to launch kickstart, defaults to None,
grid_start_launcher_arguments (str, optional) – the arguments to the launcher executable if any
stagein_clusters (int, optional) – This key determines the maximum number of stage-in jobs that are can executed locally or remotely per compute site per workflow. This is used to configure the BalancedCluster Transfer Refiner, which is the Default Refiner used in Pegasus. This profile is best set in the Site Catalog or in the Properties file, defaults to None
stagein_local_clusters (int, optional) – This key provides finer grained control in determining the number of stage-in jobs that are executed locally and are responsible for staging data to a particular remote site. This profile is best set in the Site Catalog or in the Properties file, defaults to None
stagein_remote_clusters (int, optional) – This key provides finer grained control in determining the number of stage-in jobs that are executed remotely on the remote site and are responsible for staging data to it. This profile is best set in the Site Catalog or in the Properties file, defaults to None
stageout_clusters (int, optional) – This key determines the maximum number of stage-out jobs that are can executed locally or remotely per compute site per workflow. This is used to configure the BalancedCluster Transfer Refiner, , which is the Default Refiner used in Pegasus, defaults to None
stageout_local_clusters (int, optional) – This key provides finer grained control in determining the number of stage-out jobs that are executed locally and are responsible for staging data from a particular remote site. This profile is best set in the Site Catalog or in the Properties file, defaults to None
stageout_remote_clusters (int, optional) – This key provides finer grained control in determining the number of stage-out jobs that are executed remotely on the remote site and are responsible for staging data from it. This profile is best set in the Site Catalog or in the Properties file, defaults to None
group (str, optional) – Tags a job with an arbitrary group identifier. The group site selector makes use of the tag, defaults to None
change_dir (bool, optional) – If true, tells kickstart to change into the remote working directory. Kickstart itself is executed in whichever directory the remote scheduling system chose for the job, defaults to None
create_dir (bool, optional) – If true, tells kickstart to create the the remote working directory before changing into the remote working directory. Kickstart itself is executed in whichever directory the remote scheduling system chose for the job, defaults to None
transfer_proxy (bool, optional) – If true, tells Pegasus to explicitly transfer the proxy for transfer jobs to the remote site. This is useful, when you want to use a full proxy at the remote end, instead of the limited proxy that is transferred by CondorG, defaults to None
style (str, optional) – Sets the condor submit file style. If set to globus, submit file generated refers to CondorG job submissions. If set to condor, submit file generated refers to direct Condor submission to the local Condor pool. It applies for glidein, where nodes from remote grid sites are glided into the local condor pool. The default style that is applied is globus, defaults to None
pmc_request_memory (str, optional) – This key is used to set the -m option for pegasus-mpi-cluster. It specifies the amount of memory in MB that a job requires. This profile is usually set in the DAX for each job. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None
pmc_request_cpus (int, optional) – This key is used to set the -c option for pegasus-mpi-cluster. It specifies the number of cpu’s that a job requires. This profile is usually set in the DAX for each job, defaults to None
pmc_priority (int, optional) – This key is used to set the -p option for pegasus-mpi-cluster. It specifies the priority for a job . This profile is usually set in the DAX for each job. Negative values are allowed for priorities, defaults to None
pmc_task_arguments (str, optional) – The key is used to pass any extra arguments to the PMC task during the planning time. They are added to the very end of the argument string constructed for the task in the PMC file. Hence, allows for overriding of any argument constructed by the planner for any particular task in the PMC job, defaults to None
exitcode_failure_msg (str, optional) – The message string that pegasus-exitcode searches for in the stdout and stderr of the job to flag failures, defaults to None
exitcode_success_msg (str, optional) – The message string that pegasus-exitcode searches for in the stdout and stderr of the job to determine whether a job logged it’s success message or not. Note this value is used to check for whether a job failed or not i.e if this profile is specified, and pegasus-exitcode DOES NOT find the string in the job stdout or stderr, the job is flagged as failed. The complete rules for determining failure are described in the man page for pegasus-exitcode, defaults to None
checkpoint_time (int, optional) – the expected time in minutes for a job after which it should be sent a TERM signal to generate a job checkpoint file, defaults to None
max_walltime (int, optional) – the maximum walltime in minutes for a single execution of a job, defaults to None
glite_arguments (str, optional) – specifies the extra arguments that must appear in the local PBS generated script for a job, when running workflows on a local cluster with submissions through Glite. This is useful when you want to pass through special options to underlying LRMS such as PBS e.g. you can set value -l walltime=01:23:45 -l nodes=2 to specify your job’s resource requirements, defaults to None
auxillary_local (bool, optional) – indicates whether auxillary jobs associated with a compute site X, can be run on local site. This CAN ONLY be specified as a profile in the site catalog and should be set when the compute site filesystem is accessible locally on the submit host, defaults to None
condor_arguments_quote (bool, optional) – indicates whether condor quoting rules should be applied for writing out the arguments key in the condor submit file. By default it is true unless the job is schedule to a glite style site. The value is automatically set to false for glite style sites, as condor quoting is broken in batch_gahp, defaults to None
runtime (str, optional) – Specifies the expected runtime of a job in seconds, defaults to None
clusters_max_runtime (int, optional) – Specifies the maximum runtime of a job, defaults to None
cores (int, optional) – The total number of cores required for a job. This is also used for accounting purposes in the database while generating statistics. It corresponds to the multiplier_factor in the job_instance table, defaults to None
gpus (int, optional) – The total number of gpus required for a job, defaults to None
nodes (int, optional) – Indicates the number of nodes a job requires, defaults to None
ppn (int, optional) – Indicates the number of processors per node. This profile is best set in the Site Catalog and usually set when running workflows with MPI jobs, defaults to None
memory (str, optional) – Indicates the maximum memory a job requires. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None
diskspace (int, optional) – Indicates the maximum diskspace a job requires in MB. Given as a str formatted as ‘<int> [MB | GB | TB | PB | EB]’, defaults to None
data_configuration (str, optional) – Indicates the data configuration setup. Can be one of “sharedfs”, “condorio”, or “nonsharedfs”, defaults to None
queue (str, optional) – This specifies the queue for the job. (e.g.
"normal"
)project (str, optional) – Causes the job time to be charged to or associated with a particular project/account. This is not used for SGE.
boto_config (str, optional) – Specified which
.boto
file to use (e.g."/home/myuser/.boto"
)container_arguments (str, optional) – additional cli arguments that will be added to the
docker container run
orsingularity exec
commandlabel (str, optional) – associate a label to a job; used for label based clustering
pegasus_lite_env_source (Union[str, Path], optional) – specify a path on the submit host to indicate the file that needs to be sourced, defaults to None
SSH_PRIVATE_KEY (str, optional) – path to the ssh private key which will be used in this workflow (it is recommended that a special set of keys are created specifically for workflow use)
relative_submit_dir (Union[str, Path], optional) – specify the relative directory a job’s submit files are written to, defaults to None
- add_profiles(self, ns: Namespace, key: str | None = None, value: str | int | float | bool | Path | None = None, **kw)[source]
Add a profile.
If key and value are given, then
**kw
are ignored and{Namespace::key : value}
is added. Else**kw
is added. When the value of “key” is not a valid python variable name, the usage in Example #1 should be used, else follow the usage shown in Example #2.# Example 1 job.add_profiles(Namespace.DAGMAN, key="pre.arguments", value="-i f1") # Example 2 job.add_profiles(Namespace.ENV, JAVA_HOME="/usr/bin/java", USER="ryan")
For
add_globus_profiles()
,add_condor_profiles()
,add_dagman_profiles()
,add_selector_profiles()
, andadd_pegasus_profiles()
, if a profile key you are trying to use is not listed as a key word argument, use this function to add the profile.- Raises:
TypeError – namespace must be one of Namespace
- Returns:
self
- add_selector_profile(*, execution_site: str = None, pfn: str | Path = None, grid_job_type: str = None)[source]
Add Selector(s).
The Selector namespace allows users to override the beahvior of the Workflow Mapper during site selection. This gives you finer grained control over where a job executes and what executable it refers to.
- Parameters:
execution_site (str, optional) – the execution site where a job should be executed, defaults to None
pfn (Union[str, Path], optional) – the physical file name to the main executable that a job refers to. Overrides any entries specified in the transformation catalog, defaults to None
grid_job_type (str, optional) – This profile is usually used to ensure that a compute job executes on another job manager (see docs for more information), defaults to None
- Returns:
self
- add_selector_profiles(*, execution_site: str = None, pfn: str | Path = None, grid_job_type: str = None)
Add Selector(s).
The Selector namespace allows users to override the beahvior of the Workflow Mapper during site selection. This gives you finer grained control over where a job executes and what executable it refers to.
- Parameters:
execution_site (str, optional) – the execution site where a job should be executed, defaults to None
pfn (Union[str, Path], optional) – the physical file name to the main executable that a job refers to. Overrides any entries specified in the transformation catalog, defaults to None
grid_job_type (str, optional) – This profile is usually used to ensure that a compute job executes on another job manager (see docs for more information), defaults to None
- Returns:
self
1.1.1.4. Pegasus.api.properties module
- class Properties[source]
Bases:
object
Write Pegasus properties to a file.
# Example props = Properties() props["globus.maxtime"] = 900 props["globus.maxwalltime"] = 1000 props["dagman.retry"] = 4 props.write()
- add_site_profile(site, namespace, key, value)[source]
Maps a site profile to a property that can be picked up by the planner, to override site catalog entries loaded from the Site Catalog.
- Parameters:
site – the site
namespace – the namespace for which profile has to be added
key – the profile key
value – the profile value
- Returns:
- static ls(prop: str | None = None)[source]
List property keys. Refer to Configuration docs for additional information. If
prop
is given, all properties containing prop will be printed, else all properties will be printed.# Example >>> P.ls("request") condor.request_cpus condor.request_disk condor.request_gpus condor.request_memory pegasus.pmc_request_cpus pegasus.pmc_request_memory
- Parameters:
prop (Optional[str]) – properties containing “prop” will be listed in alphabetical order, defaults to None
- write(file: str | TextIO | None = None)[source]
Write these properties to a file. If
file
is not given, these properties are written to./pegasus.properties
# Example 1 props.write() # Example 2 with open("conf", "w") as f: props.write(f)
- Parameters:
file (Optional[Union[str, TextIO]]) – file path or file object where properties will be written to, defaults to None
- Raises:
TypeError – file must be of type str or file like object
1.1.1.5. Pegasus.api.replica_catalog module
- class File(lfn: str, size: int | None = None, for_planning: bool | None = False)[source]
Bases:
MetadataMixin
A workflow File. This class is used to represent the inputs and outputs of a
Job
.# Example input_file = File("data.txt").add_metadata(creator="ryan")
- Parameters:
lfn (str) – a unique logical filename
size (int) – size in bytes, defaults to None
for_planning (bool) – indicate that a file is to be used for planning purposes
- class ReplicaCatalog[source]
Bases:
Writable
Maintains a mapping of logical filenames to physical filenames. Any input files to the workflow are specified here so that Pegasus knows where to obtain them.
# Example if1 = File("if") if2 = File("if2") rc = ReplicaCatalog()\ .add_replica("local", if1, "/nfs/u2/ryan/data.csv")\ .add_replica("local", "if2", "/nfs/u2/ryan/data2.csv")\ .write()
- add_regex_replica(self, site: str, pattern: str, pfn: str | Path, metadata: Dict[str, int | str | float] | None = None)[source]
Add an entry to this replica catalog using a regular expression pattern. Note that regular expressions should follow Java regular expression syntax as the underlying code that handles this catalog is Java based.
# Example 1: Match f<any-character>a i.e. faa, f.a, f0a, etc. rc.add_regex_replica("local", "f.a", "/Volumes/data/input/f.a") # Example 2: Using groupings rc.add_regex_replica("local", "alpha\.(csv|txt|xml)", "/Volumes/data/input/[1]/[0]") # If the file being looked up is alpha.csv, the pfn for the file will be # generated as /Volumes/data/input/csv/alpha.csv # Example 3: Specifying a default location for all lfns that don't match any # regular expressions. Note that this should be the last entry into the replica # catalog if used. rc.add_regex_replica("local", ".*", Path("/Volumes/data") / "input/[0]")
- Parameters:
site (str) – the site at which this replica (file) resides
pattern (str) – regular expression used to match a file
pfn (Union[str, Path]) – path to the file (may also be a pattern as shown in the example above)
metadata (Optional[Dict[str, Union[int, str, float]]]) – any metadata to be associated with the matched files, for example:
{"creator": "pegasus"}
, defaults to None
- Raises:
DuplicateError – Duplicate patterns with different PFNs are currently not supported
- add_replica(self, site: str, lfn: str | File, pfn: str | Path, checksum: Dict[str, str] | None = None, metadata: Optiona[Dict[str, int | str | float]] = None)[source]
Add an entry to this replica catalog.
# Example 1 f = File("in.txt").add_metadata(creator="pegasus") rc.add_replica("local", f, Path(".").resolve() / "in.txt") # Example 2: Adding metadata and a checksum rc.add_replica( "local", "in.txt", "/home/ryan/wf/in.txt", checksum={"sha256": "abc123"}, metadata={"creator": "pegasus"} ) # Example 3: Adding multiple pfns for the same lfn (metadata and checksum will be # updated for that lfn if given. rc.add_replica("local", "in.txt", Path(".").resolve() / "in.txt") rc.add_replica("condorpool", "in.txt", "/path/to/file/in.txt")
- Parameters:
site (str) – the site at which this replica (file) resides
lfn (Union[str, File]) – logical file name
pfn (Union[str, Path]) – physical file name such as
Path("f.txt").resolve()
,/home/ryan/file.txt
, orhttp://pegasus.isi.edu/file.txt
checksum (Optional[Dict[str, str]], optional) – Dict containing checksums for this file. Currently only sha256 is given. This should be entered as
{"sha256": <value>}
, defaults to Nonemetadata (Optional[Dict[str, Union[int, str, float]]], optional) – metadata key value pairs associated with this lfn such as
{"created": "Thu Jun 18 22:18:36 PDT 2020", "owner": "pegasus"}
, defaults to None
- Raises:
ValueError – if pfn is given as a
pathlib.Path
, it must be an absolute pathValueError – an unsupported checksum type was given
1.1.1.6. Pegasus.api.site_catalog module
- class Arch(value)[source]
Bases:
Enum
Architecture types
- AARCH64 = 'aarch64'
- AMD64 = 'amd64'
- IA64 = 'ia64'
- PPC = 'ppc'
- PPC64LE = 'ppc64le'
- PPC_64 = 'ppc_64'
- SPARCV7 = 'sparcv7'
- SPARCV9 = 'sparcv9'
- X86 = 'x86'
- X86_64 = 'x86_64'
- class Directory(directory_type: _DirectoryType, path: str | Path, shared_file_system: bool = False)[source]
Bases:
object
Information about filesystems Pegasus can use for storing temporary and long-term files.
- Parameters:
directory_type (_DirectoryType) – directory type defined in
DirectoryType
(e.g.Directory.SHARED_SCRATCH
orDirectory.LOCAL_STORAGE
)path (Union[str, Path]) – directory path
shared_file_system (bool) – indicate whether the shared scratch space is accessible to the worker nodes via a shared filesystem, defaults to False
- Raises:
ValueError – directory_type must be one of
DirectoryType
ValueError – path must be given as an absolute path
- LOCAL_SCRATCH = 'localScratch'
Describes the scratch file systems available locally on a compute node.
- LOCAL_STORAGE = 'localStorage'
Describes a long term storage file system. This is the directory Pegasus will stage output files to.
- SHARED_SCRATCH = 'sharedScratch'
Describes a scratch file systems. Pegasus will use this to store intermediate data between jobs and other temporary files.
- SHARED_STORAGE = 'sharedStorage'
- add_file_servers(self, *file_servers: FileServer)[source]
Add one or more access methods to this directory
- Parameters:
file_server – a
FileServer
- Raises:
ValueError – file_server must be of type
FileServer
- Returns:
self
- class FileServer(url: str, operation_type: Operation)[source]
Bases:
ProfileMixin
Describes the fileserver to access data from outside
- Parameters:
url (str) – url including protocol such as
scp://obelix.isi.edu/data
operation_type (OperationType) – operation type defined in
OperationType
(e.g.Operation.ALL
)
- Raises:
ValueError – operation_type must be one defined in
OperationType
- class Grid(grid_type: _GridType, contact: str, scheduler_type: Scheduler, job_type: SupportedJobs | None = None)[source]
Bases:
object
Each site supports various (usually two) job managers
- Parameters:
grid_type (_GridType) – a grid type defined in
Grid
(e.g.Grid.SLURM
)contact (str) – endpoint such as
"workflow.isi.edu"
scheduler_type (Scheduler) – a scheduler type defined in
Scheduler
(e.g.Scheduler.SLURM
)job_type (Optional[SupportedJobs], optional) – a job type defined in
SupportedJobs
(e.g.SupportedJobs.COMPUTE
), defaults to None
- Raises:
- BATCH = 'batch'
- CONDOR = 'condor'
- CREAM = 'cream'
- DELTACLOUD = 'deltacloud'
- EC2 = 'ec2'
- GT5 = 'gt5'
- NORDUGRID = 'nordugrid'
- UNICORE = 'unicore'
- class OS(value)[source]
Bases:
Enum
Operating system types
- AIX = 'aix'
- LINUX = 'linux'
- MACOSX = 'macosx'
- SUNOS = 'sunos'
- WINDOWS = 'windows'
- class Operation(value)[source]
Bases:
Enum
Different types of operations supported by a file server
- ALL = 'all'
- GET = 'get'
- PUT = 'put'
- class Scheduler(value)[source]
Bases:
Enum
Different scheduler types on the Grid
- CONDOR = 'condor'
- FORK = 'fork'
- LSF = 'lsf'
- PBS = 'pbs'
- SGE = 'sge'
- SLURM = 'slurm'
- UNKNOWN = 'unknown'
- class Site(name: str, arch: Arch | None = None, os_type: OS | None = None, os_release: str | None = None, os_version: str | None = None)[source]
Bases:
ProfileMixin
A compute resource (which is often a cluster) that we intend to run the workflow upon. A site is a homogeneous part of a cluster that has at least a single GRAM gatekeeper with a jobmanager-fork and jobmanager-<scheduler> interface and at least one gridftp server along with a shared file system. The GRAM gatekeeper can be either WS GRAM or Pre-WS GRAM. A site can also be a condor pool or glidein pool with a shared file system.
# Example site = Site(LOCAL, arch=Arch.X86_64, os_type=OS.LINUX, os_release="rhel", os_version="7")\ .add_directories( Directory(Directory.SHARED_SCRATCH, shared_scratch_dir) .add_file_servers(FileServer("file://" + shared_scratch_dir, Operation.ALL)), Directory(Directory.LOCAL_STORAGE, local_storage_dir) .add_file_servers(FileServer("file://" + local_storage_dir, Operation.ALL)) )
- Parameters:
name (str) – name of the site
arch (Optional[Arch]) – the site’s architecture (e.g.
Arch.X86_64
), defaults to Noneos_type (Optional[OS], optional) – the site’s operating system (e.g.
OS.LINUX
), defaults to Noneos_release (Optional[str]) – the release of the site’s operating system, defaults to None
os_version (Optional[str]) – the version of the site’s operating system, defaults to None
- Raises:
- class SiteCatalog[source]
Bases:
Writable
The SiteCatalog describes the compute resources, or
Site
s that we intend to run the workflow upon.sc = SiteCatalog() WORK_DIR = Path(".").resolve() shared_scratch_dir = WORK_DIR / RUN_ID local_storage_dir = WORK_DIR / "outputs" / RUN_ID local = Site("local")\ .add_directories( Directory(Directory.SHARED_SCRATCH, shared_scratch_dir) .add_file_servers(FileServer("file://" + shared_scratch_dir, Operation.ALL)), Directory(Directory.LOCAL_STORAGE, local_storage_dir) .add_file_servers(FileServer("file://" + local_storage_dir, Operation.ALL)) ) condorpool = Site("condorpool")\ .add_pegasus_profile(style="condor")\ .add_pegasus_profile(auxillary_local="true")\ .add_condor_profile(universe="vanilla") sc.add_sites(local, condorpool) sc.write()
- add_sites(self, *sites: Site)[source]
Add one or more sites to this catalog
- Parameters:
site – the site to be added
- Raises:
TypeError – site must be of type
Site
DuplicateError – a site with the same name already exists in this catalog
- Returns:
self
1.1.1.7. Pegasus.api.transformation_catalog module
- class Container(name: str, container_type: _ContainerType, image: str, arguments: str | None = None, mounts: List[str] | None = None, image_site: str | None = None, checksum: Dict[str, str] | None = None, metadata: Dict[str, int | str | float] | None = None, bypass_staging: bool = False)[source]
Bases:
ProfileMixin
Describes a container that can be added to the
TransformationCatalog
. Note that thechecksum
parameter refers to the checksum of the tar file of the image and not the specific version of an image (digest/content-addressable identifer in the case of Docker).# Example 1: Docker centos_pegasus = Container( "centos-pegasus", Container.DOCKER, "docker:///ryan/centos-pegasus:latest", arguments="--shm-size", mounts=["/Volumes/Work/lfs1:/shared-data/:ro"] ) # Example 2: Singularity fb_nlp = Container( "fb-nlp", Container.SINGULARITY, image="library://papajim/default/fb-nlp", mounts=["/data:/mnt:ro"] )
- Parameters:
name (str) – name of this container
container_type (_ContainerType) – a container type defined in
Container
image (str) – image, such as
docker:///rynge/montage:latest
arguments (Optional[str]) – additional cli arguments to be added to the
docker container run
orsingularity exec
commands when starting this containermounts (Optional[List[str]]) – list of mount strings such as
['/Volumes/Work/lfs1:/shared-data/:ro', ...]
image_site (Optional[str]) – optional site attribute to tell pegasus which site tar file exists, defaults to None
checksum (Optional[Dict[str, str]]) – Dict containing checksums for the tar file of this image. Currently only sha256 is supported. This should be entered as
{"sha256": <value>}
, defaults to Nonemetadata (Optional[Dict[str, Union[int, str, float]]]) – Dict containing metadata key, value pairs associated with this container, defaults to None
bypass_staging (bool, optional) – whether or not to bypass the stage in job for this container, defaults to False
- Raises:
ValueError – container_type must be one of
_ContainerType
(Container.DOCKER
|Container.SINGULARITY
|Container.SHIFTER
)
- DOCKER = 'docker'
- SHIFTER = 'shifter'
- SINGULARITY = 'singularity'
- class Transformation(name: str, namespace: str | None = None, version: str | None = None, site: str | None = None, pfn: str | Path | None = None, is_stageable: bool = False, bypass_staging: bool = False, arch: Arch | None = None, os_type: OS | None = None, os_release: str | None = None, os_version: str | None = None, container: Container | str | None = None, checksum: Dict[str, str] | None = None)[source]
Bases:
ProfileMixin
,HookMixin
,MetadataMixin
A transformation, which can be a standalone executable, or one that requires other executables.
When a transformation resides on a single site, the syntax in Example 1 can be used where the args: site and pfn are provided to the constructor. If site and pfn are specified, then the args: is_stageable, bypass_staging, arch, os_type, os_release, os_version, and container, are applied to the site, else they are ignored. When a transformation resides multiple sites, the syntax in Example 2 can be used where multiple TransformationSite objects can be added. Note that when specifying a checksum such as
{"sha256": <value>}
, this only applies stageable executables.# Example 1: transformation that resides on a single site preprocess = Transformation( "preprocess", namespace="pegasus", version="4.0", site="condorpool", pfn="/usr/bin/pegasus-keg", is_stageable=False, bypass_staging=False, arch=Arch.X86_64, os_type=OS.LINUX, container=centos_pegasus ) # Example 2: transformation that resides on multiple sites preprocess = Transformation("preprocess", namespace="pegasus", version="4.0")\ .add_sites( TransformationSite( "condorpool", "/usr/bin/pegasus-keg", is_stageable=False, arch=Arch.X86_64, os_type=OS.LINUX, container="centos-pegasus" ), ... )
- Parameters:
name (str) – the logical name of the transformation
namespace (Optional[str]) – the namespace that this transformation belongs to, defaults to None
version (Optional[str]) – the version of this transformation (e.g.
"1.1"
), defaults to Nonesite (Optional[str]) – a
Site
specified in theSiteCatalog
on which this transformation resides, defaults to Nonepfn (Optional[Union[str, Path]]) – the physical filename of this transformation (e.g.
"/usr/bin/tar"
), defaults to Noneis_stageable – whether or not this transformation is stageable or installed, defaults to False
bypass_staging (bool, optional) – whether or not to bypass the stage in job of this executable (Note that this only works for transformations where
is_stageable=True
), defaults to Falsearch (Optional[Arch]) – architecture that this transformation was compiled for (defined in
Arch
, e.gArch.X86_64
), defaults to Noneos_type (Optional[OS]) – name of os that this transformation was compiled for (defined in
OS
, e.g.OS.LINUX
), defaults to Noneos_release (Optional[str]) – release of os that this transformation was compiled for, defaults to None
os_version (Optional[str]) – version of os that this transformation was compiled for, defaults to None
container (Optional[Union[Container, str]]) – a
Container
or name of the container to be used for this transformation, defaults to Nonechecksum (Optional[Dict[str, str]]) – Dict containing checksums for this file. Currently only sha256 is given. This should be entered as
{"sha256": <value>}
, defaults to None
- Raises:
TypeError – container must be of type
Container
or strTypeError – arch must be one of
Arch
TypeError – os_type must be one of
OS
ValueError – fields: namespace, name, and field must not contain any
:
(colons)ValueError – pfn must be given as an absolute path when
pathlib.Path
is usedValueError –
bypass_staging=True
can only be used whenis_stageable=True
- add_requirement(self, required_transformation: str | Transformation, namespace: str = None, version: str = None)[source]
Add a requirement to this transformation. Specify the other transformation, identified by name, namespace, and version, that this transformation depends upon. If a
Transformation
is passed in forrequired_transformation
, then namespace and version are ignored.- Parameters:
required_transformation (str or Transformation) –
Transformation
that this transformation requires- Raises:
DuplicateError – this requirement already exists
ValueError –
required_transformation
must be of typeTransformation
or strValueError – namespace, required transformation name, and version cannot contain any
:
(colon) charactersTypeError – required_transformation must be one of type str or
Transformation
- Returns:
self
- add_sites(self, *transformation_sites: TransformationSite)[source]
Add one or more
TransformationSite
s to this transformation.- Parameters:
transformation_sites – the
TransformationSite
(s) to be added- Raises:
TypeError – argument(s) must be of type
TransformationSite
DuplicateError – a
TransformationSite
with the same name as the one you are attempting to add already exists
- Returns:
self
- class TransformationCatalog[source]
Bases:
Writable
Maintains a list a
Transformation
s and :py:class:~`Pegasus.api.transformation_catalog.Container` s# Example centos_pegasus = Container( "centos-pegasus", Container.DOCKER, "docker:///ryan/centos-pegasus:latest", mounts=["/Volumes/Work/lfs1:/shared-data/:ro"] ) preprocess = Transformation( "preprocess", site="condorpool", pfn="/usr/bin/pegasus-keg", is_stageable=False, arch=Arch.X86_64, os_type=OS.LINUX, container=centos_pegasus ) findrange = Transformation( "findrange", site="condorpool", pfn="/usr/bin/pegasus-keg", is_stageable=False, arch=Arch.X86_64, os_type=OS.LINUX, container=centos_pegasus ) analyze = Transformation( "analyze", site="condorpool", pfn="/usr/bin/pegasus-keg", is_stageable=False, arch=Arch.X86_64, os_type=OS.LINUX, container=centos_pegasus ) tc = TransformationCatalog()\ .add_containers(centos_pegasus)\ .add_transformations(preprocess, findrange, analyze)\ .write()
- add_containers(self, *containers: Container)[source]
Add one or more
Container
s to this catalog# Example tc.add_containers( Container( "centos-pegasus", Container.DOCKER, "docker:///ryan/centos-pegasus:latest", mounts=["/Volumes/Work/lfs1:/shared-data/:ro"] ) )
- Parameters:
containers – the
Container
to be added- Raises:
TypeError – argument(s) must be of type
Container
DuplicateError – a container with the same name already exists in this catalog
- Returns:
self
- add_transformations(self, *transformations: Transformation)[source]
Add one or more
Transformation
s to this catalog# Example tc.add_transformations( Transformation( "analyze", site="condorpool", pfn="/usr/bin/pegasus-keg", is_stageable=False, arch=Arch.X86_64, os_type=OS.LINUX, container=centos_pegasus ) )
- Parameters:
transformations – the
Transformation
(s) to be added- Raises:
TypeError – argument(s) must be of type
Transformation
DuplicateError – the given
Transformation
already exists in this catalog
- Returns:
self
- class TransformationSite(name: str, pfn: str | Path, is_stageable: bool = False, bypass_staging: bool = False, arch: Arch | None = None, os_type: OS | None = None, os_release: str | None = None, os_version: str | None = None, container: Container | str | None = None)[source]
Bases:
ProfileMixin
,MetadataMixin
Site specific information about a
Transformation
. ATransformation
must contain at least one transformation site.- Parameters:
name (str) – name of the site at which this
Transformation
residespfn (Union[str, Path]) – physical file name, an absolute path given as a str or Path
is_stageable – whether or not this transformation is stageable or installed, defaults to False
bypass_staging (bool, optional) – whether or not to bypass the stage in job of this executable (Note that this only works for transformations where
is_stageable=True
), defaults to Falsearch (Optional[Arch], optional) – architecture that this
Transformation
was compiled for (defined inArch
), defaults to Noneos_type (Optional[OS], optional) – name of os that this
Transformation
was compiled for (defined inOS
), defaults to Noneos_release (Optional[str], optional) – release of os that this
Transformation
was compiled for, defaults to None, defaults to Noneos_version (Optional[str], optional) – version of os that this
Transformation
was compiled for, defaults to None, defaults to Nonecontainer (Optional[Union[Container, str]], optional) – specify the name of the container or Container object to use, optional
- Raises:
1.1.1.8. Pegasus.api.workflow module
- class AbstractJob(_id: str | None = None, node_label: str | None = None)[source]
Bases:
HookMixin
,ProfileMixin
,MetadataMixin
An abstract representation of a workflow job
- Parameters:
_id (Optional[str]) – a unique id, if None is given then one will be assigned when this job is added to a
Workflow
, defaults to Nonenode_label (Optional[str]) – a short descriptive label that can be assigned to this job, defaults to None
Note: avoid using IDs such as
'0000008'
or'00000009'
as these may end up being unquoted by PyYaml, and consequently misinterpreted as integer values when read in by other tools.- add_args(self, *args: File | int | float | str)[source]
Add arguments to this job. Each argument will be separated by a space. Each argument must be either a File, scalar, or str.
- Parameters:
args (Union[File, int, float, str]) – arguments to pass to this job (each arg in arg will be separated by a space)
- Returns:
self
- add_checkpoint(self, checkpoint_file: File | str, stage_out: bool = True, register_replica: bool = True)[source]
Add an output
File
of this job as a checkpoint file Ifcheckpoint_file
is given as a str, aFile
object is created for you internally with the given value as its lfn.- Parameters:
checkpoint_file (Union[File, str]) – the
File
to be added as a checkpoint file to this jobstage_out (bool, optional) – whether or not to send files back to an output directory, defaults to True
register_replica (bool, optional) – whether or not to register replica with a
ReplicaCatalog
, defaults to True
- Raises:
DuplicateError – all output files must be unique
TypeError – job inputs must be of type
File
or str
- Returns:
self
- add_inputs(self, *input_files: File | str, bypass: bool = False)[source]
Add one or more
File
objects as input to this job. Ifinput_file
is given as a str, aFile
object is created for you internally with the given value as its lfn.- Parameters:
- Raises:
DuplicateError – all input files must be unique
TypeError – job inputs must be of type
File
or str
- Returns:
self
- add_outputs(self, *output_files: File | str, stage_out: bool = True, register_replica: bool = True)[source]
Add one or more
File
objects as outputs to this job.stage_out
andregister_replica
will be applied to all files given. Ifoutput_file
is given as a str, aFile
object is created for you internally with the given value as its lfn.- Parameters:
output_files (Union[File, str]) – the
File
objects to be added as outputs to this jobstage_out (bool, optional) – whether or not to send files back to an output directory, defaults to True
register_replica (bool, optional) – whether or not to register replica with a
ReplicaCatalog
, defaults to True
- Raises:
DuplicateError – all output files must be unique
TypeError – job outputs must be of type
File
or str
- Returns:
self
- get_inputs()[source]
Get this job’s input
File
s- Returns:
all input files associated with this job
- Return type:
set
- get_outputs()[source]
Get this job’s output
File
objects- Returns:
all output files associated with this job
- Return type:
set
- set_stderr(self, file: str | File, stage_out: bool = True, register_replica: bool = True)[source]
Set stderr to a
File
. If file is given as a str, aFile
object is created for you internally with the given value as its lfn.- Parameters:
file (Union[str, File]) – a file that stderr will be written to
stage_out (bool, optional) – whether or not to send files back to an output directory, defaults to True
register_replica (bool, optional) – whether or not to register replica with a
ReplicaCatalog
, defaults to True
- Raises:
TypeError – file must be of type
File
or strDuplicateError – stderr is already set or the given file has already been added as an output to this job
- Returns:
self
- set_stdin(self, file: str | File)[source]
Set stdin to a
File
. If file is given as a str, aFile
object is created for you internally with the given value as its lfn.- Parameters:
file (Union[str, File]) – a file that will be read into stdin
- Raises:
TypeError – file must be of type
File
or strDuplicateError – stdin is already set or the given file has already been added as an input to this job
- Returns:
self
- set_stdout(self, file: str | File, stage_out: bool = True, register_replica: bool = True)[source]
Set stdout to a
File
. If file is given as a str, aFile
object is created for you internally with the given value as its lfn.- Parameters:
file (Union[str, File]) – a file that stdout will be written to
stage_out (bool, optional) – whether or not to send files back to an output directory, defaults to True
register_replica (bool, optional) – whether or not to register replica with a
ReplicaCatalog
, defaults to True
- Raises:
TypeError – file must be of type
File
or strDuplicateError – stdout is already set or the given file has already been added as an output to this job
- Returns:
self
- class Job(transformation: str | Transformation, _id: str | None = None, node_label: str | None = None, namespace: str | None = None, version: str | None = None)[source]
Bases:
AbstractJob
A typical workflow Job that executes a
Transformation
. SeeAbstractJob
for full list of available functions.# Example if1 = File("if1") if2 = File("if2") of1 = File("of1") of2 = File("of2") # Assuming a transformation named "analyze.py" has been added to your # transformation catalog: job = Job("analyze.py")\ .add_args("-i", if1, if2, "-o", of1, of2)\ .add_inputs(if1, if2)\ .add_outputs(of1, of2, stage_out=True, register_replica=False)
- Parameters:
transformation (Union[str, Transformation]) –
Transformation
object or name of the transformation that this job uses_id (Optional[str]) – a unique id; if none is given then one will be assigned when the job is added by a
Workflow
, defaults to Nonenode_label (Optional[str]) – a short descriptive label that can be assigned to this job, defaults to None
namespace (Optional[str]) – namespace to which the
Transformation
belongs, defaults to Noneversion (Optional[str]) – version of the given
Transformation
, defaults to None
- Raises:
TypeError – transformation must be one of type
Transformation
or str
- class SubWorkflow(file: str | File | Workflow, is_planned: bool = False, _id: str | None = None, node_label: str | None = None)[source]
Bases:
AbstractJob
Job that represents a subworkflow. See
AbstractJob
for full list of available functions. SubWorkflow jobs can be created using several different methods. These are outlined below.root_wf = Workflow("root") # "workflow.yml" must be added to the ReplicaCatalog j1 = SubWorkflow(file="workflow.yml", is_planned=False) root_wf.add_jobs(j1) another_wf = Workflow("another-wf") j2 = SubWorkflow(another_wf, _id="j2") root_wf.add_jobs(j2) # Upon invoking root_wf.write() or root_wf.plan(), another_wf will automatically # be serialized to CWD / "another-wf_j2.yml" and added to an inline ReplicaCatalog; # This means that you may define "another_wf" in a separate python script, and # import it here where it would be used. root_wf.write()
- Parameters:
file (Union[str, File, Workflow]) –
File
, the name of the workflow file as astr
, orWorkflow
is_planned (bool) – whether or not this subworkflow has already been planned by the Pegasus planner, defaults to False
_id (Optional[str]) – a unique id; if none is given then one will be assigned when the job is added by a
Workflow
, defaults to Nonenode_label (Optional[str]) – a short descriptive label that can be assigned to this job, defaults to None
- Raises:
TypeError – file must be of type
File
or str
- add_planner_args(self, conf: str | Path | None = None, basename: str | None = None, job_prefix: str | None = None, cluster: List[str] | None = None, sites: List[str] | None = None, output_sites: List[str] | None = None, staging_sites: Dict[str, str] | None = None, cache: List[str | Path] | None = None, input_dirs: List[str] | None = None, output_dir: str | None = None, dir: str | None = None, relative_dir: str | Path | None = None, random_dir: bool | str | Path = False, relative_submit_dir: str | Path | None = None, inherited_rc_files: List[str | Path] | None = None, cleanup: str | None = None, reuse: List[str | Path] | None = None, verbose: int = 0, quiet: int = 0, force: bool = False, force_replan: bool = False, forward: List[str] | None = None, submit: bool = False, json: bool = False, java_options: List[str] | None = None, **properties: Dict[str, str])[source]
Add pegasus-planner arguments. This function can only be used when
is_planned=False
is set inSubWorkflow
and may only be invoked once.- Parameters:
conf (Optional[Union[str, Path]]) – the path to the properties file to use for planning, defaults to None
basename (Optional[str]) – the basename prefix while constructing the per workflow files like .dag etc., defaults to None
job_prefix (Optional[str]) – the prefix to be applied while construction job submit filenames, defaults to None
cluster (Optional[List[str]]) – comma separated list of clustering techniques to be applied to the workflow to cluster jobs in to larger jobs, to avoid scheduling overheads., defaults to None
sites (Optional[List[str]]) – list of execution sites on which to map the workflow, defaults to None
output_sites (Optional[List[str]]) – the output sites where the data products during workflow execution are transferred to, defaults to None
staging_sites (Optional[Dict[str,str]]) – key, value pairs of execution site to staging site mappings such as
{"condorpool": "staging-site"}
, defaults to Nonecache (Optional[List[Union[str, Path]]]) – comma separated list of replica cache files, defaults to None
input_dirs (Optional[List[Union[str, Path]]]) – comma separated list of optional input directories where the input files reside on submit host, defaults to None
output_dir (Optional[Union[str, Path]]) – an optional output directory where the output files should be transferred to on submit host, defaults to None
transformations_dir (Optional[Union[str, Path]]) – an optional directory containing executables used by the workflow, from where to construct transformation catalog entries
dir (Optional[Union[str, Path]]) – the directory where to generate the executable workflow, defaults to None
relative_dir (Optional[Union[str, Path]]) – the relative directory to the base directory where to generate the concrete workflow, defaults to None
random_dir (Union[bool, str, Path], optional) – if set to
True
, a random timestamp based name will be used for the execution directory that is created by the create dir jobs; else if a path is given as astr
orpathlib.Path
, then that will be used as the basename of the directory that is to be created, defaults to Falserelative_submit_dir (Optional[Union[str, Path]]) – the relative submit directory where to generate the concrete workflow. Overrides relative_dir, defaults to None
inherited_rc_files (Optional[List[Union[str, Path]]]) – comma separated list of replica files, defaults to None
cleanup (Optional[str]) – the cleanup strategy to use. Can be a
str
none|inplace|leaf|constraint
, defaults to None (internally, pegasus-plan will use the defaultinplace
ifNone
is given)reuse (Optional[List[Union[str,Path]]]) – list of submit directories of previous runs from which to pick up for reuse (e.g.
["/workflows/submit_dir1", "/workflows/submit_dir2"]
), defaults to Noneverbose (int, optional) – verbosity, defaults to 0
quiet (int) – decreases the verbosity of messages about what is going on, defaults to 0
force (bool, optional) – skip reduction of the workflow, resulting in build style dag, defaults to False
force_replan (bool) – force replanning for sub workflows in case of failure, defaults to False
forward (Optional[List[str]]) – any options that need to be passed ahead to pegasus-run in format option[=value] (e.g.
["nogrid"]
), defaults to Nonesubmit (bool, optional) – submit the executable workflow generated, defaults to False
java_options (Optional[List[str]]) – pass to jvm a non standard option (e.g.
["mx1024m", "ms512m"]
), defaults to None**properties – configuration properties (e.g.
**{"pegasus.mode": "development"}
, which would be passed to pegasus-plan as-Dpegasus.mode=development
). Note that configuration properties set here take precedance over the properties file property with the same key.
- Raises:
TypeError – an invalid type was given for one or more arguments
PegasusError – SubWorkflow.add_planner_args() can only be called by SubWorkflows that have not yet been planned (i.e. SubWorkflow(‘wf_file’, is_planned=False))
PegasusError – SubWorkflow.add_planner_args() can only be invoked once
- Returns:
self
- class Workflow(name: str, infer_dependencies: bool = True)[source]
Bases:
Writable
,HookMixin
,ProfileMixin
,MetadataMixin
Represents multi-step computational steps as a directed acyclic graph.
# Example import logging from pathlib import Path from Pegasus.api import * logging.basicConfig(level=logging.DEBUG) # --- Replicas ----------------------------------------------------------------- with open("f.a", "w") as f: f.write("This is sample input to KEG") fa = File("f.a").add_metadata(creator="ryan") rc = ReplicaCatalog().add_replica("local", fa, Path(".") / "f.a") # --- Transformations ---------------------------------------------------------- preprocess = Transformation( "preprocess", site="condorpool", pfn="/usr/bin/pegasus-keg", is_stageable=False, arch=Arch.X86_64, os_type=OS.LINUX ) findrange = Transformation( "findrange", site="condorpool", pfn="/usr/bin/pegasus-keg", is_stageable=False, arch=Arch.X86_64, os_type=OS.LINUX ) analyze = Transformation( "analyze", site="condorpool", pfn="/usr/bin/pegasus-keg", is_stageable=False, arch=Arch.X86_64, os_type=OS.LINUX ) tc = TransformationCatalog().add_transformations(preprocess, findrange, analyze) # --- Workflow ----------------------------------------------------------------- ''' [f.b1] - (findrange) - [f.c1] / \ [f.a] - (preprocess) (analyze) - [f.d] \ / [f.b2] - (findrange) - [f.c2] ''' wf = Workflow("blackdiamond") fb1 = File("f.b1") fb2 = File("f.b2") job_preprocess = Job(preprocess)\ .add_args("-a", "preprocess", "-T", "3", "-i", fa, "-o", fb1, fb2)\ .add_inputs(fa)\ .add_outputs(fb1, fb2) fc1 = File("f.c1") job_findrange_1 = Job(findrange)\ .add_args("-a", "findrange", "-T", "3", "-i", fb1, "-o", fc1)\ .add_inputs(fb1)\ .add_outputs(fc1) fc2 = File("f.c2") job_findrange_2 = Job(findrange)\ .add_args("-a", "findrange", "-T", "3", "-i", fb2, "-o", fc2)\ .add_inputs(fb2)\ .add_outputs(fc2) fd = File("f.d") job_analyze = Job(analyze)\ .add_args("-a", "analyze", "-T", "3", "-i", fc1, fc2, "-o", fd)\ .add_inputs(fc1, fc2)\ .add_outputs(fd) wf.add_jobs(job_preprocess, job_findrange_1, job_findrange_2, job_analyze) wf.add_replica_catalog(rc) wf.add_transformation_catalog(tc) try: wf.plan(submit=True)\ .wait()\ .analyze()\ .statistics() except PegasusClientError as e: print(e.output)
- Parameters:
name (str) – name of the
Workflow
infer_dependencies (bool, optional) – whether or not to automatically compute job dependencies based on input and output files used by each job, defaults to True
- Raises:
ValueError – workflow name may not contain any / or spaces
- add_dependency(self, job: Job | SubWorkflow, *, parents: List[Job | SubWorkflow] = [], children: List[Job | SubWorkflow] = [])[source]
Add parent, child dependencies for a given job.
- Parameters:
job (AbstractJob) – the job to which parents and children will be assigned
parents (list, optional) – jobs to be added as parents to this job, defaults to []
children (list, optional) – jobs to be added as children of this job, defaults to []
- Raises:
ValueError – the given job(s) do not have ids assigned to them
DuplicateError – a dependency between two jobs already has been added
- Returns:
self
- add_jobs(self, *jobs: Job | SubWorkflow)[source]
Add one or more jobs at a time to the Workflow
- Raises:
DuplicateError – a job with the same id already exists in this workflow
- Returns:
self
- add_replica_catalog(self, rc: ReplicaCatalog)[source]
Add a
ReplicaCatalog
to this workflow. The contents fo the replica catalog will be inlined into the same file as this workflow when it is written.- Parameters:
rc (ReplicaCatalog) – the
ReplicaCatalog
to be added- Raises:
TypeError – rc must be of type
ReplicaCatalog
DuplicateError – a
ReplicaCatalog
has already been added
- Returns:
self
- add_site_catalog(self, sc: SiteCatalog)[source]
Add a
SiteCatalog
to this workflow. The contents fo the site catalog will be inlined into the same file as this workflow when it is written out.- Parameters:
sc (SiteCatalog) – the
SiteCatalog
to be added- Raises:
TypeError – sc must be of type
SiteCatalog
DuplicateError – a
SiteCatalog
has already been added
- Returns:
self
- add_transformation_catalog(self, tc: TransformationCatalog)[source]
Add a
TransformationCatalog
to this workflow. The contents fo the transformation catalog will be inlined into the same file as this workflow when it is written.- Parameters:
tc (TransformationCatalog) – the
TransformationCatalog
to be added- Raises:
TypeError – tc must be of type
TransformationCatalog
DuplicateError – a
TransformationCatalog
has already been added
- Returns:
self
- analyze(self, verbose: int = 0)[source]
Debug a workflow.
- Parameters:
verbose (int, optional) – verbosity, defaults to 0
- Raises:
PegasusClientError – pegasus-analyzer encountered an error
- Returns:
self
- property braindump
Once this workflow has been planned using
plan
, the braindump file can be accessed for information such asuser
,submit_dir
, androot_wf_uuid
. For a full list of available attributes, seeBraindump
.try: wf.plan(submit=True) print(wf.braindump.user) print(wf.braindump.submit_hostname) print(wf.braindump.submit_dir) print(wf.braindump.root_wf_uuid) print(wf.braindump.wf_uuid) except PegasusClientError as e: print(e.output)
- Getter:
returns a
Braindump
object corresponding to the most recent call ofplan
- Return type:
Pegasus.braindump.Braindump
- Raises:
PegasusError –
plan
must be called before accessing the braindump file
- get_job(_id: str)[source]
Retrieve the job with the given id
- Parameters:
_id (str) – id of the job to be retrieved from the Workflow
- Raises:
NotFoundError – a job with the given id does not exist in this workflow
- Returns:
the job with the given id
- Return type:
- get_status(self)[source]
Returns current status information of the workflow as a dict in the following format:
{ "totals": { "unready": <int>, "ready": <int>, "pre": <int>, "queued": <int>, "post": <int>, "succeeded": <int>, "failed": <int>, "percent_done": <float>, "total": <int> }, "dags": { "root": { "unready": <int>, "ready": <int>, "pre": <int>, "queued": <int>, "post": <int>, "succeeded": <int>, "failed": <int>, "percent_done": <float>, "state": <"Running" | "Success" | "Failure">, "dagname": <str> } } }
- Keys are defined as follows
unready
: Jobs blocked by dependenciesready
: Jobs ready for submissionpre
: PRE-Scripts runningqueued
: Submitted jobspost
: POST-Scripts runningsucceeded
: Job completed with successfailed
: Jobs completed with failurepercent_done
: Success percentagestate
: Workflow statedagname
: Name of workflow
- Returns:
current status information
- Return type:
Union[dict, None]
- graph(self, include_files: bool = True, no_simplify: bool = True, label: Literal['label', 'xform', 'id', 'xform-id', 'label-xform', 'label-id'] = 'label', output: str | None = None, remove: List[str] | None = None, width: int | None = None, height: int | None = None)[source]
Convert workflow into a graphviz dot format
- Parameters:
include_files (bool, optional) – include files as nodes, defaults to True
no_simplify (bool, optional) – when set to
False
a transitive reduction is performed to remove extra edges, defaults to Truelabel (Literal["label", "xform", "id", "xform-id", "label-xform", "label-id"], optional) – what attribute to use for labels, defaults to “label”
output (Optional[str], optional) – Write output to a file. If none is given dot output is written to stdout. If output is a file name with any of the following extensions: “jpg”, “jpeg”, “pdf”, “gif”, or “svg”,
dot -T<ext> -o <output>
will be invoked to draw the graph. If any other extension is given, the dot representation of the graph will be output. defaults to Noneremove (Optional[List[str]], optional) – remove one or more nodes by transformation name, defaults to None
width (Optional[int], optional) – width of the digraph, defaults to None
height (Optional[int], optional) – height of the digraph, defaults to None
- Returns:
self
- Raises:
PegasusError – workflow must be written to a file prior to invoking
graph
withwrite
orplan
ValueError – label must be one of
label
,xform
,id
,xform-id
,label-xform
, orlabel-id
- plan(self, conf: str | Path | None = None, basename: str | None = None, job_prefix: str | None = None, cluster: List[str] | None = None, sites: List[str] | None = None, output_sites: List[str] = ['local'], staging_sites: Dict[str, str] | None = None, cache: List[str | Path] | None = None, input_dirs: List[str] | None = None, output_dir: str | None = None, dir: str | None = None, relative_dir: str | Path | None = None, random_dir: bool | str | Path = False, relative_submit_dir: str | Path | None = None, inherited_rc_files: List[str | Path] | None = None, cleanup: str = 'inplace', reuse: List[str | Path] | None = None, verbose: int = 0, quiet: int = 0, force: bool = False, force_replan: bool = False, forward: List[str] | None = None, submit: bool = False, json: bool = False, java_options: List[str] | None = None, **properties: Dict[str, str])[source]
Plan the workflow.
try: # configuration properties properties = {"pegasus.mode": "development} wf.plan(verbose=3, submit=True, **properties) except PegasusClientError as e: print(e.output)
- Parameters:
conf (Optional[Union[str, Path]]) – the path to the properties file to use for planning, defaults to None
basename (Optional[str]) – the basename prefix while constructing the per workflow files like .dag etc., defaults to None
job_prefix (Optional[str]) – the prefix to be applied while construction job submit filenames, defaults to None
cluster (Optional[List[str]]) – comma separated list of clustering techniques to be applied to the workflow to cluster jobs in to larger jobs, to avoid scheduling overheads., defaults to None
sites (Optional[List[str]]) – list of execution sites on which to map the workflow, defaults to None
output_sites (List[str]) – the output sites where the data products during workflow execution are transferred to, defaults to
["local"]
staging_sites (Optional[Dict[str,str]]) – key, value pairs of execution site to staging site mappings such as
{"condorpool": "staging-site"}
, defaults to Nonecache (Optional[List[Union[str, Path]]]) – comma separated list of replica cache files, defaults to None
input_dirs (Optional[List[Union[str, Path]]]) – comma separated list of optional input directories where the input files reside on submit host, defaults to None
output_dir (Optional[Union[str, Path]]) – an optional output directory where the output files should be transferred to on submit host, defaults to None
transformations_dir (Optional[Union[str, Path]]) – an optional directory containing executables used by the workflow, from where to construct transformation catalog entries
dir (Optional[Union[str, Path]]) – the directory where to generate the executable workflow, defaults to None
relative_dir (Optional[Union[str, Path]]) – the relative directory to the base directory where to generate the concrete workflow, defaults to None
random_dir (Union[bool, str, Path], optional) – if set to
True
, a random timestamp based name will be used for the execution directory that is created by the create dir jobs; else if a path is given as astr
orpathlib.Path
, then that will be used as the basename of the directory that is to be created, defaults to Falserelative_submit_dir (Optional[Union[str, Path]]) – the relative submit directory where to generate the concrete workflow. Overrides relative_dir, defaults to None
inherited_rc_files (Optional[List[Union[str, Path]]]) – comma separated list of replica files, defaults to None
cleanup (str, optional) – the cleanup strategy to use. Can be
none|inplace|leaf|constraint
, defaults toinplace
reuse (Optional[List[Union[str,Path]]]) – list of submit directories of previous runs from which to pick up for reuse (e.g.
["/workflows/submit_dir1", "/workflows/submit_dir2"]
), defaults to Noneverbose (int, optional) – verbosity, defaults to 0
quiet (int) – decreases the verbosity of messages about what is going on, defaults to 0
force (bool, optional) – skip reduction of the workflow, resulting in build style dag, defaults to False
force_replan (bool) – force replanning for sub workflows in case of failure, defaults to False
forward (Optional[List[str]]) – any options that need to be passed ahead to pegasus-run in format option[=value] (e.g.
["nogrid"]
), defaults to Nonesubmit (bool, optional) – submit the executable workflow generated, defaults to False
java_options (Optional[List[str]]) – pass to jvm a non standard option (e.g.
["mx1024m", "ms512m"]
), defaults to None**properties – configuration properties (e.g.
**{"pegasus.mode": "development"}
, which would be passed to pegasus-plan as-Dpegasus.mode=development
). Note that configuration properties set here take precedance over the properties file property with the same key.
- Raises:
PegasusClientError – pegasus-plan encountered an error
- Returns:
self
- remove(self, verbose: int = 0)[source]
Removes this workflow that has been planned and submitted.
- Parameters:
verbose (int, optional) – verbosity, defaults to 0
- Raises:
PegasusClientError – pegasus-remove encountered an error
- Returns:
self
- run(self, verbose: int = 0, json: bool = False, grid: bool = False)[source]
Run the planned workflow.
- Parameters:
verbose (int, optional) – verbosity, defaults to 0
grid (bool, optional) – enable checking for grids, defaults to False
- Raises:
PegasusClientError – pegasus-run encountered an error
- Returns:
self
- property run_output
Get the json output from pegasus-run after it has been called.
try: wf.plan() wf.run() print(wf.run_output) except PegasusClientError as e: print(e.output)
- Raises:
PegasusError –
run
must be called prior to accessing run_output- Returns:
output of pegasus-run
- Return type:
dict
- statistics(self, verbose: int = 0)[source]
Generate statistics about the workflow run.
- Parameters:
verbose (int, optional) – verbosity, defaults to 0
- Raises:
PegasusClientError – pegasus-statistics encountered an error
- Returns:
self
- status(self, long: bool = False, verbose: int = 0)[source]
Monitor the workflow by quering Condor and directories.
- Parameters:
long (bool, optional) – Show all DAG states, including sub-DAGs, default only totals. defaults to False
verbose (int, optional) – verbosity, defaults to False
- Raises:
PegasusClientError – pegasus-status encountered an error
- Returns:
self
- wait(self, delay: int = 5)[source]
Displays progress bar to stdout and blocks until the workflow either completes or fails.
- Parameters:
delay (int, optional) – refresh rate in seconds of the progress bar, defaults to 5
- Raises:
PegasusClientError – pegasus-status encountered an error
- Returns:
self
- write(self, file: str | TextIO | None = None, _format: str = 'yml')[source]
Write this workflow to a file. If no file is given, it will written to workflow.yml
- Parameters:
file (Optional[Union[str, TextIO]]) – path or file object (opened in “w” mode) to write to, defaults to None
_format (str, optional) – serialized format of the workflow object (this should be left as its default)
- Returns:
self
1.1.1.9. Pegasus.api.writable module
- class Writable[source]
Bases:
object
Derived class can be serialized to a json or yaml file
- property path: Path
Retrieve the path to which this object has been written to.
- Raises:
PegasusError – object has not yet been written to a file
- Returns:
resolved path to which this object has been written
- Return type:
Path
- write(file: str | TextIO | None = None, _format: str = 'yml')[source]
Serialize this class as either yaml or json and write to the given file. If file==None, this class will be written to a default file. The following classes have these defaults:
Class
Default Filename
SiteCatalog
sites.yml
ReplicaCatalog
replicas.yml
TransformationCatalog
transformations.yml
Workflow
workflow.yml
- Parameters:
file (Optional[Union[str, TextIO]]) – path or file object (opened in “w” mode) to write to, defaults to None
_format (str, optional) – can be either “yml”, “yaml” or “json”, defaults to “yml”
- Raises:
ValueError – _format must be one of “yml”, “yaml” or “json”
TypeError – file must be a str or file object