9.6. Workflow of Workflows

9.6.1. Galactic Plane

The Galactic Plane workflow is a workflow of many Montage workflows. The output is a set of tiles which can be used in software which takes the tiles and produces a seamless image which can be scrolled and zoomed into. As this is more of a production workflow than an example one, it can be a little bit harder to get running in your environment.

Highlights of the example are:

  • The subworkflow DAXes are generated as jobs in the parent workflow - this is an example on how to make more dynamic workflows. For example, if you need a job in your workflow to determine the number of jobs in the next level, you can have the first job create a subworkflow with the right number of jobs.

  • DAGMan job categories are used to limit the number of concurrant jobs in certain places. This is used to limit the number of concurrant connections to the data find service, as well limit the number of concurrant subworkflows to manage disk usage on the compute cluster.

  • Job priorities are used to make sure we overlap staging and computation. Pegasus sets default priorities, which for most jobs are fine, but the priority of the data find job is set explicitly to a higher priority.

  • A specific output site is defined the the site catalog and specified with the --output option of subworkflows.

The DAX API has support for sub workflows:

    remote_tile_setup = Job(namespace="gp", name="remote_tile_setup", version="1.0")
    remote_tile_setup.addArguments("%05d" % (tile_id))
    remote_tile_setup.addProfile(Profile("dagman", "CATEGORY", "remote_tile_setup"))
    remote_tile_setup.uses(params, link=Link.INPUT, register=False)
    remote_tile_setup.uses(mdagtar, link=Link.OUTPUT, register=False, transfer=True)
    uberdax.addJob(remote_tile_setup)
...
    subwf = DAX("%05d.dax" % (tile_id), "ID%05d" % (tile_id))
    subwf.addArguments("-Dpegasus.schema.dax=%s/etc/dax-2.1.xsd" %(os.environ["PEGASUS_HOME"]),
                       "-Dpegasus.catalog.replica.file=%s/rc.data" % (tile_work_dir),
                       "-Dpegasus.catalog.site.file=%s/sites.xml" % (work_dir),
                       "-Dpegasus.transfer.links=true",
                       "--sites", cluster_name,
                       "--cluster", "horizontal",
                       "--basename", "tile-%05d" % (tile_id),
                       "--force",
                       "--output", output_name)
    subwf.addProfile(Profile("dagman", "CATEGORY", "subworkflow"))
    subwf.uses(subdax_file, link=Link.INPUT, register=False)
    uberdax.addDAX(subwf)