API
pyppl.template
-
desc
Template adaptor for PyPPL
-
variables
DEFAULT_ENVS (dict)
: The default environments for templates
class: Template
Base class wrapper to wrap template for PyPPL
method: __init__(self, source, **envs)
Template construct
method: register_envs(self, **envs)
Register extra environment
-
params
**envs
: The environment
method: render(self, data)
Render the template
-
parmas
data (dict)
: The data used to render
class: TemplateJinja2
Jinja2 template wrapper
method: register_envs(self, **envs)
Register extra environment
-
params
**envs
: The environment
method: render(self, data)
Render the template
-
parmas
data (dict)
: The data used to render
class: TemplateLiquid
liquidpy template wrapper.
method: register_envs(self, **envs)
Register extra environment
-
params
**envs
: The environment
method: render(self, data)
Render the template
-
parmas
data (dict)
: The data used to render
pyppl.procset
-
desc
The procset for a set of procs
class: ProcSet
The ProcSet for a set of processes
method: __init__(self, *procs)
Constructor
-
params
-
*procs (Proc)
: the set of processes -
**kwargs
: Other arguments to instantiate aProcSet
depends (bool): Whether auto deduce depends.
Default: `True`
id (str): The id of the procset.
Default: `None` (the variable name)
tag (str): The tag of the processes. Default:
None
copy (bool): Whether copy the processes or just use them.
Default: `True`
-
method: copy(self, id, tag, depends)
Like proc
's copy
function, copy a procset.
Each processes will be copied.
-
params
-
id (str)
: Use a different id if you don'twant to use the variant name
-
tag (str)
: The new tag of all copied processes -
depends (bool)
: Whether to copy the dependencies or not.Default: True
- dependences for processes in starts will not be copied
-
-
returns
(ProcSet)
: The new procset
method: delegate(self, attr, *procs)
Delegate process attributes to procset.
-
params
-
*procs (str|Proc)
: The first argument is the name of the attributes.- The rest of them should be
Proc
s orProc
selectors.
- The rest of them should be
-
method: delegated(self, name)
Get the detegated processes by specific attribute name
-
params
name (str)
: the attribute name to query
-
returns
(Proxy)
: The set of processes
method: module(self, name)
A decorator used to define a module.
-
params
-
name (callable|str)
: The function to be decorated orthe name of the module.
-
-
returns
(callable)
: The decorator
method: restore_states(self)
Restore the initial state of a procset
pyppl.proc
-
desc
Process for PyPPL
class: Proc
Process of a pipeline
property: cache
Should we cache the results or read results from cache?
property: id
The identity of the process
property: tag
The tag of the process
method: add_config(self, name, default, converter, runtime)
Add a plugin configuration
-
params
-
name (str)
: The name of the plugin configuration -
default (any)
: The default value -
converter (callable)
: The converter function for the value -
runtime (str)
: How should we deal with it while runtime_config is passed and its setcounter > 1-
override: Override the value
-
update: Update the value if it's a dict otherwise override its
-
ignore: Ignore the value from runtime_config
-
-
method: copy(self, id, **kwargs)
Copy a process to a new one Depends and nexts will be copied
-
params
-
id
: The id of the new process -
kwargs
: Other arguments for constructing a process
-
method: run(self, runtime_config)
Run the process
-
params
runtime_config (simpleconf.Config)
: The runtime configuration
pyppl.utils
-
desc
Utility functions for PyPPL
method: always_list(data, trim)
Convert a string or a list with element
-
params
-
data
: the data to be converted -
trim
: trim the whitespaces for each item or not. Default: True
-
-
examples
data = ["a, b, c", "d"] ret = always_list (data) # ret == ["a", "b", "c", "d"]
-
returns
The split list
method: brief_list(blist, base)
Briefly show an integer list, combine the continuous numbers.
-
params
blist
: The list
-
returns
(str)
: The string to show for the briefed list.
method: chmod_x(filepath)
Convert file1 to executable or add extract shebang to cmd line
-
params
filepath (path)
: The file path
-
returns
(list)
: with or without the path of the interpreter as the first element
and the script file as the last element
method: filesig(filepath, dirsig)
Generate a signature for a file
-
params
dirsig
: Whether expand the directory? Default: True
-
returns
The signature
method: format_secs(seconds)
Format a time duration
-
params
seconds
: the time duration in seconds
-
returns
The formated string.
For example
: "01:01:01.001" stands for 1 hour 1 min 1 sec and 1 minisec.
method: funcsig(func)
Get the signature of a function Try to get the source first, if failed, try to get its name, otherwise return None
-
params
func
: The function
-
returns
The signature
method: name2filename(name)
Convert any name to a valid filename
-
params
name (str)
: The name to be converted
-
returns
(str)
: The converted name
method: try_deepcopy(obj, depth)
Try do deepcopy an object. If fails, just do a shallow copy.
-
params
-
obj (any)
: The object -
depth (int)
: A flag to avoid deep recursion
-
-
returns
(any)
: The copied object
class: PQueue
A modified PriorityQueue, which allows jobs to be submitted in batch
method: __init__(self, maxsize, batch_len)
A Priority Queue for PyPPL jobs 0 0 done, wait for 1 1 start 0 1 start 1 start 2 2 ------> 0 2 ------> 2 ---------> 3 3 1 3 3 4 4 4 2 4 1
-
params
-
maxsize
: The maxsize of the queue. Default: None -
batch_len
: What's the length of a batch
-
method: get(self)
Get an item from the queue
-
returns
(int, int)
: The index of the item and the batch of it
method: put(self, item, batch)
Put item to any batch
-
params
-
item (any)
: item to put -
batch (int)
: target batch
-
method: put_next(self, item, batch)
Put item to next batch
-
params
-
item (any)
: item to put -
batch (int)
: current batch
-
pyppl.job
-
desc
Job for PyPPL
class: Job
Job class
-
arguments
-
index (int)
: The index of the job -
proc (Proc)
: The process
-
property: logger
A logger wrapper to avoid instanize a logger object for each job
-
params
-
*args (str)
: messages to be logged. -
*kwargs
: Other parameters for the logger.
-
property: signature
Calculate the signature of the job based on the input/output and the script. If file does not exist, it will not be in the signature. The signature is like:
{
"i": {
"invar:var": <invar>,
"infile:file": <infile>,
"infiles:files": [<infiles...>]
},
"o": {
"outvar:var": <outvar>,
"outfile:file": <outfile>,
"outdir:dir": <outdir>
}
"mtime": <max mtime of input and script>,
}
-
returns
(Diot)
: The signature of the job
method: add_config(self, name, default, converter)
Add a config to plugin config, used for plugins
-
params
-
name (str)
: The name of the config.To proptect your config, better use a prefix
-
default (Any)
: The default value for the config -
converter (callable)
: The converter for the value
-
method: build(self)
Initiate a job, make directory and prepare input, output and script.
method: cache(self)
Truly cache the job (by signature)
method: done(self, cached, status)
Do some cleanup when job finished
-
params
cached (bool)
: Whether this is running for a cached job.
method: kill(self)
Kill the job
-
returns
(bool)
:True
if succeeds elseFalse
method: poll(self)
Check the status of a running job
-
returns
-
(bool|str)
:True/False
if rcfile generared andwhether job succeeds, otherwise returns
running
.
-
method: reset(self)
Clear the intermediate files and output files
method: retry(self)
If the job is available to retry
-
returns
-
(bool|str)
:ignore
iferrhow
isignore
, otherwisereturns whether we could submit the job to retry.
-
method: submit(self)
Submit the job
-
returns
(bool)
:True
if succeeds elseFalse
pyppl.pyppl
-
desc
Pipeline for PyPPL
-
variables
-
SEPARATOR_LEN (int)
: the length of the separators in log -
PROCESSES (set)
: The process pool where the processes are registered -
TIPS (list)
: Some tips to show in log -
PIPELINES (dict)
: Exists pipelines
-
class: PyPPL
The class for the whole pipeline
method: __init__(self, config, name, config_files, **kwconfigs)
The construct for PyPPL
-
params
-
config (dict)
: the runtime configuration for the pipeline -
name (str)
: The name of the pipeline -
config_files (list)
: A list of runtime configuration files -
**kwconfigs
: flattened runtime configurations, for example-
you can do:
PyPPL(forks = 10)
, or even -
PyPPL(logger_level = 'debug')
-
-
method: add_method(self, func, require)
Add a method to PyPPL object, used as a decorator
-
params
-
func (callable)
: the function to add -
require (str)
: Require which function to be called before this
-
method: run(self, profile)
Run the pipeline with certain profile
-
params
profile (str)
: The profile name
method: start(self, *anything)
Set the start processes for the pipeline
-
params
-
*anything
: Anything that can be converted to processes-
Could be a string or a wildcard to search for processes
-
or the process itself
-
-
pyppl.channel
-
desc
Channel for pyppl
class: Channel
The channen class, extended from list
method: attach(self, *names)
Attach columns to names of Channel, so we can access each column by:
ch.col0
== ch.colAt(0)
-
params
-
*names (str)
: The names. Have to be as length as channel's width.None of them should be Channel's property name
-
flatten (bool)
: Whether flatten the channel forthe name being attached
-
method: cbind(self, *cols)
Add columns to the channel
-
params
*cols (any)
: The columns
-
returns
(Channel)
: The channel with the columns inserted.
method: colAt(self, index)
Fetch one column of a Channel
-
params
index (int|list)
: which column to fetch
-
returns
(Channel)
: The Channel with that column
method: col_at(self, index)
Fetch one column of a Channel
-
params
index (int|list)
: which column to fetch
-
returns
(Channel)
: The Channel with that column
method: collapse(self, col)
Do the reverse of expand length: N -> 1 width: M -> M
-
params
col (int)
: the index of the column used to collapse
-
returns
(Channel)
: The collapsed Channel
method: copy(self)
Copy a Channel using copy.copy
-
returns
(Channel)
: The copied Channel
method: create(alist)
Create a Channel from a list
-
params
alist (list|Channel)
: The list, default: []
-
returns
(Channel)
: The Channel created from the list
method: expand(self, col, pattern, ftype, sortby, reverse)
expand the Channel according to the files in
[(dir1/dir2, 1)].expand (0, "*")
will expand to
[(dir1/dir2/file1, 1), (dir1/dir2/file2, 1), ...]
length: 1 -> N
width: M -> M
-
params
-
col (int)
: the index of the column used to expand -
pattern (str)
: use a pattern to filter the files/dirs, default:*
-
ftype (str)
: the type of the files/dirs to include -
'dir', 'file', 'link' or 'any' (default)
-
sortby (str)
: how the list is sorted -
'name' (default), 'mtime', 'size'
-
reverse (bool)
: reverse sort. Default: False
-
-
returns
(Channel)
: The expanded Channel
method: filter(self, func)
Alias of python builtin filter
-
params
func (callable)
: the function. Default:None
-
returns
(Channel)
: The filtered Channel
method: filterCol(self, func, col)
Just filter on the specific column
-
params
-
func (callable)
: the function -
col (int)
: the column to filter
-
-
returns
(Channel)
: The filtered Channel
method: filter_col(self, func, col)
Just filter on the specific column
-
params
-
func (callable)
: the function -
col (int)
: the column to filter
-
-
returns
(Channel)
: The filtered Channel
method: flatten(self, col)
Convert a single-column Channel to a list (remove the tuple signs)
[(a,), (b,)]
to [a, b]
-
params
col (int)
: The column to flat. None for all columns (default)
-
returns
(list)
: The list converted from the Channel.
method: fold(self, nfold)
Fold a Channel. Make a row to n-length chunk rows
a1 a2 a3 a4
b1 b2 b3 b4
if nfold==2, fold(2) will change it to:
a1 a2
a3 a4
b1 b2
b3 b4
-
params
nfold (int)
: the size of the chunk
-
returns
(Channel)
: The new Channel
method: fromArgv()
Create a Channel from sys.argv[1:]
"python test.py a b c" creates a width=1 Channel
"python test.py a,1 b,2 c,3" creates a width=2 Channel
-
returns
(Channel)
: The Channel created from the command line arguments
method: fromChannels(*args)
Create a Channel from Channels
-
params
*args (any)
: The Channels or anything can be created as aChannel
-
returns
(Channel)
: The Channel merged from other Channels
method: fromFile(filename, header, skip, delimit)
Create Channel from the file content It's like a matrix file, each row is a row for a Channel. And each column is a column for a Channel.
-
params
-
filename (file)
: the file -
header (bool)
: Whether the file contains header.If True, will attach the header
- So you can use
channel.<header>
to fetch the column
- So you can use
-
skip (int)
: first lines to skip, default:0
-
delimit (str)
: the delimit for columns, default:
-
-
returns
(Channel)
: A Channel created from the file
method: fromPairs(pattern)
Create a width = 2 Channel from a pattern
-
params
pattern (str)
: the pattern
-
returns
(Channel)
: The Channel create from every 2 files match the pattern
method: fromParams(*pnames)
Create a Channel from params
-
params
*pnames (str)
: The names of the option
-
returns
(Channel)
: The Channel created frompyparam
.
method: fromPattern(pattern, ftype, sortby, reverse)
Create a Channel from a path pattern
-
params
-
pattern (str)
: the pattern with wild cards -
ftype (str)
: the type of the files/dirs to include -
'dir', 'file', 'link' or 'any' (default)
-
sortby (str)
: how the list is sorted -
'name' (default), 'mtime', 'size'
-
reverse (bool)
: reverse sort. Default:False
-
-
returns
(Channel)
: The Channel created from the path
method: from_argv()
Create a Channel from sys.argv[1:]
"python test.py a b c" creates a width=1 Channel
"python test.py a,1 b,2 c,3" creates a width=2 Channel
-
returns
(Channel)
: The Channel created from the command line arguments
method: from_channels(*args)
Create a Channel from Channels
-
params
*args (any)
: The Channels or anything can be created as aChannel
-
returns
(Channel)
: The Channel merged from other Channels
method: from_file(filename, header, skip, delimit)
Create Channel from the file content It's like a matrix file, each row is a row for a Channel. And each column is a column for a Channel.
-
params
-
filename (file)
: the file -
header (bool)
: Whether the file contains header.If True, will attach the header
- So you can use
channel.<header>
to fetch the column
- So you can use
-
skip (int)
: first lines to skip, default:0
-
delimit (str)
: the delimit for columns, default:
-
-
returns
(Channel)
: A Channel created from the file
method: from_pairs(pattern)
Create a width = 2 Channel from a pattern
-
params
pattern (str)
: the pattern
-
returns
(Channel)
: The Channel create from every 2 files match the pattern
method: from_params(*pnames)
Create a Channel from params
-
params
*pnames (str)
: The names of the option
-
returns
(Channel)
: The Channel created frompyparam
.
method: from_pattern(pattern, ftype, sortby, reverse)
Create a Channel from a path pattern
-
params
-
pattern (str)
: the pattern with wild cards -
ftype (str)
: the type of the files/dirs to include -
'dir', 'file', 'link' or 'any' (default)
-
sortby (str)
: how the list is sorted -
'name' (default), 'mtime', 'size'
-
reverse (bool)
: reverse sort. Default:False
-
-
returns
(Channel)
: The Channel created from the path
method: get(self, idx)
Get the element of a flattened channel
-
params
idx (int)
: The index of the element to get. Default: 0
-
return
(any)
: The element
method: insert(self, cidx, *cols)
Insert columns to a channel
-
params
-
cidx (int)
: Insert into which index of column? -
*cols (any)
: the columns to be bound to Channel
-
-
returns
(Channel)
: The combined Channel
method: length(self)
Get the length of a Channel
It's just an alias of len(chan)
-
returns
(int)
: The length of the Channel
method: map(self, func)
Alias of python builtin map
-
params
func (callable)
: the function
-
returns
(Channel)
: The transformed Channel
method: mapCol(self, func, col)
Map for a column
-
params
-
func (callable)
: the function -
col (int)
: the index of the column. Default:0
-
-
returns
(Channel)
: The transformed Channel
method: map_col(self, func, col)
Map for a column
-
params
-
func (callable)
: the function -
col (int)
: the index of the column. Default:0
-
-
returns
(Channel)
: The transformed Channel
method: nones(length, width)
Create a channel with None
s
-
params
-
length (int)
: The length of the channel -
width (int)
: The width of the channel
-
-
returns
(Channel)
: The created channel
method: rbind(self, *rows)
The multiple-argument versoin of rbind
-
params
*rows (any)
: the rows to be bound to Channel
-
returns
(Channel)
: The combined Channel
method: reduce(self, func)
Alias of python builtin reduce
-
params
func (callable)
: the function
-
returns
(Channel)
: The reduced value
method: reduceCol(self, func, col)
Reduce a column
-
params
-
func (callable)
: the function -
col (int)
: the column to reduce
-
-
returns
(Channel)
: The reduced value
method: reduce_col(self, func, col)
Reduce a column
-
params
-
func (callable)
: the function -
col (int)
: the column to reduce
-
-
returns
(Channel)
: The reduced value
method: repCol(self, nrep)
Repeat column and return a new channel
-
params
nrep (int)
: how many times to repeat.
-
returns
(Channel)
: The new channel with repeated columns
method: repRow(self, nrep)
Repeat row and return a new channel
-
params
nrep (int)
: how many times to repeat.
-
returns
(Channel)
: The new channel with repeated rows
method: rep_col(self, nrep)
Repeat column and return a new channel
-
params
nrep (int)
: how many times to repeat.
-
returns
(Channel)
: The new channel with repeated columns
method: rep_row(self, nrep)
Repeat row and return a new channel
-
params
nrep (int)
: how many times to repeat.
-
returns
(Channel)
: The new channel with repeated rows
method: rowAt(self, index)
Fetch one row of a Channel
-
params
index (int)
: which row to fetch
-
returns
(Channel)
: The Channel with that row
method: row_at(self, index)
Fetch one row of a Channel
-
params
index (int)
: which row to fetch
-
returns
(Channel)
: The Channel with that row
method: slice(self, start, length)
Fetch some columns of a Channel
-
params
-
start (int)
: from column to start -
length (int)
: how many columns to fetch,default: None (from start to the end)
-
-
returns
(Channel)
: The Channel with fetched columns
method: split(self, flatten)
Split a Channel to single-column Channels
-
returns
(list[Channel])
: The list of single-column Channels
method: t(self)
Transpose the channel
-
returns
(Channel)
: The transposed channel.
method: transpose(self)
Transpose the channel
-
returns
(Channel)
: The transposed channel.
method: unfold(self, nfold)
Do the reverse thing as self.fold does
-
params
nfold (int)
: How many rows to combind each time. default: 2
-
returns
(Channel)
: The unfolded Channel
method: unique(self)
Make the channel unique, remove duplicated rows Try to keep the order
-
returns
(Channel)
: The channel with unique rows.
method: width(self)
Get the width of a Channel
-
returns
(int)
: The width of the Channel
pyppl.exception
-
desc
Exceptions for PyPPL
class: JobBuildingError
Failed to build the job
class: JobFailError
Job results validation failed
class: JobInputParseError
Failed to parse job input
class: JobOutputParseError
Failed to parse job output
class: PluginNoSuchPlugin
When try to find a plugin not existing
class: ProcessAlreadyRegistered
Process already registered with the same id and tag
method: __init__(self, message, proc1, proc2)
Construct for ProcessAlreadyRegistered
-
params
-
message (str)
: The message, make the class to be compatible with Exception -
proc1 (Proc)
: the first Proc -
proc2 (Proc)
: the second Proc
-
class: ProcessAttributeError
Process AttributeError
class: ProcessInputError
Process Input error
class: ProcessOutputError
Process Output error
class: ProcessScriptError
Process script building error
class: PyPPLFindNoProcesses
When failed to find any processes with given pattern
class: PyPPLInvalidConfigurationKey
When invalid configuration key passed
class: PyPPLMethodExists
Method has already been registered
class: PyPPLNameError
Pipeline name duplicated after transformed by utils.name2filename
class: PyPPLResumeError
Try to resume when no start process has been specified
class: PyPPLWrongPositionMethod
Wrong position for plugin-added method
class: RunnerMorethanOneRunnerEnabled
When more than one runners are enabled
class: RunnerNoSuchRunner
When no such runner is found
class: RunnerTypeError
Wrong type of runner
pyppl.config
-
desc
Configuration for PyPPL
-
variables
-
DEFAULT_CFGFILES (tuple)
: default configuration files -
DEFAULT_CONFIG (dict)
: default configurations to be loaded -
config (dict)
: The default configuration that will beused in the whole session
-
method: load_config(default_config, *config_files)
Load the configurations
-
params
-
default_config (dict|path)
: A configuration dictionary ora path to a configuration file
-
*config_files
: A list of configuration or configuration files
-
pyppl.jobmgr
-
desc
Job manager for PyPPL
-
variables
-
STATES (dict)
: Possible states for the job -
PBAR_MARKS (dict)
: The marks on progress bar for different states -
PBAR_LEVEL (dict)
: The levels for different states -
PBAR_SIZE (int)
: the size of the progress bar
-
class: Jobmgr
Job manager
method: __init__(self, jobs)
Job manager constructor
-
params
jobs (list)
: All jobs of a process
method: cleanup(self, ex)
Cleanup the pipeline when
- Ctrl-C hit
- error encountered and proc.errhow
= 'terminate'
-
params
ex (Exception)
: The exception raised by workers
function: kill_worker(cls, killq)
The killing worker to kill the jobs
method: progressbar(self, event)
Generate the progress bar.
-
params
event (StateMachine event)
: The event including job as model.
method: start(self)
Start the queue.
method: worker(self)
The worker to build, submit and poll the jobs
pyppl.plugin
-
desc
Plugin system for PyPPL
-
variables
-
PMNAME (str)
: The name of the plugin manager -
hookimpl (pluggy.HookimplMarker)
: Used to mark the implementation of hooks -
hookspec (pluggy.HookspecMarker)
: Used to mark the hooks
-
method: cli_addcmd(commands)
PLUGIN API Add command and options to CLI
-
params
commands (Commands)
: The Commands instance
method: cli_execcmd(command, opts)
PLUGIN API Execute the command being added to CLI
-
params
-
command (str)
: The command -
opts (dict)
: The options
-
method: config_plugins(*plugins)
Parse configurations for plugins and enable/disable plugins accordingly.
-
params
-
*plugins ([any])
: The pluginsplugins with 'no:' will be disabled.
-
method: disable_plugin(plugin)
Try to disable a plugin
-
params
plugin (any)
: A plugin or the name of a plugin
method: job_build(job, status)
PLUGIN API After a job is being built
-
params
-
job (Job)
: The Job instance -
status (str)
: The status of the job building-
True: The job is successfully built
-
False: The job is failed to build
-
cached: The job is cached
-
-
method: job_done(job, status)
PLUGIN API After a job is done
-
params
-
job (Job)
: The Job instance -
status (str)
: The status of the job-
succeeded: The job is successfully done
-
failed: The job is failed
-
cached: The job is cached
-
-
method: job_init(job)
PLUGIN API Right after job initiates
-
params
job (Job)
: The Job instance
method: job_kill(job, status)
PLUGIN API After a job is being killed
-
params
-
job (Job)
: The Job instance -
status (str)
: The status of the job killing-
'succeeded': The job is successfully killed
-
'failed': The job is failed to kill
-
-
method: job_poll(job, status)
PLUGIN API Poll the status of a job
-
params
-
job (Job)
: The Job instance -
status (str)
: The status of the job-
'running': The job is still running
-
'done': Polling is done, rcfile is generated
-
-
method: job_prebuild(job)
PLUGIN API Before a job starts to build
-
params
job (Job)
: The Job instance
method: job_submit(job, status)
PLUGIN API After a job is being submitted
-
params
-
job (Job)
: The Job instance -
status (str)
: The status of the job submission-
'succeeded': The job is successfully submitted
-
'failed': The job is failed to submit
-
'running': The job is already running
-
-
method: job_succeeded(job)
PLUGIN API Tell if job is successfully done or not One can add not rigorous check. By default, only if returncode is 0 checked. return False to tell if job is failed otherwise use the default status or results from other plugins
-
params
job (Job)
: The Job instance
method: logger_init(logger)
PLUGIN API Initiate logger, most manipulate levels
-
params
logger (Logger)
: The Logger instance
method: proc_init(proc)
PLUGIN API Right after a Proc being initialized
-
params
proc (Proc)
: The Proc instance
method: proc_postrun(proc, status)
PLUGIN API After a process has done
-
params
-
proc (Proc)
: The Proc instance -
status (str)
: succeeded/failed
-
method: proc_prerun(proc)
PLUGIN API Before a process starts If False returned, process will not start The value returned by the first plugin will be used, which means once a plugin stops process from running, others cannot resume it.
-
params
proc (Proc)
: The Proc instance
method: pyppl_init(ppl)
PLUGIN API Right after a pipeline initiates
-
params
ppl (PyPPL)
: The PyPPL instance
method: pyppl_postrun(ppl)
PLUGIN API After the pipeline is done If the pipeline fails, this won't run. Use proc_postrun(proc = proc, status = 'failed') instead.
-
params
ppl (PyPPL)
: The PyPPL instance
method: pyppl_prerun(ppl)
PLUGIN API Before pipeline starts to run If False returned, the pipeline will not run The value returned by the first plugin will be used, which means once a plugin stops process from running, others cannot resume it.
-
params
ppl (PyPPL)
: The PyPPL instance
method: setup(config)
PLUGIN API Add default configs
-
params
config (Config)
: The default configurations
class: PluginConfig
Plugin configuration for Proc/Job
method: __init__(self, *args, **kwargs)
Construct for PluginConfig
-
params
pconfig (dict)
: the default plugin configuration
method: add(self, name, default, converter, update)
Add a config item
-
params
-
name (str)
: The name of the config item. -
default (any)
: The default value -
converter (callable)
: The converter to convert the valuewhenever the value is set.
-
update (str)
: With setcounter > 1,should we update the value or ignore it in .update()?
-
You can set plugin_config_check_update to False with
.update to disable this
-
Could be ignore (don't update),
replace (replace the whole value, even it is a
dictionary) or update
(replace the non-dict value and update dictionary values)
-
-
method: update(self, *args, **kwargs)
Update the configuration
Depends on update
argument while the configuration is added
-
params
pconfig (dict)
: the configuration to update from
pyppl.logger
-
desc
Custome logger for PyPPL
-
variables
-
LOG_FORMAT (str)
: The format of loggers -
LOGTIME_FORMAT (str)
: The format of time for loggers -
GROUP_VALUES (dict)
: The values for each level group -
LEVEL_GROUPS (dict)
: The groups of levels -
THEMES (dict)
: The builtin themes -
SUBLEVELS (dict)
: the sub levels used to limit loggers of the same type
-
method: get_group(level)
Get the group name of the level
-
params
level (str)
: The level, should be UPPERCASE
-
returns
(str)
: The group name
method: get_value(level)
Get the value of the level
-
params
level (str)
: The level, should be UPPERCASE
-
returns
(int)
: The value of the group where the level is in.
method: init_levels(group, leveldiffs)
Initiate the levels, get real levels.
-
params
-
group (str)
: The group of levels -
leveldiffs (str|list)
: The diffs of levels
-
-
returns
(set)
: The real levels.
class: Logger
A wrapper of logger
method: __init__(self, name, bake)
The logger wrapper construct
-
params
-
name (str)
: The logger name. Default:PyPPL
-
bake (dict)
: The arguments to bake a new logger.
-
property: pbar
Mark the record as a progress record.
Allow logger.pbar.info
access
-
returns
(Logger)
: The Logger object itself
method: add_level(self, level, group)
-
params
-
level (str)
: The log level nameMake sure it's less than 7 characters
-
group (str)
: The group the level is to be added
-
method: add_sublevel(self, slevel, lines)
-
params
-
slevel (str)
: The debug level -
lines (int)
: The number of lines allowed for the debug level- Negative value means a summary will be printed
-
method: bake(self, **kwargs)
Bake the logger with certain arguments
-
params
*kwargs
: arguments used to bake a new logger
-
returns
(Logger)
: The new logger.
method: init(self, config)
Initiate the logger, called by the construct, Just in case, we want to change the config and as default_config Reinitiate the logger.
-
params
conf (Config)
: The configuration used to initiate logger.
class: Theme
The theme for the logger
-
variables
COLORS (dict)
: Color collections used to format theme
method: __init__(self, theme)
Construct for Theme
-
params
theme (str)
: the name of the theme
method: get_color(self, level)
Get the color for a given level
-
params
level
: The level
-
returns
The color of the level by the theme.
pyppl.runner
-
desc
Make runners as plugins for PyPPL
-
variables
-
RMNAME (str)
: The name of the runner manager -
RUNNERS (dict)
: All ever registered runners -
DEFAULT_POLL_INTERVAL (int)
: The default poll interval to check job status -
hookimpl (pluggy.HookimplMarker)
: The marker for runner hook Implementations -
hookspec (pluggy.HookspecMarker)
: The marker for runner hooks
-
method: current_runner()
Get current runner name
-
returns
(str)
: current runner name
method: isrunning(job)
RUNNER API Tell if the job is running
-
params
job (Job)
: the job instance
method: kill(job)
RUNNER API Try to kill the job
-
params
job (Job)
: the job instance
method: poll_interval()
Get the poll interval for current runner
-
returns
(int)
: poll interval for querying job status
method: register_runner(runner, name)
Register a runner
-
params
-
runner (callable)
: The runner, a module or a class object -
name (str)
: The name of the runner to registered
-
method: runner_init(proc)
RUNNER API Initiate runner
-
params
proc (Proc)
: The Proc instance
method: script_parts(job, base)
RUNNER API Overwrite script parts
-
params
-
job (Job)
: the job instance -
base (Diot)
: The base script parts
-
method: submit(job)
RUNNER API Submit a job
-
params
job (Job)
: the job instance
method: use_runner(runner)
runner should be a module or the name of a module, with or without "pyppl_runner_" prefix To enable a runner, we need to disable other runners
-
params
runner (str)
: the name of runner
class: PyPPLRunnerLocal
PyPPL's default runner
method: isrunning(self, job)
Try to tell whether the job is still running.
-
params
job (Job)
: the job instance
-
returns
True
if yes, otherwiseFalse
method: kill(self, job)
Try to kill the running jobs if I am exiting
-
params
job (Job)
: the job instance
method: submit(self, job)
Try to submit the job
-
params
job (Job)
: the job instance