6.4. Notifications

The Pegasus Workflow Mapper now supports job and workflow level notifications. You can specify in the DAX with the job or the workflow

  • the event when the notification needs to be sent

  • the executable that needs to be invoked.

The notifications are issued from the submit host by the pegasus-monitord daemon that monitors the Condor logs for the workflow. When a notification is issued, pegasus-monitord while invoking the notifying executable sets certain environment variables that contain information about the job and workflow state.

The Pegasus release comes with default notification clients that send notifications via email or jabber.

6.4.1. Specifying Notifications in the DAX

Currently, you can specify notifications for the jobs and the workflow by the use of invoke elements.

Invoke elements can be sub elements for the following elements in the DAX schema.

  • job - to associate notifications with a compute job in the DAX.

  • dax - to associate notifications with a dax job in the DAX.

  • dag - to associate notifications with a dag job in the DAX.

  • executable - to associate notifications with a job that uses a particular notification

The invoke element can be specified at the root element level of the DAX to indicate workflow level notifications.

The invoke element may be specified multiple times, as needed. It has a mandatory when attribute with the following value set

Table 6.6. Invoke Element attributes and meaning.

Enumeration of Values for when attribute Meaning
never (default). Never notify of anything. This is useful to temporarily disable an existing notifications.
start create a notification when the job is submitted.
on_error after a job finishes with failure (exitcode != 0).
on_success after a job finishes with success (exitcode == 0).
at_end after a job finishes, regardless of exitcode.
all like start and at_end combined.

You can specify multiple invoke elements corresponding to same when attribute value in the DAX. This will allow you to have multiple notifications for the same event.

Here is an example that illustrates that.

<job id="ID000001" namespace="example" name="mDiffFit" version="1.0"
       node-label="preprocess" >
    <argument>-a top -T 6  -i <file name="f.a"/>  -o <file name="f.b1"/></argument>

    <!-- profiles are optional -->
    <profile namespace="execution" key="site">isi_viz</profile>
    <profile namespace="condor" key="getenv">true</profile>

    <uses name="f.a" link="input"  register="false" transfer="true" type="data" />
    <uses name="f.b" link="output" register="false" transfer="true" type="data" />

    <!-- 'WHEN' enumeration: never, start, on_error, on_success, at_end, all -->
    <invoke when="start">/path/to/notify1 arg1 arg2</invoke>
    <invoke when="start">/path/to/notify1 arg3 arg4</invoke>
    <invoke when="on_success">/path/to/notify2 arg3 arg4</invoke>
  </job>

In the above example the executable notify1 will be invoked twice when a job is submitted ( when="start" ), once with arguments arg1 and arg2 and second time with arguments arg3 and arg4.

The DAX Generator API chapter has information about how to add notifications to the DAX using the DAX api's.

6.4.2. Notify File created by Pegasus in the submit directory

Pegasus while planning a workflow writes out a notify file in the submit directory that contains all the notifications that need to be sent for the workflow. pegasus-monitord picks up this notifications file to determine what notifications need to be sent and when.

  1. ENTITY_TYPE ID NOTIFICATION_CONDITION ACTION

    • ENTITY_TYPE can be either of the following keywords

      • WORKFLOW - indicates workflow level notification

      • JOB - indicates notifications for a job in the executable workflow

      • DAXJOB - indicates notifications for a DAX Job in the executable workflow

      • DAGJOB - indicates notifications for a DAG Job in the executable workflow

    • ID indicates the identifier for the entity. It has different meaning depending on the entity type - -

      • workflow - ID is wf_uuid

      • JOB|DAXJOB|DAGJOB - ID is the job identifier in the executable workflow ( DAG ).

    • NOTIFICATION_CONDITION is the condition when the notification needs to be sent. The notification conditions are enumerated in this table

    • ACTION is what needs to happen when condition is satisfied. It is executable + arguments

  2. INVOCATION JOB_IDENTIFIER INV.ID NOTIFICATION_CONDITION ACTION

    The INVOCATION lines are only generated for clustered jobs, to specifiy the finer grained notifications for each constitutent job/invocation .

    • JOB IDENTIFIER is the job identifier in the executable workflow ( DAG ).

    • INV.ID indicates the index of the task in the clustered job for which the notification needs to be sent.

    • NOTIFICATION_CONDITION is the condition when the notification needs to be sent. The notification conditions are enumerated in Table 1

    • ACTION is what needs to happen when condition is satisfied. It is executable + arguments

A sample notifications file generated is listed below.

WORKFLOW d2c4f79c-8d5b-4577-8c46-5031f4d704e8 on_error /bin/date1

INVOCATION merge_vahi-preprocess-1.0_PID1_ID1 1 on_success /bin/date_executable
INVOCATION merge_vahi-preprocess-1.0_PID1_ID1 1 on_success /bin/date_executable
INVOCATION merge_vahi-preprocess-1.0_PID1_ID1 1 on_error /bin/date_executable

INVOCATION merge_vahi-preprocess-1.0_PID1_ID1 2 on_success /bin/date_executable
INVOCATION merge_vahi-preprocess-1.0_PID1_ID1 2 on_error /bin/date_executable

DAXJOB subdax_black_ID000003 on_error /bin/date13
JOB    analyze_ID00004    on_success /bin/date

6.4.3. Configuring pegasus-monitord for notifications

Whenever pegasus-monitord enters a workflow (or sub-workflow) directory, it will read the notifications file generated by Pegasus. Pegasus-monitord will match events in the running workflow against the notifications specified in the notifications file and will initiate the script specified in a notification when that notification matches an event in the workflow. It is important to note that there will be a delay between a certain event happening in the workflow, and pegasus-monitord processing the log file and executing the corresponding notification script.

The following command line options (and properties) can change how pegasus-monitord handles notifications:

  • --no-notifications (pegasus.monitord.notifications=False): Will disable notifications completely.

  • --notifications-max=nn (pegasus.monitord.notifications.max=nn): Will limit the number of concurrent notification scripts to nn. Once pegasus-monitord reaches this number, it will wait until one notification script finishes before starting a new one. Notifications happening during this time will be queued by the system. The default number of concurrent notification scripts for pegasus-monitord is 10.

  • --notifications-timeout=nn (pegasus.monitord.notifications.timeout=nn): This setting is used to change how long will pegasus-monitord wait for a notification script to finish. By default pegasus-monitord will wait for as long as it takes (possibly indefinitely) until a notification script ends. With this option, pegasus-monitord will wait for at most nn seconds before killing the notification script.

It is also important to understand that pegasus-monitord will not issue any notifications when it is executed in replay mode.

6.4.3.1. Environment set for the notification scripts

Whenever a notification in the notifications file matches an event in the running workflow, pegasus-monitord will run the corresponding script specified in the ACTION field of the notifications file. Pegasus-monitord will set the following environment variables for each notification script is starts:

  • PEGASUS_EVENT: The NOTIFICATION_CONDITION that caused the notification. In the case of the "all" condition, pegasus-monitord will substitute it for the actual event that caused the match (e.g. "start" or "at_end").

  • PEGASUS_EVENT_TIMESTAMP: Timestamp in EPOCH format for the event (better for automated processing).

  • PEGASUS_EVENT_TIMESTAMP_ISO: Same as above, but in ISO format (better for human readability).

  • PEGASUS_SUBMIT_DIR: The submit directory for the workflow (usually the value from "submit_dir" in the braindump.txt file)

  • PEGASUS_STDOUT: For workflow notifications, this will correspond to the dagman.out file for that workflow. For job and invocation notifications, this field will contain the output file (stdout) for that particular job instance.

  • PEGASUS_STDERR: For job and invocation notifications, this field will contain the error file (stderr) for the particular executable job instance. This field does not exist in case of workflow notifications.

  • PEGASUS_WFID: Contains the workflow id for this notification in the form of DAX_LABEL + DAX_INDEX (from the braindump.txt file).

  • PEGASUS_JOBID: For workflow notifications, this contains the worfkflow wf_uuid (from the braindump.txt file). For job and invocation notifications, this field contains the job identifier in the executable workflow ( DAG ) for the particular notification.

  • PEGASUS_INVID: Contains the index of the task in the clustered job for the notification.

  • PEGASUS_STATUS: For workflow notifications, this contains DAGMan's exit code. For job and invocation notifications, this field contains the exit code for the particular job/task. Please note that this field is not present for 'start' notification events.

6.4.4. Default Notification Scripts

Pegasus ships with two reference notification scripts. These can be used as starting point when creating your own notification scripts, or if the default one is all you need, you can use them directly in your workflows. The scripts are:

  • libexec/notification/email - sends email, including the output from pegasus-status (default) or pegasus-analyzer.

    $ ./libexec/notification/email --help
    Usage: email [options]
    
    Options:
      -h, --help            show this help message and exit
      -t TO_ADDRESS, --to=TO_ADDRESS
                            The To: email address. Defines the recipient for the
                            notification.
      -f FROM_ADDRESS, --from=FROM_ADDRESS
                            The From: email address. Defaults to the required To:
                            address.
      -r REPORT, --report=REPORT
                            Include workflow report. Valid values are: none
                            pegasus-analyzer pegasus-status (default)
    
  • libexec/notification/jabber - sends simple notifications to Jabber/GTalk. This can be useful for job failures.

    $ ./libexec/notification/jabber --help
    Usage: jabber [options]
    
    Options:
      -h, --help            show this help message and exit
      -i JABBER_ID, --jabberid=JABBER_ID
                            Your jabber id. Example: user@jabberhost.com
      -p PASSWORD, --password=PASSWORD
                            Your jabber password
      -s HOST, --host=HOST  Jabber host, if different from the host in your jabber
                            id. For Google talk, set this to talk.google.com
      -r RECIPIENT, --recipient=RECIPIENT
                            Jabber id of the recipient. Not necessary if you want
                            to send to your own jabber id
    

For example, if the DAX generator is written in Python and you want notifications on 'at_end' events (successful or failed):

# job level notifications - in this case for at_end events
job.invoke('at_end', pegasus_home + "/libexec/notifications/email --to me@somewhere.edu")

Please see the notifications example to see a full workflow using notifications.