11.2. Job Clustering

A large number of workflows executed through the Pegasus Workflow Management System, are composed of several jobs that run for only a few seconds or so. The overhead of running any job on the grid is usually 60 seconds or more. Hence, it makes sense to cluster small independent jobs into a larger job. This is done while mapping an abstract workflow to an executable workflow. Site specific or transformation specific criteria are taken into consideration while clustering smaller jobs into a larger job in the executable workflow. The user is allowed to control the granularity of this clustering on a per transformation per site basis.

11.2.1. Overview

The abstract workflow is mapped onto the various sites by the Site Selector. This semi executable workflow is then passed to the clustering module. The clustering of the workflow can be either be

  • level based horizontal clustering - where you can denote how many jobs get clustered into a single clustered job per level, or how many clustered jobs should be created per level of the workflow

  • level based runtime clustering - similar to horizontal clustering , but while creating the clusters per level take into account the job runtimes.

  • label based (label clustering)

The clustering module clusters the jobs into larger/clustered jobs, that can then be executed on the remote sites. The execution can either be sequential on a single node or on multiple nodes using MPI. To specify which clustering technique to use the user has to pass the --cluster option to pegasus-plan .

11.2.1.1. Generating Clustered Executable Workflow

The clustering of a workflow is activated by passing the --cluster|-C option to pegasus-plan. The clustering granularity of a particular logical transformation on a particular site is dependant upon the clustering techniques being used. The executable that is used for running the clustered job on a particular site is determined as explained in section 7.

#Running pegasus-plan to generate clustered workflows

$ pegasus-plan --dax example.dax --dir ./dags -p siteX --output local
               --cluster [comma separated list of clustering techniques]  -verbose

Valid clustering techniques are horizontal and label.

The naming convention of submit files of the clustered jobs is merge_NAME_IDX.sub . The NAME is derived from the logical transformation name. The IDX is an integer number between 1 and the total number of jobs in a cluster. Each of the submit files has a corresponding input file, following the naming convention merge_NAME_IDX.in . The input file contains the respective execution targets and the arguments for each of the jobs that make up the clustered job.

11.2.1.1.1. Horizontal Clustering

In case of horizontal clustering, each job in the workflow is associated with a level. The levels of the workflow are determined by doing a modified Breadth First Traversal of the workflow starting from the root nodes. The level associated with a node, is the furthest distance of it from the root node instead of it being the shortest distance as in normal BFS. For each level the jobs are grouped by the site on which they have been scheduled by the Site Selector. Only jobs of same type (txnamespace, txname, txversion) can be clustered into a larger job. To use horizontal clustering the user needs to set the --cluster option of pegasus-plan to horizontal .

11.2.1.1.1.1. Controlling Clustering Granularity

The number of jobs that have to be clustered into a single large job, is determined by the value of two parameters associated with the smaller jobs. Both these parameters are specified by the use of a PEGASUS namespace profile keys. The keys can be specified at any of the placeholders for the profiles (abstract transformation in the DAX, site in the site catalog, transformation in the transformation catalog). The normal overloading semantics apply i.e. profile in transformation catalog overrides the one in the site catalog and that in turn overrides the one in the DAX. The two parameters are described below.

  • clusters.size factor

    The clusters.size factor denotes how many jobs need to be merged into a single clustered job. It is specified via the use of a PEGASUS namespace profile key 'clusters.size'. for e.g. if at a particular level, say 4 jobs referring to logical transformation B have been scheduled to a siteX. The clusters.size factor associated with job B for siteX is say 3. This will result in 2 clustered jobs, one composed of 3 jobs and another of 2 jobs. The clusters.size factor can be specified in the transformation catalog as follows

    # multiple line text-based transformation catalog: 2014-09-30T16:05:01.731-07:00
    tr B {
            site siteX {
                    profile pegasus "clusters.size" "3" 
                    pfn "/shared/PEGASUS/bin/jobB"
                    arch "x86"
                    os "LINUX"
                    type "INSTALLED"
            }
    }
    
    tr C {
            site siteX {
                    profile pegasus "clusters.size" "2" 
                    pfn "/shared/PEGASUS/bin/jobC"
                    arch "x86"
                    os "LINUX"
                    type "INSTALLED"
            }
    }
    
    

    Figure 11.1. Clustering by clusters.size

    Clustering by clusters.size

  • clusters.num factor

    The clusters.num factor denotes how many clustered jobs does the user want to see per level per site. It is specified via the use of a PEGASUS namespace profile key 'clusters.num'. for e.g. if at a particular level, say 4 jobs referring to logical transformation B have been scheduled to a siteX. The 'clusters.num' factor associated with job B for siteX is say 3. This will result in 3 clustered jobs, one composed of 2 jobs and others of a single job each. The clusters.num factor in the transformation catalog can be specified as follows

    # multiple line text-based transformation catalog: 2014-09-30T16:06:23.397-07:00
    tr B {
            site siteX {
                    profile pegasus "clusters.num" "3" 
                    pfn "/shared/PEGASUS/bin/jobB"
                    arch "x86"
                    os "LINUX"
                    type "INSTALLED"
            }
    }
    
    tr C {
            site siteX {
                    profile pegasus "clusters.num" "2" 
                    pfn "/shared/PEGASUS/bin/jobC"
                    arch "x86"
                    os "LINUX"
                    type "INSTALLED"
            }
    }
    
    

    In the case, where both the factors are associated with the job, the clusters.num value supersedes the clusters.size value.

    # multiple line text-based transformation catalog: 2014-09-30T16:08:01.537-07:00
    tr B {
            site siteX {
                    profile pegasus "clusters.num" "3" 
                    profile pegasus "clusters.size" "3" 
                    pfn "/shared/PEGASUS/bin/jobB"
                    arch "x86"
                    os "LINUX"
                    type "INSTALLED"
            }
    }
    

    In the above case the jobs referring to logical transformation B scheduled on siteX will be clustered on the basis of 'clusters.num' value. Hence, if there are 4 jobs referring to logical transformation B scheduled to siteX, then 3 clustered jobs will be created.

    Figure 11.2. Clustering by clusters.num

    Clustering by clusters.num

11.2.1.1.2. Runtime Clustering

Workflows often consist of jobs of same type, but have varying run times. Two or more instances of the same job, with varying inputs can differ significantly in their runtimes. A simple way to think about this is running the same program on two distinct input sets, where one input is smaller (1 MB) as compared to the other which is 10 GB in size. In such a case the two jobs will having significantly differing run times. When such jobs are clustered using horizontal clustering, the benefits of job clustering may be lost if all smaller jobs get clustered together, while the larger jobs are clustered together. In such scenarios it would be beneficial to be able to cluster jobs together such that all clustered jobs have similar runtimes.

In case of runtime clustering, jobs in the workflow are associated with a level. The levels of the workflow are determined in the same manner as in horizontal clustering. For each level the jobs are grouped by the site on which they have been scheduled by the Site Selector. Only jobs of same type (txnamespace, txname, txversion) can be clustered into a larger job. To use runtime clustering the user needs to set the --cluster option of pegasus-plan to horizontal, and set the Pegasus property pegasus.clusterer.preference to Runtime.

Runtime clustering supports two modes of operation.

  1. Clusters jobs together such that the clustered job's runtime does not exceed a user specified maxruntime.

    Basic Algorithm of grouping jobs into clusters is as follows

    // cluster.maxruntime - Is the maximum runtime for which the clustered job should run.
    // j.runtime - Is the runtime of the job j.
    1. Create a set of jobs of the same type (txnamespace, txname, txversion), and that run on the same site.
    2. Sort the jobs in decreasing order of their runtime.
    3. For each job j, repeat
      a. If j.runtime > cluster.maxruntime then 
            ignore j.
      // Sum of runtime of jobs already in the bin + j.runtime <= cluster.maxruntime
      b. If j can be added to any existing bin (clustered job) then 
            Add j to bin
         Else
            Add a new bin
            Add job j to newly added bin

    The runtime of a job, and the maximum runtime for which a clustered jobs should run is determined by the value of two parameters associated with the jobs.

    • runtime

      expected runtime for a job

    • clusters.maxruntime

      maxruntime for the clustered job i.e. Group as many jobs as possible into a cluster, as long as the clustered jobs' runtime does not exceed clusters.maxruntime.

  2. Clusters all the into a fixed number of clusters (clusters.num), such that the runtimes of the clustered jobs are similar.

    Basic Algorithm of grouping jobs into clusters is as follows

    // cluster.num - Is the number of clustered jobs to create.
    // j.runtime - Is the runtime of the job j.
    1. Create a set of jobs of the same type (txnamespace, txname, txversion), and that run on the same site.
    2. Sort the jobs in decreasing order of their runtime.
    3. Create a heap containing clusters.num number of clustered jobs.
    4. For each job j, repeat
      a. Get cluster job cj, having the shortest runtime
      b. Add job j to clustered job cj 

    The runtime of a job, and the number of clustered jobs to create is determined by the value of two parameters associated with the jobs.

    • runtime

      expected runtime for a job

    • clusters.num

      clusters.num factor denotes how many clustered jobs does the user want to see per level per site

Note

Users should either specify clusters.maxruntime or clusters.num. If both of them are specified, then clusters.num profile will be ignored by the clustering engine.

All of these parameters are specified by the use of a PEGASUS namespace profile keys. The keys can be specified at any of the placeholders for the profiles (abstract transformation in the DAX, site in the site catalog, transformation in the transformation catalog). The normal overloading semantics apply i.e. profile in transformation catalog overrides the one in the site catalog and that in turn overrides the one in the DAX. The two parameters are described below.

# multiple line text-based transformation catalog: 2014-09-30T16:09:40.610-07:00
#Cluster all jobs of type B at siteX, into 2 clusters such that the 2 clusters have similar runtimes
tr B {
        site siteX {
                profile pegasus "clusters.num" "2" 
                profile pegasus "runtime" "100" 
                pfn "/shared/PEGASUS/bin/jobB"
                arch "x86"
                os "LINUX"
                type "INSTALLED"
        }
}

#Cluster all jobs of type C at siteX, such that the duration of the clustered job does not exceed 300.
tr C {
        site siteX {
                profile pegasus "clusters.maxruntime" "300" 
                profile pegasus "runtime" "100" 
                pfn "/shared/PEGASUS/bin/jobC"
                arch "x86"
                os "LINUX"
                type "INSTALLED"
        }
}

Figure 11.3. Clustering by runtime

Clustering by runtime

In the above case the jobs referring to logical transformation B scheduled on siteX will be clustered such that all clustered jobs will run approximately for the same duration specified by the clusters.maxruntime property. In the above case we assume all jobs referring to transformation B run for 100 seconds. For jobs with significantly differing runtime, the runtime property will be associated with the jobs in the DAX.

In addition to the above two profiles, we need to inform pegasus-plan to use runtime clustering. This is done by setting the following property .

 pegasus.clusterer.preference          Runtime 

11.2.1.1.3. Label Clustering

In label based clustering, the user labels the workflow. All jobs having the same label value are clustered into a single clustered job. This allows the user to create clusters or use a clustering technique that is specific to his workflows. If there is no label associated with the job, the job is not clustered and is executed as is

Figure 11.4. Label-based clustering

Label-based clustering


Since, the jobs in a cluster in this case are not independent, it is important the jobs are executed in the correct order. This is done by doing a topological sort on the jobs in each cluster. To use label based clustering the user needs to set the --cluster option of pegasus-plan to label.

11.2.1.1.3.1. Labelling the Workflow

The labels for the jobs in the workflow are specified by associated pegasus profile keys with the jobs during the DAX generation process. The user can choose which profile key to use for labeling the workflow. By default, it is assumed that the user is using the PEGASUS profile key label to associate the labels. To use another key, in the pegasus namespace the user needs to set the following property

  • pegasus.clusterer.label.key

For example if the user sets pegasus.clusterer.label.key to user_label then the job description in the DAX looks as follows

<adag >
...
  <job id="ID000004" namespace="app" name="analyze" version="1.0" level="1" >
    <argument>-a bottom -T60  -i <filename file="user.f.c1"/>  -o <filename file="user.f.d"/></argument>
    <profile namespace="pegasus" key="user_label">p1</profile>
    <uses file="user.f.c1" link="input" register="true" transfer="true"/>
    <uses file="user.f.c2" link="input" register="true" transfer="true"/>
    <uses file="user.f.d" link="output" register="true" transfer="true"/>
  </job>
...
</adag>
  • The above states that the pegasus profiles with key as user_label are to be used for designating clusters.

  • Each job with the same value for pegasus profile key user_label appears in the same cluster.

11.2.1.1.4. Recursive Clustering

In some cases, a user may want to use a combination of clustering techniques. For e.g. a user may want some jobs in the workflow to be horizontally clustered and some to be label clustered. This can be achieved by specifying a comma separated list of clustering techniques to the --cluster option of pegasus-plan. In this case the clustering techniques are applied one after the other on the workflow in the order specified on the command line.

For example

$ pegasus-plan --dax example.dax --dir ./dags --cluster label,horizontal -s siteX --output local --verbose

Figure 11.5. Recursive clustering

Recursive clustering

11.2.1.2. Execution of the Clustered Job

The execution of the clustered job on the remote site, involves the execution of the smaller constituent jobs either

  • sequentially on a single node of the remote site

    The clustered job is executed using pegasus-cluster, a wrapper tool written in C that is distributed as part of the PEGASUS. It takes in the jobs passed to it, and ends up executing them sequentially on a single node. To use pegasus-cluster for executing any clustered job on a siteX, there needs to be an entry in the transformation catalog for an executable with the logical name seqexec and namespace as pegasus.

    #site  transformation   pfn            type                 architecture    profiles
    
    siteX    pegasus::seqexec     /usr/pegasus/bin/pegasus-cluster INSTALLED       INTEL32::LINUX NULL

    If the entry is not specified, Pegasus will attempt create a default path on the basis of the environment profile PEGASUS_HOME specified in the site catalog for the remote site.

  • On multiple nodes of the remote site using MPI based task management tool called Pegasus MPI Cluster (PMC)

    The clustered job is executed using pegasus-mpi-cluster, a wrapper MPI program written in C that is distributed as part of the PEGASUS. A PMC job consists of a single master process (this process is rank 0 in MPI parlance) and several worker processes. These processes follow the standard master-worker architecture. The master process manages the workflow and assigns workflow tasks to workers for execution. The workers execute the tasks and return the results to the master. Communication between the master and the workers is accomplished using a simple text-based protocol implemented using MPI_Send and MPI_Recv. PMC relies on a shared filesystem on the remote site to manage the individual tasks stdout and stderr and stage it back to the submit host as part of it's own stdout/stderr.

    The input format for PMC is a DAG based format similar to Condor DAGMan's. PMC follows the dependencies specified in the DAG to release the jobs in the right order and executes parallel jobs via the workers when possible. The input file for PMC is automatically generated by the Pegasus Planner when generating the executable workflow. PMC allows for a finer grained control on how each task is executed. This can be enabled by associating the following pegasus profiles with the jobs in the DAX

    Table 11.1. Pegasus Profiles that can be associated with jobs in the DAX for PMC

    Key Description
    pmc_request_memory This key is used to set the -m option for pegasus-mpi-cluster. It specifies the amount of memory in MB that a job requires. This profile is usually set in the DAX for each job.
    pmc_request_cpus This key is used to set the -c option for pegasus-mpi-cluster. It specifies the number of cpu's that a job requires. This profile is usually set in the DAX for each job.
    pmc_priority This key is used to set the -p option for pegasus-mpi-cluster. It specifies the priority for a job . This profile is usually set in the DAX for each job. Negative values are allowed for priorities.
    pmc_task_arguments The key is used to pass any extra arguments to the PMC task during the planning time. They are added to the very end of the argument string constructed for the task in the PMC file. Hence, allows for overriding of any argument constructed by the planner for any particular task in the PMC job.

    Refer to the pegasus-mpi-cluster man page in the command line tools chapter to know more about PMC and how it schedules individual tasks.

    It is recommended to have a pegasus::mpiexec entry in the transformation catalog to specify the path to PMC on the remote and specify the relevant globus profiles such as xcount, host_xcount and maxwalltime to control size of the MPI job.

    # multiple line text-based transformation catalog: 2014-09-30T16:11:11.947-07:00
    tr pegasus::mpiexec {
            site siteX {
                    profile globus "host_xcount" "1" 
                    profile globus "xcount" "32" 
                    pfn "/usr/pegasus/bin/pegasus-mpi-cluster"
                    arch "x86"
                    os "LINUX"
                    type "INSTALLED"
            }
    }

    the entry is not specified, Pegasus will attempt create a default path on the basis of the environment profile PEGASUS_HOME specified in the site catalog for the remote site.

    Tip

    Users are encouraged to use label based clustering in conjunction with PMC

11.2.1.2.1. Specification of Method of Execution for Clustered Jobs

The method execution of the clustered job(whether to launch via mpiexec or seqexec) can be specified

  1. globally in the properties file

    The user can set a property in the properties file that results in all the clustered jobs of the workflow being executed by the same type of executable.

    #PEGASUS PROPERTIES FILE
    pegasus.clusterer.job.aggregator seqexec|mpiexec

    In the above example, all the clustered jobs on the remote sites are going to be launched via the property value, as long as the property value is not overridden in the site catalog.

  2. associating profile key job.aggregator with the site in the site catalog

    <site handle="siteX" gridlaunch = "/shared/PEGASUS/bin/kickstart">
        <profile namespace="env" key="GLOBUS_LOCATION" >/home/shared/globus</profile>
        <profile namespace="env" key="LD_LIBRARY_PATH">/home/shared/globus/lib</profile>
        <profile namespace="pegasus" key="job.aggregator" >seqexec</profile>
        <lrc url="rls://siteX.edu" />
        <gridftp  url="gsiftp://siteX.edu/" storage="/home/shared/work" major="2" minor="4" patch="0" />
        <jobmanager universe="transfer" url="siteX.edu/jobmanager-fork" major="2" minor="4" patch="0" />
        <jobmanager universe="vanilla" url="siteX.edu/jobmanager-condor" major="2" minor="4" patch="0" />
        <workdirectory >/home/shared/storage</workdirectory>
      </site>

    In the above example, all the clustered jobs on a siteX are going to be executed via seqexec, as long as the value is not overridden in the transformation catalog.

  3. associating profile key job.aggregator with the transformation that is being clustered, in the transformation catalog

    # multiple line text-based transformation catalog: 2014-09-30T16:11:52.230-07:00
    tr B {
            site siteX {
                    profile pegasus "clusters.size" "3" 
                    profile pegasus "job.aggregator" "mpiexec" 
                    pfn "/shared/PEGASUS/bin/jobB"
                    arch "x86"
                    os "LINUX"
                    type "INSTALLED"
            }
    }
    

    In the above example, all the clustered jobs that consist of transformation B on siteX will be executed via mpiexec.

    Note

    The clustering of jobs on a site only happens only if

    • there exists an entry in the transformation catalog for the clustering executable that has been determined by the above 3 rules

    • the number of jobs being clustered on the site are more than 1

11.2.1.3. Outstanding Issues

  1. Label Clustering

    More rigorous checks are required to ensure that the labeling scheme applied by the user is valid.