6.7. Publishing to AMQP Message Servers

The workflow events generated by pegasus-monitord can also be used to publish to an AMQP message server such as RabbitMQ in addition to the stampede workflow database.


A thing to keep in mind. The workflow events are documented as conforming to the netlogger requirements. When events are pushed to an AMQP endpoint, the . in the keys are replaced by _ .

6.7.1. Configuration

In order to get pegasus-monitord to populate to a message queue, you can set the following property

pegasus.catalog.workflow.amqp.url amqp://[USERNAME:PASSWORD@]amqp.isi.edu[:port]/<exchange_name> 

The routing key set for the messages matches the name of the stampede workflow event being sent. By default, if you enable AMQP population only the following events are sent to the server

  • stampede.job_inst.tag

  • stampede.inv.end

  • stampede.wf.plan

To configure additional events, you can specify a comma separated list of events that need to be sent using the property pegasus.catalog.workflow.amqp.events . For example

pegasus.catalog.workflow.amqp.events = stampede.xwf.*,stampede.static.*


To get all events you can just specify * as the value to the property.

6.7.2. Monitord, RabbitMQ, ElasticSearch Example

The AMQP support in Monitord is still a work in progress, but even the current functionality provides basic support for getting the monitoring data into ElasticSearch. In our development environment, we use a RabbitMQ instance with a simple exhange/queue. The configuration required for Pegasus is:

# help Pegasus developers collect data on integrity failures                                        
pegasus.monitord.encoding = json                                                                    
pegasus.catalog.workflow.amqp.url = amqp://friend:donatedata@msgs.pegasus.isi.edu:5672/prod/workflows

On the other side of the queue, Logstash is configured to receive the messages and forward them to ElasticSearch. The Logstash pipeline looks something like:

input {
  rabbitmq {
    type => "workflow-events"
    host => "msg.pegasus.isi.edu"
    vhost => "prod"
    queue => "workflows-es"
    heartbeat => 30
    durable => true
    password => "XXXXXX"
    user => "prod-logstash"

filter {
  if [type] == "workflow-events" {
    mutate {
      convert => {
        "dur" => "float"
        "remote_cpu_time" => "float"
    date {
      # set @timestamp from the ts of the actual event
      match => [ "ts", "UNIX" ]
    date {
      match => [ "start_time", "UNIX" ]
      target => "start_time_human"
    fingerprint {
      # create unique document ids
      source => "ts"
      concatenate_sources => true
      method => "SHA1"
      key => "Pegasus Event"
      target => "[@metadata][fingerprint]"
output {
  if [type] == "workflow-events" {
    elasticsearch {
      "hosts" => ["es1.isi.edu:9200", "es2.isi.edu:9200"]
      "sniffing" => false
      "document_type" => "workflow-events"
      "document_id" => "%{[@metadata][fingerprint]}"
      "index" => "workflow-events-%{+YYYY.MM.dd}"
      "template" => "/usr/share/logstash/templates/workflow-events.json"
      "template_name" => "workflow-events-*"
      "template_overwrite" => true


Once the data is ElasticSearch, you can easily create for example Grafana dashboard like:

6.7.3. A Pre-Configured Data Collection Pipeline

In this repository , we provide a containerized data-collection/visualization pipeline similar to what we use in production. The figure below illustrates the processes involved in the pipeline and how they are connected to one another. For more information regarding setup and usage, please visit the link referenced above.