6. Monitoring
6.1. Monitoring Database
Pegasus launches a monitoring daemon called pegasus-monitord
per
workflow (a single daemon is launched if a user submits a hierarchical
workflow). pegasus-monitord
parses the workflow and job logs in the
submit directory and populates to a database. This section gives an
overview of the pegasus-monitord
and describes the schema of the runtime
database.
6.1.1. pegasus-monitord
pegasus-monitord
is used to follow workflows, parsing the output of
DAGMan’s dagman.out
file. In addition to generating the jobstate.log
file, which contains the various states that a job goes through during
the workflow execution, pegasus-monitord
can also be used to mine
information from jobs’ submit and output files, and either populate a
database, or write a file with NetLogger events containing this
information. pegasus-monitord
can also send notifications to users
in real-time as it parses the workflow execution logs.
pegasus-monitord
is automatically invoked by pegasus-run
, and
tracks workflows in real-time. By default, it produces the jobstate.log
file, and a SQLite database, which contains all the information listed
in the Stampede schema. When a workflow
fails, and is re-submitted with a rescue DAG, pegasus-monitord
will
automatically pick up from where it left previously and continue to
write the jobstate.log
file and populate the database.
If, once the workflow has completed, users need to re-create the
jobstate.log
file, or re-populate the database from scratch,
pegasus-monitord
’s --replay
(or -r
) option should be used
when running it manually.
6.1.1.1. Populating to different backend databases
In addition to SQLite, pegasus-monitord
supports other types of
databases, such as MySQL and Postgres. Users will need to install the
low-level database drivers, and can use the --dest
command-line
option, or the pegasus.monitord.output
property to select where the
logs should go.
As an example, the command:
$ pegasus-monitord -r diamond-0.dag.dagman.out
will launch pegasus-monitord
in replay mode. In this case, if a
jobstate.log
file already exists, it will be rotated and a new file will
be created. It will also create/use a SQLite database in the workflow’s
run directory, with the name of diamond-0.stampede.db
. If the database
already exists, it will make sure to remove any references to the
current workflow before it populates the database. In this case,
pegasus-monitord
will process the workflow information from start to
finish, including any restarts that may have happened.
Users can specify an alternative database for the events, as illustrated by the following examples:
$ pegasus-monitord -r -d mysql://username:userpass@hostname/database_name diamond-0.dag.dagman.out
$ pegasus-monitord -r -d sqlite:////tmp/diamond-0.db diamond-0.dag.dagman.out
In the first example, pegasus-monitord
will send the data to the
database_name
database located at server hostname
, using the
username
and userpass
provided. In the second example,
pegasus-monitord
will store the data in the /tmp/diamond-0.db
SQLite database.
Note
For absolute paths, four slashes are required when specifying an alternative database path in SQLite.
Users should also be aware that in all cases, with the exception of
SQLite, the database should exist before pegasus-monitord
is run (as
it creates all needed tables but does not create the database itself).
Finally, the following example:
$ pegasus-monitord -r --dest diamond-0.bp diamond-0.dag.dagman.out
sends events to the diamond-0.bp
file. (please note that in replay mode,
any data on the file will be overwritten).
One important detail is that while processing a workflow,
pegasus-monitord
will automatically detect if/when sub-workflows are
initiated, and will automatically track those sub-workflows as well. In
this case, although pegasus-monitord
will create a separate
jobstate.log
file in each workflow directory, the database at the
top-level workflow will contain the information from not only the main
workflow, but also from all sub-workflows.
6.1.1.3. Multiple End points
`pegasus-monitord
can be used to publish events
to different backends at the same time. The configuration of this is
managed through properties matching
pegasus.catalog.workflow.<variable-name>.url
.
For example, to enable populating to an AMQP end point and a file format in addition to default SQLite you can configure as follows:
pegasus.catalog.workflow.amqp.url amqp://vahi:XXXXX@amqp.isi.edu:5672/panorama/monitoring
pegasus.catalog.workflow.file.url file:///lfs1/work/monitord/amqp/nl.bp
If you want to only override the default SQLite population, then you
can specify the pegasus.catalog.workflow.url
property.
6.1.2. Overview of the Workflow Database Schema.
Pegasus takes in an abstract workflow, which is composed of tasks. Pegasus plans it into a Condor DAG / Executable workflow that consists of Jobs. In case of Clustering, multiple tasks in the DAX can be captured into a single job in the Executable workflow. When DAGMan executes a job, a job instance is populated. Job instances capture information as seen by DAGMan. In case DAGMan retries a job when detecting a failure, a new job instance is populated. When DAGMan finds a job instance has finished, an invocation is associated with the job instance. In case of a clustered job, multiple invocations will be associated with a single job instance. If a Pre script or Post Script is associated with a job instance, then invocations are populated in the database for the corresponding job instance.
6.1.2.1. Storing of Exitcode in the database
Kickstart records capture raw status in addition to the exitcode. The
exitcode is derived from the raw status. Since Pegasus 4.0,
all exitcode columns (i.e., invocation and job instance table
columns) are stored with the raw status by pegasus-monitord
. If an
exitcode is encountered while parsing the dagman log files, the value
is converted to the corresponding raw status before it is stored. All
user tools, pegasus-analyzer
and pegasus-statistics
then convert
the raw status to exitcode
when retrieving from the database.
6.1.2.2. Multiplier Factor
Since Pegasus 4.0, there is a multiplier factor associated with the
jobs in the job_instance
table. It defaults to one, unless the
user associates a Pegasus profile key named cores
with the job in
the abstract workflow. The factor can be used for getting more accurate
statistics for jobs that run on multiple processors/cores or mpi jobs.
The multiplier factor is used for computing the following metrics by pegasus statistics:
In the summary, the workflow cumulative job wall time
In the summary, the cumulative job wall time as seen from the submit side.
In the jobs file, the multiplier factor is listed along-with the multiplied kickstart time.
In the breakdown file, where statistics are listed per transformation the mean, min, max, and average values take into account the multiplier factor.
6.2. Stampede Workflow Events
All the events generated by the system ( Pegasus planner and monitoring daemon) are formatted as Netlogger BP events. The netlogger events that Pegasus generates are described in Yang schema file that can be found in the share/pegasus/schema/ directory. The stampede yang schema is described below.
6.2.1. Typedefs
The following typedefs are used in the yang schema to describe the certain event attributes.
distinguished-name
typedef distinguished-name { type string; }
uuid
typedef uuid { type string { length "36"; pattern '[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}'; } }
intbool
typedef intbool { type uint8 { range "0 .. 1"; } }
nl_ts
typedef nl_ts { type string { pattern '(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[\+\-]\d{2}:\d{2}))|(\d{1,9}(\.\d+)?)'; } }
peg_inttype
typedef peg_inttype { type uint8 { range "0 .. 11"; } }
peg_strtype
typedef peg_strtype { type enumeration { enum "unknown" { value 0; } enum "compute" { value 1; } enum "stage-in-tx" { value 2; } enum "stage-out-tx" { value 3; } enum "registration" { value 4; } enum "inter-site-tx" { value 5; } enum "create-dir" { value 6; } enum "staged-compute" { value 7; } enum "cleanup" { value 8; } enum "chmod" { value 9; } enum "dax" { value 10; } enum "dag" { value 11; } } }
condor_jobstates
typedef condor_jobstates { type enumeration { enum "PRE_SCRIPT_STARTED" { value 0; } enum "PRE_SCRIPT_TERMINATED" { value 1; } enum "PRE_SCRIPT_SUCCESS" { value 2; } enum "PRE_SCRIPT_FAILED" { value 3; } enum "SUBMIT" { value 4; } enum "GRID_SUBMIT" { value 5; } enum "GLOBUS_SUBMIT" { value 6; } enum "SUBMIT_FAILED" { value 7; } enum "EXECUTE" { value 8; } enum "REMOTE_ERROR" { value 9; } enum "IMAGE_SIZE" { value 10; } enum "JOB_TERMINATED" { value 11; } enum "JOB_SUCCESS" { value 12; } enum "JOB_FAILURE" { value 13; } enum "JOB_HELD" { value 14; } enum "JOB_EVICTED" { value 15; } enum "JOB_RELEASED" { value 16; } enum "POST_SCRIPT_STARTED" { value 17; } enum "POST_SCRIPT_TERMINATED" { value 18; } enum "POST_SCRIPT_SUCCESS" { value 19; } enum "POST_SCRIPT_FAILED" { value 20; } } }
condor_wfstates
typedef condor_wfstates { type enumeration { enum "WORKFLOW_STARTED" { value 0; } enum "WORKFLOW_TERMINATED" { value 1; } } }
6.2.2. Groupings
Groupings are groups of common attributes that different type of events refer to. The following groupings are defined.
base-event - Common components in all events
ts - Timestamp, ISO8601 or numeric seconds since 1/1/1970
level - Severity level of event. Subset of NetLogger BP levels. For ‘*.end’ events, if status is non-zero then level should be Error.”
xwf.id - DAG workflow UUID
grouping base-event { description "Common components in all events"; leaf ts { type nl_ts; mandatory true; description "Timestamp, ISO8601 or numeric seconds since 1/1/1970"; } leaf level { type enumeration { enum "Info" { value 0; } enum "Error" { value 1; } } description "Severity level of event. " + "Subset of NetLogger BP levels. " + "For '*.end' events, if status is non-zero then level should be Error."; } leaf xwf.id { type uuid; description "DAG workflow id"; } } // grouping base-event
base-job-inst - Common components for all job instance events
all attributes from base-event
job_inst.id - Job instance identifier i.e the submit sequence generated by monitord.
js.id - Jobstate identifier
job.id - Identifier for corresponding job in the DAG
grouping base-job-inst { description "Common components for all job instance events"; uses base-event; leaf job_inst.id { type int32; mandatory true; description "Job instance identifier i.e the submit sequence generated by monitord"; } leaf js.id { type int32; description "Jobstate identifier"; } leaf job.id { type string; mandatory true; description "Identifier for corresponding job in the DAG"; } }
sched-job-inst - Scheduled job instance.
all attributes from base-job-inst
sched.id - Identifier for job in scheduler
grouping sched-job-inst { description "Scheduled job instance"; uses base-job-inst; leaf sched.id { type string; mandatory true; description "Identifier for job in scheduler"; } }
base-metadata
uses
key
value
grouping base-metadata { description "Common components for all metadata events that describe metadata for an entity."; uses base-event; leaf key { type string; mandatory true; description "Key for the metadata tuple"; } leaf value { type string; description "Corresponding value of the key"; } } // grouping base-metadata
6.2.3. Events
The system generates following types of events, that are described below.
The events are described in detail below
stampede.wf.plan
container stampede.wf.plan { uses base-event; leaf submit.hostname { type inet:host; mandatory true; description "The hostname of the Pegasus submit host"; } leaf dax.label { type string; default "workflow"; description "Label for abstract workflow specification"; } leaf dax.index { type string; default "workflow"; description "Index for the DAX"; } leaf dax.version { type string; mandatory true; description "Version number for DAX"; } leaf dax.file { type string; mandatory true; description "Filename for for the DAX"; } leaf dag.file.name { type string; mandatory true; description "Filename for the DAG"; } leaf planner.version { type string; mandatory true; description "Version string for Pegasus planner, e.g. 3.0.0cvs"; } leaf grid_dn { type distinguished-name; description "Grid DN of submitter"; } leaf user { type string; description "User name of submitter"; } leaf submit.dir { type string; mandatory true; description "Directory path from which workflow was submitted"; } leaf argv { type string; description "All arguments given to planner on command-line"; } leaf parent.xwf.id { type uuid; description "Parent workflow in DAG, if any"; } leaf root.xwf.id { type string; mandatory true; description "Root of workflow hierarchy, in DAG. " + "Use this workflow's UUID if it is the root"; } } // container stampede.wf.plan
stampede.static.start
container stampede.static.start { uses base-event; }
stampede.static.end
container stampede.static.end { uses base-event; } //
stampede.xwf.start
container stampede.xwf.start { uses base-event; leaf restart_count { type uint32; mandatory true; description "Number of times workflow was restarted (due to failures)"; } } // container stampede.xwf.start
stampede.xwf.end
container stampede.xwf.end { uses base-event; leaf restart_count { type uint32; mandatory true; description "Number of times workflow was restarted (due to failures)"; } leaf status { type int16; mandatory true; description "Status of workflow. 0=success, -1=failure"; } } // container stampede.xwf.end
stampede.task.info
container stampede.task.info { description "Information about task in DAX"; uses base-event; leaf transformation { type string; mandatory true; description "Logical name of the underlying executable"; } leaf argv { type string; description "All arguments given to transformation on command-line"; } leaf type { type peg_inttype; mandatory true; description "Type of task"; } leaf type_desc { type peg_strtype; mandatory true; description "String description of task type"; } leaf task.id { type string; mandatory true; description "Identifier for this task in the DAX"; } } // container stampede.task.info
stampede.task.edge
container stampede.task.edge { description "Represents child/parent relationship between two tasks in DAX"; uses base-event; leaf parent.task.id { type string; mandatory true; description "Parent task"; } leaf child.task.id { type string; mandatory true; description "Child task"; } } // container stampede.task.edge
stampede.wf.map.task_job
container stampede.wf.map.task_job { description "Relates a DAX task to a DAG job."; uses base-event; leaf task.id { type string; mandatory true; description "Identifier for the task in the DAX"; } leaf job.id { type string; mandatory true; description "Identifier for corresponding job in the DAG"; } } // container stampede.wf.map.task_job
stampede.xwf.map.subwf_job
container stampede.xwf.map.subwf_job { description "Relates a sub workflow to the corresponding job instance"; uses base-event; leaf subwf.id { type string; mandatory true; description "Sub Workflow Identified / UUID"; } leaf job.id { type string; mandatory true; description "Identifier for corresponding job in the DAG"; } leaf job_inst.id { type int32; mandatory true; description "Job instance identifier i.e the submit sequence generated by monitord"; } } // container stampede.xwf.map.subwf_job
stampede.job.info
container stampede.job.info { description "A description of a job in the DAG"; uses base-event; leaf job.id { type string; mandatory true; description "Identifier for job in the DAG"; } leaf submit_file { type string; mandatory true; description "Name of file being submitted to the scheduler"; } leaf type { type peg_inttype; mandatory true; description "Type of task"; } leaf type_desc { type peg_strtype; mandatory true; description "String description of task type"; } leaf clustered { type intbool; mandatory true; description "Whether job is clustered or not"; } leaf max_retries { type uint32; mandatory true; description "How many retries are allowed for this job before giving up"; } leaf task_count { type uint32; mandatory true; description "Number of DAX tasks for this job. " + "Auxiliary jobs without a task in the DAX will have the value '0'"; } leaf executable { type string; mandatory true; description "Program to execute"; } leaf argv { type string; description "All arguments given to executable (on command-line)"; } } // container stampede.job.info
stampede.job.edge
container stampede.job.edge { description "Parent/child relationship between two jobs in the DAG"; uses base-event; leaf parent.job.id { type string; mandatory true; description "Parent job"; } leaf child.job.id { type string; mandatory true; description "Child job"; } } // container stampede.job.edge
stampede.job_inst.pre.start
container stampede.job_inst.pre.start { description "Start of a prescript for a job instance"; uses base-job-inst; } // container stampede.job_inst.pre.start
stampede.job_inst.pre.term
container stampede.job_inst.pre.term { description "Job prescript is terminated (success or failure not yet known)"; } // container stampede.job_inst.pre.term
stampede.job_inst.pre.end
container stampede.job_inst.pre.end { description "End of a prescript for a job instance"; uses base-job-inst; leaf status { type int32; mandatory true; description "Status of prescript. 0 is success, -1 is error"; } leaf exitcode { type int32; mandatory true; description "the exitcode with which the prescript exited"; } } // container stampede.job_inst.pre.end
stampede.job_inst.submit.start
container stampede.job_inst.submit.start { description "When job instance is going to be submitted. " + "Scheduler job id is not yet known"; uses sched-job-inst; } // container stampede.job_inst.submit.start
stampede.job_inst.submit.end
container stampede.job_inst.submit.end { description "When executable job is submitted"; uses sched-job-inst; leaf status { type int16; mandatory true; description "Status of workflow. 0=success, -1=failure"; } } // container stampede.job_inst.submit.end
stampede.job_inst.held.start
container stampede.job_inst.held.start { description "When Condor holds the jobs"; uses sched-job-inst; } // container stampede.job_inst.held.start
stampede.job_inst.held.end
container stampede.job_inst.held.end { description "When the job is released after being held"; uses sched-job-inst; leaf status { type int16; mandatory true; description "Status of workflow. 0=success, -1=failure"; } } // container stampede.job_inst.held.end
stampede.job_inst.main.start
container stampede.job_inst.main.start { description "Start of execution of a scheduler job"; uses sched-job-inst; leaf stdin.file { type string; description "Path to file containing standard input of job"; } leaf stdout.file { type string; mandatory true; description "Path to file containing standard output of job"; } leaf stderr.file { type string; mandatory true; description "Path to file containing standard error of job"; } } // container stampede.job_inst.main.start
stampede.job_inst.main.term
container stampede.job_inst.main.term { description "Job is terminated (success or failure not yet known)"; uses sched-job-inst; leaf status { type int32; mandatory true; description "Execution status. 0=means job terminated, -1=job was evicted, not terminated"; } } // container stampede.job_inst.main.term
stampede.job_inst.main.end
container stampede.job_inst.main.end { description "End of main part of scheduler job"; uses sched-job-inst; leaf stdin.file { type string; description "Path to file containing standard input of job"; } leaf stdout.file { type string; mandatory true; description "Path to file containing standard output of job"; } leaf stdout.text { type string; description "Text containing output of job"; } leaf stderr.file { type string; mandatory true; description "Path to file containing standard error of job"; } leaf stderr.text { type string; description "Text containing standard error of job"; } leaf user { type string; description "Scheduler's name for user"; } leaf site { type string; mandatory true; description "DAX name for the site at which the job ran"; } leaf work_dir { type string; description "Path to working directory"; } leaf local.dur { type decimal64 { fraction-digits 6; } units "seconds"; description "Duration as seen at the local node"; } leaf status { type int32; mandatory true; description "Execution status. 0=success, -1=failure"; } leaf exitcode { type int32; mandatory true; description "the exitcode with which the executable exited"; } leaf multiplier_factor { type int32; mandatory true; description "the multiplier factor for use in statistics"; } leaf cluster.start { type nl_ts; description "When the enclosing cluster started"; } leaf cluster.dur { type decimal64 { fraction-digits 6; } units "seconds"; description "Duration of enclosing cluster"; } } // container stampede.job_inst.main.end
stampede.job_inst.post.start
container stampede.job_inst.post.start { description "Start of a postscript for a job instance"; uses sched-job-inst; } // container stampede.job_inst.post.start
stampede.job_inst.post.term
container stampede.job_inst.post.term { description "Job postscript is terminated (success or failure not yet known)"; uses sched-job-inst; } // container stampede.job_inst.post.term
stampede.job_inst.post.end
container stampede.job_inst.post.end { description "End of a postscript for a job instance"; uses sched-job-inst; leaf status { type int32; mandatory true; description "Status of postscript. 0 is success, -1=failure"; } leaf exitcode { type int32; mandatory true; description "the exitcode with which the postscript exited"; } } // container stampede.job_inst.post.end
stampede.job_inst.host.info
container stampede.job_inst.host.info { description "Host information associated with a job instance"; uses base-job-inst; leaf site { type string; mandatory true; description "Site name"; } leaf hostname { type inet:host; mandatory true; description "Host name"; } leaf ip { type inet:ip-address; mandatory true; description "IP address"; } leaf total_memory { type uint64; description "Total RAM on host"; } leaf uname { type string; description "Operating system name"; } } // container stampede.job_inst.host.info
stampede.job_inst.image.info
container stampede.job_inst.image.info { description "Image size associated with a job instance"; uses base-job-inst; leaf size { type uint64; description "Image size"; } leaf sched.id { type string; mandatory true; description "Identifier for job in scheduler"; } } // container stampede.job_inst.image.info
stampede.job_inst.tag
container stampede.job_inst.tag { description "A tag event to tag errors at a job_instance level"; uses base-job-inst; leaf name { type string; description "Name of tagged event such as int.error"; } leaf count { type int32; mandatory true; description "count of occurences of the events of type name for the job_instance"; } } // container stampede.job_inst.tag
stampede.job_inst.composite
container stampede.job_inst.composite{ description "A de-normalized composite event at the job_instance level that captures all the job information. Useful when populating AMQP"; uses base-job-inst; leaf jobtype { type string; description "Type of job as classified by the planner."; } leaf stdin.file { type string; description "Path to file containing standard input of job"; } leaf stdout.file { type string; mandatory true; description "Path to file containing standard output of job"; } leaf stdout.text { type string; description "Text containing output of job"; } leaf stderr.file { type string; mandatory true; description "Path to file containing standard error of job"; } leaf stderr.text { type string; description "Text containing standard error of job"; } leaf user { type string; description "Scheduler's name for user"; } leaf site { type string; mandatory true; description "DAX name for the site at which the job ran"; } leaf hostname { type inet:host; mandatory true; description "Host name"; } leaf { type string; description "Path to working directory"; } leaf local.dur { type decimal64 { fraction-digits 6; } units "seconds"; description "Duration as seen at the local node"; } leaf status { type int32; mandatory true; description "Execution status. 0=success, -1=failure"; } leaf exitcode { type int32; mandatory true; description "the exitcode with which the executable exited"; } leaf multiplier_factor { type int32; mandatory true; description "the multiplier factor for use in statistics"; } leaf cluster.start { type nl_ts; description "When the enclosing cluster started"; } leaf cluster.dur { type decimal64 { fraction-digits 6; } units "seconds"; description "Duration of enclosing cluster"; } leaf int_error_count { type int32; mandatory true; description "number of integrity errors encountered"; } } // container stampede.job_inst.composite
stampede.inv.start
container stampede.inv.start { description "Start of an invocation"; uses base-event; leaf job_inst.id { type int32; mandatory true; description "Job instance identifier i.e the submit sequence generated by monitord"; } leaf job.id { type string; mandatory true; description "Identifier for corresponding job in the DAG"; } leaf inv.id { type int32; mandatory true; description "Identifier for invocation. " + "Sequence number, with -1=prescript and -2=postscript"; } } // container stampede.inv.start
stampede.inv.end
container stampede.inv.end { description "End of an invocation"; uses base-event; leaf job_inst.id { type int32; mandatory true; description "Job instance identifier i.e the submit sequence generated by monitord"; } leaf inv.id { type int32; mandatory true; description "Identifier for invocation. " + "Sequence number, with -1=prescript and -2=postscript"; } leaf job.id { type string; mandatory true; description "Identifier for corresponding job in the DAG"; } leaf start_time { type nl_ts; description "The start time of the event"; } leaf dur { type decimal64 { fraction-digits 6; } units "seconds"; description "Duration of invocation"; } leaf remote_cpu_time { type decimal64 { fraction-digits 6; } units "seconds"; description "remote CPU time computed as the stime + utime"; } leaf exitcode { type int32; description "the exitcode with which the executable exited"; } leaf transformation { type string; mandatory true; description "Transformation associated with this invocation"; } leaf executable { type string; mandatory true; description "Program executed for this invocation"; } leaf argv { type string; description "All arguments given to executable on command-line"; } leaf task.id { type string; description "Identifier for related task in the DAX"; } } // container stampede.inv.end
stampede.int.metric
container stampede.int.metric { description "additional task events picked up from the job stdout"; uses base-event; leaf job_inst.id { type int32; mandatory true; description "Job instance identifier i.e the submit sequence generated by monitord"; } leaf job.id { type string; mandatory true; description "Identifier for corresponding job in the DAG"; } leaf type{ type string; description "enumerated type of metrics check|compute"; } leaf file_type{ type string; description "enumerated type of file types input|output"; } leaf count{ type int32; description "number of integrity events grouped by type , file_type "; } leaf duration{ type float; description "duration in seconds it took to perform these events "; } } // container stampede.int.metric
stampede.static.meta.start
container stampede.static.meta.start { uses base-event; } // container stampede.static.meta.start
stampede.static.meta.end
container stampede.static.meta.end { uses base-event; } // container stampede.static.meta.end
stampede.xwf.meta
container stampede.xwf.meta { description "Metadata associated with a workflow"; uses base-metadata; } // container stampede.xwf.meta
stampede.task.meta
container stampede.task.meta { description "Metadata associated with a task"; uses base-metadata; leaf task.id { type string; description "Identifier for related task in the DAX"; } } // container stampede.task.meta
stampede.task.monitoring
container stampede.task.monitoring { description "additional task events picked up from the job stdout"; uses base-event; leaf job_inst.id { type int32; mandatory true; description "Job instance identifier i.e the submit sequence generated by monitord"; } leaf job.id { type string; mandatory true; description "Identifier for corresponding job in the DAG"; } leaf monitoring_event{ type string; description "the name of the monitoring event parsed from the job stdout"; } leaf key{ type string; description "user defined keys in their payload in the event defined in the job stdout"; } } // container stampede.task.meta
stampede.rc.meta
container stampede.rc.meta { description "Metadata associated with a file in the replica catalog"; uses base-metadata; leaf lfn.id { type string; description "Logical File Identifier for the file"; } } // container stampede.rc.meta
stampede.wf.map.file
container stampede.wf.map.file { description "Event that captures what task generates or consumes a particular file"; uses base-event; leaf lfn.id { type string; description "Logical File Identifier for the file"; } leaf task.id { type string; description "Identifier for related task in the DAX"; } } // container stampede.wf.map.file
6.3. Publishing to AMQP Message Servers
Theworkflow events generated by pegasus-monitordcan also be used to publish to an AMQP message server such as RabbitMQ in addition to the stampede workflow database.
Note
A thing to keep in mind. The workflow events are documented as conforming to the netlogger requirements. When events are pushed to an AMQP endpoint, the . in the keys are replaced by _ .
6.3.1. Configuration
In order to get pegasus-monitord to populate to a message queue, you can set the following property
pegasus.catalog.workflow.amqp.url amqp://[USERNAME:PASSWORD@]amqp.isi.edu[:port]/<exchange_name>
The routing key set for the messages matches the name of the stampede workflow event being sent. By default, if you enable AMQP population only the following events are sent to the server
stampede.job_inst.composite
stampede.job_inst.tag
stampede.inv.end
stampede.wf.plan
To configure additional events, you can specify a comma separated list of events that need to be sent using the property pegasus.catalog.workflow.amqp.events. For example
pegasus.catalog.workflow.amqp.events = stampede.xwf.*,stampede.static.*
Note
To get all events you can just specify * as the value to the property.
6.3.1.1. Monitord, RabbitMQ, ElasticSearch Example
The AMQP support in Monitord is still a work in progress, but even the current functionality provides basic support for getting the monitoring data into ElasticSearch. In our development environment, we use a RabbitMQ instance with a simple exhange/queue. The configuration required for Pegasus is:
# help Pegasus developers collect data on integrity failures
pegasus.monitord.encoding = json
pegasus.catalog.workflow.amqp.url = amqp://friend:donatedata@msgs.pegasus.isi.edu:5672/prod/workflows
On the other side of the queue, Logstash is configured to receive the messages and forward them to ElasticSearch. The Logstash pipeline looks something like:
input {
rabbitmq {
type => "workflow-events"
host => "msg.pegasus.isi.edu"
vhost => "prod"
queue => "workflows-es"
heartbeat => 30
durable => true
password => "XXXXXX"
user => "prod-logstash"
}
}
filter {
if [type] == "workflow-events" {
mutate {
convert => {
"dur" => "float"
"remote_cpu_time" => "float"
}
}
date {
# set @timestamp from the ts of the actual event
match => [ "ts", "UNIX" ]
}
date {
match => [ "start_time", "UNIX" ]
target => "start_time_human"
}
fingerprint {
# create unique document ids
source => "ts"
concatenate_sources => true
method => "SHA1"
key => "Pegasus Event"
target => "[@metadata][fingerprint]"
}
}
}
output {
if [type] == "workflow-events" {
elasticsearch {
"hosts" => ["es1.isi.edu:9200", "es2.isi.edu:9200"]
"sniffing" => false
"document_type" => "workflow-events"
"document_id" => "%{[@metadata][fingerprint]}"
"index" => "workflow-events-%{+YYYY.MM.dd}"
"template" => "/usr/share/logstash/templates/workflow-events.json"
"template_name" => "workflow-events-*"
"template_overwrite" => true
}
}
}
Once the data is ElasticSearch, you can easily create for example Grafana dashboard like:
6.3.1.2. A Pre-Configured Data Collection Pipeline
In this repository, we provide a containerized data-collection/visualization pipeline similar to what we use in production. The figure below illustrates the processes involved in the pipeline and how they are connected to one another. For more information regarding setup and usage, please visit the link referenced above.