public class ADAG extends Object
This class provides the Java API to create Abstract Workflow files. Starting 5.0 Release, this class by default writes out YAML formatted abstract workflow file. The information included in the generated abstract workflow is a subset of the Pegasus Workflow YAML schema that is available at http://pegasus.isi.edu/schema/wf-5.0.yml In particular, it is missing options to create a Site Catalog, and associate Containers with user executables. If you need this information, we recommend you use the python Pegasus workflow API. The Abstract Workflow consists of 6 parts the first 4 are optional and the last is optional.
Here is sample java code that illustrates how to use the Java Abstract Workflow API
java.io.File cwdFile = new java.io.File ("."); String cwd = cwdFile.getCanonicalPath(); String pegasusHome = "/usr"; String site = "TestCluster"; ADAG dax = new ADAG("diamond"); dax.addNotification(Invoke.WHEN.start,"/pegasus/libexec/notification/email -t notify@example.com"); dax.addNotification(Invoke.WHEN.at_end,"/pegasus/libexec/notification/email -t notify@example.com"); dax.addMetadata( "name", "diamond"); dax.addMetadata( "createdBy", "Karan Vahi"); File fa = new File("f.a"); fa.addPhysicalFile("file://" + cwd + "/f.a", "local"); fa.addMetaData( "size", "1024" ); dax.addFile(fa); File fb1 = new File("f.b1"); File fb2 = new File("f.b2"); File fc1 = new File("f.c1"); File fc2 = new File("f.c2"); File fd = new File("f.d"); fd.setRegister(true); Executable preprocess = new Executable("pegasus", "preprocess", "4.0"); preprocess.setArchitecture(Executable.ARCH.X86).setOS(Executable.OS.LINUX); preprocess.setInstalled(true); preprocess.addMetaData( "size", "2048" ); preprocess.addPhysicalFile("file://" + pegasus_location + "/bin/keg", site_handle); Executable findrange = new Executable("pegasus", "findrange", "4.0"); findrange.setArchitecture(Executable.ARCH.X86).setOS(Executable.OS.LINUX); findrange.setInstalled(true); findrange.addPhysicalFile("file://" + pegasus_location + "/bin/keg", site_handle); Executable analyze = new Executable("pegasus", "analyze", "4.0"); analyze.setArchitecture(Executable.ARCH.X86).setOS(Executable.OS.LINUX); analyze.setInstalled(true); analyze.addPhysicalFile("file://" + pegasus_location + "/bin/keg", site_handle); dax.addExecutable(preprocess).addExecutable(findrange).addExecutable(analyze); // Add a preprocess job Job j1 = new Job("j1", "pegasus", "preprocess", "4.0"); j1.addArgument("-a preprocess -T 60 -i ").addArgument(fa); j1.addArgument("-o ").addArgument(fb1); j1.addArgument(" ").addArgument(fb2); j1.addMetadata( "time", "60" ); j1.uses(fa, File.LINK.INPUT); j1.uses(fb1, File.LINK.OUTPUT); j1.uses(fb2, File.LINK.OUTPUT); j1.addNotification(Invoke.WHEN.start,"/pegasus/libexec/notification/email -t notify@example.com"); j1.addNotification(Invoke.WHEN.at_end,"/pegasus/libexec/notification/email -t notify@example.com"); dax.addJob(j1); // Add left Findrange job Job j2 = new Job("j2", "pegasus", "findrange", "4.0"); j2.addArgument("-a findrange -T 60 -i ").addArgument(fb1); j2.addArgument("-o ").addArgument(fc1); j2.addMetadata( "time", "60" ); j2.uses(fb1, File.LINK.INPUT); j2.uses(fc1, File.LINK.OUTPUT); j2.addNotification(Invoke.WHEN.start,"/pegasus/libexec/notification/email -t notify@example.com"); j2.addNotification(Invoke.WHEN.at_end,"/pegasus/libexec/notification/email -t notify@example.com"); dax.addJob(j2); // Add right Findrange job Job j3 = new Job("j3", "pegasus", "findrange", "4.0"); j3.addArgument("-a findrange -T 60 -i ").addArgument(fb2); j3.addArgument("-o ").addArgument(fc2); j3.addMetadata( "time", "60" ); j3.uses(fb2, File.LINK.INPUT); j3.uses(fc2, File.LINK.OUTPUT); j3.addNotification(Invoke.WHEN.start,"/pegasus/libexec/notification/email -t notify@example.com"); j3.addNotification(Invoke.WHEN.at_end,"/pegasus/libexec/notification/email -t notify@example.com"); dax.addJob(j3); // Add analyze job Job j4 = new Job("j4", "pegasus", "analyze", "4.0"); j4.addArgument("-a analyze -T 60 -i ").addArgument(fc1); j4.addArgument(" ").addArgument(fc2); j4.addArgument("-o ").addArgument(fd); j4.addMetadata( "time", "60" ); j4.uses(fc1, File.LINK.INPUT); j4.uses(fc2, File.LINK.INPUT); j4.uses(fd, File.LINK.OUTPUT); j4.addNotification(Invoke.WHEN.start,"/pegasus/libexec/notification/email -t notify@example.com"); j4.addNotification(Invoke.WHEN.at_end,"/pegasus/libexec/notification/email -t notify@example.com"); dax.addJob(j4); dax.addDependency("j1", "j2"); dax.addDependency("j1", "j3"); dax.addDependency("j2", "j4"); dax.addDependency("j3", "j4"); dax.writeToSTDOUT();
Modifier and Type | Class and Description |
---|---|
static class |
ADAG.FORMAT
Enum to indicate how the generated dax file should be formatted
|
(package private) static class |
ADAG.JsonSerializer
Custom serializer for YAML representation of ADAG
|
Modifier and Type | Field and Description |
---|---|
static ADAG.FORMAT |
DEFAULT_FORMAT
The default format to use for writing out the Abstract Workflow.
|
private int |
mCount
The Count of the number of dax objects : N
|
private Map<String,Set<Edge>> |
mDependencies
Map of Dependencies between Job,DAX,DAG objects.
|
private Set<Executable> |
mExecutables
The list of Executable objects
|
private List<File> |
mFiles
The list of edu.isi.pegasus.planner.dax.File objects
|
private int |
mIndex
The Index of the dax object.
|
private List<Invoke> |
mInvokes
List of Notification objects
|
private Map<String,AbstractJob> |
mJobs
The List of Job,DAX and DAG objects
|
private List<DAG> |
mLDAGs |
private List<DAX> |
mLDAXs |
private List<Job> |
mLJobs |
private edu.isi.pegasus.common.logging.LogManager |
mLogger |
private Set<MetaData> |
mMetaDataAttributes
The metadata attributes associated with the whole workflow.
|
private String |
mName
The Name / Label of the DAX
|
private Set<Transformation> |
mTransformations
The List of Transformation objects
|
private edu.isi.pegasus.common.util.XMLWriter |
mWriter
Handle the XML writer
|
static String |
PEGASUS_VENDOR_EXTENSION_KEY |
static String |
SCHEMA_LOCATION
The "not-so-official" location URL of the DAX schema definition.
|
static String |
SCHEMA_NAMESPACE
The "official" namespace URI of the site catalog schema.
|
static String |
SCHEMA_NAMESPACE_XSI
XSI SCHEMA NAMESPACE
|
static String |
SCHEMA_VERSION
The version to report.
|
private static String |
WF_API_KEY
The type of Abstract Workflow API generated
|
Constructor and Description |
---|
ADAG(String name)
The Simple constructor for the DAX object
|
ADAG(String name,
int index,
int count)
DAX Constructor
|
Modifier and Type | Method and Description |
---|---|
private ADAG |
addAbstractJob(AbstractJob ajob)
Add AbstractJob to the DAX
|
private ADAG |
addAbstractJobs(List<AbstractJob> ajobs)
Add AbstractJobs to the DAX
|
ADAG |
addDAG(DAG dag)
Add a DAG job to the DAX
|
ADAG |
addDAGs(List<DAG> dags)
Add multiple DAG jobs to the DAX
|
ADAG |
addDAX(DAX dax)
Add a DAX job to the DAX
|
ADAG |
addDAXs(List<DAX> daxs)
Add multiple DAX jobs to the DAX
|
ADAG |
addDependency(AbstractJob parent,
AbstractJob child)
Add a parent child dependency between two jobs,dax,dag
|
ADAG |
addDependency(AbstractJob parent,
AbstractJob child,
String label)
Add a parent child dependency with a dependency label
|
ADAG |
addDependency(String parent,
String child)
Add a parent child dependency between two jobs,dax,dag
|
ADAG |
addDependency(String parent,
String child,
String label)
Add a parent child dependency with a dependency label
|
ADAG |
addExecutable(Executable executable)
Add Executable to the DAX
|
ADAG |
addExecutables(List<Executable> executables)
Add Multiple Executable objects to the DAX
|
ADAG |
addFile(File file)
Add a RC File object to the top of the DAX.
|
ADAG |
addFiles(List<File> files)
Add Files to the RC Section on top of the DAX
|
ADAG |
addInvoke(Invoke.WHEN when,
String what)
Add a Notification for this Workflow
|
ADAG |
addInvoke(Invoke invoke)
Add a Notification for this Workflow
|
ADAG |
addInvokes(List<Invoke> invokes)
Add a List of Notifications for this Workflow
|
ADAG |
addJob(Job job)
Add Job to the DAX
|
ADAG |
addJobs(List<Job> jobs)
Add multiple Jobs to the DAX
|
ADAG |
addMetaData(String key,
String value)
Adds metadata to the workflow
|
ADAG |
addNotification(Invoke.WHEN when,
String what)
Add a Notification for this Workflow
|
ADAG |
addNotification(Invoke invoke)
Add a Notification for this Workflow
|
ADAG |
addNotifications(List<Invoke> invokes)
Add a List of Notifications for this Workflow
|
ADAG |
addTransformation(Transformation transformation)
Add Transformation to the DAX
|
ADAG |
addTransformations(List<Transformation> transformations)
Add Multiple Transformation to the DAX
|
private boolean |
containsAbstractJob(AbstractJob ajob)
Check if an abstractjob exists in the DAX
|
private boolean |
containsAbstractJobId(String ajobid)
Check if a jobid exists in the DAX
|
boolean |
containsDAG(DAG dag)
Check if a DAG job exists in the DAX
|
boolean |
containsDAGId(String dagid)
Check if a DAG job id exists in the DAX
|
boolean |
containsDAX(DAX dax)
Check if a DAX job exists in the DAX
|
boolean |
containsDAXId(String daxid)
Check if a DAX job id exists in the DAX
|
boolean |
containsExecutable(Executable executable)
Checks if a given executable exists in the DAX based Transformation Catalog
|
boolean |
containsJob(Job job)
Check if a job exists in the DAX
|
boolean |
containsJobId(String jobid)
Check if a jobid exists in the DAX
|
boolean |
containsTransformation(Transformation transformation)
Checks if a given Transformation exists in the DAX based Transformation Catalog
|
private static ADAG |
Diamond() |
private AbstractJob |
getAbstractJob(String ajobid)
Returns an abstract Job with id ajobid if present otherwise null.
|
int |
getCount()
Returns the total count of the dax collection.
|
DAG |
getDAG(String dagid)
Returns a DAG object with id dagid if present otherwise null.
|
List<DAG> |
getDAGs()
Get a list of all the DAG jobs.
|
DAX |
getDAX(String daxid)
Returns a DAX object with id daxid if present otherwise null.
|
List<DAX> |
getDAXs()
Get a list of all the DAX jobs.
|
Set<Edge> |
getEdges()
Returns a Set of all the Edge objects for the DAX.Returns empty if no dependencies.
|
Set<Edge> |
getEdges(String child)
Returns a list of Edge objects for a child job/dax/dag id.
|
Set<Executable> |
getExecutables()
Returns a set of Executable Objects stored as part of the inDAX Transformation Catalog;
|
List<File> |
getFiles()
Returns a list of File objects defined as the inDax Replica Catalog
|
int |
getIndex() |
List<Invoke> |
getInvoke()
Returns a list of Invoke objects associated with the workflow
|
Job |
getJob(String jobid)
Returns a Job object with id jobid if present otherwise null.
|
List<Job> |
getJobs()
Get a list of all the DAG jobs.
|
String |
getMetaData(String key)
Returns the metadata associated for a key if exists, else null
|
String |
getName()
Return the name/label of the dax
|
List<Invoke> |
getNotification()
Returns a list of Invoke objects associated with the workflow.
|
private Map<String,String> |
getPegasusVendorExtensions()
Returns pegasus vendor extensions that we encode in each workflow
|
Set<Transformation> |
getTransformations()
Returns a set of Transformation Objects (complex executables) stored in the DAX based
Transformation Catalog
|
static void |
main(String[] args)
Create an example DIAMOND DAX
|
void |
toXML(edu.isi.pegasus.common.util.XMLWriter writer)
Generates a DAX representation.
|
String |
toYAML()
Generates the YAML representation of a workflow.
|
void |
writeTo(Writer writer,
ADAG.FORMAT format)
Generate a DAX File out of this object;
|
void |
writeToFile(String daxfile)
Generate a DAX File out of this object in YAML format.
|
void |
writeToFile(String daxfile,
ADAG.FORMAT format)
Generate a DAX File out of this object;
|
void |
writeToSTDOUT()
Convenience function to write out the generated DAX to stdout in YAML format
|
void |
writeToSTDOUT(ADAG.FORMAT format)
Convenience function to write out the generated DAX to stdout
|
void |
writeToWriter(Writer writer,
boolean close)
Deprecated.
|
public static final String SCHEMA_NAMESPACE
public static final String SCHEMA_NAMESPACE_XSI
public static final String SCHEMA_LOCATION
public static final String SCHEMA_VERSION
public static final String PEGASUS_VENDOR_EXTENSION_KEY
public static ADAG.FORMAT DEFAULT_FORMAT
private static final String WF_API_KEY
private String mName
private int mIndex
private int mCount
private Map<String,AbstractJob> mJobs
DAG
,
DAX
,
Job
,
AbstractJob
private Set<Transformation> mTransformations
Transformation
private Set<Executable> mExecutables
Executable
private List<File> mFiles
File
private Map<String,Set<Edge>> mDependencies
Edge
private Set<MetaData> mMetaDataAttributes
private edu.isi.pegasus.common.util.XMLWriter mWriter
private edu.isi.pegasus.common.logging.LogManager mLogger
public ADAG(String name)
name
- DAX LABELpublic ADAG(String name, int index, int count)
name
- DAX Labelindex
- Index of DAX out of N DAX'scount
- Number of DAXS in a groupprivate Map<String,String> getPegasusVendorExtensions()
public String getName()
public int getIndex()
public int getCount()
public ADAG addInvoke(Invoke.WHEN when, String what)
when
- when to do the notificationwhat
- executable and arguments to invokepublic ADAG addNotification(Invoke.WHEN when, String what)
when
- when to do the notificationwhat
- executable and arguments to invokepublic ADAG addInvoke(Invoke invoke)
invoke
- the invoke objectpublic ADAG addNotification(Invoke invoke)
invoke
- the invoke objectpublic ADAG addInvokes(List<Invoke> invokes)
invokes
- List of Invoke objectspublic ADAG addNotifications(List<Invoke> invokes)
invokes
- List of Invoke objectspublic List<Invoke> getInvoke()
public List<Invoke> getNotification()
public ADAG addMetaData(String key, String value)
key
- key name for metadatavalue
- valuepublic String getMetaData(String key)
key
- the keypublic ADAG addFile(File file)
file
- File object to be added to the RC sectionFile
public ADAG addFiles(List<File> files)
files
- List of File objects to be added to the RC SectionFile
public List<File> getFiles()
public ADAG addExecutable(Executable executable)
executable
- Executable to be addedExecutable
public ADAG addExecutables(List<Executable> executables)
executables
- List of Executable objects to be addedExecutable
public Set<Executable> getExecutables()
public boolean containsExecutable(Executable executable)
executable
- the executablepublic ADAG addTransformation(Transformation transformation)
transformation
- Transformation object to be addedTransformation
public ADAG addTransformations(List<Transformation> transformations)
transformations
- List of Transformation objectsTransformation
public boolean containsTransformation(Transformation transformation)
transformation
- Transformationpublic Set<Transformation> getTransformations()
private ADAG addAbstractJob(AbstractJob ajob)
ajob
- AbstractJobJob
,
DAG
,
DAX
,
AbstractJob
private ADAG addAbstractJobs(List<AbstractJob> ajobs)
ajobs
- AbstractJobJob
,
DAG
,
DAX
,
AbstractJob
private AbstractJob getAbstractJob(String ajobid)
ajobid
- abstract job idprivate boolean containsAbstractJob(AbstractJob ajob)
ajob
- the abstract jobprivate boolean containsAbstractJobId(String ajobid)
ajobid
- the job idpublic ADAG addJob(Job job)
job
- the jobJob
,
AbstractJob
public ADAG addJobs(List<Job> jobs)
jobs
- List of jobs to addJob
,
AbstractJob
public boolean containsJob(Job job)
job
- the job to checkpublic boolean containsJobId(String jobid)
jobid
- the job idpublic Job getJob(String jobid)
jobid
- the job idpublic DAX getDAX(String daxid)
daxid
- the dax idpublic DAG getDAG(String dagid)
dagid
- the dagidpublic ADAG addDAG(DAG dag)
dag
- the DAG to be addedDAG
,
AbstractJob
public ADAG addDAGs(List<DAG> dags)
dags
- List of DAG jobs to be addedDAG
,
AbstractJob
public boolean containsDAG(DAG dag)
dag
- the dagpublic boolean containsDAGId(String dagid)
dagid
- the dagidpublic ADAG addDAX(DAX dax)
dax
- DAX to be addedDAX
,
AbstractJob
public ADAG addDAXs(List<DAX> daxs)
daxs
- LIST of DAX jobs to be addedDAX
,
AbstractJob
public boolean containsDAX(DAX dax)
dax
- the daxpublic boolean containsDAXId(String daxid)
daxid
- the dax idpublic ADAG addDependency(String parent, String child)
parent
- String job,dax,dag idchild
- String job,dax,dag,idpublic ADAG addDependency(AbstractJob parent, AbstractJob child)
parent
- Job|DAX|DAG objectchild
- Job|DAX|DAG objectpublic ADAG addDependency(String parent, String child, String label)
parent
- String job,dax,dag idchild
- String job,dax,dag idlabel
- String dependency labelpublic Set<Edge> getEdges(String child)
child
- the childpublic Set<Edge> getEdges()
public ADAG addDependency(AbstractJob parent, AbstractJob child, String label)
parent
- Job|DAX|DAG objectchild
- Job|DAX|DAG objectlabel
- String label for annotationpublic void writeToFile(String daxfile)
daxfile
- The file to write the DAX topublic void writeToFile(String daxfile, ADAG.FORMAT format)
daxfile
- The file to write the DAX toformat
- how should the file be formattedpublic void writeTo(Writer writer, ADAG.FORMAT format)
writer
- the writer to the dax fileformat
- how should the file be formattedpublic void writeToSTDOUT()
public void writeToSTDOUT(ADAG.FORMAT format)
format
- how should the file be formattedpublic void writeToWriter(Writer writer, boolean close)
writer
- A Writer objectclose
- Whether writer should be closed on return.public String toYAML()
public void toXML(edu.isi.pegasus.common.util.XMLWriter writer)
writer
- the xml writerpublic static void main(String[] args)
args
- main argsprivate static ADAG Diamond()