Skip to content

Process sets

Imagine that you have a set of processes predefined, and every time when you deal with similar problems (i.e. format a file and plot the data or some next generation sequencing data analysis), you will consistently use those processes, then you have to configure and call them every time.

Process sets are designed for this kind of situations, you can just define a ProcSet with those processes, and adjust the dependencies, input and arguments, you will be able to re-use the set with very less configuration.

For example:

pTrimmomaticPE.input             = <input channel>
pAlignPEByBWA.depends            = pTrimmomaticPE
pSortSam.depends                 = pAlignPEByBWA
pMarkDuplicates.depends          = pSortSam
pIndexBam.depends                = pMarkDuplicates
pRealignerTargetCreator.depends  = pIndexBam
pIndelRealigner.depends          = pIndexBam, pRealignerTargetCreator
pBaseRecalibrator.depends        = pIndelRealigner
pPrintReads.depends              = pIndelRealigner, pBaseRecalibrator
pPrintReads.exportdir            = exdir

pAlignPEByBWA.args.ref            = <reference>
pRealignerTargetCreator.args.ref  = <reference>
pIndelRealigner.args.ref          = <reference>
pBaseRecalibrator.args.ref        = <reference>
pBaseRecalibrator.args.knownSites = <dbsnp>
pPrintReads.args.ref              = <reference>

PyPPL(forks = 100, runner_runner = 'sge', runner_sge_q = '4-days') \
    .start(pTrimmomaticPE).run()
This is a very commonly used Whole Genome Sequencing data cleanup pipeline from the raw reads according to the GATK best practice. And it will be used pretty much every time when the raw read files come.

With a ProcSet defined, you don't need to configure and call those processes every time:

from pyppl import ProcSet

aFastqPE2Bam = ProcSet(
    pTrimmomaticPE,
    pAlignPEByBWA,
    pSortSam,
    pMarkDuplicates,
    pIndexBam,
    pRealignerTargetCreator,
    pIndelRealigner,
    pBaseRecalibrator,
    pPrintReads
)
# dependency adjustment
aFastqPE2Bam.pIndelRealigner.depends = 'pIndexBam, pRealignerTargetCreator'
aFastqPE2Bam.pPrintReads.depends     = 'pIndelRealigner, pBaseRecalibrator'
# input adjustment
# args adjustment
aFastqPE2Bam.pAlignPEByBWA.args.ref            = <reference>
aFastqPE2Bam.pRealignerTargetCreator.args.ref  = <reference>
aFastqPE2Bam.pIndelRealigner.args.ref          = <reference>
aFastqPE2Bam.pBaseRecalibrator.args.ref        = <reference>
aFastqPE2Bam.pBaseRecalibrator.args.knownSites = <dbsnp>
aFastqPE2Bam.pPrintReads.args.ref              = <reference>

Then every time you just need to call the proc set:

aFastqPE2Bam.input = channel.fromPairs ( datadir + '/*.fastq.gz' )
aFastqPE2Bam.exdir = exdir
PyPPL(runner_sge_q = '1-day').start(aFastqPE2Bam).run()

Initializing a procset

Like previous example shows, you just need to give the constructor all the processes to construct a procset. However, there are several things need to be noticed:

  1. The dependencies are automatically constructed by the order of the processes.
    a = ProcSet (p1, p2, p3)
    # The dependencies will be p1 -> p2 -> p3
    
  2. The starting and ending processes are defined as the first and last processes, respectively. If you need to modify the dependencies, keep that in mind whether the starting and ending processes are changed.
    a = ProcSet(p1, p2, p3)
    # a.starts == [p1]
    # a.ends   == [p3]
    #                                / p2
    # change the dependencies to  p1      >
    #                                \ p3
    # both p2, p3 depend on p1, and p3 depends on p2
    a.p3.depends = [p1, p2]
    # but remember the ending processes are changed from [p3] to [p2, p3]
    a.ends = [p2, p3]
    
    You can also specify the dependencies manually:
    a = ProcSet(p1, p2, p3, depends = False)
    a.p2.depends = p1
    a.p3.depends = [p1, p2]
    a.starts = [p1]
    a.ends   = [p2, p3]
    
  3. If you have one process used twice in the procset, copy it with a different id:
    a = ProcSet(
        p1,
        p2,
        p1.copy(id = 'p1copy')
    )
    # then to access the 2nd p1: a.p1copy
    
  4. Each process is copied by procset, so the original one can still be used.
  5. The tag of each process is regenerated by the id of the procset.

Accessing processes of a procset

There are several ways to access the processes of a procset:

  • As an attribute: ps.pXXX
  • As keys: ps['pXXX'], ps['pXXX, pYYY']
  • As indexes: ps[0], ps[:2]
  • Or wildcard selector: ps['pGATK*']

If a process is selected as an attribute, then we get the process itself. Otherwise, we will get a Proxy object so that we are able to set/get the attributes of those processes being selected.

Delegating attributes of processes to a procset

You can Delegate the attributes directly for a process to a procset:

aFastqPE2Bam.delegate('args.ref', 'pAlignPEByBWA')
Then when you want to set args.ref for pAlignPEByBWA, you can just do:
aFastqPE2Bam.args.ref = '/path/to/hg19.fa'
You may use starts/ends represents the start/end processes.

Delegate an attribute to multiple processes:

aFastqPE2Bam.delegate('args.ref', 'pAlignPEByBWA, pPrintReads')
# or
aFastqPE2Bam.delegate('args.ref', 'pAlignPEByBWA', 'pPrintReads')
# or
aFastqPE2Bam.delegate('args.ref', aFastqPE2Bam.pAlignPEByBWA, aFastqPE2Bam.pPrintReads)

Note

For attributes that have sub-attributes (i.e. p.args.params.inopts.cnames), you may just delegate the first parts, then the full assignment of the attribute will still follow the delegation. For example:

procset.delegate('args.params', 'p1,p2')
procset.args.params.inopts.cnames = True # only affects p1, p2
Keep in mind that shorter delegations always come first. In the above case, if we have another delegation: procset.delegate('args.params.inopts', 'p3'), then the assignment will still affect p1, p2 (the first delegation) and p3 (the second delegation).

Default delegations

By default, input/depends are delegated for start processes, and exdir/exhow/exow/expart for end processes. Importantly, as procset.starts is a list, the values for input/depends must be a list as well, with elements corresponing to each start process. Besides, we have two special attributes for procsets: input2 and depends2. Unlike input and depends, input2 and depends2 try to pass everything it gets to each process, instead of passing corresponind element to each process. For example:

# procset.starts = [procset.p1, procset.p2]
procset.input = [['a'], ['b']]
# then:
# procset.p1.config['input'] == ['a']
# procset.p2.config['input'] == ['b']

Set attribute value for specific processes of a procset

There are several ways to do that:

# refer to the process directly
aFastqPE2Bam.pPrintReads.args.tmpdir = '/tmp'
aFastqPE2Bam.pPrintReads.runner = 'local'
# refer to the index of the process
aFastqPE2Bam[8].args.tmpdir = '/tmp'
aFastqPE2Bam[8].runner = 'local'
# refer to the name of the process
aFastqPE2Bam['pPrintReads'].args.tmpdir = '/tmp'
aFastqPE2Bam['pPrintReads'].runner = 'local'

# for multiple processes
aFastqPE2Bam[:3].args.tmpdir = '/tmp'
aFastqPE2Bam[0,1,3].args.tmpdir = '/tmp'
aFastqPE2Bam['aFastqPE2Bam', 'pPrintReads'].args.tmpdir = '/tmp'
aFastqPE2Bam['aFastqPE2Bam, pPrintReads'].args.tmpdir = '/tmp'

# or you may use starts/ends to refer to the start/end processes
# has to be done after procset.starts/procset.ends assigned
# or initialized with depends = True
aFastqPE2Bam['starts'].args.tmpdir = '/tmp'
aFastqPE2Bam['ends'].args.tmpdir = '/tmp'

Hint

If an attribute is delegated for other processes, you can still set the value of it by the above methods.

Defining modules of a procset.

We can define some modules for a procset, later on we can switch them on in some scenarios. To define a module, you can simply do:

# aBam2SCNV is a procset redefined
@aBam2SCNV.module
# may also specify the module name:
#  @aBam2SCNV.module('plots')
# otherwise:
#  if function name startswith "aBam2SCNV_" the rest part will be used
#  otherwise full function name will be used as module name
def aBam2SCNV_plots(ps):
    # in case state has been modified in other modules.
    ps.restoreStates()
	ps.ends = 'pCNVkitScatter, pCNVkitDiagram, pCNVkitHeatmap, pCNVkitReport pCNVkit2Vcf'

	ps['pCNVkitScatter'].depends = ps['pCNVkitFix, pCNVkitSeg']
	ps['pCNVkitDiagram'].depends = ps['pCNVkitFix, pCNVkitSeg']
	ps['pCNVkitHeatmap'].depends = ps['pCNVkitSeg']
	ps['pCNVkitReport'].depends  = ps['pCNVkitFix, pCNVkitSeg']

# to switch on the module:
aBam2SCNV.modules.plots()

Since we are change the status of a procset in a module definition, including the start, end processes and the dependences, we might need to reset the status when we define our next module using procset.restoreStates().

Setting a procset as start procset for a pipeline

You can do it just like setting a process as the starting process of pipeline (see here). Actually the starting processes in the procset (procset.starts) will be set as the starting processes of the pipeline.

The dependency of procsets and processes

a procset can depend on procsets and/or processes, you just treat the procsets as processes. A process can also depend on procsets and/or processes.

What am I? Whom I am depending on? Real relations
ProcSet (a1) ProcSet (a2) a1.starts depends on a2.ends
Proc (p) ProcSet (a) p depends on a.ends

Note

You have to specify depends for start processes of a procset.

Copying a procset

ProcSet.copy(id = None, tag = 'notag', depends = True) You may copy a procset, all the processes in the procset will be copied, and the dependencies will be switched to the corresponding copied processes, as well as the starting and ending processes, if depends == True.

You can keep the ids of processes unchanged but give a new tag and also give the procset an new id instead of the variable name:

a = ProcSet(p1, p2, p3)
# access the processes:
# a.p1, a.p2, a.p3
a2 = a.copy(tag = 'copied')
# a2.procs == [
# <proc with id "p1" and tag "copied">,
# <proc with id "p2" and tag "copied">,
# <proc with id "p3" and tag "copied">,
# ]
# a2.id == 'a2'
# to access the processes:
# a2.p1, a2.p2, a2.p3
a2 = a.copy('newps', tag = 'copied')
# a2.id == 'newps'