Package edu.isi.pegasus.planner.dax
Class ADAG
java.lang.Object
edu.isi.pegasus.planner.dax.ADAG
This class provides the Java API to create Abstract Workflow files. Starting 5.0 Release, this class by default writes out YAML formatted abstract workflow file. The information included in the generated abstract workflow is a subset of the Pegasus Workflow YAML schema that is available at http://pegasus.isi.edu/schema/wf-5.0.yml In particular, it is missing options to create a Site Catalog, and associate Containers with user executables. If you need this information, we recommend you use the python Pegasus workflow API. The Abstract Workflow consists of 6 parts the first 4 are optional and the last is optional.
- file:Used as "In Abstract Workflow" Replica Catalog (Optional)
- executable: Used as "In Abstract Workflow" Transformation Catalog (Optional)
- transformation: Used to describe compound executables. i.e. Executable depending on other executables (Optional)
- job|dax|dag: Used to describe a single job or sub dax or sub dax. Atleast 1 required.
- child: The dependency section to describe dependencies between job|dax|dag elements.
Here is sample java code that illustrates how to use the Java Abstract Workflow API
java.io.File cwdFile = new java.io.File (".");
String cwd = cwdFile.getCanonicalPath();
String pegasusHome = "/usr";
String site = "TestCluster";
ADAG dax = new ADAG("diamond");
dax.addNotification(Invoke.WHEN.start,"/pegasus/libexec/notification/email -t notify@example.com");
dax.addNotification(Invoke.WHEN.at_end,"/pegasus/libexec/notification/email -t notify@example.com");
dax.addMetadata( "name", "diamond");
dax.addMetadata( "createdBy", "Karan Vahi");
File fa = new File("f.a");
fa.addPhysicalFile("file://" + cwd + "/f.a", "local");
fa.addMetaData( "size", "1024" );
dax.addFile(fa);
File fb1 = new File("f.b1");
File fb2 = new File("f.b2");
File fc1 = new File("f.c1");
File fc2 = new File("f.c2");
File fd = new File("f.d");
fd.setRegister(true);
Executable preprocess = new Executable("pegasus", "preprocess", "4.0");
preprocess.setArchitecture(Executable.ARCH.X86).setOS(Executable.OS.LINUX);
preprocess.setInstalled(true);
preprocess.addMetaData( "size", "2048" );
preprocess.addPhysicalFile("file://" + pegasus_location + "/bin/keg", site_handle);
Executable findrange = new Executable("pegasus", "findrange", "4.0");
findrange.setArchitecture(Executable.ARCH.X86).setOS(Executable.OS.LINUX);
findrange.setInstalled(true);
findrange.addPhysicalFile("file://" + pegasus_location + "/bin/keg", site_handle);
Executable analyze = new Executable("pegasus", "analyze", "4.0");
analyze.setArchitecture(Executable.ARCH.X86).setOS(Executable.OS.LINUX);
analyze.setInstalled(true);
analyze.addPhysicalFile("file://" + pegasus_location + "/bin/keg", site_handle);
dax.addExecutable(preprocess).addExecutable(findrange).addExecutable(analyze);
// Add a preprocess job
Job j1 = new Job("j1", "pegasus", "preprocess", "4.0");
j1.addArgument("-a preprocess -T 60 -i ").addArgument(fa);
j1.addArgument("-o ").addArgument(fb1);
j1.addArgument(" ").addArgument(fb2);
j1.addMetadata( "time", "60" );
j1.uses(fa, File.LINK.INPUT);
j1.uses(fb1, File.LINK.OUTPUT);
j1.uses(fb2, File.LINK.OUTPUT);
j1.addNotification(Invoke.WHEN.start,"/pegasus/libexec/notification/email -t notify@example.com");
j1.addNotification(Invoke.WHEN.at_end,"/pegasus/libexec/notification/email -t notify@example.com");
dax.addJob(j1);
// Add left Findrange job
Job j2 = new Job("j2", "pegasus", "findrange", "4.0");
j2.addArgument("-a findrange -T 60 -i ").addArgument(fb1);
j2.addArgument("-o ").addArgument(fc1);
j2.addMetadata( "time", "60" );
j2.uses(fb1, File.LINK.INPUT);
j2.uses(fc1, File.LINK.OUTPUT);
j2.addNotification(Invoke.WHEN.start,"/pegasus/libexec/notification/email -t notify@example.com");
j2.addNotification(Invoke.WHEN.at_end,"/pegasus/libexec/notification/email -t notify@example.com");
dax.addJob(j2);
// Add right Findrange job
Job j3 = new Job("j3", "pegasus", "findrange", "4.0");
j3.addArgument("-a findrange -T 60 -i ").addArgument(fb2);
j3.addArgument("-o ").addArgument(fc2);
j3.addMetadata( "time", "60" );
j3.uses(fb2, File.LINK.INPUT);
j3.uses(fc2, File.LINK.OUTPUT);
j3.addNotification(Invoke.WHEN.start,"/pegasus/libexec/notification/email -t notify@example.com");
j3.addNotification(Invoke.WHEN.at_end,"/pegasus/libexec/notification/email -t notify@example.com");
dax.addJob(j3);
// Add analyze job
Job j4 = new Job("j4", "pegasus", "analyze", "4.0");
j4.addArgument("-a analyze -T 60 -i ").addArgument(fc1);
j4.addArgument(" ").addArgument(fc2);
j4.addArgument("-o ").addArgument(fd);
j4.addMetadata( "time", "60" );
j4.uses(fc1, File.LINK.INPUT);
j4.uses(fc2, File.LINK.INPUT);
j4.uses(fd, File.LINK.OUTPUT);
j4.addNotification(Invoke.WHEN.start,"/pegasus/libexec/notification/email -t notify@example.com");
j4.addNotification(Invoke.WHEN.at_end,"/pegasus/libexec/notification/email -t notify@example.com");
dax.addJob(j4);
dax.addDependency("j1", "j2");
dax.addDependency("j1", "j3");
dax.addDependency("j2", "j4");
dax.addDependency("j3", "j4");
dax.writeToSTDOUT();
- Version:
- $Revision$
- Author:
- Gaurang Mehta gmehta at isi dot edu, Karan Vahi, Ryan Tanaka
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic enumEnum to indicate how the generated dax file should be formatted(package private) static classCustom serializer for YAML representation of ADAG -
Field Summary
FieldsModifier and TypeFieldDescriptionstatic ADAG.FORMATThe default format to use for writing out the Abstract Workflow.private intThe Count of the number of dax objects : NMap of Dependencies between Job,DAX,DAG objects.private Set<Executable>The list of Executable objectsThe list of edu.isi.pegasus.planner.dax.File objectsprivate intThe Index of the dax object.List of Notification objectsprivate Map<String,AbstractJob> The List of Job,DAX and DAG objectsprivate edu.isi.pegasus.common.logging.LogManagerThe metadata attributes associated with the whole workflow.private StringThe Name / Label of the DAXprivate Set<Transformation>The List of Transformation objectsprivate edu.isi.pegasus.common.util.XMLWriterHandle the XML writerstatic final Stringstatic final Stringstatic final StringThe "not-so-official" location URL of the DAX schema definition.static final StringThe "official" namespace URI of the site catalog schema.static final StringXSI SCHEMA NAMESPACEstatic final StringThe version to report.private static final StringThe type of Abstract Workflow API generated -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionprivate ADAGaddAbstractJob(AbstractJob ajob) Add AbstractJob to the DAXprivate ADAGaddAbstractJobs(List<AbstractJob> ajobs) Add AbstractJobs to the DAXAdd a DAG job to the DAXAdd multiple DAG jobs to the DAXAdd a DAX job to the DAXAdd multiple DAX jobs to the DAXaddDependency(AbstractJob parent, AbstractJob child) Add a parent child dependency between two jobs,dax,dagaddDependency(AbstractJob parent, AbstractJob child, String label) Add a parent child dependency with a dependency labeladdDependency(String parent, String child) Add a parent child dependency between two jobs,dax,dagaddDependency(String parent, String child, String label) Add a parent child dependency with a dependency labeladdExecutable(Executable executable) Add Executable to the DAXaddExecutables(List<Executable> executables) Add Multiple Executable objects to the DAXAdd a RC File object to the top of the DAX.Add Files to the RC Section on top of the DAXAdd a Notification for this WorkflowaddInvoke(Invoke.WHEN when, String what) Add a Notification for this WorkflowaddInvokes(List<Invoke> invokes) Add a List of Notifications for this WorkflowAdd Job to the DAXAdd multiple Jobs to the DAXaddMetaData(String key, String value) Adds metadata to the workflowaddNotification(Invoke invoke) Add a Notification for this WorkflowaddNotification(Invoke.WHEN when, String what) Add a Notification for this WorkflowaddNotifications(List<Invoke> invokes) Add a List of Notifications for this WorkflowaddTransformation(Transformation transformation) Add Transformation to the DAXaddTransformations(List<Transformation> transformations) Add Multiple Transformation to the DAXprivate booleanCheck if an abstractjob exists in the DAXprivate booleancontainsAbstractJobId(String ajobid) Check if a jobid exists in the DAXbooleancontainsDAG(DAG dag) Check if a DAG job exists in the DAXbooleancontainsDAGId(String dagid) Check if a DAG job id exists in the DAXbooleancontainsDAX(DAX dax) Check if a DAX job exists in the DAXbooleancontainsDAXId(String daxid) Check if a DAX job id exists in the DAXbooleancontainsExecutable(Executable executable) Checks if a given executable exists in the DAX based Transformation CatalogbooleancontainsJob(Job job) Check if a job exists in the DAXbooleancontainsJobId(String jobid) Check if a jobid exists in the DAXbooleancontainsTransformation(Transformation transformation) Checks if a given Transformation exists in the DAX based Transformation Catalogprivate static ADAGDiamond()private AbstractJobgetAbstractJob(String ajobid) Returns an abstract Job with id ajobid if present otherwise null.intgetCount()Returns the total count of the dax collection.Returns a DAG object with id dagid if present otherwise null.getDAGs()Get a list of all the DAG jobs.Returns a DAX object with id daxid if present otherwise null.getDAXs()Get a list of all the DAX jobs.getEdges()Returns a Set of all the Edge objects for the DAX.Returns empty if no dependencies.Returns a list of Edge objects for a child job/dax/dag id.Returns a set of Executable Objects stored as part of the inDAX Transformation Catalog;getFiles()Returns a list of File objects defined as the inDax Replica CatalogintgetIndex()Returns a list of Invoke objects associated with the workflowReturns a Job object with id jobid if present otherwise null.getJobs()Get a list of all the DAG jobs.getMetaData(String key) Returns the metadata associated for a key if exists, else nullgetName()Return the name/label of the daxReturns a list of Invoke objects associated with the workflow.Returns pegasus vendor extensions that we encode in each workflowReturns a set of Transformation Objects (complex executables) stored in the DAX based Transformation Catalogstatic voidCreate an example DIAMOND DAXprivate static ADAGprivate static ADAGvoidtoXML(edu.isi.pegasus.common.util.XMLWriter writer) Generates a DAX representation.toYAML()Generates the YAML representation of a workflow.voidwriteTo(Writer writer, ADAG.FORMAT format) Generate a DAX File out of this object;voidwriteToFile(String daxfile) Generate a DAX File out of this object in YAML format.voidwriteToFile(String daxfile, ADAG.FORMAT format) Generate a DAX File out of this object;voidConvenience function to write out the generated DAX to stdout in YAML formatvoidwriteToSTDOUT(ADAG.FORMAT format) Convenience function to write out the generated DAX to stdoutvoidwriteToWriter(Writer writer, boolean close) Deprecated.
-
Field Details
-
SCHEMA_NAMESPACE
The "official" namespace URI of the site catalog schema.- See Also:
-
SCHEMA_NAMESPACE_XSI
XSI SCHEMA NAMESPACE- See Also:
-
SCHEMA_LOCATION
The "not-so-official" location URL of the DAX schema definition.- See Also:
-
SCHEMA_VERSION
The version to report.- See Also:
-
PEGASUS_VENDOR_EXTENSION_KEY
- See Also:
-
PEGASUS_YAML_ABSTRACT_WF_VERSION
- See Also:
-
DEFAULT_FORMAT
The default format to use for writing out the Abstract Workflow. -
WF_API_KEY
The type of Abstract Workflow API generated- See Also:
-
mName
The Name / Label of the DAX -
mIndex
private int mIndexThe Index of the dax object. I out of N -
mCount
private int mCountThe Count of the number of dax objects : N -
mJobs
The List of Job,DAX and DAG objects- See Also:
-
mLJobs
-
mLDAGs
-
mLDAXs
-
mTransformations
The List of Transformation objects- See Also:
-
mExecutables
The list of Executable objects- See Also:
-
mFiles
The list of edu.isi.pegasus.planner.dax.File objects- See Also:
-
mDependencies
Map of Dependencies between Job,DAX,DAG objects. Map key is a string that holds the child element reference, the value is a List of Parent objects- See Also:
-
mInvokes
List of Notification objects -
mMetaDataAttributes
The metadata attributes associated with the whole workflow. -
mWriter
private edu.isi.pegasus.common.util.XMLWriter mWriterHandle the XML writer -
mLogger
private edu.isi.pegasus.common.logging.LogManager mLogger
-
-
Constructor Details
-
ADAG
The Simple constructor for the DAX object- Parameters:
name- DAX LABEL
-
ADAG
DAX Constructor- Parameters:
name- DAX Labelindex- Index of DAX out of N DAX'scount- Number of DAXS in a group
-
-
Method Details
-
getPegasusVendorExtensions
Returns pegasus vendor extensions that we encode in each workflow- Returns:
- Map
-
getName
Return the name/label of the dax- Returns:
- the name
-
getIndex
public int getIndex() -
getCount
public int getCount()Returns the total count of the dax collection. (legacy)- Returns:
- int
-
addInvoke
Add a Notification for this Workflow- Parameters:
when- when to do the notificationwhat- executable and arguments to invoke- Returns:
- ADAG
-
addNotification
Add a Notification for this Workflow- Parameters:
when- when to do the notificationwhat- executable and arguments to invoke- Returns:
- ADAG
-
addInvoke
Add a Notification for this Workflow- Parameters:
invoke- the invoke object- Returns:
- ADAG
-
addNotification
Add a Notification for this Workflow- Parameters:
invoke- the invoke object- Returns:
- ADAG
-
addInvokes
Add a List of Notifications for this Workflow- Parameters:
invokes- List of Invoke objects- Returns:
- ADAG
-
addNotifications
Add a List of Notifications for this Workflow- Parameters:
invokes- List of Invoke objects- Returns:
- ADAG
-
getInvoke
Returns a list of Invoke objects associated with the workflow- Returns:
- list of notifications
-
getNotification
Returns a list of Invoke objects associated with the workflow. Same as getInvoke()- Returns:
- list of notifications
-
addMetaData
Adds metadata to the workflow- Parameters:
key- key name for metadatavalue- value- Returns:
- ADAG
-
getMetaData
Returns the metadata associated for a key if exists, else null- Parameters:
key- the key- Returns:
- String
-
addFile
Add a RC File object to the top of the DAX.- Parameters:
file- File object to be added to the RC section- Returns:
- ADAG
- See Also:
-
addFiles
Add Files to the RC Section on top of the DAX- Parameters:
files- List of File objects to be added to the RC Section- Returns:
- ADAG
- See Also:
-
getFiles
Returns a list of File objects defined as the inDax Replica Catalog- Returns:
- List of Files
-
addExecutable
Add Executable to the DAX- Parameters:
executable- Executable to be added- Returns:
- ADAG
- See Also:
-
addExecutables
Add Multiple Executable objects to the DAX- Parameters:
executables- List of Executable objects to be added- Returns:
- ADAG
- See Also:
-
getExecutables
Returns a set of Executable Objects stored as part of the inDAX Transformation Catalog;- Returns:
- Set of executables stored in TC
-
containsExecutable
Checks if a given executable exists in the DAX based Transformation Catalog- Parameters:
executable- the executable- Returns:
- boolean
-
addTransformation
Add Transformation to the DAX- Parameters:
transformation- Transformation object to be added- Returns:
- ADAG
- See Also:
-
addTransformations
Add Multiple Transformation to the DAX- Parameters:
transformations- List of Transformation objects- Returns:
- ADAG
- See Also:
-
containsTransformation
Checks if a given Transformation exists in the DAX based Transformation Catalog- Parameters:
transformation- Transformation- Returns:
- boolean
-
getTransformations
Returns a set of Transformation Objects (complex executables) stored in the DAX based Transformation Catalog- Returns:
- Set of Transformation stored in the DAX
-
addAbstractJob
Add AbstractJob to the DAX- Parameters:
ajob- AbstractJob- Returns:
- ADAG
- See Also:
-
addAbstractJobs
Add AbstractJobs to the DAX- Parameters:
ajobs- AbstractJob- Returns:
- ADAG
- See Also:
-
getAbstractJob
Returns an abstract Job with id ajobid if present otherwise null.- Parameters:
ajobid- abstract job id- Returns:
- AbstractJob
-
containsAbstractJob
Check if an abstractjob exists in the DAX- Parameters:
ajob- the abstract job- Returns:
- boolean
-
containsAbstractJobId
Check if a jobid exists in the DAX- Parameters:
ajobid- the job id- Returns:
- boolean
-
addJob
Add Job to the DAX- Parameters:
job- the job- Returns:
- ADAG
- See Also:
-
addJobs
Add multiple Jobs to the DAX- Parameters:
jobs- List of jobs to add- Returns:
- ADAG
- See Also:
-
containsJob
Check if a job exists in the DAX- Parameters:
job- the job to check- Returns:
- boolean
-
containsJobId
Check if a jobid exists in the DAX- Parameters:
jobid- the job id- Returns:
- boolean
-
getJob
Returns a Job object with id jobid if present otherwise null.- Parameters:
jobid- the job id- Returns:
- the Job
-
getJobs
Get a list of all the DAG jobs.- Returns:
- List of Job
-
getDAXs
Get a list of all the DAX jobs.- Returns:
- List of DAX jobs
-
getDAX
Returns a DAX object with id daxid if present otherwise null.- Parameters:
daxid- the dax id- Returns:
- DAX
-
getDAGs
Get a list of all the DAG jobs.- Returns:
- List of DAG objects
-
getDAG
Returns a DAG object with id dagid if present otherwise null.- Parameters:
dagid- the dagid- Returns:
- the DAG
-
addDAG
Add a DAG job to the DAX- Parameters:
dag- the DAG to be added- Returns:
- ADAG
- See Also:
-
addDAGs
Add multiple DAG jobs to the DAX- Parameters:
dags- List of DAG jobs to be added- Returns:
- ADAG
- See Also:
-
containsDAG
Check if a DAG job exists in the DAX- Parameters:
dag- the dag- Returns:
- boolean
-
containsDAGId
Check if a DAG job id exists in the DAX- Parameters:
dagid- the dagid- Returns:
- boolean
-
addDAX
Add a DAX job to the DAX- Parameters:
dax- DAX to be added- Returns:
- ADAG
- See Also:
-
addDAXs
Add multiple DAX jobs to the DAX- Parameters:
daxs- LIST of DAX jobs to be added- Returns:
- ADAG
- See Also:
-
containsDAX
Check if a DAX job exists in the DAX- Parameters:
dax- the dax- Returns:
- boolean
-
containsDAXId
Check if a DAX job id exists in the DAX- Parameters:
daxid- the dax id- Returns:
- boolean
-
addDependency
Add a parent child dependency between two jobs,dax,dag- Parameters:
parent- String job,dax,dag idchild- String job,dax,dag,id- Returns:
- ADAG
-
addDependency
Add a parent child dependency between two jobs,dax,dag- Parameters:
parent- Job|DAX|DAG objectchild- Job|DAX|DAG object- Returns:
- ADAG
-
addDependency
Add a parent child dependency with a dependency label- Parameters:
parent- String job,dax,dag idchild- String job,dax,dag idlabel- String dependency label- Returns:
- ADAG
-
getEdges
Returns a list of Edge objects for a child job/dax/dag id. Returns an empty set if the child does not have any parents Returns null if the child is not a valid job/dax/dag id- Parameters:
child- the child- Returns:
- Set of Edges
-
getEdges
Returns a Set of all the Edge objects for the DAX.Returns empty if no dependencies.- Returns:
- set of all edges
-
addDependency
Add a parent child dependency with a dependency label- Parameters:
parent- Job|DAX|DAG objectchild- Job|DAX|DAG objectlabel- String label for annotation- Returns:
- ADAG
-
writeToFile
Generate a DAX File out of this object in YAML format.- Parameters:
daxfile- The file to write the DAX to
-
writeToFile
Generate a DAX File out of this object;- Parameters:
daxfile- The file to write the DAX toformat- how should the file be formatted
-
writeTo
Generate a DAX File out of this object;- Parameters:
writer- the writer to the dax fileformat- how should the file be formatted
-
writeToSTDOUT
public void writeToSTDOUT()Convenience function to write out the generated DAX to stdout in YAML format -
writeToSTDOUT
Convenience function to write out the generated DAX to stdout- Parameters:
format- how should the file be formatted
-
writeToWriter
Deprecated.Generate a DAX representation and pipe it into the Writer- Parameters:
writer- A Writer objectclose- Whether writer should be closed on return.
-
toYAML
Generates the YAML representation of a workflow.- Returns:
- YAML representation of this ADAG as a String
-
toXML
public void toXML(edu.isi.pegasus.common.util.XMLWriter writer) Generates a DAX representation.- Parameters:
writer- the xml writer
-
main
Create an example DIAMOND DAX- Parameters:
args- main args
-
Performance
-
Diamond
-
SingleJob
-