Pegasus launches a monitoring daemon called pegasus-monitord per workflow ( a single daemon is launched if a user submits a hierarchal workflow ) . pegasus-monitord parses the workflow and job logs in the submit directory and populates to a database. This chapter gives an overview of the pegasus-monitord and describes the schema of the runtime database.
Pegasus-monitord is used to follow workflows, parsing the output of DAGMan's dagman.out file. In addition to generating the jobstate.log file, which contains the various states that a job goes through during the workflow execution, pegasus-monitord can also be used to mine information from jobs' submit and output files, and either populate a database, or write a file with NetLogger events containing this information. Pegasus-monitord can also send notifications to users in real-time as it parses the workflow execution logs.
Pegasus-monitord is automatically invoked by pegasus-run, and tracks workflows in real-time. By default, it produces the jobstate.log file, and a SQLite database, which contains all the information listed in the Stampede schema. When a workflow fails, and is re-submitted with a rescue DAG, pegasus-monitord will automatically pick up from where it left previously and continue to write the jobstate.log file and populate the database.
If, after the workflow has already finished, users need to re-create the jobstate.log file, or re-populate the database from scratch, pegasus-monitord's --replay option should be used when running it manually.
In addition to SQLite, pegasus-monitord supports other types of databases, such as MySQL and Postgres. Users will need to install the low-level database drivers, and can use the --dest command-line option, or the pegasus.monitord.output property to select where the logs should go.
As an example, the command:
$ pegasus-monitord -r diamond-0.dag.dagman.out
will launch pegasus-monitord in replay mode. In this case, if a jobstate.log file already exists, it will be rotated and a new file will be created. It will also create/use a SQLite database in the workflow's run directory, with the name of diamond-0.stampede.db. If the database already exists, it will make sure to remove any references to the current workflow before it populates the database. In this case, pegasus-monitord will process the workflow information from start to finish, including any restarts that may have happened.
Users can specify an alternative database for the events, as illustrated by the following examples:
$ pegasus-monitord -r -d mysql://username:userpass@hostname/database_name diamond-0.dag.dagman.out
$ pegasus-monitord -r -d sqlite:////tmp/diamond-0.db diamond-0.dag.dagman.out
In the first example, pegasus-monitord will send the data to the database_name database located at server hostname, using the username and userpass provided. In the second example, pegasus-monitord will store the data in the /tmp/diamond-0.db SQLite database.
For absolute paths four slashes are required when specifying an alternative database path in SQLite.
Users should also be aware that in all cases, with the exception of SQLite, the database should exist before pegasus-monitord is run (as it creates all needed tables but does not create the database itself).
Finally, the following example:
$ pegasus-monitord -r --dest diamond-0.bp diamond-0.dag.dagman.out
sends events to the diamond-0.bp file. (please note that in replay mode, any data on the file will be overwritten).
One important detail is that while processing a workflow, pegasus-monitord will automatically detect if/when sub-workflows are initiated, and will automatically track those sub-workflows as well. In this case, although pegasus-monitord will create a separate jobstate.log file in each workflow directory, the database at the top-level workflow will contain the information from not only the main workflow, but also from all sub-workflows.
Pegasus-monitord generates a number of files in each workflow directory:
jobstate.log: contains a summary of workflow and job execution.
monitord.log: contains any log messages generated by pegasus-monitord. It is not overwritten when it restarts. This file is not generated in replay mode, as all log messages from pegasus-monitord are output to the console. Also, when sub-workflows are involved, only the top-level workflow will have this log file. Starting with release 4.0 and 3.1.1, monitord.log file is rotated if it exists already.
monitord.started: contains a timestamp indicating when pegasus-monitord was started. This file get overwritten every time pegasus-monitord starts.
monitord.done: contains a timestamp indicating when pegasus-monitord finished. This file is overwritten every time pegasus-monitord starts.
monitord.info: contains pegasus-monitord state information, which allows it to resume processing if a workflow does not finish properly and a rescue dag is submitted. This file is erased when pegasus-monitord is executed in replay mode.
monitord.recover: contains pegasus-monitord state information that allows it to detect that a previous instance of pegasus-monitord failed (or was killed) midway through parsing a workflow's execution logs. This file is only present while pegasus-monitord is running, as it is deleted when it ends and the monitord.info file is generated.
monitord.subwf.db: contains information that aids pegasus-monitord to track when sub-workflows fail and are re-planned/re-tried. It is overwritten when pegasus-monitord is started in replay mode.
monitord-notifications.log: contains the log file for notification-related messages. Normally, this file only includes logs for failed notifications, but can be populated with all notification information when pegasus-monitord is run in verbose mode via the -v command-line option.
pegasus-monitord can be used to publish events to different backends at the same time. The configuration of this is managed through properties matching pegasus.catalog.workflow.<variable-name>.url .
For example, to enable populating to an AMQP end point and a file format in addition to default sqlite you can configure as follows
pegasus.catalog.workflow.amqp.url amqp://vahi:XXXXX@amqp.isi.edu:5672/panorama/monitoring pegasus.catalog.workflow.file.url file:///lfs1/work/monitord/amqp/nl.bp
If you want to only override the default sqlite population , then you can specify pegasus.catalog.workflow.url property .
Pegasus takes in a DAX which is composed of tasks. Pegasus plans it into a Condor DAG / Executable workflow that consists of Jobs. In case of Clustering, multiple tasks in the DAX can be captured into a single job in the Executable workflow. When DAGMan executes a job, a job instance is populated . Job instances capture information as seen by DAGMan. In case DAGMan retires a job on detecting a failure , a new job instance is populated. When DAGMan finds a job instance has finished , an invocation is associated with job instance. In case of clustered job, multiple invocations will be associated with a single job instance. If a Pre script or Post Script is associated with a job instance, then invocations are populated in the database for the corresponding job instance.
The current schema version is 4.0 that is stored in the schema_info table.
Starting Pegasus 4.x the monitoring and statistics database schema has changed. If you want to use the pegasus-statistics, pegasus-analyzer and pegasus-plots against a 3.x database you will need to upgrade the schema first using the schema upgrade tool /usr/share/pegasus/sql/schema_tool.py or /path/to/pegasus-4.x/share/pegasus/sql/schema_tool.py
Upgrading the schema is required for people using the MySQL database for storing their monitoring information if it was setup with 3.x monitoring tools.
If your setup uses the default SQLite database then the new databases run with Pegasus 4.x are automatically created with the correct schema. In this case you only need to upgrade the SQLite database from older runs if you wish to query them with the newer clients.
To upgrade the database
For SQLite Database cd /to/the/workflow/directory/with/3.x.monitord.db Check the db version /usr/share/pegasus/sql/schema_tool.py -c connString=sqlite:////to/the/workflow/directory/with/workflow.stampede.db 2012-02-29T01:29:43.330476Z INFO netlogger.analysis.schema.schema_check.SchemaCheck.init | 2012-02-29T01:29:43.330708Z INFO netlogger.analysis.schema.schema_check.SchemaCheck.check_schema.start | 2012-02-29T01:29:43.348995Z INFO netlogger.analysis.schema.schema_check.SchemaCheck.check_schema | Current version set to: 3.1. 2012-02-29T01:29:43.349133Z ERROR netlogger.analysis.schema.schema_check.SchemaCheck.check_schema | Schema version 3.1 found - expecting 4.0 - database admin will need to run upgrade tool. Convert the Database to be version 4.x compliant /usr/share/pegasus/sql/schema_tool.py -u connString=sqlite:////to/the/workflow/directory/with/workflow.stampede.db 2012-02-29T01:35:35.046317Z INFO netlogger.analysis.schema.schema_check.SchemaCheck.init | 2012-02-29T01:35:35.046554Z INFO netlogger.analysis.schema.schema_check.SchemaCheck.check_schema.start | 2012-02-29T01:35:35.064762Z INFO netlogger.analysis.schema.schema_check.SchemaCheck.check_schema | Current version set to: 3.1. 2012-02-29T01:35:35.064902Z ERROR netlogger.analysis.schema.schema_check.SchemaCheck.check_schema | Schema version 3.1 found - expecting 4.0 - database admin will need to run upgrade tool. 2012-02-29T01:35:35.065001Z INFO netlogger.analysis.schema.schema_check.SchemaCheck.upgrade_to_4_0 | Upgrading to schema version 4.0. Verify if the database has been converted to Version 4.x /usr/share/pegasus/sql/schema_tool.py -c connString=sqlite:////to/the/workflow/directory/with/workflow.stampede.db 2012-02-29T01:39:17.218902Z INFO netlogger.analysis.schema.schema_check.SchemaCheck.init | 2012-02-29T01:39:17.219141Z INFO netlogger.analysis.schema.schema_check.SchemaCheck.check_schema.start | 2012-02-29T01:39:17.237492Z INFO netlogger.analysis.schema.schema_check.SchemaCheck.check_schema | Current version set to: 4.0. 2012-02-29T01:39:17.237624Z INFO netlogger.analysis.schema.schema_check.SchemaCheck.check_schema | Schema up to date. For upgrading a MySQL database the steps remain the same. The only thing that changes is the connection String to the database E.g. /usr/share/pegasus/sql/schema_tool.py -u connString=mysql://username:password@server:port/dbname
After the database has been upgraded you can use either 3.x or 4.x clients to query the database with pegasus-statistics, as well as pegasus-plots and pegasus-analyzer.
Kickstart records capture raw status in addition to the exitcode . The exitcode is derived from the raw status. Starting with Pegasus 4.0 release, all exitcode columns ( i.e invocation and job instance table columns ) are stored with the raw status by pegasus-monitord. If an exitcode is encountered while parsing the dagman log files , the value is converted to the corresponding raw status before it is stored. All user tools, pegasus-analyzer and pegasus-statistics then convert the raw status to exitcode when retrieving from the database.
Starting with the 4.0 release, there is a multiplier factor associated with the jobs in the job_instance table. It defaults to one, unless the user associates a Pegasus profile key named cores with the job in the DAX. The factor can be used for getting more accurate statistics for jobs that run on multiple processors/cores or mpi jobs.
The multiplier factor is used for computing the following metrics by pegasus statistics.
In the summary, the workflow cumulative job wall time
In the summary, the cumulative job wall time as seen from the submit side
In the jobs file, the multiplier factor is listed along-with the multiplied kickstart time.
In the breakdown file, where statistics are listed per transformation the mean, min , max and average values take into account the multiplier factor.