Chapter 4. Creating Workflows

4.1. Abstract Workflows (DAX)

The DAX is a description of an abstract workflow in XML format that is used as the primary input into Pegasus. The DAX schema is described in dax-3.4.xsd The documentation of the schema and its elements can be found in dax-3.4.html.

A DAX can be created by all users with the DAX generating API in Java, Perl, or Python format

Note

We highly recommend using the DAX API.

Advanced users who can read XML schema definitions can generate a DAX directly from a script

The sample workflow below incorporates some of the elementary graph structures used in all abstract workflows.

  • fan-out, scatter, and diverge all describe the fact that multiple siblings are dependent on fewer parents.

    The example shows how the Job 2 and 3 nodes depend on Job 1 node.

  • fan-in, gather, join, and converge describe how multiple siblings are merged into fewer dependent child nodes.

    The example shows how the Job 4 node depends on both Job 2 and Job 3 nodes.

  • serial execution implies that nodes are dependent on one another, like pearls on a string.

  • parallel execution implies that nodes can be executed in parallel

Figure 4.1. Sample Workflow

Sample Workflow


The example diamond workflow consists of four nodes representing jobs, and are linked by six files.

  • Required input files must be registered with the Replica catalog in order for Pegasus to find it and integrate it into the workflow.

  • Leaf files are a product or output of a workflow. Output files can be collected at a location.

  • The remaining files all have lines leading to them and originating from them. These files are products of some job steps (lines leading to them), and consumed by other job steps (lines leading out of them). Often, these files represent intermediary results that can be cleaned.

There are two main ways of generating DAX's

  1. Using a DAX generating API in Java, Perl or Python.

    Note: We recommend this option.

  2. Generating XML directly from your script.

    Note: This option should only be considered by advanced users who can also read XML schema definitions.

One example for a DAX representing the example workflow can look like the following:

<?xml version="1.0" encoding="UTF-8"?>
<!-- generated on: 2016-01-21T10:36:39-08:00 -->
<!-- generated by: vahi [ ?? ] -->
<adag xmlns="http://pegasus.isi.edu/schema/DAX" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://pegasus.isi.edu/schema/DAX http://pegasus.isi.edu/schema/dax-3.6.xsd" version="3.6" name="diamond" index="0" count="1">

<!-- Section 1: Metadata attributes for the workflow (can be empty)  -->

   <metadata key="name">diamond</metadata>
   <metadata key="createdBy">Karan Vahi</metadata>

<!-- Section 2: Invokes - Adds notifications for a workflow (can be empty) -->

   <invoke when="start">/pegasus/libexec/notification/email -t notify@example.com</invoke>
   <invoke when="at_end">/pegasus/libexec/notification/email -t notify@example.com</invoke>

<!-- Section 3: Files - Acts as a Replica Catalog (can be empty) -->

   <file name="f.a">
      <metadata key="size">1024</metadata>
      <pfn url="file:///Volumes/Work/lfs1/work/pegasus-features/PM-902/f.a" site="local"/>
   </file>

<!-- Section 4: Executables - Acts as a Transformaton Catalog (can be empty) -->

   <executable namespace="pegasus" name="preprocess" version="4.0" installed="true" arch="x86" os="linux">
      <metadata key="size">2048</metadata>
      <pfn url="file:///usr/bin/keg" site="TestCluster"/>
   </executable>
   <executable namespace="pegasus" name="findrange" version="4.0" installed="true" arch="x86" os="linux">
      <pfn url="file:///usr/bin/keg" site="TestCluster"/>
   </executable>
   <executable namespace="pegasus" name="analyze" version="4.0" installed="true" arch="x86" os="linux">
      <pfn url="file:///usr/bin/keg" site="TestCluster"/>
   </executable>

<!-- Section 5: Transformations - Aggregates executables and Files (can be empty) -->


<!-- Section 6: Job's, DAX's or Dag's - Defines a JOB or DAX or DAG (Atleast 1 required) -->

   <job id="j1" namespace="pegasus" name="preprocess" version="4.0">
      <metadata key="time">60</metadata>
      <argument>-a preprocess -T 60 -i  <file name="f.a"/> -o  <file name="f.b1"/>   <file name="f.b2"/></argument>
      <uses name="f.a" link="input">
         <metadata key="size">1024</metadata>
      </uses>
      <uses name="f.b1" link="output" transfer="true" register="true"/>
      <uses name="f.b2" link="output" transfer="true" register="true"/>
      <invoke when="start">/pegasus/libexec/notification/email -t notify@example.com</invoke>
      <invoke when="at_end">/pegasus/libexec/notification/email -t notify@example.com</invoke>
   </job>
   <job id="j2" namespace="pegasus" name="findrange" version="4.0">
      <metadata key="time">60</metadata>
      <argument>-a findrange -T 60 -i  <file name="f.b1"/> -o  <file name="f.c1"/></argument>
      <uses name="f.b1" link="input"/>
      <uses name="f.c1" link="output" transfer="true" register="true"/>
      <invoke when="start">/pegasus/libexec/notification/email -t notify@example.com</invoke>
      <invoke when="at_end">/pegasus/libexec/notification/email -t notify@example.com</invoke>
   </job>
   <job id="j3" namespace="pegasus" name="findrange" version="4.0">
      <metadata key="time">60</metadata>
      <argument>-a findrange -T 60 -i  <file name="f.b2"/> -o  <file name="f.c2"/></argument>
      <uses name="f.b2" link="input"/>
      <uses name="f.c2" link="output" transfer="true" register="true"/>
      <invoke when="start">/pegasus/libexec/notification/email -t notify@example.com</invoke>
      <invoke when="at_end">/pegasus/libexec/notification/email -t notify@example.com</invoke>
   </job>
   <job id="j4" namespace="pegasus" name="analyze" version="4.0">
      <metadata key="time">60</metadata>
      <argument>-a analyze -T 60 -i  <file name="f.c1"/>   <file name="f.c2"/> -o  <file name="f.d"/></argument>
      <uses name="f.c1" link="input"/>
      <uses name="f.c2" link="input"/>
      <uses name="f.d" link="output" transfer="true" register="true"/>
      <invoke when="start">/pegasus/libexec/notification/email -t notify@example.com</invoke>
      <invoke when="at_end">/pegasus/libexec/notification/email -t notify@example.com</invoke>
   </job>

<!-- Section 7: Dependencies - Parent Child relationships (can be empty) -->

   <child ref="j2">
      <parent ref="j1"/>
   </child>
   <child ref="j3">
      <parent ref="j1"/>
   </child>
   <child ref="j4">
      <parent ref="j2"/>
      <parent ref="j3"/>
   </child>
</adag>

The example workflow representation in form of a DAX requires external catalogs, such as transformation catalog (TC) to resolve the logical job names (such as diamond::preprocess:2.0), and a replica catalog (RC) to resolve the input file f.a. The above workflow defines the four jobs just like the example picture, and the files that flow between the jobs. The intermediary files are neither registered nor staged out, and can be considered transient. Only the final result file f.d is staged out.