6.7. Publishing to AMQP Message Servers

The workflow events generated by pegasus-monitord can also be used to publish to an AMQP message server such as RabbitMQ in addition to the stampede workflow database.

6.7.1. Configuration

In order to get pegasus-monitord to populate to a message queue, you can set the following property

pegasus.catalog.workflow.amqp.url amqp://[USERNAME:PASSWORD@]amqp.isi.edu[:port]/<exchange_name> 

The routing key set for the messages matches the name of the stampede workflow event being sent. By default, if you enable AMQP population only the following events are sent to the server

  • stampede.job_inst.tag

  • stampede.inv.end

  • stampede.wf.plan

To configure additional events, you can specify a comma separated list of events that need to be sent using the property pegasus.catalog.workflow.amqp.events . For example

pegasus.catalog.workflow.amqp.events = stampede.xwf.*,stampede.static.*


To get all events you can just specify * as the value to the property. Monitord, RabbitMQ, ElasticSearch Example

The AMQP support in Monitord is still a work in progress, but even the current functionality provides basic support for getting the monitoring data into ElasticSearch. In our development environment, we use a RabbitMQ instance with a simple exhange/queue. The configuration required for Pegasus is:

# help Pegasus developers collect data on integrity failures                                        
pegasus.monitord.encoding = json                                                                    
pegasus.catalog.workflow.amqp.url = amqp://friend:donatedata@msgs.pegasus.isi.edu:5672/prod/workflows

On the other side of the queue, Logstash is configured to receive the messages and forward them to ElasticSearch. The Logstash pipeline looks something like:

input {
  rabbitmq {
    type => "workflow-events"
    host => "msg.pegasus.isi.edu"
    vhost => "prod"
    queue => "workflows-es"
    heartbeat => 30
    durable => true
    password => "XXXXXX"
    user => "prod-logstash"

filter {
  if [type] == "workflow-events" {
    mutate {
      convert => {
        "dur" => "float"
        "remote_cpu_time" => "float"
    date {
      # set @timestamp from the ts of the actual event
      match => [ "ts", "UNIX" ]
    date {
      match => [ "start_time", "UNIX" ]
      target => "start_time_human"
    fingerprint {
      # create unique document ids
      source => "ts"
      concatenate_sources => true
      method => "SHA1"
      key => "Pegasus Event"
      target => "[@metadata][fingerprint]"
output {
  if [type] == "workflow-events" {
    elasticsearch {
      "hosts" => ["es1.isi.edu:9200", "es2.isi.edu:9200"]
      "sniffing" => false
      "document_type" => "workflow-events"
      "document_id" => "%{[@metadata][fingerprint]}"
      "index" => "workflow-events-%{+YYYY.MM.dd}"
      "template" => "/usr/share/logstash/templates/workflow-events.json"
      "template_name" => "workflow-events-*"
      "template_overwrite" => true


Once the data is ElasticSearch, you can easily create for example Grafana dashboard like: