Skip to content

Package pypiper Documentation

Package Overview

The pypiper package provides a framework for building robust, restartable bioinformatics pipelines. It handles common pipeline tasks like checkpointing, logging, and resource monitoring.

Key Features

  • Automatic Checkpointing: Resume pipelines from where they left off
  • Resource Monitoring: Track memory and CPU usage
  • Result Reporting: Integrate with pipestat for standardized results
  • Container Support: Run commands in Docker containers
  • Pipeline Management: Built-in logging and status tracking

Installation

pip install pypiper

Quick Example

from pypiper import PipelineManager

# Initialize a pipeline
pm = PipelineManager(
    name="my_pipeline",
    outfolder="results/"
)

# Run a command
pm.run("echo 'Hello, world!'")

# Stop the pipeline
pm.stop_pipeline()

API Reference

PipelineManager Class

The main class for building and managing pipelines:

PipelineManager

PipelineManager(name, outfolder, version=None, args=None, multi=False, dirty=False, recover=False, new_start=False, force_follow=False, cores=1, mem='1000M', config_file=None, output_parent=None, overwrite_checkpoints=False, logger_kwargs=None, pipestat_record_identifier=None, pipestat_schema=None, pipestat_results_file=None, pipestat_config=None, pipestat_pipeline_type=None, pipestat_result_formatter=None, **kwargs)

Bases: object

Base class for instantiating a PipelineManager object, the main class of Pypiper.

:param str name: Choose a name for your pipeline; it's used to name the output files, flags, etc. :param str outfolder: Folder in which to store the results. :param argparse.Namespace args: Optional args object from ArgumentParser; Pypiper will simply record these arguments from your script :param bool multi: Enables running multiple pipelines in one script or for interactive use. It simply disables the tee of the output, so you won't get output logged to a file. :param bool dirty: Overrides the pipeline's clean_add() manual parameters, to never clean up intermediate files automatically. Useful for debugging; all cleanup files are added to manual cleanup script. :param bool recover: Specify recover mode, to overwrite lock files. If pypiper encounters a locked target, it will ignore the lock and recompute this step. Useful to restart a failed pipeline. :param bool new_start: start over and run every command even if output exists :param bool force_follow: Force run all follow functions even if the preceding command is not run. By default, following functions are only run if the preceding command is run. :param int cores: number of processors to use, default 1 :param str mem: amount of memory to use. Default units are megabytes unless specified using the suffix [K|M|G|T]." :param str config_file: path to pipeline configuration file, optional :param str output_parent: path to folder in which output folder will live :param bool overwrite_checkpoints: Whether to override the stage-skipping logic provided by the checkpointing system. This is useful if the calls to this manager's run() method will be coming from a class that implements pypiper.Pipeline, as such a class will handle checkpointing logic automatically, and will set this to True to protect from a case in which a restart begins upstream of a stage for which a checkpoint file already exists, but that depends on the upstream stage and thus should be rerun if it's "parent" is rerun. :param str pipestat_record_identifier: record_identifier to report results via pipestat :param str pipestat_schema: output schema used by pipestat to report results :param str pipestat_results_file: path to file backend for reporting results :param str pipestat_config_file: path to pipestat configuration file :param str pipestat_pipeline_type: Sample or Project level pipeline :param pipestat_result_formatter: function used to style reported results, defaults to result_formatter_markdown :raise TypeError: if start or stop point(s) are provided both directly and via args namespace, or if both stopping types (exclusive/prospective and inclusive/retrospective) are provided.

Source code in pypiper/manager.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
def __init__(
    self,
    name,
    outfolder,
    version=None,
    args=None,
    multi=False,
    dirty=False,
    recover=False,
    new_start=False,
    force_follow=False,
    cores=1,
    mem="1000M",
    config_file=None,
    output_parent=None,
    overwrite_checkpoints=False,
    logger_kwargs=None,
    pipestat_record_identifier=None,
    pipestat_schema=None,
    pipestat_results_file=None,
    pipestat_config=None,
    pipestat_pipeline_type=None,
    pipestat_result_formatter=None,
    **kwargs,
):
    # Params defines the set of options that could be updated via
    # command line args to a pipeline run, that can be forwarded
    # to Pypiper. If any pypiper arguments are passed
    # (via add_pypiper_args()), these will override the constructor
    # defaults for these arguments.

    # Establish default params
    params = {
        "dirty": dirty,
        "recover": recover,
        "new_start": new_start,
        "force_follow": force_follow,
        "config_file": config_file,
        "output_parent": output_parent,
        "cores": cores,
        "mem": mem,
        "testmode": False,
    }

    # Transform the command-line namespace into a Mapping.
    args_dict = vars(args) if args else dict()

    # Parse and store stage specifications that can determine pipeline
    # start and/or stop point.
    # First, add such specifications to the command-line namespace,
    # favoring the command-line spec if both are present.
    for cp_spec in set(CHECKPOINT_SPECIFICATIONS) & set(kwargs.keys()):
        args_dict.setdefault(cp_spec, kwargs[cp_spec])
    # Then, ensure that we set each such specification on this manager
    # so that we're guaranteed safe attribute access. If it's present,
    # remove the specification from the namespace that will be used to
    # update this manager's parameters Mapping.
    for optname in CHECKPOINT_SPECIFICATIONS:
        checkpoint = args_dict.pop(optname, None)
        setattr(self, optname, checkpoint)
    if self.stop_before and self.stop_after:
        raise TypeError(
            "Cannot specify both pre-stop and post-stop; "
            "got '{}' and '{}'".format(self.stop_before, self.stop_after)
        )

    # Update this manager's parameters with non-checkpoint-related
    # command-line parameterization.
    params.update(args_dict)

    # If no starting point was specified, assume that the pipeline's
    # execution is to begin right away and set the internal flag so that
    # run() is let loose to execute instructions given.
    self._active = not self.start_point

    # Pipeline-level variables to track global state and pipeline stats
    # Pipeline settings
    self.name = name
    self.tee = None
    self.overwrite_locks = params["recover"]
    self.new_start = params["new_start"]
    self.force_follow = params["force_follow"]
    self.dirty = params["dirty"]
    self.cores = params["cores"]
    self.output_parent = params["output_parent"]
    self.testmode = params["testmode"]

    # Establish the log file to check safety with logging keyword arguments.
    # Establish the output folder since it's required for the log file.
    self.outfolder = os.path.join(outfolder, "")  # trailing slash
    self.pipeline_log_file = pipeline_filepath(self, suffix=LOGFILE_SUFFIX)

    # Set up logger
    logger_kwargs = logger_kwargs or {}
    if logger_kwargs.get("logfile") == self.pipeline_log_file:
        raise ValueError(
            f"The logfile given for the pipeline manager's logger matches that which will be used by the manager itself: {self.pipeline_log_file}"
        )
    default_logname = ".".join([__name__, self.__class__.__name__, self.name])
    self._logger = None
    if args:
        logger_builder_method = "logger_via_cli"
        try:
            self._logger = logger_via_cli(args, **logger_kwargs)
        except logmuse.est.AbsentOptionException as e:
            # Defer logger construction to init_logger.
            self.debug(f"logger_via_cli failed: {e}")
    if self._logger is None:
        logger_builder_method = "init_logger"
        # covers cases of bool(args) being False, or failure of logger_via_cli.
        # strict is only for logger_via_cli.
        logger_kwargs = {k: v for k, v in logger_kwargs.items() if k != "strict"}
        try:
            name = logger_kwargs.pop("name")
        except KeyError:
            name = default_logname
        self._logger = logmuse.init_logger(name, **logger_kwargs)
    self.debug(f"Logger set with {logger_builder_method}")

    # Keep track of an ID for the number of processes attempted
    self.proc_count = 0

    # We use this memory to pass a memory limit to processes like java that
    # can take a memory limit, so they don't get killed by a SLURM (or other
    # cluster manager) overage. However, with java, the -Xmx argument can only
    # limit the *heap* space, not total memory use; so occasionally SLURM will
    # still kill these processes because total memory goes over the limit.
    # As a kind of hack, we'll set the java processes heap limit to 95% of the
    # total memory limit provided.
    # This will give a little breathing room for non-heap java memory use.

    if not params["mem"].endswith(("K", "M", "G", "T")):
        self.mem = params["mem"] + "M"
    else:
        # Assume the memory is in megabytes.
        self.mem = params["mem"]

    self.javamem = str(int(int(self.mem[:-1]) * 0.95)) + self.mem[-1:]

    self.container = None
    self.clean_initialized = False

    # Do some cores math for split processes
    # If a pipeline wants to run a process using half the cores, or 1/4 of the cores,
    # this can lead to complications if the number of cores is not evenly divisible.
    # Here we add a few variables so that pipelines can easily divide the cores evenly.
    # 50/50 split
    self.cores1of2a = int(self.cores) / 2 + int(self.cores) % 2
    self.cores1of2 = int(self.cores) / 2

    # 75/25 split
    self.cores1of4 = int(self.cores) / 4
    self.cores3of4 = int(self.cores) - int(self.cores1of4)

    self.cores1of8 = int(self.cores) / 8
    self.cores7of8 = int(self.cores) - int(self.cores1of8)

    self.pl_version = version
    # Set relative output_parent directory to absolute
    # not necessary after all. . .
    # if self.output_parent and not os.path.isabs(self.output_parent):
    #   self.output_parent = os.path.join(os.getcwd(), self.output_parent)

    # File paths:
    self.make_sure_path_exists(self.outfolder)
    self.pipeline_profile_file = pipeline_filepath(self, suffix="_profile.tsv")

    # Stats and figures are general and so lack the pipeline name.
    self.pipeline_stats_file = pipeline_filepath(self, filename="stats.yaml")

    # Record commands used and provide manual cleanup script.
    self.pipeline_commands_file = pipeline_filepath(self, suffix="_commands.sh")
    self.cleanup_file = pipeline_filepath(self, suffix="_cleanup.sh")

    # Pipeline status variables
    self.peak_memory = 0  # memory high water mark
    self.starttime = time.time()
    self.last_timestamp = self.starttime  # time of the last call to timestamp()

    self.locks = []
    self.running_procs = {}
    self.completed_procs = {}

    self.wait = True  # turn off for debugging

    # Initialize status and flags
    self.status = "initializing"
    # as part of the beginning of the pipeline, clear any flags set by
    # previous runs of this pipeline
    clear_flags(self)

    # In-memory holder for report_result
    self.stats_dict = {}

    # Result formatter to pass to pipestat
    self.pipestat_result_formatter = (
        pipestat_result_formatter or result_formatter_markdown
    )

    # Checkpoint-related parameters
    self.overwrite_checkpoints = overwrite_checkpoints or self.new_start
    self.halt_on_next = False
    self.prev_checkpoint = None
    self.curr_checkpoint = None

    # Pypiper can keep track of intermediate files to clean up at the end
    self.cleanup_list = []
    self.cleanup_list_conditional = []

    # Register handler functions to deal with interrupt and termination signals;
    # If received, we would then clean up properly (set pipeline status to FAIL, etc).
    signal.signal(signal.SIGINT, self._signal_int_handler)
    signal.signal(signal.SIGTERM, self._signal_term_handler)

    # pipestat setup
    self.pipestat_record_identifier = (
        pipestat_record_identifier or DEFAULT_SAMPLE_NAME
    )
    self.pipestat_pipeline_type = pipestat_pipeline_type or "sample"

    # don't force default pipestat_results_file value unless
    # pipestat config not provided
    if pipestat_config is None and pipestat_results_file is None:
        self.pipestat_results_file = self.pipeline_stats_file
    elif pipestat_results_file:
        self.pipestat_results_file = pipestat_results_file
        self.pipeline_stats_file = self.pipestat_results_file

    def _get_arg(args_dict, arg_name):
        """safely get argument from arg dict -- return None if doesn't exist"""
        return None if arg_name not in args_dict else args_dict[arg_name]

    self._pipestat_manager = PipestatManager(
        record_identifier=self.pipestat_record_identifier
        or _get_arg(args_dict, "pipestat_sample_name")
        or DEFAULT_SAMPLE_NAME,
        pipeline_name=self.name,
        schema_path=pipestat_schema
        or _get_arg(args_dict, "pipestat_schema")
        or default_pipestat_output_schema(sys.argv[0]),
        results_file_path=self.pipestat_results_file
        or _get_arg(args_dict, "pipestat_results_file")
        or self.pipeline_stats_file,
        config_file=pipestat_config or _get_arg(args_dict, "pipestat_config"),
        multi_pipelines=multi,
        pipeline_type=self.pipestat_pipeline_type,
    )

    self.start_pipeline(args, multi)

    # Handle config file if it exists

    # Read YAML config file
    # TODO: This section should become a function, so toolkits can use it
    # to locate a config file.
    config_to_load = None  # start with nothing

    if config_file:
        config_to_load = config_file
    else:
        cmdl_config_file = getattr(args, "config_file", None)
        if cmdl_config_file:
            if os.path.isabs(cmdl_config_file):
                # Absolute custom config file specified
                if os.path.isfile(cmdl_config_file):
                    config_to_load = cmdl_config_file
                else:
                    self.debug("Can't find custom config file: " + cmdl_config_file)
                    pass
            else:
                # Relative custom config file specified
                # Set path to be relative to pipeline script
                pipedir = os.path.dirname(sys.argv[0])
                abs_config = os.path.join(pipedir, cmdl_config_file)
                if os.path.isfile(abs_config):
                    config_to_load = abs_config
                else:
                    self.debug("File: {}".format(__file__))
                    self.debug("Can't find custom config file: " + abs_config)
                    pass
            if config_to_load is not None:
                pass
                self.debug("\nUsing custom config file: {}".format(config_to_load))
        else:
            # No custom config file specified. Check for default
            default_config = default_pipeline_config(sys.argv[0])
            if os.path.isfile(default_config):
                config_to_load = default_config
                self.debug(
                    "Using default pipeline config file: {}".format(config_to_load)
                )

    # Finally load the config we found.
    if config_to_load is not None:
        self.debug("\nLoading config file: {}\n".format(config_to_load))
        self.config = AttMapEcho(load_yaml(config_to_load))
    else:
        self.debug("No config file")
        self.config = None

halted property

halted

Is the managed pipeline in a paused/halted state? :return bool: Whether the managed pipeline is in a paused/halted state.

pipestat property

pipestat

pipestat.PipestatManager object to use for pipeline results reporting and status management

Depending on the object configuration it can report to a YAML-formatted file or PostgreSQL database. Please refer to pipestat documentation for more details: http://pipestat.databio.org/

:return pipestat.PipestatManager: object to use for results reporting

callprint

callprint(cmd, shell=None, lock_file=None, nofail=False, container=None)

Prints the command, and then executes it, then prints the memory use and return code of the command.

Uses python's subprocess.Popen() to execute the given command. The shell argument is simply passed along to Popen(). You should use shell=False (default) where possible, because this enables memory profiling. You should use shell=True if you require shell functions like redirects (>) or stars (*), but this will prevent the script from monitoring memory use. The pipes (|) will be used to split the command into subprocesses run within python, so the memory profiling is possible.

cmd can also be a series (a dict object) of multiple commands, which will be run in succession.

:param str | Iterable[str] cmd: Bash command(s) to be run. :param str lock_file: a lock file name :param bool nofail: Should the pipeline bail on a nonzero return from a process? Default: False Nofail can be used to implement non-essential parts of the pipeline; if these processes fail, they will not cause the pipeline to bail out. :param bool shell: if the command should be run it its own shell, default: None (will try to determine based on the command) :param container: Named Docker container in which to execute. :param container: str

Source code in pypiper/manager.py
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
def callprint(self, cmd, shell=None, lock_file=None, nofail=False, container=None):
    """
    Prints the command, and then executes it, then prints the memory use and
    return code of the command.

    Uses python's subprocess.Popen() to execute the given command. The shell argument is simply
    passed along to Popen(). You should use shell=False (default) where possible, because this enables memory
    profiling. You should use shell=True if you require shell functions like redirects (>) or stars (*), but this
    will prevent the script from monitoring memory use. The pipes (|) will be used to split the command into
    subprocesses run within python, so the memory profiling is possible.

    cmd can also be a series (a dict object) of multiple commands, which will be run in succession.

    :param str | Iterable[str] cmd: Bash command(s) to be run.
    :param str lock_file: a lock file name
    :param bool nofail: Should the pipeline bail on a nonzero return from a process? Default: False
        Nofail can be used to implement non-essential parts of the pipeline; if these processes fail,
        they will not cause the pipeline to bail out.
    :param bool shell: if the command should be run it its own shell, default: None (will try
        to determine based on the command)
    :param container: Named Docker container in which to execute.
    :param container: str
    """
    # The Popen shell argument works like this:
    # if shell=False, then we format the command (with split()) to be a list of command and its arguments.
    # Split the command to use shell=False;
    # leave it together to use shell=True;

    def get_mem_child_sum(proc):
        try:
            # get children processes
            children = proc.children(recursive=True)
            # get RSS memory of each child proc and sum all
            mem_sum = proc.memory_info().rss
            if children:
                mem_sum += sum([x.memory_info().rss for x in children])
            # return in gigs
            return mem_sum / 1e9
        except (psutil.NoSuchProcess, psutil.ZombieProcess) as e:
            self.warning(e)
            self.warning(
                "Warning: couldn't add memory use for process: {}".format(proc.pid)
            )
            return 0

    def display_memory(memval):
        return None if memval < 0 else "{}GB".format(round(memval, 3))

    def make_hash(o):
        """
        Convert the object to string and hash it, return None in case of failure
        :param o: object of any type, in our case it is a dict
        :return str: hahsed string representation of the dict
        """
        try:
            hsh = md5(str(o).encode("utf-8")).hexdigest()[:10]
        except Exception as e:
            self.debug(
                "Could not create hash for '{}', caught exception: {}".format(
                    str(o), e.__class__.__name__
                )
            )
            hsh = None
        return hsh

    if container:
        cmd = "docker exec " + container + " " + cmd

    if self.testmode:
        self._report_command(cmd)
        return 0, 0

    self.debug("Command: {}".format(cmd))
    param_list = parse_cmd(cmd, shell)
    # cast all commands to str and concatenate for hashing
    conc_cmd = "".join([str(x["args"]) for x in param_list])
    self.debug("Hashed command '{}': {}".format(conc_cmd, make_hash(conc_cmd)))
    processes = []
    running_processes = []
    completed_processes = []
    start_time = time.time()
    for i in range(len(param_list)):
        running_processes.append(i)
        if i == 0:
            processes.append(psutil.Popen(preexec_fn=os.setsid, **param_list[i]))
        else:
            param_list[i]["stdin"] = processes[i - 1].stdout
            processes.append(psutil.Popen(preexec_fn=os.setsid, **param_list[i]))
        self.running_procs[processes[-1].pid] = {
            "proc_name": get_proc_name(param_list[i]["args"]),
            "start_time": start_time,
            "container": container,
            "p": processes[-1],
            "args_hash": make_hash(conc_cmd),
            "local_proc_id": self.process_counter(),
        }

    self._report_command(cmd, [x.pid for x in processes])
    # Capture the subprocess output in <pre> tags to make it format nicely
    # if the markdown log file is displayed as HTML.
    self.info("<pre>")

    local_maxmems = [-1] * len(running_processes)
    returncodes = [None] * len(running_processes)
    proc_wrapup_text = [None] * len(running_processes)

    if not self.wait:
        self.info("</pre>")
        ids = [x.pid for x in processes]
        self.debug("Not waiting for subprocesses: " + str(ids))
        return 0, -1

    def proc_wrapup(i):
        """
        :param i: internal ID number of the subprocess
        """
        returncode = processes[i].returncode
        current_pid = processes[i].pid

        info = "PID: {pid};\tCommand: {cmd};\tReturn code: {ret};\tMemory used: {mem}".format(
            pid=current_pid,
            cmd=self.running_procs[current_pid]["proc_name"],
            ret=processes[i].returncode,
            mem=display_memory(local_maxmems[i]),
        )

        # report process profile
        self._report_profile(
            self.running_procs[current_pid]["proc_name"],
            lock_file,
            time.time() - self.running_procs[current_pid]["start_time"],
            local_maxmems[i],
            current_pid,
            self.running_procs[current_pid]["args_hash"],
            self.running_procs[current_pid]["local_proc_id"],
        )

        # Remove this as a running subprocess
        self.running_procs[current_pid]["info"] = info
        self.running_procs[current_pid]["returncode"] = returncode
        self.completed_procs[current_pid] = self.running_procs[current_pid]
        del self.running_procs[current_pid]
        running_processes.remove(i)
        completed_processes.append(i)
        proc_wrapup_text[i] = info
        returncodes[i] = returncode
        return info

    sleeptime = 0.0001

    while running_processes:
        self.debug("running")
        for i in running_processes:
            local_maxmems[i] = max(
                local_maxmems[i], (get_mem_child_sum(processes[i]))
            )
            self.peak_memory = max(self.peak_memory, local_maxmems[i])
            self.debug(processes[i])
            if not self._attend_process(processes[i], sleeptime):
                proc_wrapup_text[i] = proc_wrapup(i)

        # the sleeptime is extremely short at the beginning and gets longer exponentially
        # (+ constant to prevent copious checks at the very beginning)
        # = more precise mem tracing for short processes
        sleeptime = min((sleeptime + 0.25) * 3, 60 / len(processes))

    # All jobs are done, print a final closing and job info
    info = (
        "Elapsed time: "
        + str(datetime.timedelta(seconds=self.time_elapsed(start_time)))
        + "."
    )
    info += " Running peak memory: {pipe}.".format(
        pipe=display_memory(self.peak_memory)
    )
    # if len(proc_wrapup_text) == 1:
    # info += " {}".format(proc_wrapup_text[0])

    for i in completed_processes:
        info += "  \n  {}".format(self.completed_procs[processes[i].pid]["info"])

    info += "\n"  # finish out the
    self.info("</pre>")
    self.info("Command completed. {info}".format(info=info))

    for rc in returncodes:
        if rc != 0:
            msg = (
                "Subprocess returned nonzero result. Check above output for details"
            )
            self._triage_error(SubprocessError(msg), nofail)

    return [returncodes, local_maxmems]

checkprint

checkprint(cmd, shell=None, nofail=False)

Just like callprint, but checks output -- so you can get a variable in python corresponding to the return value of the command you call. This is equivalent to running subprocess.check_output() instead of subprocess.call(). :param str | Iterable[str] cmd: Bash command(s) to be run. :param bool | str shell: If command requires should be run in its own shell. Optional. Default: "guess" -- run() will try to guess if the command should be run in a shell (based on the presence of a pipe (|) or redirect (>), To force a process to run as a direct subprocess, set shell to False; to force a shell, set True. :param bool nofail: Should the pipeline bail on a nonzero return from a process? Default: False Nofail can be used to implement non-essential parts of the pipeline; if these processes fail, they will not cause the pipeline to bail out. :return str: text output by the executed subprocess (check_output)

Source code in pypiper/manager.py
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
def checkprint(self, cmd, shell=None, nofail=False):
    """
    Just like callprint, but checks output -- so you can get a variable
    in python corresponding to the return value of the command you call.
    This is equivalent to running subprocess.check_output()
    instead of subprocess.call().
    :param str | Iterable[str] cmd: Bash command(s) to be run.
    :param bool | str shell: If command requires should be run in its own shell. Optional.
        Default: "guess" -- `run()` will try to guess if the command should be run in a
        shell (based on the presence of a pipe (|) or redirect (>), To force a process to
        run as a direct subprocess, set `shell` to False; to force a shell, set True.
    :param bool nofail: Should the pipeline bail on a nonzero return from a process? Default: False
        Nofail can be used to implement non-essential parts of the pipeline; if these processes fail,
        they will not cause the pipeline to bail out.
    :return str: text output by the executed subprocess (check_output)
    """

    self._report_command(cmd)
    if self.testmode:
        return ""

    likely_shell = check_shell(cmd, shell)

    if shell is None:
        shell = likely_shell

    if not shell:
        if likely_shell:
            self.debug(
                "Should this command run in a shell instead of directly in a subprocess?"
            )
        cmd = shlex.split(cmd)

    try:
        return subprocess.check_output(cmd, shell=shell).decode().strip()
    except Exception as e:
        self._triage_error(e, nofail)

clean_add

clean_add(regex, conditional=False, manual=False)

Add files (or regexs) to a cleanup list, to delete when this pipeline completes successfully. When making a call with run that produces intermediate files that should be deleted after the pipeline completes, you flag these files for deletion with this command. Files added with clean_add will only be deleted upon success of the pipeline.

:param str regex: A unix-style regular expression that matches files to delete (can also be a file name). :param bool conditional: True means the files will only be deleted if no other pipelines are currently running; otherwise they are added to a manual cleanup script called {pipeline_name}_cleanup.sh :param bool manual: True means the files will just be added to a manual cleanup script.

Source code in pypiper/manager.py
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
def clean_add(self, regex, conditional=False, manual=False):
    """
    Add files (or regexs) to a cleanup list, to delete when this pipeline completes successfully.
    When making a call with run that produces intermediate files that should be
    deleted after the pipeline completes, you flag these files for deletion with this command.
    Files added with clean_add will only be deleted upon success of the pipeline.

    :param str regex:  A unix-style regular expression that matches files to delete
        (can also be a file name).
    :param bool conditional: True means the files will only be deleted if no other
        pipelines are currently running; otherwise they are added to a manual cleanup script
        called {pipeline_name}_cleanup.sh
    :param bool manual: True means the files will just be added to a manual cleanup script.
    """

    # TODO: print this message (and several below) in debug
    # print("Adding regex to cleanup: {}".format(regex))
    if self.dirty:
        # Override the user-provided option and force manual cleanup.
        manual = True

    if not self.clean_initialized:
        # Make cleanup files relative to the cleanup script in case the result folder moves.
        with open(self.cleanup_file, "a") as myfile:
            clean_init = 'DIR="$(cd -P -- "$(dirname -- "$0")" && pwd -P)"'
            myfile.write(clean_init + "\n")
            myfile.write("cd ${DIR}\n")
            self.clean_initialized = True

    if manual:
        filenames = glob.glob(regex)
        if not filenames:
            self.info("No files match cleanup pattern: {}".format(regex))
        for filename in filenames:
            try:
                with open(self.cleanup_file, "a") as myfile:
                    if os.path.isabs(filename):
                        relative_filename = os.path.relpath(
                            filename, self.outfolder
                        )
                        absolute_filename = filename
                    else:
                        relative_filename = os.path.relpath(
                            filename, self.outfolder
                        )
                        absolute_filename = os.path.abspath(
                            os.path.join(self.outfolder, relative_filename)
                        )
                    if os.path.isfile(absolute_filename):
                        # print("Adding file to cleanup: {}".format(filename))
                        myfile.write("rm " + relative_filename + "\n")
                    elif os.path.isdir(absolute_filename):
                        # print("Adding directory to cleanup: {}".format(filename))
                        # first, add all filenames in the directory
                        myfile.write("rm " + relative_filename + "/*\n")
                        # and the directory itself
                        myfile.write("rmdir " + relative_filename + "\n")
                    else:
                        self.info(
                            "File not added to cleanup: {}".format(
                                relative_filename
                            )
                        )
            except Exception as e:
                self.error(
                    "Error in clean_add on path {}: {}".format(filename, str(e))
                )
    elif conditional:
        self.cleanup_list_conditional.append(regex)
    else:
        self.cleanup_list.append(regex)
        # TODO: what's the "absolute" list?
        # Remove it from the conditional list if added to the absolute list
        while regex in self.cleanup_list_conditional:
            self.cleanup_list_conditional.remove(regex)

complete

complete()

Stop a completely finished pipeline.

Source code in pypiper/manager.py
1962
1963
1964
def complete(self):
    """Stop a completely finished pipeline."""
    self.stop_pipeline(status=COMPLETE_FLAG)

fail_pipeline

fail_pipeline(exc, dynamic_recover=False)

If the pipeline does not complete, this function will stop the pipeline gracefully. It sets the status flag to failed and skips the normal success completion procedure.

:param Exception exc: Exception to raise. :param bool dynamic_recover: Whether to recover e.g. for job termination.

Source code in pypiper/manager.py
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
def fail_pipeline(self, exc: Exception, dynamic_recover: bool = False):
    """
    If the pipeline does not complete, this function will stop the pipeline gracefully.
    It sets the status flag to failed and skips the normal success completion procedure.

    :param Exception exc: Exception to raise.
    :param bool dynamic_recover: Whether to recover e.g. for job termination.
    """
    # Take care of any active running subprocess
    sys.stdout.flush()
    self._terminate_running_subprocesses()

    if dynamic_recover:
        # job was terminated, not failed due to a bad process.
        # flag this run as recoverable.
        if len(self.locks) < 1:
            # If there is no process locked, then recovery will be automatic.
            self.info("No locked process. Dynamic recovery will be automatic.")
        # make a copy of self.locks to iterate over since we'll be clearing them as we go
        # set a recovery flag for each lock.
        for lock_file in self.locks[:]:
            recover_file = self._recoverfile_from_lockfile(lock_file)
            self.info("Setting dynamic recover file: {}".format(recover_file))
            self._create_file(recover_file)
            self.locks.remove(lock_file)

    # Produce cleanup script
    self._cleanup(dry_run=True)

    # Finally, set the status to failed and close out with a timestamp
    if not self._failed:  # and not self._completed:
        self.timestamp("### Pipeline failed at: ")
        total_time = datetime.timedelta(seconds=self.time_elapsed(self.starttime))
        self.info("Total time: " + str(total_time))
        self.info("Failure reason: " + str(exc))
        self.pipestat.set_status(
            record_identifier=self._pipestat_manager.record_identifier,
            status_identifier="failed",
        )

    if isinstance(exc, str):
        exc = RuntimeError(exc)

    raise exc

get_elapsed_time

get_elapsed_time()

Parse the pipeline profile file, collect the unique and last duplicated runtimes and sum them up. In case the profile is not found, an estimate is calculated (which is correct only in case the pipeline was not rerun)

:return int: sum of runtimes in seconds

Source code in pypiper/manager.py
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
def get_elapsed_time(self):
    """
    Parse the pipeline profile file, collect the unique and last duplicated
    runtimes and sum them up. In case the profile is not found, an estimate
    is calculated (which is correct only in case the pipeline was not rerun)

    :return int: sum of runtimes in seconds
    """
    if os.path.isfile(self.pipeline_profile_file):
        df = _pd.read_csv(
            self.pipeline_profile_file,
            sep="\t",
            comment="#",
            names=PROFILE_COLNAMES,
        )
        try:
            df["runtime"] = _pd.to_timedelta(df["runtime"])
        except ValueError:
            # return runtime estimate
            # this happens if old profile style is mixed with the new one
            # and the columns do not match
            return self.time_elapsed(self.starttime)
        unique_df = df[~df.duplicated("cid", keep="last").values]
        return sum(unique_df["runtime"].apply(lambda x: x.total_seconds()))
    return self.time_elapsed(self.starttime)

get_stat

get_stat(key)

Returns a stat that was previously reported. This is necessary for reporting new stats that are derived from two stats, one of which may have been reported by an earlier run. For example, if you first use report_result to report (number of trimmed reads), and then in a later stage want to report alignment rate, then this second stat (alignment rate) will require knowing the first stat (number of trimmed reads); however, that may not have been calculated in the current pipeline run, so we must retrieve it from the stats.yaml output file. This command will retrieve such previously reported stats if they were not already calculated in the current pipeline run. :param key: key of stat to retrieve

Source code in pypiper/manager.py
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
def get_stat(self, key):
    """
    Returns a stat that was previously reported. This is necessary for reporting new stats that are
    derived from two stats, one of which may have been reported by an earlier run. For example,
    if you first use report_result to report (number of trimmed reads), and then in a later stage
    want to report alignment rate, then this second stat (alignment rate) will require knowing the
    first stat (number of trimmed reads); however, that may not have been calculated in the current
    pipeline run, so we must retrieve it from the stats.yaml output file. This command will retrieve
    such previously reported stats if they were not already calculated in the current pipeline run.
    :param key: key of stat to retrieve
    """

    try:
        return self.stats_dict[key]
    except KeyError:
        self._refresh_stats()
        try:
            return self.stats_dict[key]
        except KeyError:
            self.warning("Missing stat '{}'".format(key))
            return None

halt

halt(checkpoint=None, finished=False, raise_error=True)

Stop the pipeline before completion point.

:param str checkpoint: Name of stage just reached or just completed. :param bool finished: Whether the indicated stage was just finished (True), or just reached (False) :param bool raise_error: Whether to raise an exception to truly halt execution.

Source code in pypiper/manager.py
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
def halt(self, checkpoint=None, finished=False, raise_error=True):
    """
    Stop the pipeline before completion point.

    :param str checkpoint: Name of stage just reached or just completed.
    :param bool finished: Whether the indicated stage was just finished
        (True), or just reached (False)
    :param bool raise_error: Whether to raise an exception to truly
        halt execution.
    """
    self.stop_pipeline(PAUSE_FLAG)
    self._active = False
    if raise_error:
        raise PipelineHalt(checkpoint, finished)

make_sure_path_exists staticmethod

make_sure_path_exists(path)

Creates all directories in a path if it does not exist.

:param str path: Path to create. :raises Exception: if the path creation attempt hits an error with a code indicating a cause other than pre-existence.

Source code in pypiper/manager.py
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
@staticmethod
def make_sure_path_exists(path):
    """
    Creates all directories in a path if it does not exist.

    :param str path: Path to create.
    :raises Exception: if the path creation attempt hits an error with
        a code indicating a cause other than pre-existence.
    """
    try:
        os.makedirs(path)
    except OSError as exception:
        if exception.errno != errno.EEXIST:
            raise

process_counter

process_counter()

Increments process counter with regard to the follow state: if currently executed command is a follow function of another one, the counter is not incremented.

:return str | int: current counter state, a number if the counter has beed incremented or a number of the previous process plus "f" otherwise

Source code in pypiper/manager.py
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
def process_counter(self):
    """
    Increments process counter with regard to the follow state:
    if currently executed command is a follow function of another one, the counter is not incremented.

    :return str | int: current counter state, a number if the counter has beed incremented or a number
    of the previous process plus "f" otherwise
    """
    try:
        if self.in_follow:
            return str(self.proc_count) + "f"
        else:
            self.proc_count += 1
            return self.proc_count
    except AttributeError:
        self.proc_count += 1
        return self.proc_count

report_object

report_object(key, filename, anchor_text=None, anchor_image=None, annotation=None, nolog=False, result_formatter=None, force_overwrite=True)

Writes a key:value pair to self.pipeline_stats_file. Note: this function will be deprecated. Using report_result is recommended.

:param str key: name (key) of the object :param str filename: relative path to the file (relative to parent output dir) :param str anchor_text: text used as the link anchor test or caption to refer to the object. If not provided, defaults to the key. :param str anchor_image: a path to an HTML-displayable image thumbnail (so, .png or .jpg, for example). If a path, the path should be relative to the parent output dir. :param str annotation: By default, the figures will be annotated with the pipeline name, so you can tell which pipeline records which figures. If you want, you can change this. :param bool nolog: Turn on this flag to NOT print this result in the logfile. Use sparingly in case you will be printing the result in a different format. :param str result_formatter: function for formatting via pipestat backend :param bool force_overwrite: overwrite results if they already exist? :return str reported_result: the reported result is returned as a list of formatted strings.

Source code in pypiper/manager.py
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
def report_object(
    self,
    key,
    filename,
    anchor_text=None,
    anchor_image=None,
    annotation=None,
    nolog=False,
    result_formatter=None,
    force_overwrite=True,
):
    """
    Writes a key:value pair to self.pipeline_stats_file. Note: this function
        will be deprecated. Using report_result is recommended.

    :param str key: name (key) of the object
    :param str filename: relative path to the file (relative to parent
        output dir)
    :param str anchor_text: text used as the link anchor test or caption to
        refer to the object. If not provided, defaults to the key.
    :param str anchor_image: a path to an HTML-displayable image thumbnail
        (so, .png or .jpg, for example). If a path, the path should be
        relative to the parent output dir.
    :param str annotation: By default, the figures will be annotated with
        the pipeline name, so you can tell which pipeline records which
        figures. If you want, you can change this.
    :param bool nolog: Turn on this flag to NOT print this result in the
        logfile. Use sparingly in case you will be printing the result in a
        different format.
    :param str result_formatter: function for formatting via pipestat backend
    :param bool force_overwrite: overwrite results if they already exist?
    :return str reported_result: the reported result is returned as a list of formatted strings.
    """
    warnings.warn(
        "This function may be removed in future release. "
        "The recommended way to report pipeline results is using PipelineManager.pipestat.report().",
        category=DeprecationWarning,
    )
    rf = result_formatter or self.pipestat_result_formatter
    # Default annotation is current pipeline name.
    annotation = str(annotation or self.name)
    # In case the value is passed with trailing whitespace.
    filename = str(filename).strip()
    if anchor_text:
        anchor_text = str(anchor_text).strip()
    else:
        anchor_text = str(key).strip()
    # better to use a relative path in this file
    # convert any absolute paths into relative paths

    values = {
        "path": filename,
        "thumbnail_path": anchor_image,
        "title": anchor_text,
        "annotation": annotation,
    }
    val = {key: values}

    reported_result = self.pipestat.report(
        values=val,
        record_identifier=self.pipestat_record_identifier,
        result_formatter=rf,
        force_overwrite=force_overwrite,
    )

    if not nolog:
        if isinstance(
            reported_result, bool
        ):  # Pipestat can return False if results are NOT reported.
            self.info("Result successfully reported? " + str(reported_result))
        else:
            for r in reported_result:
                self.info(r)

report_result

report_result(key, value, nolog=False, result_formatter=None, force_overwrite=True)

Writes a key:value pair to self.pipeline_stats_file.

:param str key: name (key) of the stat :param dict value: value of the stat to report. :param bool nolog: Turn on this flag to NOT print this result in the logfile. Use sparingly in case you will be printing the result in a different format. :param str result_formatter: function for formatting via pipestat backend :param bool force_overwrite: overwrite results if they already exist? :return str reported_result: the reported result is returned as a list of formatted strings.

Source code in pypiper/manager.py
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
def report_result(
    self, key, value, nolog=False, result_formatter=None, force_overwrite=True
):
    """
    Writes a key:value pair to self.pipeline_stats_file.

    :param str key: name (key) of the stat
    :param dict value: value of the stat to report.
    :param bool nolog: Turn on this flag to NOT print this result in the
        logfile. Use sparingly in case you will be printing the result in a
        different format.
    :param str result_formatter: function for formatting via pipestat backend
    :param bool force_overwrite: overwrite results if they already exist?
    :return str reported_result: the reported result is returned as a list of formatted strings.

    """
    # keep the value in memory:
    self.stats_dict[key] = value

    rf = result_formatter or self.pipestat_result_formatter

    reported_result = self.pipestat.report(
        values={key: value},
        record_identifier=self.pipestat_record_identifier,
        result_formatter=rf,
        force_overwrite=force_overwrite,
    )

    if not nolog:
        if isinstance(
            reported_result, bool
        ):  # Pipestat can return False if results are NOT reported.
            self.info("Result successfully reported? " + str(reported_result))
        else:
            for r in reported_result:
                self.info(r)

    return reported_result

run

run(cmd, target=None, lock_name=None, shell=None, nofail=False, clean=False, follow=None, container=None, default_return_code=0)

The primary workhorse function of PipelineManager, this runs a command.

This is the command execution function, which enforces race-free file-locking, enables restartability, and multiple pipelines can produce/use the same files. The function will wait for the file lock if it exists, and not produce new output (by default) if the target output file already exists. If the output is to be created, it will first create a lock file to prevent other calls to run (for example, in parallel pipelines) from touching the file while it is being created. It also records the memory of the process and provides some logging output.

:param str | list[str] cmd: Shell command(s) to be run. :param str | Sequence[str] target: Output file(s) to produce, optional. If all target files exist, the command will not be run. If no target is given, a lock_name must be provided. :param str lock_name: Name of lock file. Optional. :param bool shell: If command requires should be run in its own shell. Optional. Default: None --will try to determine whether the command requires a shell. :param bool nofail: Whether the pipeline proceed past a nonzero return from a process, default False; nofail can be used to implement non-essential parts of the pipeline; if a 'nofail' command fails, the pipeline is free to continue execution. :param bool clean: True means the target file will be automatically added to an auto cleanup list. Optional. :param callable follow: Function to call after executing (each) command. :param str container: Name for Docker container in which to run commands. :param Any default_return_code: Return code to use, might be used to discriminate between runs that did not execute any commands and runs that did. :return int: Return code of process. If a list of commands is passed, this is the maximum of all return codes for all commands.

Source code in pypiper/manager.py
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
def run(
    self,
    cmd,
    target=None,
    lock_name=None,
    shell=None,
    nofail=False,
    clean=False,
    follow=None,
    container=None,
    default_return_code=0,
):
    """
    The primary workhorse function of PipelineManager, this runs a command.

    This is the command  execution function, which enforces
    race-free file-locking, enables restartability, and multiple pipelines
    can produce/use the same files. The function will wait for the file
    lock if it exists, and not produce new output (by default) if the
    target output file already exists. If the output is to be created,
    it will first create a lock file to prevent other calls to run
    (for example, in parallel pipelines) from touching the file while it
    is being created. It also records the memory of the process and
    provides some logging output.

    :param str | list[str] cmd: Shell command(s) to be run.
    :param str | Sequence[str] target: Output file(s) to produce, optional.
        If all target files exist, the command will not be run. If no target
        is given, a lock_name must be provided.
    :param str lock_name: Name of lock file. Optional.
    :param bool shell: If command requires should be run in its own shell.
        Optional. Default: None --will try to determine whether the
        command requires a shell.
    :param bool nofail: Whether the pipeline proceed past a nonzero return from
        a process, default False; nofail can be used to implement
        non-essential parts of the pipeline; if a 'nofail' command fails,
        the pipeline is free to continue execution.
    :param bool clean: True means the target file will be automatically added
        to an auto cleanup list. Optional.
    :param callable follow: Function to call after executing (each) command.
    :param str container: Name for Docker container in which to run commands.
    :param Any default_return_code: Return code to use, might be used to discriminate
        between runs that did not execute any commands and runs that did.
    :return int: Return code of process. If a list of commands is passed,
        this is the maximum of all return codes for all commands.
    """

    def _max_ret_code(codes_list):
        """
        Return the maximum of a list of return codes.

        :param list[int] code: List of return codes to compare.
        :return int: Maximum of list.
        """
        # filter out codes that are None
        codes_list = [code for code in codes_list if code is not None]
        # get the max of the remaining codes
        if codes_list:
            return max(codes_list)
        # if no codes are left, return None
        return

    # validate default return code
    if default_return_code is not None and not isinstance(default_return_code, int):
        raise TypeError("default_return_code must be an int or None")

    # If the pipeline's not been started, skip ahead.
    if not self._active:
        cmds = [cmd] if isinstance(cmd, str) else cmd
        cmds_text = [c if isinstance(c, str) else " ".join(c) for c in cmds]
        self.info(
            "Pipeline is inactive; skipping {} command(s):\n{}".format(
                len(cmds), "\n".join(cmds_text)
            )
        )
        return default_return_code

    # Short-circuit if the checkpoint file exists and the manager's not
    # been configured to overwrite such files.
    if self.curr_checkpoint is not None:
        check_fpath = checkpoint_filepath(self.curr_checkpoint, self)
        if os.path.isfile(check_fpath) and not self.overwrite_checkpoints:
            self.info(
                "Checkpoint file exists for '{}' ('{}'), and the {} has "
                "been configured to not overwrite checkpoints; "
                "skipping command '{}'".format(
                    self.curr_checkpoint, check_fpath, self.__class__.__name__, cmd
                )
            )
            return default_return_code

    # TODO: consider making the logic such that locking isn't implied, or
    # TODO (cont.): that we can make it otherwise such that it's not
    # TODO (cont.): strictly necessary to provide target or lock_name.
    # The default lock name is based on the target name.
    # Therefore, a targetless command that you want
    # to lock must specify a lock_name manually.
    if target is None and lock_name is None:
        self.fail_pipeline(
            Exception("You must provide either a target or a lock_name.")
        )

    # Downstream code requires target to be a list, so convert if only
    # a single item was given
    if not is_multi_target(target) and target is not None:
        target = [target]

    # Downstream code requires a list of locks; convert
    if isinstance(lock_name, str):
        lock_name = [lock_name]

    # Default lock_name (if not provided) is based on the target file name,
    # but placed in the parent pipeline outfolder
    self.debug(
        "Lock_name {}; target '{}', outfolder '{}'".format(
            lock_name, target, self.outfolder
        )
    )
    lock_name = lock_name or make_lock_name(target, self.outfolder)
    lock_files = [self._make_lock_path(ln) for ln in lock_name]

    process_return_code = default_return_code
    local_maxmem = 0

    # Decide how to do follow-up.
    if not follow:
        call_follow = lambda: None
    elif not hasattr(follow, "__call__"):
        # Warn about non-callable argument to follow-up function.
        self.warning(
            "Follow-up function is not callable and won't be used: {}".format(
                type(follow)
            )
        )
        call_follow = lambda: None
    else:
        # Wrap the follow-up function so that the log shows what's going on.
        # additionally, the in_follow attribute is set to enable proper command count handling
        def call_follow():
            self.debug("Follow:")
            self.in_follow = True
            follow()
            self.in_follow = False

    # The while=True loop here is unlikely to be triggered, and is just a
    # wrapper to prevent race conditions; the lock_file must be created by
    # the current loop. If not, we loop again and then re-do the tests.
    # The recover and newstart options inform the pipeline to run a command
    # in a scenario where it normally would not. We use these "local" flags
    # to allow us to report on the state of the pipeline in the first round
    # as normal, but then proceed on the next iteration through the outer
    # loop. The proceed_through_locks is a flag that is set if any lockfile
    # is found that needs to be recovered or overwritten. It instructs us to
    # ignore lock files on the next iteration.
    local_recover = False
    local_newstart = False
    proceed_through_locks = False

    while True:
        ##### Tests block
        # Base case: All targets exists and not set to overwrite targets break loop, don't run process.
        # os.path.exists returns True for either a file or directory; .isfile is file-only
        if (
            target is not None
            and all([os.path.exists(t) for t in target])
            and not any([os.path.isfile(l) for l in lock_files])
            and not local_newstart
        ):
            for tgt in target:
                if os.path.exists(tgt):
                    self.info("Target exists: `" + tgt + "`  ")
            if self.new_start:
                self.info("New start mode; run anyway.  ")
                # Set the local_newstart flag so the command will run anyway.
                # Doing this in here instead of outside the loop allows us
                # to still report the target existence.
                local_newstart = True
                continue
            # Normally we don't run the follow, but if you want to force. . .
            if self.force_follow:
                call_follow()
            # Increment process count
            increment_info_pattern = "Skipped command: `{}`\nCommand ID incremented by: `{}`. Current ID: `{}`\n"
            if isinstance(cmd, list):
                for c in cmd:
                    count = len(parse_cmd(c, shell))
                    self.proc_count += count
                    self.debug(
                        increment_info_pattern.format(
                            str(c), count, self.proc_count
                        )
                    )
            else:
                count = len(parse_cmd(cmd, shell))
                self.proc_count += count
                self.debug(
                    increment_info_pattern.format(str(cmd), count, self.proc_count)
                )
            break  # Do not run command

        # Scenario 1: Lock file exists, but we're supposed to overwrite target; Run process.
        if not proceed_through_locks:
            for lock_file in lock_files:
                recover_file = self._recoverfile_from_lockfile(lock_file)
                if os.path.isfile(lock_file):
                    self.info("Found lock file: {}".format(lock_file))
                    if self.overwrite_locks:
                        self.info("Overwriting target...")
                        proceed_through_locks = True
                    elif os.path.isfile(recover_file):
                        self.info(
                            "Found dynamic recovery file ({}); "
                            "overwriting target...".format(recover_file)
                        )
                        # remove the lock file which will then be promptly re-created for the current run.
                        local_recover = True
                        proceed_through_locks = True
                        # the recovery flag is now spent; remove so we don't accidentally re-recover a failed job
                        os.remove(recover_file)
                    else:  # don't overwrite locks
                        self._wait_for_lock(lock_file)
                        # when it's done loop through again to try one more
                        # time (to see if the target exists now)
                        continue

        # If you get to this point, the target doesn't exist, and the lock_file doesn't exist
        # (or we should overwrite). create the lock (if you can)
        # Initialize lock in master lock list
        for lock_file in lock_files:
            self.locks.append(lock_file)
            if self.overwrite_locks or local_recover:
                self._create_file(lock_file)
            else:
                try:
                    self._create_file_racefree(lock_file)  # Create lock
                except OSError as e:
                    if e.errno == errno.EEXIST:  # File already exists
                        self.info(
                            "Lock file created after test! Looping again: {}".format(
                                lock_file
                            )
                        )

                        # Since a lock file was created by a different source,
                        # we need to reset this flag to re-check the locks.
                        proceed_through_locks = False
                        continue  # Go back to start

        ##### End tests block
        # If you make it past these tests, we should proceed to run the process.

        if target is not None:
            self.info(
                "Target to produce: {}  ".format(
                    ",".join(["`" + x + "`" for x in target])
                )
            )
        else:
            self.info("Targetless command, running...  ")

        if isinstance(cmd, list):  # Handle command lists
            for cmd_i in cmd:
                list_ret, maxmem = self.callprint(
                    cmd_i, shell, lock_file, nofail, container
                )
                maxmem = max(maxmem) if isinstance(maxmem, Iterable) else maxmem
                local_maxmem = max(local_maxmem, maxmem)
                list_ret = (
                    _max_ret_code(list_ret)
                    if isinstance(list_ret, Iterable)
                    else list_ret
                )
                process_return_code = _max_ret_code([process_return_code, list_ret])

        else:  # Single command (most common)
            process_return_code, local_maxmem = self.callprint(
                cmd, shell, lock_file, nofail, container
            )  # Run command
            if isinstance(process_return_code, list):
                process_return_code = _max_ret_code(process_return_code)

        # For temporary files, you can specify a clean option to automatically
        # add them to the clean list, saving you a manual call to clean_add
        if target is not None and clean:
            for tgt in target:
                self.clean_add(tgt)

        call_follow()
        for lock_file in lock_files:
            os.remove(lock_file)  # Remove lock file
            self.locks.remove(lock_file)

        # If you make it to the end of the while loop, you're done
        break

    return process_return_code

start_pipeline

start_pipeline(args=None, multi=False)

Initialize setup. Do some setup, like tee output, print some diagnostics, create temp files. You provide only the output directory (used for pipeline stats, log, and status flag files).

Source code in pypiper/manager.py
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
def start_pipeline(self, args=None, multi=False):
    """
    Initialize setup. Do some setup, like tee output, print some diagnostics, create temp files.
    You provide only the output directory (used for pipeline stats, log, and status flag files).
    """
    # Perhaps this could all just be put into __init__, but I just kind of like the idea of a start function
    # self.make_sure_path_exists(self.outfolder)

    # By default, Pypiper will mirror every operation so it is displayed both
    # on sys.stdout **and** to a log file. Unfortunately, interactive python sessions
    # ruin this by interfering with stdout. So, for interactive mode, we do not enable
    # the tee subprocess, sending all output to screen only.
    # Starting multiple PipelineManagers in the same script has the same problem, and
    # must therefore be run in interactive_mode.

    interactive_mode = multi or not hasattr(__main__, "__file__")
    if interactive_mode:
        self.warning(
            "Warning: You're running an interactive python session. "
            "This works, but pypiper cannot tee the output, so results "
            "are only logged to screen."
        )
    else:
        sys.stdout = Unbuffered(sys.stdout)
        # sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)  # Unbuffer output

        # The tee subprocess must be instructed to ignore TERM and INT signals;
        # Instead, I will clean up this process in the signal handler functions.
        # This is required because otherwise, if pypiper receives a TERM or INT,
        # the tee will be automatically terminated by python before I have a chance to
        # print some final output (for example, about when the process stopped),
        # and so those things don't end up in the log files because the tee
        # subprocess is dead. Instead, I will handle the killing of the tee process
        # manually (in the exit handler).

        # a for append to file

        tee = subprocess.Popen(
            ["tee", "-a", self.pipeline_log_file],
            stdin=subprocess.PIPE,
            preexec_fn=self._ignore_interrupts,
        )

        # If the pipeline is terminated with SIGTERM/SIGINT,
        # make sure we kill this spawned tee subprocess as well.
        # atexit.register(self._kill_child_process, tee.pid, proc_name="tee")
        os.dup2(tee.stdin.fileno(), sys.stdout.fileno())
        os.dup2(tee.stdin.fileno(), sys.stderr.fileno())

        self.tee = tee

    # For some reason, this exit handler function MUST be registered after
    # the one that kills the tee process.
    atexit.register(self._exit_handler)

    # A future possibility to avoid this tee, is to use a Tee class; this works for anything printed here
    # by pypiper, but can't tee the subprocess output. For this, it would require using threading to
    # simultaneously capture and display subprocess output. I shelve this for now and stick with the tee option.
    # sys.stdout = Tee(self.pipeline_log_file)

    # Record the git version of the pipeline and pypiper used. This gets (if it is in a git repo):
    # dir: the directory where the code is stored
    # hash: the commit id of the last commit in this repo
    # date: the date of the last commit in this repo
    # diff: a summary of any differences in the current (run) version vs. the committed version

    # Wrapped in try blocks so that the code will not fail if the pipeline or pypiper are not git repositories
    gitvars = {}
    try:
        # pypiper dir
        ppd = os.path.dirname(os.path.realpath(__file__))
        gitvars["pypiper_dir"] = ppd
        gitvars["pypiper_hash"] = (
            subprocess.check_output(
                "cd " + ppd + "; git rev-parse --verify HEAD 2>/dev/null",
                shell=True,
            )
            .decode()
            .strip()
        )
        gitvars["pypiper_date"] = (
            subprocess.check_output(
                "cd " + ppd + "; git show -s --format=%ai HEAD 2>/dev/null",
                shell=True,
            )
            .decode()
            .strip()
        )
        gitvars["pypiper_diff"] = (
            subprocess.check_output(
                "cd " + ppd + "; git diff --shortstat HEAD 2>/dev/null", shell=True
            )
            .decode()
            .strip()
        )
        gitvars["pypiper_branch"] = (
            subprocess.check_output(
                "cd " + ppd + "; git branch | grep '*' 2>/dev/null", shell=True
            )
            .decode()
            .strip()
        )
    except Exception:
        pass
    try:
        # pipeline dir
        pld = os.path.dirname(os.path.realpath(sys.argv[0]))
        gitvars["pipe_dir"] = pld
        gitvars["pipe_hash"] = (
            subprocess.check_output(
                "cd " + pld + "; git rev-parse --verify HEAD 2>/dev/null",
                shell=True,
            )
            .decode()
            .strip()
        )
        gitvars["pipe_date"] = (
            subprocess.check_output(
                "cd " + pld + "; git show -s --format=%ai HEAD 2>/dev/null",
                shell=True,
            )
            .decode()
            .strip()
        )
        gitvars["pipe_diff"] = (
            subprocess.check_output(
                "cd " + pld + "; git diff --shortstat HEAD 2>/dev/null", shell=True
            )
            .decode()
            .strip()
        )
        gitvars["pipe_branch"] = (
            subprocess.check_output(
                "cd " + pld + "; git branch | grep '*' 2>/dev/null", shell=True
            )
            .decode()
            .strip()
        )
    except Exception:
        pass

    # Print out a header section in the pipeline log:
    # Wrap things in backticks to prevent markdown from interpreting underscores as emphasis.
    # print("----------------------------------------")
    def logfmt(key, value=None, padding=16):
        padded_key = key.rjust(padding)
        formatted_val = f"`{value}`" if value else ""
        return f"* {padded_key}: {formatted_val}"

    self.info("### Pipeline run code and environment:\n")
    self.info(logfmt("Command", str(" ".join(sys.argv))))
    self.info(logfmt("Compute host", platform.node()))
    self.info(logfmt("Working dir", os.getcwd()))
    self.info(logfmt("Outfolder", self.outfolder))
    self.info(logfmt("Log file", self.pipeline_log_file))
    self.timestamp(logfmt("Start time"))

    self.info("\n### Version log:\n")
    self.info(logfmt("Python version", platform.python_version()))
    try:
        self.info(logfmt("Pypiper dir", gitvars["pypiper_dir"].strip()))
        self.info(logfmt("Pypiper version", __version__))
        self.info(logfmt("Pypiper hash", gitvars["pypiper_hash"]))
        self.info(logfmt("Pypiper branch", gitvars["pypiper_branch"]))
        self.info(logfmt("Pypiper date", gitvars["pypiper_date"]))
        if gitvars["pypiper_diff"]:
            self.info(logfmt("Pypiper diff", gitvars["pypiper_diff"]))
    except KeyError:
        # It is ok if keys aren't set, it means pypiper isn't in a  git repo.
        pass

    try:
        self.info(logfmt("Pipeline dir", gitvars["pipe_dir"].strip()))
        self.info(logfmt("Pipeline version", self.pl_version))
        self.info(logfmt("Pipeline hash", gitvars["pipe_hash"]).strip())
        self.info(logfmt("Pipeline branch", gitvars["pipe_branch"]).strip())
        self.info(logfmt("Pipeline date", gitvars["pipe_date"]).strip())
        if gitvars["pipe_diff"] != "":
            self.info(logfmt("Pipeline diff", gitvars["pipe_diff"]).strip())
    except KeyError:
        # It is ok if keys aren't set, it means the pipeline isn't a git repo.
        pass

    # self.info all arguments (if any)
    self.info("\n### Arguments passed to pipeline:\n")
    for arg, val in sorted((vars(args) if args else dict()).items()):
        argtext = "`{}`".format(arg)
        valtext = "`{}`".format(val)
        self.info("* {}:  {}".format(argtext.rjust(20), valtext))

    self.info("\n### Initialized Pipestat Object:\n")
    results = self._pipestat_manager.__str__().split("\n")
    for i in results:
        self.info("* " + i)
    self.info("* Sample name: " + self.pipestat_record_identifier + "\n")
    self.info("\n----------------------------------------\n")
    self.status = "running"
    self.pipestat.set_status(
        record_identifier=self._pipestat_manager.record_identifier,
        status_identifier="running",
    )

    # Record the start in PIPE_profile and PIPE_commands output files so we
    # can trace which run they belong to

    with open(self.pipeline_commands_file, "a") as myfile:
        myfile.write(
            "# Pipeline started at "
            + time.strftime("%m-%d %H:%M:%S", time.localtime(self.starttime))
            + "\n\n"
        )

    with open(self.pipeline_profile_file, "a") as myfile:
        myfile.write(
            "# Pipeline started at "
            + time.strftime("%m-%d %H:%M:%S", time.localtime(self.starttime))
            + "\n\n"
            + "# "
            + "\t".join(PROFILE_COLNAMES)
            + "\n"
        )

stop_pipeline

stop_pipeline(status=COMPLETE_FLAG)

Terminate the pipeline.

This is the "healthy" pipeline completion function. The normal pipeline completion function, to be run by the pipeline at the end of the script. It sets status flag to completed and records some time and memory statistics to the log file.

Source code in pypiper/manager.py
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
def stop_pipeline(self, status=COMPLETE_FLAG):
    """
    Terminate the pipeline.

    This is the "healthy" pipeline completion function.
    The normal pipeline completion function, to be run by the pipeline
    at the end of the script. It sets status flag to completed and records
    some time and memory statistics to the log file.
    """
    # self._set_status_flag(status)
    self.pipestat.set_status(
        record_identifier=self._pipestat_manager.record_identifier,
        status_identifier=status,
    )
    self._cleanup()
    elapsed_time_this_run = str(
        datetime.timedelta(seconds=self.time_elapsed(self.starttime))
    )
    self.report_result("Time", elapsed_time_this_run, nolog=True)
    self.report_result("Success", time.strftime("%m-%d-%H:%M:%S"), nolog=True)

    self.info("\n### Pipeline completed. Epilogue")
    # print("* " + "Total elapsed time".rjust(20) + ":  "
    #       + str(datetime.timedelta(seconds=self.time_elapsed(self.starttime))))
    self.info(
        "* " + "Elapsed time (this run)".rjust(30) + ":  " + elapsed_time_this_run
    )
    self.info(
        "* "
        + "Total elapsed time (all runs)".rjust(30)
        + ":  "
        + str(datetime.timedelta(seconds=round(self.get_elapsed_time())))
    )
    self.info(
        "* "
        + "Peak memory (this run)".rjust(30)
        + ":  "
        + str(round(self.peak_memory, 4))
        + " GB"
    )
    # self.info("* " + "Total peak memory (all runs)".rjust(30) + ":  " +
    #     str(round(self.peak_memory, 4)) + " GB")
    if self.halted:
        return

    t = time.strftime("%Y-%m-%d %H:%M:%S")
    self.info("* " + "Pipeline completed time".rjust(30) + ": " + t)

time_elapsed staticmethod

time_elapsed(time_since)

Returns the number of seconds that have elapsed since the time_since parameter.

:param float time_since: Time as a float given by time.time().

Source code in pypiper/manager.py
1521
1522
1523
1524
1525
1526
1527
1528
@staticmethod
def time_elapsed(time_since):
    """
    Returns the number of seconds that have elapsed since the time_since parameter.

    :param float time_since: Time as a float given by time.time().
    """
    return round(time.time() - time_since, 0)

timestamp

timestamp(message='', checkpoint=None, finished=False, raise_error=True)

Print message, time, and time elapsed, perhaps creating checkpoint.

This prints your given message, along with the current time, and time elapsed since the previous timestamp() call. If you specify a HEADING by beginning the message with "###", it surrounds the message with newlines for easier readability in the log file. If a checkpoint is designated, an empty file is created corresponding to the name given. Depending on how this manager's been configured, the value of the checkpoint, and whether this timestamp indicates initiation or completion of a group of pipeline steps, this call may stop the pipeline's execution.

:param str message: Message to timestamp. :param str checkpoint: Name of checkpoint; this tends to be something that reflects the processing logic about to be or having just been completed. Provision of an argument to this parameter means that a checkpoint file will be created, facilitating arbitrary starting and stopping point for the pipeline as desired. :param bool finished: Whether this call represents the completion of a conceptual unit of a pipeline's processing :param raise_error: Whether to raise exception if checkpoint or current state indicates that a halt should occur.

Source code in pypiper/manager.py
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
def timestamp(self, message="", checkpoint=None, finished=False, raise_error=True):
    """
    Print message, time, and time elapsed, perhaps creating checkpoint.

    This prints your given message, along with the current time, and time
    elapsed since the previous timestamp() call.  If you specify a
    HEADING by beginning the message with "###", it surrounds the message
    with newlines for easier readability in the log file. If a checkpoint
    is designated, an empty file is created corresponding to the name
    given. Depending on how this manager's been configured, the value of
    the checkpoint, and whether this timestamp indicates initiation or
    completion of a group of pipeline steps, this call may stop the
    pipeline's execution.

    :param str message: Message to timestamp.
    :param str checkpoint: Name of checkpoint; this tends to be something
        that reflects the processing logic about to be or having just been
        completed. Provision of an argument to this parameter means that
        a checkpoint file will be created, facilitating arbitrary starting
        and stopping point for the pipeline as desired.
    :param bool finished: Whether this call represents the completion of a
        conceptual unit of a pipeline's processing
    :param raise_error: Whether to raise exception if
        checkpoint or current state indicates that a halt should occur.
    """

    # Halt if the manager's state has been set such that this call
    # should halt the pipeline.
    if self.halt_on_next:
        self.halt(checkpoint, finished, raise_error=raise_error)

    # Determine action to take with respect to halting if needed.
    if checkpoint:
        if finished:
            # Write the file.
            self._checkpoint(checkpoint)
            self.prev_checkpoint = checkpoint
            self.curr_checkpoint = None
        else:
            self.prev_checkpoint = self.curr_checkpoint
            self.curr_checkpoint = checkpoint
            self._checkpoint(self.prev_checkpoint)
        # Handle the two halting conditions.
        if (finished and checkpoint == self.stop_after) or (
            not finished and checkpoint == self.stop_before
        ):
            self.halt(checkpoint, finished, raise_error=raise_error)
        # Determine if we've started executing.
        elif checkpoint == self.start_point:
            self._active = True
        # If this is a prospective checkpoint, set the current checkpoint
        # accordingly and whether we should halt the pipeline on the
        # next timestamp call.
        if not finished and checkpoint == self.stop_after:
            self.halt_on_next = True

    elapsed = self.time_elapsed(self.last_timestamp)
    t = time.strftime("%m-%d %H:%M:%S")
    if checkpoint is None:
        msg = "{m} ({t}) elapsed: {delta_t} _TIME_".format(
            m=message, t=t, delta_t=elapsed
        )
    else:
        msg = "{m} ({t}) ({status} {stage}) elapsed: {delta_t} _TIME_".format(
            m=message,
            t=t,
            status="finished" if finished else "starting",
            stage=checkpoint,
            delta_t=elapsed,
        )
    if re.match("^###", message):
        msg = "\n{}\n".format(msg)
    self.info(msg)
    self.last_timestamp = time.time()