10.2. Data Transfers

As part of the Workflow Mapping Process, Pegasus does data management for the executable workflow . It queries a Replica Catalog to discover the locations of the input datasets and adds data movement and registration nodes in the workflow to

  1. stage-in input data to the staging sites ( a site associated with the compute job to be used for staging. In the shared filesystem setup, staging site is the same as the execution sites where the jobs in the workflow are executed )

  2. stage-out output data generated by the workflow to the final storage site.

  3. stage-in intermediate data between compute sites if required.

  4. data registration nodes to catalog the locations of the output data on the final storage site into the replica catalog.

The separate data movement jobs that are added to the executable workflow are responsible for staging data to a workflow specific directory accessible to the staging server on a staging site associated with the compute sites. Depending on the data staging configuration, the staging site for a compute site is the compute site itself. In the default case, the staging server is usually on the headnode of the compute site and has access to the shared filesystem between the worker nodes and the head node. Pegasus adds a directory creation job in the executable workflow that creates the workflow specific directory on the staging server.

In addition to data, Pegasus does transfer user executable to the compute sites if the executable are not installed on the remote sites before hand. This chapter gives an overview of how transfers of data and executable is managed in Pegasus.

10.2.1. Data Staging Configuration

Pegasus can be broadly setup to run workflows in the following configurations

  • Shared File System

    This setup applies to where the head node and the worker nodes of a cluster share a filesystem. Compute jobs in the workflow run in a directory on the shared filesystem.

  • NonShared FileSystem

    This setup applies to where the head node and the worker nodes of a cluster don't share a filesystem. Compute jobs in the workflow run in a local directory on the worker node

  • Condor Pool Without a shared filesystem

    This setup applies to a condor pool where the worker nodes making up a condor pool don't share a filesystem. All data IO is achieved using Condor File IO. This is a special case of the non shared filesystem setup, where instead of using pegasus-transfer to transfer input and output data, Condor File IO is used.

For the purposes of data configuration various sites, and directories are defined below.

  1. Submit Host

    The host from where the workflows are submitted . This is where Pegasus and Condor DAGMan are installed. This is referred to as the "local" site in the site catalog .

  2. Compute Site

    The site where the jobs mentioned in the DAX are executed. There needs to be an entry in the Site Catalog for every compute site. The compute site is passed to pegasus-plan using --sites option

  3. Staging Site

    A site to which the separate transfer jobs in the executable workflow ( jobs with stage_in , stage_out and stage_inter prefixes that Pegasus adds using the transfer refiners) stage the input data to and the output data from to transfer to the final output site. Currently, the staging site is always the compute site where the jobs execute.

  4. Output Site

    The output site is the final storage site where the users want the output data from jobs to go to. The output site is passed to pegasus-plan using the --output option. The stageout jobs in the workflow stage the data from the staging site to the final storage site.

  5. Input Site

    The site where the input data is stored. The locations of the input data are catalogued in the Replica Catalog, and the "site" attribute of the locations gives us the site handle for the input site.

  6. Workflow Execution Directory

    This is the directory created by the create dir jobs in the executable workflow on the Staging Site. This is a directory per workflow per staging site. Currently, the Staging site is always the Compute Site.

  7. Worker Node Directory

    This is the directory created on the worker nodes per job usually by the job wrapper that launches the job.

10.2.1.1. Shared File System

By default Pegasus is setup to run workflows in the shared file system setup, where the worker nodes and the head node of a cluster share a filesystem.

Figure 10.1. Shared File System Setup

Shared File System Setup

The data flow is as follows in this case

  1. Stagein Job executes ( either on Submit Host or Head Node ) to stage in input data from Input Sites ( 1---n) to a workflow specific execution directory on the shared filesystem.

  2. Compute Job starts on a worker node in the workflow execution directory. Accesses the input data using Posix IO

  3. Compute Job executes on the worker node and writes out output data to workflow execution directory using Posix IO

  4. Stageout Job executes ( either on Submit Host or Head Node ) to stage out output data from the workflow specific execution directory to a directory on the final output site.

Tip

Set pegasus.data.configuration to sharedfs to run in this configuration.

10.2.1.2. Non Shared Filesystem

In this setup , Pegasus runs workflows on local file-systems of worker nodes with the the worker nodes not sharing a filesystem. The data transfers happen between the worker node and a staging / data coordination site. The staging site server can be a file server on the head node of a cluster or can be on a separate machine.

Setup

  • compute and staging site are the different

  • head node and worker nodes of compute site don't share a filesystem

  • Input Data is staged from remote sites.

  • Remote Output Site i.e site other than compute site. Can be submit host.

Figure 10.2. Non Shared Filesystem Setup

Non Shared Filesystem Setup

The data flow is as follows in this case

  1. Stagein Job executes ( either on Submit Host or on staging site ) to stage in input data from Input Sites ( 1---n) to a workflow specific execution directory on the staging site.

  2. Compute Job starts on a worker node in a local execution directory. Accesses the input data using pegasus transfer to transfer the data from the staging site to a local directory on the worker node

  3. The compute job executes in the worker node, and executes on the worker node.

  4. The compute Job writes out output data to the local directory on the worker node using Posix IO

  5. Output Data is pushed out to the staging site from the worker node using pegasus-transfer.

  6. Stageout Job executes ( either on Submit Host or staging site ) to stage out output data from the workflow specific execution directory to a directory on the final output site.

In this case, the compute jobs are wrapped as PegasusLite instances.

This mode is especially useful for running in the cloud environments where you don't want to setup a shared filesystem between the worker nodes. Running in that mode is explained in detail here.

Tip

Set p egasus.data.configuration to nonsharedfs to run in this configuration. The staging site can be specified using the --staging-site option to pegasus-plan.

In this setup, Pegasus always stages the input files through the staging site i.e the stage-in job stages in data from the input site to the staging site. The PegasusLite jobs that start up on the worker nodes, then pull the input data from the staging site for each job. In some cases, it might be useful to setup the PegasusLite jobs to pull input data directly from the input site without going through the staging server. This is based on the assumption that the worker nodes can access the input site. Starting 4.3 release, users can enable this. However, you should be aware that the access to the input site is no longer throttled ( as in case of stage in jobs). If large number of compute jobs start at the same time in a workflow, the input server will see a connection from each job.

Tip

Set pegasus.transfer.bypass.input.staging to trueto enable the bypass of staging of input files via the staging server.

10.2.1.3. Condor Pool Without a Shared Filesystem

This setup applies to a condor pool where the worker nodes making up a condor pool don't share a filesystem. All data IO is achieved using Condor File IO. This is a special case of the non shared filesystem setup, where instead of using pegasus-transfer to transfer input and output data, Condor File IO is used.

Setup

  • Submit Host and staging site are same

  • head node and worker nodes of compute site don't share a filesystem

  • Input Data is staged from remote sites.

  • Remote Output Site i.e site other than compute site. Can be submit host.

Figure 10.3. Condor Pool Without a Shared Filesystem

Condor Pool Without a Shared Filesystem

The data flow is as follows in this case

  1. Stagein Job executeson the submit host to stage in input data from Input Sites ( 1---n) to a workflow specific execution directory on the submit host

  2. Compute Job starts on a worker node in a local execution directory. Before the compute job starts, Condor transfers the input data for the job from the workflow execution directory on thesubmit host to the local execution directory on the worker node.

  3. The compute job executes in the worker node, and executes on the worker node.

  4. The compute Job writes out output data to the local directory on the worker node using Posix IO

  5. When the compute job finishes, Condor transfers the output data for the job from the local execution directory on the worker node to the workflow execution directory on the submit host.

  6. Stageout Job executes ( either on Submit Host or staging site ) to stage out output data from the workflow specific execution directory to a directory on the final output site.

In this case, the compute jobs are wrapped as PegasusLite instances.

This mode is especially useful for running in the cloud environments where you don't want to setup a shared filesystem between the worker nodes. Running in that mode is explained in detail here.

Tip

Set pegasus.data.configuration to condorio to run in this configuration. In this mode, the staging site is automatically set to site local

In this setup, Pegasus always stages the input files through the submit host i.e the stage-in job stages in data from the input site to the submit host (local site). The input data is then transferred to remote worker nodes from the submit host using Condor file transfers. In the case, where the input data is locally accessible at the submit host i.e the input site and the submit host are the same, then it is possible to bypass the creation of separate stage in jobs that copy the data to the workflow specific directory on the submit host. Instead, Condor file transfers can be setup to transfer the input files directly from the locally accessible input locations ( file URL's with "site" attribute set to local) specified in the replica catalog. Starting 4.3 release, users can enable this.

Tip

Set pegasus.transfer.bypass.input.staging to trueto bypass the creation of separate stage in jobs.

10.2.2. Local versus Remote Transfers

As far as possible, Pegasus will ensure that the transfer jobs added to the executable workflow are executed on the submit host. By default, Pegasus will schedule a transfer to be executed on the remote staging site only if there is no way to execute it on the submit host. Some scenarios where transfer jobs are executed on remote sites are as follows:

  • the file server specified for the staging site/compute site is a file server. In that case, Pegasus will schedule all the stage in data movement jobs on the compute site to stage-in the input data for the workflow.

  • a user has symlinking turned on. In that case, the transfer jobs that symlink against the input data on the compute site, will be executed remotely ( on the compute site ).

In certain execution environments, such a local campus cluster the compute site and the local share a filesystem ( i.e. compute site has file servers specified for the staging/compute site, and the scratch and storage directories mentioned for the compute site are locally mounted on the submit host), it is beneficial to have the remote transfer jobs run locally and hence bypass going through the local scheduler queue. In that case, users can set a boolean profile auxillary.local in pegasus namespace in the site catalog for the compute/staging site to true.

Users can specify the property pegasus.transfer.*.remote.sites to change the default behaviour of Pegasus and force pegasus to run different types of transfer jobs for the sites specified on the remote site. The value of the property is a comma separated list of compute sites for which you want the transfer jobs to run remotely.

The table below illustrates all the possible variations of the property.

Table 10.1. Property Variations for pegasus.transfer.*.remote.sites

Property Name Applies to
pegasus.transfer.stagein.remote.sites the stage in transfer jobs
pegasus.transfer.stageout.remote.sites the stage out transfer jobs
pegasus.transfer.inter.remote.sites the inter site transfer jobs
pegasus.transfer.*.remote.sites all types of transfer jobs

The prefix for the transfer job name indicates whether the transfer job is to be executed locallly ( on the submit host ) or remotely ( on the compute site ). For example stage_in_local_ in a transfer job name stage_in_local_isi_viz_0 indicates that the transfer job is a stage in transfer job that is executed locally and is used to transfer input data to compute site isi_viz. The prefix naming scheme for the transfer jobs is [stage_in|stage_out|inter]_[local|remote]_ .

10.2.3. Controlling Transfer Parallelism

When it comes to data transfers, Pegasus ships with a default configuration which is trying to strike a balance between performance and aggressiveness. We obviously want data transfers to be as quick as possibly, but we also do not want our transfers to overwhelm data services and systems.

Starting 4.8.0 release, the default configuration of Pegasus now adds transfer jobs and cleanup jobs based on the number of jobs at a particular level of the workflow. For example, for every 10 compute jobs on a level of a workflow, one data transfer job( stage-in and stage-out) is created. The default configuration also sets how many threads such a pegasus-transfer job can spawn. Cleanup jobs are similarly constructed with an internal ratio of 5.

Information on how to control the number of stagein and stageout jobs can be found in the Data Movement Nodes section.

How to control the number of threads pegasus-transfer can use depends on if you want to control standard transfer jobs, or PegasusLite. For the former, see the pegasus.transfer.threads property, and for the latter the pegasus.transfer.lite.threads property.

10.2.4. Symlinking Against Input Data

If input data for a job already exists on a compute site, then it is possible for Pegasus to symlink against that data. In this case, the remote stage in transfer jobs that Pegasus adds to the executable workflow will symlink instead of doing a copy of the data.

Pegasus determines whether a file is on the same site as the compute site, by inspecting the "site" attribute associated with the URL in the Replica Catalog. If the "site" attribute of an input file location matches the compute site where the job is scheduled, then that particular input file is a candidate for symlinking.

For Pegasus to symlink against existing input data on a compute site, following must be true

  1. Property pegasus.transfer.links is set to true

  2. The input file location in the Replica Catalog has the "site" attribute matching the compute site.

Tip

To confirm if a particular input file is symlinked instead of being copied, look for the destination URL for that file in stage_in_remote*.in file. The destination URL will start with symlink:// .

In the symlinking case, Pegasus strips out URL prefix from a URL and replaces it with a file URL.

For example if a user has the following URL catalogued in the Replica Catalog for an input file f.input

f.input   gsiftp://server.isi.edu/shared/storage/input/data/f.input site="isi"

and the compute job that requires this file executes on a compute site named isi , then if symlinking is turned on the data stage in job (stage_in_remote_viz_0 ) will have the following source and destination specified for the file

#viz viz
file:///shared/storage/input/data/f.input  symlink://shared-scratch/workflow-exec-dir/f.input

10.2.5. Addition of Separate Data Movement Nodes to Executable Workflow

Pegasus relies on a Transfer Refiner that comes up with the strategy on how many data movement nodes are added to the executable workflow. All the compute jobs scheduled to a site share the same workflow specific directory. The transfer refiners ensure that only one copy of the input data is transferred to the workflow execution directory. This is to prevent data clobbering . Data clobbering can occur when compute jobs of a workflow share some input files, and have different stage in transfer jobs associated with them that are staging the shared files to the same destination workflow execution directory.

Pegasus supports three different transfer refiners that dictate how the stagein and stageout jobs are added for the workflow.The default Transfer Refiner used in Pegasus is the BalancedCluster Refiner. Starting 4.8.0 release, the default configuration of Pegasus now adds transfer jobs and cleanup jobs based on the number of jobs at a particular level of the workflow. For example, for every 10 compute jobs on a level of a workflow, one data transfer job( stage-in and stage-out) is created.

The transfer refiners also allow the user to specify how many local|remote stagein|stageout jobs are created per execution site.

The behavior of the refiners (BalancedCluster and Cluster) are controlled by specifying certain pegasus profiles

  1. either with the execution sites in the site catalog

  2. OR globally in the properties file

Table 10.2. Pegasus Profile Keys For the Cluster Transfer Refiner

Profile Key Description
stagein.clusters This key determines the maximum number of stage-in jobs that are can executed locally or remotely per compute site per workflow.
stagein.local.clusters This key provides finer grained control in determining the number of stage-in jobs that are executed locally and are responsible for staging data to a particular remote site.
stagein.remote.clusters This key provides finer grained control in determining the number of stage-in jobs that are executed remotely on the remote site and are responsible for staging data to it.
stageout.clusters This key determines the maximum number of stage-out jobs that are can executed locally or remotely per compute site per workflow.
stageout.local.clusters This key provides finer grained control in determining the number of stage-out jobs that are executed locally and are responsible for staging data from a particular remote site.
stageout.remote.clusters This key provides finer grained control in determining the number of stage-out jobs that are executed remotely on the remote site and are responsible for staging data from it.

Tip

Which transfer refiner to use is controlled by property pegasus.transfer.refiner

10.2.5.1. BalancedCluster

This is a new transfer refiner that was introduced in Pegasus 4.4.0 and is the default one used in Pegasus. It does a round robin distribution of the files amongst the stagein and stageout jobs per level of the workflow. The figure below illustrates the behavior of this transfer refiner.

Figure 10.4. BalancedCluster Transfer Refiner : Input Data To Workflow Specific Directory on Shared File System

BalancedCluster Transfer Refiner : Input Data To Workflow Specific Directory on Shared File System

10.2.5.2. Cluster

This transfer refiner is similar to BalancedCluster but differs in the way how distribution of files happen across stagein and stageout jobs per level of the workflow. In this refiner, all the input files for a job get associated with a single transfer job. As illustrated in the figure below each compute usually gets associated with one stagein transfer job. In contrast, for the BalancedCluster a compute job maybe associated with multiple data stagein jobs.

Figure 10.5. Cluster Transfer Refiner : Input Data To Workflow Specific Directory on Shared File System

Cluster Transfer Refiner : Input Data To Workflow Specific Directory on Shared File System

10.2.5.3. Basic

Pegasus also supports a basic Transfer Refiner that adds one stagein and stageout job per compute job of the workflow. This is not recommended to be used for large workflows as the number of data transfer nodes in the worst case are 2n where n is the number of compute jobs in the workflow.

10.2.6. Executable Used for Transfer and Cleanup Jobs

Pegasus refers to a python script called pegasus-transfer as the executable in the transfer jobs to transfer the data. pegasus-transfer looks at source and destination url and figures out automatically which underlying client to use. pegasus-transfer is distributed with the PEGASUS and can be found at $PEGASUS_HOME/bin/pegasus-transfer.

Currently, pegasus-transfer interfaces with the following transfer clients

Table 10.3. Transfer Clients interfaced to by pegasus-transfer

Transfer Client Used For
gfal-copy staging file to and from GridFTP servers
globus-url-copy staging files to and from GridFTP servers, only if gfal is not detected in the path.
gfal-copy staging files to and from SRM or XRootD servers
wget staging files from a HTTP server
cp copying files from a POSIX filesystem
ln symlinking against input files
pegasus-s3 staging files to and from S3 buckets in Amazon Web Services
gsutil staging files to and from Google Storage buckets
scp staging files using scp
gsiscp staging files using gsiscp and X509
iget staging files to and from iRODS servers

For remote sites, Pegasus constructs the default path to pegasus-transfer on the basis of PEGASUS_HOME env profile specified in the site catalog. To specify a different path to the pegasus-transfer client , users can add an entry into the transformation catalog with fully qualified logical name as pegasus::pegasus-transfer

10.2.6.1. Preference of GFAL over GUC

JGlobus is no longer actively supported and is not in compliance with RFC 2818 . As a result cleanup jobs using pegasus-gridftp client would fail against the servers supporting the strict mode. We have removed the pegasus-gridftp client and now use gfal clients as globus-url-copy does not support removes. If gfal is not available, globus-url-copy is used for cleanup by writing out zero bytes files instead of removing them.

If you want to force globus-url-copy to be preferred over GFAL, set the PEGASUS_FORCE_GUC=1 environment variable in the site catalog for the sites you want the preference to be enforced. Please note that we expect globus-url-copy support to be completely removed in future releases of Pegasus due to the end of life of Globus Toolkit (see announcement).

10.2.7. Staging of Executables

Users can get Pegasus to stage the user executable ( executable that the jobs in the DAX refer to ) as part of the transfer jobs to the workflow specific execution directory on the compute site. The URL locations of the executable need to be specified in the transformation catalog as the PFN and the type of executable needs to be set to STAGEABLE .

The location of a transformation can be specified either in

  • DAX in the executable section. More details here .

  • Transformation Catalog. More details here .

A particular transformation catalog entry of type STAGEABLE is compatible with a compute site only if all the System Information attributes associated with the entry match with the System Information attributes for the compute site in the Site Catalog. The following attributes make up the System Information attributes

  1. arch

  2. os

  3. osrelease

  4. osversion

10.2.7.1. Transformation Mappers

Pegasus has a notion of transformation mappers that determines what type of executable are picked up when a job is executed on a remote compute site. For transfer of executable, Pegasus constructs a soft state map that resides on top of the transformation catalog, that helps in determining the locations from where an executable can be staged to the remote site.

Users can specify the following property to pick up a specific transformation mapper

pegasus.catalog.transformation.mapper 

Currently, the following transformation mappers are supported.

Table 10.4. Transformation Mappers Supported in Pegasus

Transformation Mapper Description
Installed This mapper only relies on transformation catalog entries that are of type INSTALLED to construct the soft state map. This results in Pegasus never doing any transfer of executable as part of the workflow. It always prefers the installed executable at the remote sites
Staged This mapper only relies on matching transformation catalog entries that are of type STAGEABLE to construct the soft state map. This results in the executable workflow referring only to the staged executable, irrespective of the fact that the executable are already installed at the remote end
All This mapper relies on all matching transformation catalog entries of type STAGEABLE or INSTALLED for a particular transformation as valid sources for the transfer of executable. This the most general mode, and results in the constructing the map as a result of the cartesian product of the matches.
Submit This mapper only on matching transformation catalog entries that are of type STAGEABLE and reside at the submit host (site local), are used while constructing the soft state map. This is especially helpful, when the user wants to use the latest compute code for his computations on the grid and that relies on his submit host.

10.2.8. Staging of Worker Package

The worker package contains runtime tools such as pegasus-kickstart and pegasus-transfer, and is required to be available for most jobs.

How the package is made available to the jobs depends on multiple factors. For example, a pre-installed Pegasus can be used if the location is set using the environment profile PEGASUS_HOME for the site in the Site Catalog.

If Pegasus is not already available on the execution site, the worker package can be staged by setting the following property:

pegasus.transfer.worker.package          true 

Note that how the package is transferred and accessed differs based on the configured data management mode:

  • sharedfs mode: the package is staged in to the shared filesystem once, and reused for all the jobs

  • nonsharedfs or condorio mode: each job carries a worker package. This is obviously less efficient, but the size of the worker package is kept small to minimize the impact of these extra transfers.

Which worker package is used is determined in the following order:

  • There is an entry for pegasus::worker executable in the transformation catalog. Information on how to construct that entry is provided below.

  • The planner at runtime creates a worker package out of the binary installation, and puts it in the submit directory. This worker package is used if the OS and architecture of the created worker package match with remote site, or there is an exact match with (osrelease and osversion) if specified by the user in the site catalog for the remote site.

  • The worker package compatible with the remote site is available as a binary from the Pegasus download site.

  • At runtime, in the nonsharedfs or condorio modes, extra checks are made to make sure the worker package matches the Pegasus version and the OS and architecture. The reason is that these workflows might be running in an heterogeneous environment, and thus there is no way to know before the job starts what worker package is required. If the runtime check fails, a worker package matching the Pegasus version, OS and architecture will be downloaded from the Pegasus download site. This behavior can be controlled with the pegasus.transfer.worker.package.autodownload and pegasus.transfer.worker.package.strict properties.

If you want to specify a particular worker package to use, you can specify the transformation pegasus::worker in the transformation catalog with:

  • type set to STAGEABLE

  • System Information attributes of the transformation catalog entry match the System Information attributes of the compute site.

  • the PFN specified should be a remote URL that can be pulled to the compute site.

# example of specifying a worker package in the transformation catalog
tr pegasus::worker {
site corbusier {
    pfn "https://download.pegasus.isi.edu/pegasus/4.8.0dev/pegasus-worker-4.8.0dev-x86_64_macos_10.tar.gz"
    arch "x86_64"
    os "MACOSX"
    type "INSTALLED"
  }
}

10.2.9. Staging of Application Containers

Pegasus treats containers as other files in terms of data management. Container to be used for a job is tracked as an input dependency that needs to be staged if it is not already there. Similar to executables, you specify the location for your container image in the Transformation Catalog. You can specify the source URL's for containers as the following.

  1. URL to a container hosted on a central hub repository

    Example of a docker hub URL is docker:///rynge/montage:latest, while for singularity shub://pegasus-isi/fedora-montage

  2. URL to a container image file on a file server.

    • Docker - Docker supports loading of containers from a tar file, Hence, containers images can only be specified as tar files and the extension for the filename is not important.

    • Singularity - Singularity supports container images in various forms and relies on the extension in the filename to determine what format the file is in. Pegasus supports the following extensions for singularity container images

      • .img

      • .tar

      • .tar.gz

      • .tar.bz2

      • .cpio

      • .cpio.gz

      Singularity will fail to run the container if you don't specify the right extension , when specify the source URL for the image.

In both the cases, Pegasus will place the container image on the staging site used for the workflow, as part of the data stage-in nodes, using pegasus-transfer. When pulling in an image from a container hub repository, pegasus-transfer will export the container as a tar file in case of Docker, and as .img file in case of Singularity

10.2.9.1. Symlinking

Since, Pegasus by default only mounts the job directory determined by PegasusLite into the application container, symlinking of input data sets works only if in the container definition in the transformation catalog user defines the directories containing the input data to be mounted in the container using the mount key word. We recommend to keep the source and destination directories to be the same i.e. the host path is mounted in the same location in the container. This is because the data transfers for the job happens in PegasusLite before the container is invoked. While we do support different source and destination directories in the mount format, it will create broken symlinks in the worker node that will resolve correctly in the container.

For example in the example below, we have input datasets accessible on /lizard on the compute nodes, and mounting them as read-only into the container at /lizard

cont centos-base{
     type "singularity"

     # URL to image in a docker hub or a url to an existing
     # singularity image file
     image "gsiftp://bamboo.isi.edu/lfs1/bamboo-tests/data/centos7.img"

     # optional site attribute to tell pegasus which site tar file
     # exists. useful for handling file URL's correctly
     image_site "local"

     # mount point in the container
     mount "/lizard:/lizard:ro"
  
     # specify env profile via env option do docker run
     profile env "JAVA_HOME" "/opt/java/1.6"	    
}

To enable symlinking for containers set the following properties

# Tells Pegasus to try and create symlinks for input files
pegasus.transfer.links true

# Tells Pegasus to by the staging site ( creation of stage-in jobs) as 
# data is available directly on compute nodes
pegasus.transfer.bypass.input.staging true

f you don't set pegasus.transfer.bypass.input.staging then you still can have symlinking if

  1. your staging site is same as your compute site

  2. the scratch directory specified in the site catalog is visible to the worker nodes

  3. you mount the scratch directory in the container definition, NOT the original source directory.

Enabling symlinking of containers is useful, when running large workflows on a single cluster. Pegasus can pull the image from the container repository once, and place it on the shared filesystem where it can then be symlinked from, when the PegasusLite jobs start on the worker nodes of that cluster. In order to do this, you need to be running the nonsharedfs data configuration mode with the staging site set to be the same as the compute site.

10.2.10. Staging of Job Checkpoint Files

Pegasus has support for transferring job checkpoint files back to the staging site, when a job exceeds it's advertised running time. In order to use this feature, you need to

  1. Associate a job checkpoint file ( that the job creates ) with the job in the DAX. A checkpoint file is specified by setting the link attribute to checkpoint for the uses tag.

  2. Associate a Pegasus profile key named checkpoint.time is the time in minutes after which a job is sent the TERM signal by pegasus-kickstart, telling it to create the checkpoint file.

  3. Associate a Pegasus profile key named maxwalltime with the job that specifies the max runtime in minutes before the job will be killed by the local resource manager ( such as PBS) deployed on the site. Usually, this value should be associated with the execution site in the site catalog.

Pegasus planner uses the above mentioned profile keys to setup pegasus-kickstart such that the job is sent a TERM signal when the checkpoint time of job is reached. A KILL signal is sent at (checkpoint.time + (maxwalltime-checkpoint.time)/2) minutes. This ensures that there is enough time for pegasus-lite to transfer the checkpoint file before the job is killed by the underlying scheduler.

10.2.11. Using Amazon S3 as a Staging Site

Pegasus can be configured to use Amazon S3 as a staging site. In this mode, Pegasus transfers workflow inputs from the input site to S3. When a job runs, the inputs for that job are fetched from S3 to the worker node, the job is executed, then the output files are transferred from the worker node back to S3. When the jobs are complete, Pegasus transfers the output data from S3 to the output site.

In order to use S3, it is necessary to create a config file for the S3 transfer client, pegasus-s3. See the man page for details on how to create the config file. You also need to specify S3 as a staging site.

Next, you need to modify your site catalog to tell the location of your s3cfg file. See the section on credential staging.

The following site catalog shows how to specify the location of the s3cfg file on the local site and how to specify an Amazon S3 staging site:

<sitecatalog xmlns="http://pegasus.isi.edu/schema/sitecatalog"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://pegasus.isi.edu/schema/sitecatalog
             http://pegasus.isi.edu/schema/sc-3.0.xsd" version="3.0">
    <site handle="local" arch="x86_64" os="LINUX">
        <head-fs>
            <scratch>
                <shared>
                    <file-server protocol="file" url="file://" mount-point="/tmp/wf/work"/>
                    <internal-mount-point mount-point="/tmp/wf/work"/>
                </shared>
            </scratch>
            <storage>
                <shared>
                    <file-server protocol="file" url="file://" mount-point="/tmp/wf/storage"/>
                    <internal-mount-point mount-point="/tmp/wf/storage"/>
                </shared>
            </storage>
        </head-fs>
        <profile namespace="env" key="S3CFG">/home/username/.s3cfg</profile>
    </site>
    <site handle="s3" arch="x86_64" os="LINUX">
        <head-fs>
            <scratch>
                <shared>
                    <!-- wf-scratch is the name of the S3 bucket that will be used -->
                    <file-server protocol="s3" url="s3://user@amazon" mount-point="/wf-scratch"/>
                    <internal-mount-point mount-point="/wf-scratch"/>
                </shared>
            </scratch>
        </head-fs>
    </site>
    <site handle="condorpool" arch="x86_64" os="LINUX">
        <head-fs>
            <scratch/>
            <storage/>
        </head-fs>
        <profile namespace="pegasus" key="style">condor</profile>
        <profile namespace="condor" key="universe">vanilla</profile>
        <profile namespace="condor" key="requirements">(Target.Arch == "X86_64")</profile>
    </site>
</sitecatalog>

10.2.12. OSG Stash / stashcp data access

Open Science Grid provides a data service called Stash, and the command line tool stashcp for interacting with the Stash data. An example on how to set up the site catalog and URLs can be found in the OSG User Support Pegasus tutorial

10.2.13. iRODS data access

iRODS can be used as a input data location, a storage site for intermediate data during workflow execution, or a location for final output data. Pegasus uses a URL notation to identify iRODS files. Example:

irods://some-host.org/path/to/file.txt

The path to the file is relative to the internal iRODS location. In the example above, the path used to refer to the file in iRODS is path/to/file.txt (no leading /).

See the section on credential staging for information on how to set up an irodsEnv file to be used by Pegasus.

10.2.14. GridFTP over SSH (sshftp)

Instead of using X.509 based security, newer version of Globus GridFTP can be configured to set up transfers over SSH. See the Globus Documentation for details on installing and setting up this feature.

Pegasus requires the ability to specify which SSH key to be used at runtime, and thus a small modification is necessary to the default Globus configuration. On the hosts where Pegasus initiates transfers (which depends on the data configuration of the workflow), please replace gridftp-ssh, usually located under /usr/share/globus/gridftp-ssh, with:

#!/bin/bash

url_string=$1
remote_host=$2
port=$3
user=$4

port_str=""
if  [ "X" = "X$port" ]; then
    port_str=""
else
    port_str=" -p $port "
fi

if  [ "X" != "X$user" ]; then
    remote_host="$user@$remote_host"
fi

remote_default1=.globus/sshftp
remote_default2=/etc/grid-security/sshftp
remote_fail="echo -e 500 Server is not configured for SSHFTP connections.\\\r\\\n"
remote_program=$GLOBUS_REMOTE_SSHFTP
if  [ "X" = "X$remote_program" ]; then
    remote_program="(( test -f $remote_default1 && $remote_default1 ) || ( test -f $remote_default2 && $remote_default2 ) || $remote_fail )"
fi

if [ "X" != "X$GLOBUS_SSHFTP_PRINT_ON_CONNECT" ]; then
    echo "Connecting to $1 ..." >/dev/tty
fi

# for pegasus-transfer
extra_opts=" -o StrictHostKeyChecking=no"
if [ "x$SSH_PRIVATE_KEY" != "x" ]; then
    extra_opts="$extra_opts -i $SSH_PRIVATE_KEY"
fi

exec /usr/bin/ssh $extra_opts $port_str $remote_host $remote_program

Once configured, you should be able to use URLs such as sshftp://username@host/foo/bar.txt in your workflows.

10.2.15. Globus Online

Globus Online is a transfer service with features such as policy based connection management and automatic failure detection and recovery. Pegasus has limited the support for Globus Online transfers.

If you want to use Globus Online in your workflow, all data has to be accessible via a Globus Online endpoint. You can not mix Globus Online endpoints with other protocols. For most users, this means they will have to create an endpoint for their submit host and probably modify both the replica catalog and DAX generator so that all URLs in the workflow are for Globus Online endpoints.

There are two levels of credentials required. One is for the workflow to use the Globus Online API, which is handled by OAuth tokens, provided by Globus Auth service. The second level is for the endpoints, which the user will have to manage via the Globus Online web interface. The required steps are:

  1. Using pegasus-globus-online-init, provide authorization to Pegasus and retrieve your transfer access tokens. By default Pegasus acquires temporary tokens that expire within a few days. Using --permanent option you can request refreshable tokens that last indefinetely (or until access is revoked).

  2. In the Globus Online web interface, under Endpoints, find the endpoints you need for the workflow, and activate them. Note that you should activate them for the whole duration of the workflow or you will have to regularly log in and re-activate the endpoints during workflow execution.

URLs for Globus Online endpoint data follows the following scheme: go://[endpoint]/[path]. For example, for a user with the Globus Online private endpoint bob#researchdata and a file /home/bsmith/experiment/1.dat, the URL would be: go://bob#researchdata/home/bsmith/experiment/1.dat