Examples
Get started
See examples/getStarted/
Sort 5 files simultaneously:
1. from pyppl import PyPPL, Proc, Channel
2. pSort = Proc(desc = 'Sort files.')
3. pSort.input = {"infile:file": Channel.fromPattern("./data/*.txt")}
4. pSort.output = "outfile:file:{{i.infile | fn}}.sorted"
5. pSort.forks = 5
6. pSort.exdir = './export'
7. pSort.script = """
sort -k1r {{i.infile}} > {{o.outfile}}
"""
8. PyPPL().start(pSort).run()
Line 1: Import the modules. Line 2: Define the process with a description. Line 3: Define the input data for the process. Line 4: Define the output. Templates are also applied here. Line 5: Define how many jobs are running simultaneously. Line 6: Set the directory to export the output files. Line 7: Set your script to run. Line 8: Set the starting process and run the pipeline.
> ls -l ./export
total 0
-rw-rw-rw- 1 pwwang pwwang 44 Sep 14 20:50 test1.sorted
-rw-rw-rw- 1 pwwang pwwang 56 Sep 14 20:50 test2.sorted
-rw-rw-rw- 1 pwwang pwwang 59 Sep 14 20:50 test3.sorted
-rw-rw-rw- 1 pwwang pwwang 58 Sep 14 20:50 test4.sorted
-rw-rw-rw- 1 pwwang pwwang 58 Sep 14 20:50 test5.sorted
Infer input channel from dependent process
See examples/inputFromDependent/
If a process depends on another one, the input channel can be inferred from the output channel of the latter process.
Sort 5 files and then add line number to each line.
from pyppl import PyPPL, Proc, Channel
pSort = Proc(desc = 'Sort files.')
pSort.input = {"infile:file": Channel.fromPattern("./data/*.txt")}
pSort.output = "outfile:file:{{i.infile | fn}}.sorted"
pSort.forks = 5
pSort.script = """
sort -k1r {{i.infile}} > {{o.outfile}}
"""
pAddPrefix = Proc(desc = 'Add line number to each line.')
pAddPrefix.depends = pSort
# automatically inferred from pSort.output
pAddPrefix.input = "infile:file"
pAddPrefix.output = "outfile:file:{{i.infile | fn}}.ln"
pAddPrefix.exdir = './export'
pAddPrefix.forks = 5
pAddPrefix.script = """
paste -d. <(seq 1 $(wc -l {{i.infile}} | cut -f1 -d' ')) {{i.infile}} > {{o.outfile}}
"""
PyPPL().start(pSort).run()
> head -3 ./export/test1.ln
1.8984
2.663
3.625
Modify input channel
See examples/transformInputChannels/
Sort 5 files, add line numbers, and merge them into one file.
from pyppl import PyPPL, Proc, Channel
pSort = Proc(desc = 'Sort files.')
pSort.input = {"infile:file": Channel.fromPattern("./data/*.txt")}
pSort.output = "outfile:file:{{i.infile | fn}}.sorted"
pSort.forks = 5
pSort.script = """
sort -k1r {{i.infile}} > {{o.outfile}}
"""
pAddPrefix = Proc(desc = 'Add line number to each line.')
pAddPrefix.depends = pSort
pAddPrefix.input = "infile:file" # automatically inferred from pSort.output
pAddPrefix.output = "outfile:file:{{i.infile | fn}}.ln"
pAddPrefix.forks = 5
pAddPrefix.script = """
paste -d. <(seq 1 $(wc -l {{i.infile}} | cut -f1 -d' ')) {{i.infile}} > {{o.outfile}}
"""
pMergeFiles = Proc(desc = 'Merge files, each as a column.')
pMergeFiles.depends = pAddPrefix
# Transform it into a list of files
# ["test1.ln", "test2.ln", ..., "test5.ln"]
pMergeFiles.input = {"infiles:files": lambda ch: [ch.flatten()]}
pMergeFiles.output = "outfile:file:mergedfile.txt"
pMergeFiles.exdir = "./export"
pMergeFiles.script = """
paste {{i.infiles | asquote}} > {{o.outfile}}
"""
PyPPL().start(pSort).run()
> head -3 ./export/mergedfile.txt
1.8984 1.6448 1.2915 1.7269 1.7692
2.663 2.3369 2.26223 2.3866 2.7536
3.625 3.28984 3.25945 3.29971 3.30204
Use a different language
See examples/differentLang/
Plot heatmap using R.
from pyppl import PyPPL, Proc
pHeatmap = Proc(desc = 'Draw heatmap.')
pHeatmap.input = {'seed': 8525}
pHeatmap.output = "outfile:file:heatmap.png"
pHeatmap.exdir = './export'
# Use full path "/path/to/Rscript" if it's not in $PATH
# You can also use a shebang in script
# in this case: #!/usr/bin/env Rscript
pHeatmap.lang = 'Rscript'
pHeatmap.script = """
set.seed({{i.seed}})
mat = matrix(rnorm(100), ncol=10)
png(filename = "{{o.outfile}}")
heatmap(mat)
dev.off()
"""
PyPPL().start(pHeatmap).run()
./export/heatmap.png
Use args
See examples/useArgs/
If the jobs are sharing the same set of configurations (in this case, the number of rows and columns of the matrix), they can be set in pXXX.args
. The other benefit is to make the channels intact if the configurations are not suppose to be channeling.
from pyppl import PyPPL, Proc
pHeatmap = Proc(desc = 'Draw heatmap.')
pHeatmap.input = {'seed': [1,2,3]}
pHeatmap.output = "outfile:file:heatmap{{i.seed}}.png"
pHeatmap.exdir = "./export"
pHeatmap.forks = 3
pHeatmap.args.ncol = 10
pHeatmap.args.nrow = 10
pHeatmap.lang = 'Rscript' # or /path/to/Rscript if it's not in $PATH
pHeatmap.script = """
set.seed({{i.seed}})
mat = matrix(rnorm({{args.ncol * args.nrow}}), ncol={{args.ncol}})
png(filename = "{{o.outfile}}", width=150, height=150)
heatmap(mat)
dev.off()
"""
PyPPL().start(pHeatmap).run()
./export/heatmap1.png |
./export/heatmap2.png |
./export/heatmap3.png |
---|---|---|
Use a different runner
See /examples/differentRunner/
from pyppl import PyPPL, Proc, Channel
pSort = Proc(desc = 'Sort files.')
pSort.input = {"infile:file": Channel.fromPattern("./data/*.txt")}
pSort.output = "outfile:file:{{i.infile | fn}}.sorted"
# specify the runner
pSort.runner = 'sge'
# specify the runner options
pSort.sgeRunner = {
"sge.q" : "1-day"
}
pSort.forks = 5
pSort.exdir = './export'
pSort.script = """
sort -k1r {{i.infile}} > {{o.outfile}}
"""
PyPPL().start(pSort).run()
# or run all process with sge runner:
PyPPL().start(pSort).run('sge')
# or:
PyPPL({
'default': {
'runner': 'sge',
'sgeRunner': {'sge.q': '1-day'}
}
}).start(pSort).run()
# or use a temporary profile
PyPPL().start(pSort).run({
'runner': 'sge',
'sgeRunner': {'sge.q': '1-day'}
})
Use Jinja2 as template engine
See /examples/useJinja2/
from pyppl import PyPPL, Proc, Channel
pSort = Proc(desc = 'Sort files.')
pSort.input = {"infile:file": Channel.fromPattern("./data/*.txt")}
# Notice the different between builtin template engine and Jinja2
pSort.output = "outfile:file:{{ fn(i.infile) }}.sorted"
# pSort.output = "outfile:file:{{i.infile | fn}}.sorted"
pSort.forks = 5
# You have to have Jinja2 installed (pip install Jinja2)
pSort.template = 'Jinja2'
pSort.exdir = './export'
pSort.script = """
sort -k1r {{i.infile}} > {{o.outfile}}
"""
PyPPL().start(pSort).run()
Debug your script
See /examples/debugScript/
You can directly go to <workdir>/<job.index>/job.script
to debug your script, or you can also print some values out throught PyPPL
log system.
from pyppl import PyPPL, Proc
pHeatmap = Proc(desc = 'Draw heatmap.')
pHeatmap.input = {'seed': [1,2,3,4,5]}
pHeatmap.output = "outfile:file:heatmap{{i.seed}}.png"
pHeatmap.exdir = "./export"
# Don't cache jobs for debugging
pHeatmap.cache = False
# Output debug information for all jobs, but don't echo stdout and stderr
pHeatmap.echo = {'jobs': range(5), 'type': ''}
pHeatmap.args.ncol = 10
pHeatmap.args.nrow = 10
pHeatmap.lang = 'Rscript' # or /path/to/Rscript if it's not in $PATH
pHeatmap.script = """
set.seed({{i.seed}})
mat = matrix(rnorm({{args.ncol * args.nrow}}), ncol={{args.ncol}})
png(filename = "{{o.outfile}}", width=150, height=150)
# have to be on stderr
cat("pyppl.log.debug:Plotting heatmap #{{job.index | @plus: 1}} ...", file = stderr())
heatmap(mat)
dev.off()
"""
PyPPL().start(pHeatmap).run({
'_log': {
'levels' : 'basic',
'leveldiffs': []
}
})
Switch runner profiles
See examples/siwthcRunnerProfile/
We can define a set of runner profiles in a json
file (./profiles.json
):
{
"default": {
"runner": "local",
"forks" : 1,
"sgeRunner": {
"sge.q": "1-day"
}
},
"local5": {
"runner": "local",
"forks": 5
},
"sge7days": {
"runner": "sge",
"sgeRunner": {
"sge.q": "7-days"
}
}
}
or you can also use .yaml
(pyyaml
is required) file:
default:
runner: local
forks : 1
sgeRunner:
sge.q: 1-day
local5:
runner: local
forks : 5
sge7days:
runner: local
sgeRunner:
sge.q: 7-days
To switch profile:
# default profile (default)
PyPPL(cfgfile = "./profiles.json").start(pHeatmap).run()
# switch to local5 or sge7days:
PyPPL(cfgfile = "./profiles.json").start(pHeatmap).run('local5')
PyPPL(cfgfile = "./profiles.json").start(pHeatmap).run('sge7days')
# You may also use runner name as profile,
# which means to run using the runner with default options:
PyPPL(cfgfile = "./profiles.json").start(pHeatmap).run('sge')
# will use 1-day queue
Draw the pipeline chart
PyPPL
can generate the graph in DOT language.
from pyppl import PyPPL, Proc
p1 = Proc()
p2 = Proc()
p3 = Proc()
p4 = Proc()
p5 = Proc()
p6 = Proc()
p7 = Proc()
p8 = Proc()
p9 = Proc()
"""
p1 p8
/ \ /
p2 p3
\ /
p4 p9
/ \ /
p5 p6 (export)
\ /
p7 (export)
"""
p2.depends = p1
p3.depends = p1, p8
p4.depends = p2, p3
p4.exdir = "./export"
p5.depends = p4
p6.depends = p4, p9
p6.exdir = "./export"
p7.depends = p5, p6
p7.exdir = "./export"
# make sure at least one job is created.
p1.input = {"in": [0]}
p8.input = {"in": [0]}
p9.input = {"in": [0]}
PyPPL().start(p1, p8, p9).flowchart().run()
drawFlowchart.pyppl.dot
:
digraph PyPPL {
"p8" [color="#259229" fillcolor="#ffffff" fontcolor="#000000" shape="box" style="filled"]
"p1" [color="#259229" fillcolor="#ffffff" fontcolor="#000000" shape="box" style="filled"]
"p9" [color="#259229" fillcolor="#ffffff" fontcolor="#000000" shape="box" style="filled"]
"p7" [color="#d63125" fillcolor="#ffffff" fontcolor="#c71be4" shape="box" style="filled"]
"p5" [color="#000000" fillcolor="#ffffff" fontcolor="#000000" shape="box" style="rounded,filled"]
"p4" [color="#000000" fillcolor="#ffffff" fontcolor="#c71be4" shape="box" style="rounded,filled"]
"p2" [color="#000000" fillcolor="#ffffff" fontcolor="#000000" shape="box" style="rounded,filled"]
"p3" [color="#000000" fillcolor="#ffffff" fontcolor="#000000" shape="box" style="rounded,filled"]
"p6" [color="#000000" fillcolor="#ffffff" fontcolor="#c71be4" shape="box" style="rounded,filled"]
"p2" -> "p4"
"p3" -> "p4"
"p1" -> "p2"
"p1" -> "p3"
"p6" -> "p7"
"p4" -> "p5"
"p4" -> "p6"
"p5" -> "p7"
"p8" -> "p3"
"p9" -> "p6"
}
To generate svg file, you have to have [graphviz][36] installed.
drawFlowchart.pyppl.svg
: