The Pegasus Workflow Mapper now supports job and workflow level notifications. You can specify in the DAX with the job or the workflow
the event when the notification needs to be sent
the executable that needs to be invoked.
The notifications are issued from the submit host by the pegasus-monitord daemon that monitors the Condor logs for the workflow. When a notification is issued, pegasus-monitord while invoking the notifying executable sets certain environment variables that contain information about the job and workflow state.
The Pegasus release comes with default notification clients that send notifications via email or jabber.
Currently, you can specify notifications for the jobs and the workflow by the use of invoke elements.
Invoke elements can be sub elements for the following elements in the DAX schema.
job - to associate notifications with a compute job in the DAX.
dax - to associate notifications with a dax job in the DAX.
dag - to associate notifications with a dag job in the DAX.
executable - to associate notifications with a job that uses a particular notification
The invoke element can be specified at the root element level of the DAX to indicate workflow level notifications.
The invoke element may be specified multiple times, as needed. It has a mandatory when attribute with the following value set
Table 6.6. Invoke Element attributes and meaning.
|Enumeration of Values for when attribute||Meaning|
|never||(default). Never notify of anything. This is useful to temporarily disable an existing notifications.|
|start||create a notification when the job is submitted.|
|on_error||after a job finishes with failure (exitcode != 0).|
|on_success||after a job finishes with success (exitcode == 0).|
|at_end||after a job finishes, regardless of exitcode.|
|all||like start and at_end combined.|
You can specify multiple invoke elements corresponding to same when attribute value in the DAX. This will allow you to have multiple notifications for the same event.
Here is an example that illustrates that.
<job id="ID000001" namespace="example" name="mDiffFit" version="1.0" node-label="preprocess" > <argument>-a top -T 6 -i <file name="f.a"/> -o <file name="f.b1"/></argument> <!-- profiles are optional --> <profile namespace="execution" key="site">isi_viz</profile> <profile namespace="condor" key="getenv">true</profile> <uses name="f.a" link="input" register="false" transfer="true" type="data" /> <uses name="f.b" link="output" register="false" transfer="true" type="data" /> <!-- 'WHEN' enumeration: never, start, on_error, on_success, at_end, all --> <invoke when="start">/path/to/notify1 arg1 arg2</invoke> <invoke when="start">/path/to/notify1 arg3 arg4</invoke> <invoke when="on_success">/path/to/notify2 arg3 arg4</invoke> </job>
In the above example the executable notify1 will be invoked twice when a job is submitted ( when="start" ), once with arguments arg1 and arg2 and second time with arguments arg3 and arg4.
The DAX Generator API chapter has information about how to add notifications to the DAX using the DAX api's.
Pegasus while planning a workflow writes out a notify file in the submit directory that contains all the notifications that need to be sent for the workflow. pegasus-monitord picks up this notifications file to determine what notifications need to be sent and when.
ENTITY_TYPE ID NOTIFICATION_CONDITION ACTION
ENTITY_TYPE can be either of the following keywords
WORKFLOW - indicates workflow level notification
JOB - indicates notifications for a job in the executable workflow
DAXJOB - indicates notifications for a DAX Job in the executable workflow
DAGJOB - indicates notifications for a DAG Job in the executable workflow
ID indicates the identifier for the entity. It has different meaning depending on the entity type - -
workflow - ID is wf_uuid
JOB|DAXJOB|DAGJOB - ID is the job identifier in the executable workflow ( DAG ).
NOTIFICATION_CONDITION is the condition when the notification needs to be sent. The notification conditions are enumerated in this table
ACTION is what needs to happen when condition is satisfied. It is executable + arguments
INVOCATION JOB_IDENTIFIER INV.ID NOTIFICATION_CONDITION ACTION
The INVOCATION lines are only generated for clustered jobs, to specifiy the finer grained notifications for each constitutent job/invocation .
JOB IDENTIFIER is the job identifier in the executable workflow ( DAG ).
INV.ID indicates the index of the task in the clustered job for which the notification needs to be sent.
NOTIFICATION_CONDITION is the condition when the notification needs to be sent. The notification conditions are enumerated in Table 1
ACTION is what needs to happen when condition is satisfied. It is executable + arguments
A sample notifications file generated is listed below.
WORKFLOW d2c4f79c-8d5b-4577-8c46-5031f4d704e8 on_error /bin/date1 INVOCATION merge_vahi-preprocess-1.0_PID1_ID1 1 on_success /bin/date_executable INVOCATION merge_vahi-preprocess-1.0_PID1_ID1 1 on_success /bin/date_executable INVOCATION merge_vahi-preprocess-1.0_PID1_ID1 1 on_error /bin/date_executable INVOCATION merge_vahi-preprocess-1.0_PID1_ID1 2 on_success /bin/date_executable INVOCATION merge_vahi-preprocess-1.0_PID1_ID1 2 on_error /bin/date_executable DAXJOB subdax_black_ID000003 on_error /bin/date13 JOB analyze_ID00004 on_success /bin/date
Whenever pegasus-monitord enters a workflow (or sub-workflow) directory, it will read the notifications file generated by Pegasus. Pegasus-monitord will match events in the running workflow against the notifications specified in the notifications file and will initiate the script specified in a notification when that notification matches an event in the workflow. It is important to note that there will be a delay between a certain event happening in the workflow, and pegasus-monitord processing the log file and executing the corresponding notification script.
The following command line options (and properties) can change how pegasus-monitord handles notifications:
--no-notifications (pegasus.monitord.notifications=False): Will disable notifications completely.
--notifications-max=nn (pegasus.monitord.notifications.max=nn): Will limit the number of concurrent notification scripts to nn. Once pegasus-monitord reaches this number, it will wait until one notification script finishes before starting a new one. Notifications happening during this time will be queued by the system. The default number of concurrent notification scripts for pegasus-monitord is 10.
--notifications-timeout=nn (pegasus.monitord.notifications.timeout=nn): This setting is used to change how long will pegasus-monitord wait for a notification script to finish. By default pegasus-monitord will wait for as long as it takes (possibly indefinitely) until a notification script ends. With this option, pegasus-monitord will wait for at most nn seconds before killing the notification script.
It is also important to understand that pegasus-monitord will not issue any notifications when it is executed in replay mode.
Whenever a notification in the notifications file matches an event in the running workflow, pegasus-monitord will run the corresponding script specified in the ACTION field of the notifications file. Pegasus-monitord will set the following environment variables for each notification script is starts:
PEGASUS_EVENT: The NOTIFICATION_CONDITION that caused the notification. In the case of the "all" condition, pegasus-monitord will substitute it for the actual event that caused the match (e.g. "start" or "at_end").
PEGASUS_EVENT_TIMESTAMP: Timestamp in EPOCH format for the event (better for automated processing).
PEGASUS_EVENT_TIMESTAMP_ISO: Same as above, but in ISO format (better for human readability).
PEGASUS_SUBMIT_DIR: The submit directory for the workflow (usually the value from "submit_dir" in the braindump.txt file)
PEGASUS_STDOUT: For workflow notifications, this will correspond to the dagman.out file for that workflow. For job and invocation notifications, this field will contain the output file (stdout) for that particular job instance.
PEGASUS_STDERR: For job and invocation notifications, this field will contain the error file (stderr) for the particular executable job instance. This field does not exist in case of workflow notifications.
PEGASUS_WFID: Contains the workflow id for this notification in the form of DAX_LABEL + DAX_INDEX (from the braindump.txt file).
PEGASUS_JOBID: For workflow notifications, this contains the worfkflow wf_uuid (from the braindump.txt file). For job and invocation notifications, this field contains the job identifier in the executable workflow ( DAG ) for the particular notification.
PEGASUS_INVID: Contains the index of the task in the clustered job for the notification.
PEGASUS_STATUS: For workflow notifications, this contains DAGMan's exit code. For job and invocation notifications, this field contains the exit code for the particular job/task. Please note that this field is not present for 'start' notification events.
Pegasus ships with two reference notification scripts. These can be used as starting point when creating your own notification scripts, or if the default one is all you need, you can use them directly in your workflows. The scripts are:
libexec/notification/email - sends email, including the output from pegasus-status (default) or pegasus-analyzer.
$ ./libexec/notification/email --help Usage: email [options] Options: -h, --help show this help message and exit -t TO_ADDRESS, --to=TO_ADDRESS The To: email address. Defines the recipient for the notification. -f FROM_ADDRESS, --from=FROM_ADDRESS The From: email address. Defaults to the required To: address. -r REPORT, --report=REPORT Include workflow report. Valid values are: none pegasus-analyzer pegasus-status (default)
libexec/notification/jabber - sends simple notifications to Jabber/GTalk. This can be useful for job failures.
$ ./libexec/notification/jabber --help Usage: jabber [options] Options: -h, --help show this help message and exit -i JABBER_ID, --jabberid=JABBER_ID Your jabber id. Example: firstname.lastname@example.org -p PASSWORD, --password=PASSWORD Your jabber password -s HOST, --host=HOST Jabber host, if different from the host in your jabber id. For Google talk, set this to talk.google.com -r RECIPIENT, --recipient=RECIPIENT Jabber id of the recipient. Not necessary if you want to send to your own jabber id
For example, if the DAX generator is written in Python and you want notifications on 'at_end' events (successful or failed):
# job level notifications - in this case for at_end events job.invoke('at_end', pegasus_home + "/libexec/notifications/email --to email@example.com")
Please see the notifications example to see a full workflow using notifications.