15.3. The Pegasus DAX and Jupyter Python APIs

The first step to enable Jupyter to use the Pegasus API is to import the Python Pegasus Jupyter API. The instance module will automatically load the Pegasus DAX3 API and the catalogs APIs.

from Pegasus.jupyter.instance import *

By default, the API automatically creates a folder in the user's $HOME directory based on the workflow name. However, a predefined path for the workflow files can also be defined as follows:

workflow_dir = '/home/pegasus/wf-split-tutorial'

15.3.1. Creating an Abstract Workflow

Workflow creation within Jupyter follows the same steps to generate a DAX with the DAX Generator API.

15.3.2. Creating the Catalogs

The Replica Catalog (RC) tells Pegasus where to find each of the input files for the workflow. We provide a Python API for creating the RC programmatically. For detailed information on how the RC works and its semantics can be found here, and the auto-generated python documentation for this API can be found here.

rc = ReplicaCatalog(workflow_dir)
rc.add('pegasus.html', 'file:///home/pegasus/pegasus.html', site='local')

The Transformation Catalog (TC) describes all of the executables (called "transformations") used by the workflow. The Python Jupyter API also provides methods to manage this catalog. A detailed description of the TC properties can be found here, and the auto-generated python documentation for this API can be found here.

e_split = Executable('split', arch=Arch.X86_64, os=OSType.LINUX, installed=True)
e_split.addPFN(PFN('file:///usr/bin/split', 'condorpool'))

e_wc = Executable('wc', arch=Arch.X86_64, os=OSType.LINUX, installed=True)
e_wc.addPFN(PFN('file:///usr/bin/wc', 'condorpool'))

tc = TransformationCatalog(workflow_dir)
tc.add(e_split)
tc.add(e_wc)

The Site Catalog (SC) describes the sites where the workflow jobs are to be executed. A detailed description of the SC properties and handlers can be found here, and the auto-generated python documentation for this API can be found here.

sc = SitesCatalog(workflow_dir)
sc.add_site('condorpool', arch=Arch.X86_64, os=OSType.LINUX)
sc.add_site_profile('condorpool', namespace=Namespace.PEGASUS, key='style', value='condor')
sc.add_site_profile('condorpool', namespace=Namespace.CONDOR, key='universe', value='vanilla')

15.3.3. Workflow Execution

Workflow execution and management are performed using an Instance object. An instance receives a DAX object (created with the DAX Generator API), and the catalogs objects (replica, transformation, and site). A path to the workflow directory can also be provided:

instance = Instance(dax, replica_catalog=rc, transformation_catalog=tc, sites_catalog=sc, workflow_dir=workflow_dir)

An instance object represents a workflow run, from where the workflow execution can be launched, monitored, and managed. The run method starts the workflow execution.

instance.run(site='condorpool')

After the workflow has been submitted you can monitor it using the status() method. This method takes two arguments:

  1. loop: whether the status command should be invoked once or continuously until the workflow is completed or a failure is detected.

  2. delay: The delay (in seconds) the status will be refreshed. Default value is 10s.

instance.status(loop=True, delay=5)