Skip to content

Package pipestat Documentation

Class PipestatError

Base exception type for this package

Class SamplePipestatManager

Pipestat standardizes reporting of pipeline results and pipeline status management. It formalizes a way for pipeline developers and downstream tools developers to communicate -- results produced by a pipeline can easily and reliably become an input for downstream analyses. A PipestatManager object exposes an API for interacting with the results and pipeline status and can be backed by either a YAML-formatted file or a database.

def __init__(self, **kwargs)

Initialize the PipestatManager object


  • record_identifier (str): record identifier to report for. Thiscreates a weak bound to the record, which can be overridden in this object method calls
  • schema_path (str): path to the output schema that formalizesthe results structure
  • results_file_path (str): YAML file to report into, if file isused as the object back-end
  • database_only (bool): whether the reported data should not bestored in the memory, but only in the database
  • config_file (str): path to the configuration file
  • config_dict (dict): a mapping with the config file content
  • flag_file_dir (str): path to directory containing flag files
  • show_db_logs (bool): Defaults to False, toggles showing database logs
  • pipeline_type (str): "sample" or "project"
  • pipeline_name (str): name of the current pipeline, defaults to
  • result_formatter (str): function for formatting result
  • multi_pipelines (bool): allows for running multiple pipelines for one file backend
  • output_dir (str): target directory for report generation via summarize and table generation via table.
def clear_status(self, *args, **kwargs)
def config_path(self)

Config path. None if the config was not provided or if provided as a mapping of the config contents


  • str: path to the provided config
def count_records(self, *args, **kwargs)
def data(self)

Data object


  • yacman.YAMLConfigManager: the object that stores the reported data
def db_url(self)

Database URL, generated based on config credentials


  • str: database URL


  • PipestatDatabaseError: if the object is not backed by a database
def file(self)

File path that the object is reporting the results into


  • str: file path that the object is reporting the results into
def get_status(self, *args, **kwargs)
def highlighted_results(self)

Highlighted results


  • List[str]: a collection of highlighted results
def initialize_dbbackend(*args, **kwargs)
def link(self, *args, **kwargs)
def list_recent_results(self, *args, **kwargs)
def output_dir(self)

Output directory for report and stats generation


  • str: path to output_dir
def pipeline_name(self)

Pipeline name


  • str: Pipeline name
def pipeline_type(self)

Pipeline type: "sample" or "project"


  • str: pipeline type
def project_name(self)

Project name the object writes the results to


  • str: project name the object writes the results to
def record_count(self)

Number of records reported


  • int: number of records reported
def record_identifier(self)

Pipeline type: "sample" or "project"


  • str: pipeline type
def remove(self, *args, **kwargs)
def remove_record(self, *args, **kwargs)
def report(self, *args, **kwargs)
def result_schemas(self)

Result schema mappings


  • dict: schemas that formalize the structure of each resultin a canonical jsonschema way
def retrieve_one(self, *args, **kwargs)
def schema(self)

Schema mapping


  • ParsedSchema: schema object that formalizes the results structure
def schema_path(self)

Schema path


  • str: path to the provided schema
def select_distinct(self, *args, **kwargs)
def select_records(self, *args, **kwargs)
def set_status(self, *args, **kwargs)
def status_schema(self)

Status schema mapping


  • dict: schema that formalizes the pipeline status structure
def status_schema_source(self)

Status schema source


  • dict: source of the schema that formalizesthe pipeline status structure
def summarize(self, *args, **kwargs)
def table(self, *args, **kwargs)

Class ProjectPipestatManager

Pipestat standardizes reporting of pipeline results and pipeline status management. It formalizes a way for pipeline developers and downstream tools developers to communicate -- results produced by a pipeline can easily and reliably become an input for downstream analyses. A PipestatManager object exposes an API for interacting with the results and pipeline status and can be backed by either a YAML-formatted file or a database.

def __init__(self, **kwargs)

Initialize the PipestatManager object


  • record_identifier (str): record identifier to report for. Thiscreates a weak bound to the record, which can be overridden in this object method calls
  • schema_path (str): path to the output schema that formalizesthe results structure
  • results_file_path (str): YAML file to report into, if file isused as the object back-end
  • database_only (bool): whether the reported data should not bestored in the memory, but only in the database
  • config_file (str): path to the configuration file
  • config_dict (dict): a mapping with the config file content
  • flag_file_dir (str): path to directory containing flag files
  • show_db_logs (bool): Defaults to False, toggles showing database logs
  • pipeline_type (str): "sample" or "project"
  • pipeline_name (str): name of the current pipeline, defaults to
  • result_formatter (str): function for formatting result
  • multi_pipelines (bool): allows for running multiple pipelines for one file backend
  • output_dir (str): target directory for report generation via summarize and table generation via table.
def clear_status(self, *args, **kwargs)
def config_path(self)

Config path. None if the config was not provided or if provided as a mapping of the config contents


  • str: path to the provided config
def count_records(self, *args, **kwargs)
def data(self)

Data object


  • yacman.YAMLConfigManager: the object that stores the reported data
def db_url(self)

Database URL, generated based on config credentials


  • str: database URL


  • PipestatDatabaseError: if the object is not backed by a database
def file(self)

File path that the object is reporting the results into


  • str: file path that the object is reporting the results into
def get_status(self, *args, **kwargs)
def highlighted_results(self)

Highlighted results


  • List[str]: a collection of highlighted results
def initialize_dbbackend(*args, **kwargs)
def link(self, *args, **kwargs)
def list_recent_results(self, *args, **kwargs)
def output_dir(self)

Output directory for report and stats generation


  • str: path to output_dir
def pipeline_name(self)

Pipeline name


  • str: Pipeline name
def pipeline_type(self)

Pipeline type: "sample" or "project"


  • str: pipeline type
def project_name(self)

Project name the object writes the results to


  • str: project name the object writes the results to
def record_count(self)

Number of records reported


  • int: number of records reported
def record_identifier(self)

Pipeline type: "sample" or "project"


  • str: pipeline type
def remove(self, *args, **kwargs)
def remove_record(self, *args, **kwargs)
def report(self, *args, **kwargs)
def result_schemas(self)

Result schema mappings


  • dict: schemas that formalize the structure of each resultin a canonical jsonschema way
def retrieve_one(self, *args, **kwargs)
def schema(self)

Schema mapping


  • ParsedSchema: schema object that formalizes the results structure
def schema_path(self)

Schema path


  • str: path to the provided schema
def select_distinct(self, *args, **kwargs)
def select_records(self, *args, **kwargs)
def set_status(self, *args, **kwargs)
def status_schema(self)

Status schema mapping


  • dict: schema that formalizes the pipeline status structure
def status_schema_source(self)

Status schema source


  • dict: source of the schema that formalizesthe pipeline status structure
def summarize(self, *args, **kwargs)
def table(self, *args, **kwargs)

Class PipestatBoss

PipestatBoss simply holds Sample or Project Managers that are child classes of PipestatManager. :param list[str] pipeline_list: list that holds pipeline types, e.g. ['sample','project'] :param str record_identifier: record identifier to report for. This creates a weak bound to the record, which can be overridden in this object method calls :param str schema_path: path to the output schema that formalizes the results structure :param str results_file_path: YAML file to report into, if file is used as the object back-end :param bool database_only: whether the reported data should not be stored in the memory, but only in the database :param str | dict config: path to the configuration file or a mapping with the config file content :param str flag_file_dir: path to directory containing flag files :param bool show_db_logs: Defaults to False, toggles showing database logs :param str pipeline_type: "sample" or "project" :param str result_formatter: function for formatting result :param bool multi_pipelines: allows for running multiple pipelines for one file backend :param str output_dir: target directory for report generation via summarize and table generation via table.

def __init__(self, pipeline_list: Optional[list]=None, **kwargs)

Initialize self. See help(type(self)) for accurate signature.

Class PipestatManager

Pipestat standardizes reporting of pipeline results and pipeline status management. It formalizes a way for pipeline developers and downstream tools developers to communicate -- results produced by a pipeline can easily and reliably become an input for downstream analyses. A PipestatManager object exposes an API for interacting with the results and pipeline status and can be backed by either a YAML-formatted file or a database.

def __init__(self, project_name: Optional[str]=None, record_identifier: Optional[str]=None, schema_path: Optional[str]=None, results_file_path: Optional[str]=None, database_only: Optional[bool]=True, config_file: Optional[str]=None, config_dict: Optional[dict]=None, flag_file_dir: Optional[str]=None, show_db_logs: bool=False, pipeline_type: Optional[str]=None, pipeline_name: Optional[str]=None, result_formatter: staticmethod=<function default_formatter at 0x70949b3ae680>, multi_pipelines: bool=False, output_dir: Optional[str]=None)

Initialize the PipestatManager object


  • record_identifier (str): record identifier to report for. Thiscreates a weak bound to the record, which can be overridden in this object method calls
  • schema_path (str): path to the output schema that formalizesthe results structure
  • results_file_path (str): YAML file to report into, if file isused as the object back-end
  • database_only (bool): whether the reported data should not bestored in the memory, but only in the database
  • config_file (str): path to the configuration file
  • config_dict (dict): a mapping with the config file content
  • flag_file_dir (str): path to directory containing flag files
  • show_db_logs (bool): Defaults to False, toggles showing database logs
  • pipeline_type (str): "sample" or "project"
  • pipeline_name (str): name of the current pipeline, defaults to
  • result_formatter (str): function for formatting result
  • multi_pipelines (bool): allows for running multiple pipelines for one file backend
  • output_dir (str): target directory for report generation via summarize and table generation via table.
def check_multi_results(self)
def clear_status(self, *args, **kwargs)
def config_path(self)

Config path. None if the config was not provided or if provided as a mapping of the config contents


  • str: path to the provided config
def count_records(self, *args, **kwargs)
def data(self)

Data object


  • yacman.YAMLConfigManager: the object that stores the reported data
def db_url(self)

Database URL, generated based on config credentials


  • str: database URL


  • PipestatDatabaseError: if the object is not backed by a database
def file(self)

File path that the object is reporting the results into


  • str: file path that the object is reporting the results into
def get_status(self, *args, **kwargs)
def highlighted_results(self)

Highlighted results


  • List[str]: a collection of highlighted results
def initialize_dbbackend(*args, **kwargs)
def initialize_filebackend(self, record_identifier, results_file_path, flag_file_dir)
def link(self, *args, **kwargs)
def list_recent_results(self, *args, **kwargs)
def output_dir(self)

Output directory for report and stats generation


  • str: path to output_dir
def pipeline_name(self)

Pipeline name


  • str: Pipeline name
def pipeline_type(self)

Pipeline type: "sample" or "project"


  • str: pipeline type
def process_schema(self, schema_path)
def project_name(self)

Project name the object writes the results to


  • str: project name the object writes the results to
def record_count(self)

Number of records reported


  • int: number of records reported
def record_identifier(self)

Pipeline type: "sample" or "project"


  • str: pipeline type
def remove(self, *args, **kwargs)
def remove_record(self, *args, **kwargs)
def report(self, *args, **kwargs)
def resolve_results_file_path(self, results_file_path)

Replace {record_identifier} in results_file_path if it exists.


  • results_file_path (str): YAML file to report into, if file isused as the object back-end
def result_schemas(self)

Result schema mappings


  • dict: schemas that formalize the structure of each resultin a canonical jsonschema way
def retrieve_history(self, record_identifier: str=None, result_identifier: Union[str, List[str], NoneType]=None) -> Union[Any, Dict[str, Any]]

Retrieve a single record's history


  • record_identifier (str): single record_identifier
  • result_identifier (str): single result_identifier or list of result identifiers


  • ``: a mapping with filtered historical results
def retrieve_many(self, record_identifiers: List[str], result_identifier: Optional[str]=None) -> Union[Any, Dict[str, Any]]


  • record_identifiers (``): list of record identifiers
  • result_identifier (str): single record_identifier


  • ``: a mapping with filteredresults reported for the record
def retrieve_one(self, *args, **kwargs)
def schema(self)

Schema mapping


  • ParsedSchema: schema object that formalizes the results structure
def schema_path(self)

Schema path


  • str: path to the provided schema
def select_distinct(self, *args, **kwargs)
def select_records(self, *args, **kwargs)
def set_status(self, *args, **kwargs)
def status_schema(self)

Status schema mapping


  • dict: schema that formalizes the pipeline status structure
def status_schema_source(self)

Status schema source


  • dict: source of the schema that formalizesthe pipeline status structure
def summarize(self, *args, **kwargs)
def table(self, *args, **kwargs)

Version Information: pipestat v0.9.2, generated by lucidoc v0.4.4