Skip to content

Package ubiquerg Documentation

Package Overview

Ubiquerg is a utility package with a collection of helpful universally useful functions. The name means work (erg) everywhere (ubique), indicating the intention for these to be low-level functions that can be used in lots of different places.

Installation

pip install ubiquerg

API Reference

CLI Tools

cli_tools

Functions for working with command-line interaction

VersionInHelpParser

VersionInHelpParser(version=None, **kwargs)

Bases: ArgumentParser

Overwrites the inherited init. Saves the version as an object attribute for further use.

Source code in ubiquerg/cli_tools.py
20
21
22
23
24
25
26
27
28
29
def __init__(self, version: str | None = None, **kwargs: Any) -> None:
    """Overwrites the inherited init. Saves the version as an object attribute for further use."""
    super(VersionInHelpParser, self).__init__(**kwargs)
    self.version = version
    if self.version is not None:
        self.add_argument(
            "--version",
            action="version",
            version="%(prog)s {}".format(self.version),
        )

arg_defaults

arg_defaults(subcommand=None, unique=False, top_level=False)

Get argument defaults by subcommand from a parser.

Parameters:

Name Type Description Default
subcommand str | None

subcommand to get defaults for

None
unique bool

whether only unique flat dict of dests and defaults mapping should be returned

False

Returns:

Name Type Description
dict dict[str, Any] | dict[str, dict[str, Any]]

defaults by subcommand

Source code in ubiquerg/cli_tools.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def arg_defaults(
    self,
    subcommand: str | None = None,
    unique: bool = False,
    top_level: bool = False,
) -> dict[str, Any] | dict[str, dict[str, Any]]:
    """Get argument defaults by subcommand from a parser.

    Args:
        subcommand: subcommand to get defaults for
        unique: whether only unique flat dict of dests and defaults mapping should be returned

    Returns:
        dict: defaults by subcommand
    """
    if top_level:
        top_level_actions = self.top_level_args()
        defaults_dict = {}
        for tla in top_level_actions:
            if hasattr(tla, "default") and hasattr(tla, "dest"):
                defaults_dict.update({tla.dest: tla.default})
        return defaults_dict

    if subcommand is not None and subcommand not in self.subcommands():
        raise ValueError(
            "'{}' not found in this parser commands: {}".format(
                subcommand, str(self.subcommands())
            )
        )
    subs = (
        self.subparsers().choices
        if subcommand is None
        else {subcommand: self.subparsers().choices[subcommand]}
    )
    defaults = {}
    for subcmd, sub in subs.items():
        defaults_dict = {}
        for action in sub._actions:
            if isinstance(action, _HelpAction):
                continue
            if hasattr(action, "default") and hasattr(action, "dest"):
                defaults_dict.update({action.dest: action.default})
        defaults[subcmd] = defaults_dict
    if unique:
        unique_defaults = {}
        for k, v in defaults.items():
            unique_defaults = merge_dicts(unique_defaults, v)
        return unique_defaults
    return defaults

dests_by_subparser

dests_by_subparser(subcommand=None, top_level=False)

Get argument dests by subcommand from a parser.

Parameters:

Name Type Description Default
subcommand str | None

subcommand to get dests for

None

Returns:

Name Type Description
dict list[str] | dict[str, list[str]]

dests by subcommand

Source code in ubiquerg/cli_tools.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def dests_by_subparser(
    self, subcommand: str | None = None, top_level: bool = False
) -> list[str] | dict[str, list[str]]:
    """Get argument dests by subcommand from a parser.

    Args:
        subcommand: subcommand to get dests for

    Returns:
        dict: dests by subcommand
    """
    if top_level:
        top_level_actions = self.top_level_args()
        dest_list = []
        for tla in top_level_actions:
            if hasattr(tla, "dest"):
                dest_list.append(tla.dest)
        return dest_list

    if subcommand is not None and subcommand not in self.subcommands():
        raise ValueError(
            "'{}' not found in this parser commands: {}".format(
                subcommand, str(self.subcommands())
            )
        )
    subs = (
        self.subparsers().choices
        if subcommand is None
        else {subcommand: self.subparsers().choices[subcommand]}
    )
    dests = {}
    for subcmd, sub in subs.items():
        dest_list: list[str] = []
        for action in sub._actions:
            if isinstance(action, _HelpAction):
                continue
            if hasattr(action, "dest"):
                dest_list.append(action.dest)
        dests[subcmd] = dest_list
    return dests

format_help

format_help()

Add version information to help text.

Source code in ubiquerg/cli_tools.py
31
32
33
34
def format_help(self) -> str:
    """Add version information to help text."""
    help_string = "version: {}\n".format(str(self.version)) if self.version is not None else ""
    return help_string + super(VersionInHelpParser, self).format_help()

subcommands

subcommands()

Get subcommands defined by a parser.

Returns:

Type Description
list[str]

list[str]: subcommands defined within this parser

Source code in ubiquerg/cli_tools.py
58
59
60
61
62
63
64
def subcommands(self) -> list[str]:
    """Get subcommands defined by a parser.

    Returns:
        list[str]: subcommands defined within this parser
    """
    return list(self.subparsers().choices.keys())

subparsers

subparsers()

Get the subparser associated with a parser.

Returns:

Type Description
_SubParsersAction

argparse._SubparsersAction: action defining the subparsers

Source code in ubiquerg/cli_tools.py
36
37
38
39
40
41
42
43
44
45
def subparsers(self) -> _SubParsersAction:
    """Get the subparser associated with a parser.

    Returns:
        argparse._SubparsersAction: action defining the subparsers
    """
    subs = [a for a in self._actions if isinstance(a, _SubParsersAction)]
    if len(subs) != 1:
        raise ValueError("Expected exactly 1 subparser, got {}".format(len(subs)))
    return subs[0]

suppress_defaults

suppress_defaults()

Remove parser change defaults to argparse.SUPPRESS.

This prevents them from showing up in the argparse.Namespace object after argument parsing.

Source code in ubiquerg/cli_tools.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def suppress_defaults(self) -> None:
    """Remove parser change defaults to argparse.SUPPRESS.

    This prevents them from showing up in the argparse.Namespace object after argument parsing.
    """
    top_level_actions = self.top_level_args()
    for tla in top_level_actions:
        if hasattr(tla, "dest"):
            tla.dest = SUPPRESS
    subs = self.subparsers().choices
    for subcmd, sub in subs.items():
        for sa in sub._actions:
            if hasattr(sa, "dest"):
                sa.default = SUPPRESS

top_level_args

top_level_args()

Get actions not associated with any subparser.

Help and version are also excluded.

Returns:

Type Description
list[Any]

list[argparse.]: list of argument actions

Source code in ubiquerg/cli_tools.py
47
48
49
50
51
52
53
54
55
56
def top_level_args(self) -> list[Any]:
    """Get actions not associated with any subparser.

    Help and version are also excluded.

    Returns:
        list[argparse.<action_type>]: list of argument actions
    """
    excl = [_SubParsersAction, _HelpAction, _VersionAction]
    return [a for a in self._actions if not isinstance(a, tuple(excl))]

convert_value

convert_value(val)

Convert string to the most appropriate type.

Converts to one of: bool, str, int, None or float

Parameters:

Name Type Description Default
val Any

the string to convert

required

Returns:

Type Description
bool | str | int | float | None

bool | str | int | float | None: converted string to the most appropriate type

Source code in ubiquerg/cli_tools.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def convert_value(val: Any) -> bool | str | int | float | None:
    """Convert string to the most appropriate type.

    Converts to one of: bool, str, int, None or float

    Args:
        val: the string to convert

    Returns:
        bool | str | int | float | None: converted string to the most appropriate type
    """
    if val is None:
        return None
    if isinstance(val, (bool, int, float)):
        return val
    if not isinstance(val, str):
        try:
            val = str(val)
        except Exception:
            raise ValueError(
                "The input has to be of type convertible to 'str', got '{}'".format(type(val))
            )

    # val is definitely a string at this point
    if val == "None":
        return None
    if val.lower() == "true":
        return True
    if val.lower() == "false":
        return False

    try:
        return int(val)
    except ValueError:
        try:
            return float(val)
        except ValueError:
            return val

query_yes_no

query_yes_no(question, default='no')

Ask a yes/no question via input() and return their answer.

Parameters:

Name Type Description Default
question str

a string that is presented to the user.

required
default str

the presumed answer if the user just hits .

'no'

Returns:

Name Type Description
bool bool

True for "yes" or False for "no"

Source code in ubiquerg/cli_tools.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def query_yes_no(question: str, default: str = "no") -> bool:
    """Ask a yes/no question via input() and return their answer.

    Args:
        question: a string that is presented to the user.
        default: the presumed answer if the user just hits <Enter>.

    Returns:
        bool: True for "yes" or False for "no"
    """

    def parse(ans):
        return {"yes": True, "y": True, "ye": True, "no": False, "n": False}[ans.lower()]

    try:
        prompt = {None: "[y/n]", "yes": "[Y/n]", "no": "[y/N]"}[
            None if default is None else default.lower()
        ]
    except (AttributeError, KeyError):
        raise ValueError("invalid default answer: {}".format(default))
    msg = "{q} {p} ".format(q=question, p=prompt)
    while True:
        sys.stdout.write(msg)
        try:
            return parse(input() or default)
        except (KeyError, AttributeError):
            sys.stdout.write("Please respond with 'yes' or 'no' (or 'y' or 'n').\n")

Collection Utilities

collection

Tools for working with collections

deep_update

deep_update(old, new)

Recursively update nested dict, modifying in place.

Parameters:

Name Type Description Default
old dict[Any, Any]

dict to update

required
new Mapping[Any, Any]

dict with new values

required
Source code in ubiquerg/collection.py
37
38
39
40
41
42
43
44
45
46
47
48
def deep_update(old: dict[Any, Any], new: Mapping[Any, Any]) -> None:
    """Recursively update nested dict, modifying in place.

    Args:
        old: dict to update
        new: dict with new values
    """
    for key, value in new.items():
        if isinstance(value, Mapping) and value and isinstance(old.get(key), Mapping):
            deep_update(old[key], value)
        else:
            old[key] = new[key]

is_collection_like

is_collection_like(c)

Determine whether an object is collection-like.

Parameters:

Name Type Description Default
c Any

Object to test as collection

required

Returns:

Name Type Description
bool bool

Whether the argument is a (non-string) collection

Source code in ubiquerg/collection.py
51
52
53
54
55
56
57
58
59
60
def is_collection_like(c: Any) -> bool:
    """Determine whether an object is collection-like.

    Args:
        c: Object to test as collection

    Returns:
        bool: Whether the argument is a (non-string) collection
    """
    return isinstance(c, Iterable) and not isinstance(c, str)

merge_dicts

merge_dicts(x, y)

Merge dictionaries.

Parameters:

Name Type Description Default
x dict[Any, Any]

dict to merge

required
y dict[Any, Any]

dict to merge

required

Returns:

Name Type Description
Mapping dict[Any, Any]

merged dict

Source code in ubiquerg/collection.py
22
23
24
25
26
27
28
29
30
31
32
33
34
def merge_dicts(x: dict[Any, Any], y: dict[Any, Any]) -> dict[Any, Any]:
    """Merge dictionaries.

    Args:
        x: dict to merge
        y: dict to merge

    Returns:
        Mapping: merged dict
    """
    z = x.copy()
    z.update(y)
    return z

powerset

powerset(items, min_items=None, include_full_pop=True, nonempty=False)

Build the powerset of a collection of items.

Parameters:

Name Type Description Default
items Iterable[T]

"Pool" of all items, the population for which to build the power set

required
min_items int | None

Minimum number of individuals from the population to allow in any given subset

None
include_full_pop bool

Whether to include the full population in the powerset (default True to accord with genuine definition)

True
nonempty bool

force each subset returned to be nonempty

False

Returns:

Type Description
list[tuple[T, ...]]

list[object]: Sequence of subsets of the population, in nondecreasing size order

Raises:

Type Description
TypeError

if minimum item count is specified but is not an integer

ValueError

if minimum item count is insufficient to guarantee nonempty subsets

Source code in ubiquerg/collection.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def powerset(
    items: Iterable[T],
    min_items: int | None = None,
    include_full_pop: bool = True,
    nonempty: bool = False,
) -> list[tuple[T, ...]]:
    """Build the powerset of a collection of items.

    Args:
        items: "Pool" of all items, the population for which to build the power set
        min_items: Minimum number of individuals from the population to allow in any given subset
        include_full_pop: Whether to include the full population in the powerset (default True to accord with genuine definition)
        nonempty: force each subset returned to be nonempty

    Returns:
        list[object]: Sequence of subsets of the population, in nondecreasing size order

    Raises:
        TypeError: if minimum item count is specified but is not an integer
        ValueError: if minimum item count is insufficient to guarantee nonempty subsets
    """
    if min_items is None:
        min_items = 1 if nonempty else 0
    else:
        if not isinstance(min_items, int):
            raise TypeError(
                "Min items count for each subset isn't an integer: {} ({})".format(
                    min_items, type(min_items)
                )
            )
        if nonempty and min_items < 1:
            raise ValueError(
                "When minimum item count is {}, nonempty subsets cannot be guaranteed.".format(
                    min_items
                )
            )
    # Account for iterable burn possibility; besides, collection should be
    # relatively small if building the powerset.
    items = list(items)
    n = len(items)
    if n == 0 or n < min_items:
        return []
    max_items = len(items) + 1 if include_full_pop else len(items)
    return list(
        itertools.chain.from_iterable(
            itertools.combinations(items, k) for k in range(min_items, max_items)
        )
    )

uniqify

uniqify(seq)

Return only unique items in a sequence, preserving order.

Parameters:

Name Type Description Default
seq list[T]

List of items to uniqify

required

Returns:

Type Description
list[T]

list[object]: Original list with duplicates removed

Source code in ubiquerg/collection.py
63
64
65
66
67
68
69
70
71
72
73
74
75
def uniqify(seq: list[T]) -> list[T]:  # Dave Kirby
    """Return only unique items in a sequence, preserving order.

    Args:
        seq: List of items to uniqify

    Returns:
        list[object]: Original list with duplicates removed
    """
    # Order preserving
    seen = set()
    # Use list comprehension for speed
    return [x for x in seq if x not in seen and not seen.add(x)]  # type: ignore[func-returns-value]

Environment Utilities

environment

Environment-related utilities

TmpEnv

TmpEnv(overwrite=False, **kwargs)

Bases: object

Temporary environment variable setting.

Source code in ubiquerg/environment.py
15
16
17
18
19
20
21
22
23
24
25
26
27
def __init__(self, overwrite: bool = False, **kwargs: str) -> None:
    if not overwrite:
        already_set = [k for k, v in kwargs.items() if os.getenv(k, v) != v]
        if already_set:
            msg = "{} variable(s) already set: {}".format(
                len(already_set), ", ".join(already_set)
            )
            raise ValueError(msg)
    self._kvs = kwargs
    self._originals = {}
    for k in kwargs:
        if k in os.environ:
            self._originals[k] = os.environ[k]

File Operations

files

Functions facilitating file operations

checksum

checksum(path, blocksize=int(2000000000.0))

Generate a md5 checksum for the file contents in the provided path.

Parameters:

Name Type Description Default
path str

path to file for which to generate checksum

required
blocksize int

number of bytes to read per iteration, default: 2GB

int(2000000000.0)

Returns:

Name Type Description
str str

checksum hash

Source code in ubiquerg/files.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def checksum(path: str, blocksize: int = int(2e9)) -> str:
    """Generate a md5 checksum for the file contents in the provided path.

    Args:
        path: path to file for which to generate checksum
        blocksize: number of bytes to read per iteration, default: 2GB

    Returns:
        str: checksum hash
    """
    m = md5(usedforsecurity=False)
    with open(path, "rb") as f:
        while True:
            buf = f.read(blocksize)
            if not buf:
                break
            m.update(buf)
    return m.hexdigest()

create_file_racefree

create_file_racefree(file)

Create a file, but fail if the file already exists.

This function will thus only succeed if this process actually creates the file; if the file already exists, it will cause an OSError, solving race conditions.

Parameters:

Name Type Description Default
file str

File to create

required

Raises:

Type Description
OSError

if the file to be created already exists

Source code in ubiquerg/files.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def create_file_racefree(file: str) -> str:
    """Create a file, but fail if the file already exists.

    This function will thus only succeed if this process actually creates
    the file; if the file already exists, it will cause an
    OSError, solving race conditions.

    Args:
        file: File to create

    Raises:
        OSError: if the file to be created already exists
    """
    write_lock_flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY
    fd = os.open(file, write_lock_flags)
    os.close(fd)
    return file

create_lock

create_lock(filepath, wait_max=10)

Securely create a lock file.

Parameters:

Name Type Description Default
filepath str

path to a file to lock

required
wait_max int

max wait time if the file in question is already locked

10
Source code in ubiquerg/files.py
285
286
287
288
289
290
291
292
293
294
295
def create_lock(filepath: str, wait_max: int = 10) -> None:
    """Securely create a lock file.

    Args:
        filepath: path to a file to lock
        wait_max: max wait time if the file in question is already locked
    """
    lock_path = make_lock_path(filepath)
    # wait until no lock is present
    wait_for_lock(lock_path, wait_max)
    _create_lock(lock_path, filepath, wait_max)

filesize_to_str

filesize_to_str(size)

Convert the numeric bytes to the size string.

Parameters:

Name Type Description Default
size int | float

file size to convert

required

Returns:

Name Type Description
str str | int | float

file size string

Source code in ubiquerg/files.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def filesize_to_str(size: int | float) -> str | int | float:
    """Convert the numeric bytes to the size string.

    Args:
        size: file size to convert

    Returns:
        str: file size string
    """
    if isinstance(size, (int, float)):
        for unit in FILE_SIZE_UNITS:
            if size < 1024:
                return "{}{}".format(round(size, 1), unit)
            size /= 1024
    warn("size argument was neither an int nor a float, returning the original object")
    return size

make_lock_path

make_lock_path(lock_name_base)

Create a collection of path to locks file with given name as bases.

Parameters:

Name Type Description Default
lock_name_base str | list[str]

Lock file names

required

Returns:

Type Description
str | list[str]

str | list[str]: Path to the lock files

Source code in ubiquerg/files.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def make_lock_path(lock_name_base: str | list[str]) -> str | list[str]:
    """Create a collection of path to locks file with given name as bases.

    Args:
        lock_name_base: Lock file names

    Returns:
        str | list[str]: Path to the lock files
    """

    def _mk_lock(lnb):
        base, name = os.path.split(lnb)
        lock_name = name if name.startswith(LOCK_PREFIX) else LOCK_PREFIX + name
        return lock_name if not base else os.path.join(base, lock_name)

    return (
        [_mk_lock(x) for x in lock_name_base]
        if isinstance(lock_name_base, list)
        else _mk_lock(lock_name_base)
    )

remove_lock

remove_lock(filepath)

Remove lock.

Parameters:

Name Type Description Default
filepath str

path to the file to remove the lock for. Not the path to the lock!

required

Returns:

Name Type Description
bool bool

whether the lock was found and removed

Source code in ubiquerg/files.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def remove_lock(filepath: str) -> bool:
    """Remove lock.

    Args:
        filepath: path to the file to remove the lock for. Not the path to the lock!

    Returns:
        bool: whether the lock was found and removed
    """
    lock = make_lock_path(filepath)
    try:
        os.remove(lock)
        return True
    except FileNotFoundError:
        return False

size

size(path, size_str=True)

Get the size of a file or directory or list of them in the provided path.

Parameters:

Name Type Description Default
path str | list[str]

path or list of paths to the file or directories to check size of

required
size_str bool

whether the size should be converted to a human-readable string, e.g. convert B to MB

True

Returns:

Type Description
int | str | None

int | str: file size or file size string

Source code in ubiquerg/files.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def size(path: str | list[str], size_str: bool = True) -> int | str | None:
    """Get the size of a file or directory or list of them in the provided path.

    Args:
        path: path or list of paths to the file or directories to check size of
        size_str: whether the size should be converted to a human-readable string, e.g. convert B to MB

    Returns:
        int | str: file size or file size string
    """

    if isinstance(path, list):
        s_list = sum(filter(None, [size(x, size_str=False) for x in path]))
        return filesize_to_str(s_list) if size_str else s_list

    if os.path.isfile(path):
        s = os.path.getsize(path)
    elif os.path.isdir(path):
        s = 0
        symlinks = []
        for dirpath, dirnames, filenames in os.walk(path):
            for f in filenames:
                fp = os.path.join(dirpath, f)
                if not os.path.islink(fp):
                    s += os.path.getsize(fp)
                else:
                    s += os.lstat(fp).st_size
                    symlinks.append(fp)
        if len(symlinks) > 0:
            _LOGGER.info("{} symlinks were found: {}".format(len(symlinks), "\n".join(symlinks)))
    else:
        warn("size could not be determined for: {}".format(path))
        s = None
    return filesize_to_str(s) if size_str and s is not None else s

untar

untar(src, dst, **kwargs)

Unpack a path to a target folder.

All the required directories will be created. Additional keyword arguments are passed through to tarfile.extractall().

Tarfile filter background (PEP 706):

Python 3.12 added a filter parameter to extractall() with three options: "fully_trusted" (no restrictions), "tar" (some restrictions), and "data" (strict: rejects absolute paths, symlinks to absolute targets, etc.). In 3.12-3.13, the default is "fully_trusted" but a DeprecationWarning is emitted if no filter is specified. In 3.14, the default changed to "data".

This matters for refgenie because refgenie server archives contain absolute symlinks (child assets like bwa_index symlink to parent assets like fasta using the build server's absolute path). These symlinks are always broken on the client anyway (the client rewrites them), but the "data" filter crashes with AbsoluteLinkError before extraction even finishes.

Callers extracting refgenie archives should pass filter="fully_trusted" to allow these absolute symlinks through. Once refgenie's archive creation stops including absolute symlinks, callers should switch to filter="data" for security hardening.

Parameters:

Name Type Description Default
src str

path to unpack

required
dst str

path to output folder

required
**kwargs

passed to tarfile.extractall (e.g. filter="fully_trusted")

{}
Source code in ubiquerg/files.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def untar(src: str, dst: str, **kwargs) -> None:
    """Unpack a path to a target folder.

    All the required directories will be created. Additional keyword
    arguments are passed through to tarfile.extractall().

    Tarfile filter background (PEP 706):

    Python 3.12 added a `filter` parameter to extractall() with three
    options: "fully_trusted" (no restrictions), "tar" (some restrictions),
    and "data" (strict: rejects absolute paths, symlinks to absolute
    targets, etc.). In 3.12-3.13, the default is "fully_trusted" but
    a DeprecationWarning is emitted if no filter is specified. In 3.14,
    the default changed to "data".

    This matters for refgenie because refgenie server archives contain
    absolute symlinks (child assets like bwa_index symlink to parent
    assets like fasta using the build server's absolute path). These
    symlinks are always broken on the client anyway (the client rewrites
    them), but the "data" filter crashes with AbsoluteLinkError before
    extraction even finishes.

    Callers extracting refgenie archives should pass
    filter="fully_trusted" to allow these absolute symlinks through.
    Once refgenie's archive creation stops including absolute symlinks,
    callers should switch to filter="data" for security hardening.

    Args:
        src: path to unpack
        dst: path to output folder
        **kwargs: passed to tarfile.extractall (e.g. filter="fully_trusted")
    """
    with topen(src) as tf:
        tf.extractall(path=dst, **kwargs)

wait_for_lock

wait_for_lock(lock_file, wait_max=30)

Just sleep until the lock_file does not exist.

Parameters:

Name Type Description Default
lock_file str

Lock file to wait upon

required
wait_max int

max wait time if the file in question is already locked

30
Source code in ubiquerg/files.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def wait_for_lock(lock_file: str, wait_max: int = 30) -> None:
    """Just sleep until the lock_file does not exist.

    Args:
        lock_file: Lock file to wait upon
        wait_max: max wait time if the file in question is already locked
    """
    sleeptime = 0.001
    first_message_flag = False
    dot_count = 0
    totaltime = 0
    ori_timestamp = None
    if os.path.isfile(lock_file):
        ori_timestamp = _get_file_mod_time(lock_file)
    while os.path.isfile(lock_file):
        if first_message_flag is False:
            _LOGGER.info(f"Waiting for file lock: {os.path.basename(lock_file)}")
            # sys.stdout.write("Waiting for file lock: {} ".format(os.path.basename(lock_file)))
            first_message_flag = True
        else:
            sys.stderr.write(".")
            dot_count += 1
            if dot_count % 60 == 0:
                sys.stderr.write("")
        sys.stderr.flush()
        time.sleep(sleeptime)
        totaltime += sleeptime
        sleeptime = min((sleeptime + 0.1) * 1.25, 10)
        if totaltime >= wait_max:
            if os.path.isfile(lock_file):
                timestamp = _get_file_mod_time(lock_file)
                if ori_timestamp and timestamp > ori_timestamp:
                    ori_timestamp = timestamp
                    totaltime = 0
                    sleeptime = 0.001
                    continue
                raise RuntimeError(
                    "The maximum wait time ({}) has been reached and the lock "
                    "file still exists.".format(wait_max)
                )
    if first_message_flag:
        _LOGGER.info(f" File unlocked: {os.path.basename(lock_file)}")

File Locking

file_locking

OneLocker

OneLocker(filepath, wait_max=10, strict_ro_locks=False)

A simple mutual-exclusion file locker.

Uses a single lock file for exclusive access. Unlike ThreeLocker, this does not distinguish between read and write locks — any lock is exclusive. Simpler and sufficient when concurrent readers are not needed.

Source code in ubiquerg/file_locking.py
390
391
392
393
394
def __init__(self, filepath: str, wait_max: int = 10, strict_ro_locks: bool = False):
    self.wait_max = wait_max
    self.strict_ro_locks = strict_ro_locks
    self.set_file_path(filepath)
    self.locked = {READ: False, WRITE: False}

ThreeLocker

ThreeLocker(filepath, wait_max=10, strict_ro_locks=False)

Bases: object

A class to lock files for reading and writing.

It uses a three-lock system, with separate read-lock, write-lock, and universal-lock (or lock-lock). The universal lock is used to lock the locks, to prevent race conditions between read and write locks. It allows multiple simultaneous readers, as long as there is no writer. It creates lock files in the same directory as the file to be locked.

Warning

These locks are NOT re-entrant. If a process already holds a lock on a file and tries to acquire the same lock again, it will deadlock (wait forever for itself to release the lock). Do not nest lock contexts on the same file.

Source code in ubiquerg/file_locking.py
45
46
47
48
49
def __init__(self, filepath: str, wait_max: int = 10, strict_ro_locks: bool = False):
    self.wait_max = wait_max
    self.strict_ro_locks = strict_ro_locks
    self.set_file_path(filepath)
    self.locked = {READ: False, WRITE: False}

create_read_lock

create_read_lock(filepath=None, wait_max=None)

Securely create a read lock file.

Parameters:

Name Type Description Default
filepath str

path to a file to lock

None
wait_max int

max wait time if the file in question is already locked

None
Source code in ubiquerg/file_locking.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def create_read_lock(self, filepath: str = None, wait_max: int = None) -> None:
    """Securely create a read lock file.

    Args:
        filepath: path to a file to lock
        wait_max: max wait time if the file in question is already locked
    """
    filepath = self.filepath if filepath is None else filepath
    wait_max = self.wait_max if wait_max is None else wait_max
    wait_for_lock(self.lock_paths[UNIVERSAL], wait_max)
    _create_lock(self.lock_paths[UNIVERSAL], filepath, wait_max)
    wait_for_lock(self.lock_paths[WRITE], wait_max)
    _create_lock(self.lock_paths[READ], filepath, wait_max)
    _remove_lock(self.lock_paths[UNIVERSAL])

create_write_lock

create_write_lock(filepath=None, wait_max=None)

Securely create a write lock file.

Parameters:

Name Type Description Default
filepath str

path to a file to lock

None
wait_max int

max wait time if the file in question is already locked

None
Source code in ubiquerg/file_locking.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def create_write_lock(self, filepath: str = None, wait_max: int = None) -> None:
    """Securely create a write lock file.

    Args:
        filepath: path to a file to lock
        wait_max: max wait time if the file in question is already locked
    """
    filepath = self.filepath if filepath is None else filepath
    wait_max = self.wait_max if wait_max is None else wait_max
    wait_for_lock(self.lock_paths[UNIVERSAL], wait_max)
    _create_lock(self.lock_paths[UNIVERSAL], filepath, wait_max)
    read_lock_paths = glob.glob(
        self.lock_paths[READ_GLOB]
    )  # must occur after universal lock is set
    all_lock_paths = read_lock_paths + [self.lock_paths[WRITE]]
    wait_for_locks(all_lock_paths, wait_max)
    _create_lock(self.lock_paths[READ], filepath, wait_max)
    _create_lock(self.lock_paths[WRITE], filepath, wait_max)
    _remove_lock(self.lock_paths[UNIVERSAL])

ensure_locked

ensure_locked(lock_type=WRITE)

Decorator to apply to functions to make sure they only happen when locked.

Source code in ubiquerg/file_locking.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def ensure_locked(lock_type: str = WRITE):  # decorator factory
    """Decorator to apply to functions to make sure they only happen when locked."""

    def decorator(func):
        @functools.wraps(func)
        def inner_func(self, *args, **kwargs):
            if not self.locker:
                raise OSError("File not lockable. File locker not provided.")
            if not self.locker.locked[lock_type]:
                raise OSError(
                    f"This function must use a context manager to {lock_type}-lock the file"
                )

            return func(self, *args, **kwargs)

        return inner_func

    return decorator

locked_read_file

locked_read_file(filepath, create_file=False)

Read a file contents into memory after locking the file.

This will prevent other ThreeLocker-protected processes from writing to the file while it is being read.

Parameters:

Name Type Description Default
filepath

path to the file that should be read

required
create_file bool

whether to create the file if it doesn't exist

False

Returns:

Name Type Description
str str

file contents

Source code in ubiquerg/file_locking.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def locked_read_file(filepath, create_file: bool = False) -> str:
    """Read a file contents into memory after locking the file.

    This will prevent other ThreeLocker-protected processes from writing to the
    file while it is being read.

    Args:
        filepath: path to the file that should be read
        create_file: whether to create the file if it doesn't exist

    Returns:
        str: file contents
    """
    if os.path.exists(filepath):
        with read_lock(filepath), open(filepath, "r") as file:
            file_contents = file.read()
    elif create_file:
        _LOGGER.info("File does not exist, but create_file is true. Creating...")
        file_contents = ""
        create_file_racefree(filepath)
    else:
        raise FileNotFoundError(f"No such file: {filepath}")
    return file_contents

make_all_lock_paths

make_all_lock_paths(filepath)

Create a collection of paths to lock files with given name as base.

Source code in ubiquerg/file_locking.py
368
369
370
371
372
373
374
375
376
377
378
def make_all_lock_paths(filepath: str) -> dict[str, str]:
    """
    Create a collection of paths to lock files with given name as base.
    """
    lock_paths = {}
    for type in [READ, WRITE, UNIVERSAL, READ_GLOB]:
        prefix = f"{LOCK_PREFIX}-{type}-" if type else LOCK_PREFIX
        base, name = os.path.split(filepath)
        lock_name = name if name.startswith(prefix) else prefix + name
        lock_paths[type] = lock_name if not base else os.path.join(base, lock_name)
    return lock_paths

read_lock

read_lock(obj)

Read-lock a filepath or object with locker attribute.

Parameters:

Name Type Description Default
obj object

filepath string or object with locker attribute

required

Yields:

Name Type Description
object object

the locked object

Warning

Locks are NOT re-entrant. Do not nest lock contexts on the same file, or the process will deadlock waiting for itself::

# WRONG - will deadlock:
with read_lock(cfg):
    with read_lock(cfg):  # Deadlock!
        ...

# RIGHT - lock once at the top level:
with read_lock(cfg):
    do_work(cfg)  # Pass already-locked object
Source code in ubiquerg/file_locking.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
@contextmanager
def read_lock(obj: object) -> object:
    """Read-lock a filepath or object with locker attribute.

    Args:
        obj: filepath string or object with locker attribute

    Yields:
        object: the locked object

    Warning:
        Locks are NOT re-entrant. Do not nest lock contexts on the same file,
        or the process will deadlock waiting for itself::

            # WRONG - will deadlock:
            with read_lock(cfg):
                with read_lock(cfg):  # Deadlock!
                    ...

            # RIGHT - lock once at the top level:
            with read_lock(cfg):
                do_work(cfg)  # Pass already-locked object
    """
    if isinstance(obj, str):
        locker = ThreeLocker(obj)
    elif hasattr(obj, "locker"):
        locker = obj.locker
    else:
        raise AttributeError(f"Cannot lock: {obj}.")

    # handle a premature Ctrl+C exit from this context manager
    old_sigterm = None
    old_sigint = None
    try:
        old_sigterm = getsignal(SIGTERM)
        old_sigint = getsignal(SIGINT)
        signal(SIGTERM, locker._interrupt_handler)
        signal(SIGINT, locker._interrupt_handler)
        # If this is run in a thread, the signal module is not available and raises an exception.
        # ValueError: signal only works in main thread of the main interpreter
        # That's fine; in this case, we don't need to handle signals anyway.
    except ValueError as e:
        _LOGGER.error(f"Failed to set interrupt handler: {e}")

    locker.read_lock()

    try:
        yield obj
    finally:
        locker.read_unlock()
        if old_sigterm is not None:
            try:
                signal(SIGTERM, old_sigterm)
                signal(SIGINT, old_sigint)
            except ValueError:
                pass

wait_for_locks

wait_for_locks(lock_paths, wait_max=10)

Wait for lock files to be removed.

Parameters:

Name Type Description Default
lock_paths list | str

path to a file to lock

required
wait_max int

max wait time if the file in question is already locked

10
Source code in ubiquerg/file_locking.py
327
328
329
330
331
332
333
334
335
336
337
def wait_for_locks(lock_paths: list | str, wait_max: int = 10):
    """Wait for lock files to be removed.

    Args:
        lock_paths: path to a file to lock
        wait_max: max wait time if the file in question is already locked
    """
    if not isinstance(lock_paths, list):
        lock_paths = [lock_paths]
    for lock_path in lock_paths:
        wait_for_lock(lock_path, wait_max)

write_lock

write_lock(obj)

Write-lock file path or object with locker attribute.

Parameters:

Name Type Description Default
obj object

filepath string or object with locker attribute

required

Yields:

Name Type Description
object object

the locked object

Warning

Locks are NOT re-entrant. Do not nest lock contexts on the same file, or the process will deadlock waiting for itself::

# WRONG - will deadlock:
with write_lock(cfg):
    with write_lock(cfg):  # Deadlock!
        cfg.write()

# RIGHT - lock once at the top level:
with write_lock(cfg):
    do_work_and_write(cfg)  # Don't re-lock inside
Source code in ubiquerg/file_locking.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
@contextmanager
def write_lock(obj: object) -> object:
    """Write-lock file path or object with locker attribute.

    Args:
        obj: filepath string or object with locker attribute

    Yields:
        object: the locked object

    Warning:
        Locks are NOT re-entrant. Do not nest lock contexts on the same file,
        or the process will deadlock waiting for itself::

            # WRONG - will deadlock:
            with write_lock(cfg):
                with write_lock(cfg):  # Deadlock!
                    cfg.write()

            # RIGHT - lock once at the top level:
            with write_lock(cfg):
                do_work_and_write(cfg)  # Don't re-lock inside
    """
    if isinstance(obj, str):
        locker = ThreeLocker(obj)
    elif hasattr(obj, "locker"):
        locker = obj.locker
    else:
        raise AttributeError(f"Cannot lock: {obj}.")

    # handle a premature Ctrl+C exit from this context manager
    old_sigterm = None
    old_sigint = None
    try:
        old_sigterm = getsignal(SIGTERM)
        old_sigint = getsignal(SIGINT)
        signal(SIGTERM, locker._interrupt_handler)
        signal(SIGINT, locker._interrupt_handler)
    except ValueError as e:
        _LOGGER.error(f"Failed to set interrupt handler: {e}")

    locker.write_lock()
    try:
        yield obj
    finally:
        locker.write_unlock()
        if old_sigterm is not None:
            try:
                signal(SIGTERM, old_sigterm)
                signal(SIGINT, old_sigint)
            except ValueError:
                pass

Path Utilities

paths

Filesystem utility functions

expandpath

expandpath(path)

Expand a filesystem path that may or may not contain user/env vars.

Parameters:

Name Type Description Default
path str

path to expand

required

Returns:

Name Type Description
str str

expanded version of input path

Source code in ubiquerg/paths.py
13
14
15
16
17
18
19
20
21
22
def expandpath(path: str) -> str:
    """Expand a filesystem path that may or may not contain user/env vars.

    Args:
        path: path to expand

    Returns:
        str: expanded version of input path
    """
    return os.path.expandvars(os.path.expanduser(path))

mkabs

mkabs(path, reldir=None)

Make sure a path is absolute.

If not already absolute, it's made absolute relative to a given directory (or file). Also expands ~ and environment variables for kicks.

Parameters:

Name Type Description Default
path str | None

Path to make absolute

required
reldir str | None

Relative directory to make path absolute from if it's not already absolute

None

Returns:

Name Type Description
str str | None

Absolute path

Source code in ubiquerg/paths.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def mkabs(path: str | None, reldir: str | None = None) -> str | None:
    """Make sure a path is absolute.

    If not already absolute, it's made absolute relative to a given directory (or file).
    Also expands ~ and environment variables for kicks.

    Args:
        path: Path to make absolute
        reldir: Relative directory to make path absolute from if it's not already absolute

    Returns:
        str: Absolute path
    """

    def xpand(path):
        return os.path.expandvars(os.path.expanduser(path))

    if path is None:
        return path

    if is_url(path):
        return path

    if os.path.isabs(xpand(path)):
        return xpand(path)

    if not reldir:
        return os.path.abspath(xpand(path))

    if os.path.isdir(reldir):
        return os.path.join(xpand(reldir), xpand(path))
    else:
        return os.path.join(xpand(os.path.dirname(reldir)), xpand(path))

parse_registry_path

parse_registry_path(rpstring, defaults=None)

Parse a 'registry path' string into components.

A registry path is a string that is kind of like a URL, providing a unique identifier for a particular asset, like protocol::namespace/item.subitem:tag. You can use the defaults argument to change the names of the entries in the return dict, and to provide defaults in case of missing values.

Parameters:

Name Type Description Default
rpstring str

string to parse

required
defaults list[tuple[str, Any]] | None

A list of 5 tuples with name of the 5 entries, and a default value in case it is missing (can be 'None')

None

Returns:

Type Description
dict | None

dict | None: dict with one element for each parsed entry in the path

Source code in ubiquerg/paths.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def parse_registry_path(
    rpstring: str,
    defaults: list[tuple[str, Any]] | None = None,
) -> dict | None:
    """Parse a 'registry path' string into components.

    A registry path is a string that is kind of like a URL, providing a unique
    identifier for a particular asset, like
    protocol::namespace/item.subitem:tag. You can use the `defaults` argument to
    change the names of the entries in the return dict, and to provide defaults
    in case of missing values.

    Args:
        rpstring: string to parse
        defaults: A list of 5 tuples with name of the 5 entries, and a default value in case it is missing (can be 'None')

    Returns:
        dict | None: dict with one element for each parsed entry in the path
    """

    if defaults is None:
        defaults = [
            ("protocol", None),
            ("namespace", None),
            ("item", None),
            ("subitem", None),
            ("tag", None),
        ]

    # This commented regex is the same without protocol
    # ^(?:([0-9a-zA-Z_-]+)\/)?([0-9a-zA-Z_-]+)(?::([0-9a-zA-Z_.-]+))?$
    # regex = "^(?:([0-9a-zA-Z_-]+)(?:::|:\/\/))?(?:([0-9a-zA-Z_-]+)\/)?([0-9a-zA-Z_-]+)(?::([0-9a-zA-Z_.-]+))?$"
    regex = r"^(?:([0-9a-zA-Z._-]+)(?:::|:\/\/))?(?:([0-9a-zA-Z_-]+)\/)?([0-9a-zA-Z_-]+)(?:\.([0-9a-zA-Z_-]+))?(?::([0-9a-zA-Z_.,|+()-]+))?$"
    # This regex matches strings like:
    # protocol://namespace/item:tag
    # or: protocol::namespace/item:tag
    # The names 'protocol', 'namespace', 'item', and 'tag' are generic and
    # you can use this function for whatever you like in this format... The
    # regex can handle any of these missing and will parse correctly into the
    # same element
    # For instance, you can leave the tag or protocol or both off:
    # ucsc://hg38/bowtie2_index
    # hg38/bowtie2_index
    # With no delimiters, it will match the item name:
    # bowtie2_index

    res = re.match(regex, rpstring)
    if not res:
        return None
    # position 0: parent namespace
    # position 1: namespace
    # position 2: primary name
    # position 3: tag
    captures = res.groups()
    parsed_identifier = {
        defaults[0][0]: captures[0] or defaults[0][1],
        defaults[1][0]: captures[1] or defaults[1][1],
        defaults[2][0]: captures[2] or defaults[2][1],
        defaults[3][0]: captures[3] or defaults[3][1],
        defaults[4][0]: captures[4] or defaults[4][1],
    }
    return parsed_identifier

parse_registry_path_strict

parse_registry_path_strict(input_string, require_protocol=False, require_namespace=False, require_item=True, require_subitem=False, require_tag=False)

Parse and validate a registry path with required component checks.

This function parses a registry path and returns the parsed dictionary only if all required components are present. Returns None otherwise. Can be used as a boolean check (truthy/falsy) or to get the parsed components.

Parameters:

Name Type Description Default
input_string str

String to parse and validate as a registry path

required
require_protocol bool

If True, protocol component must be present

False
require_namespace bool

If True, namespace component must be present

False
require_item bool

If True, item component must be present (default: True)

True
require_subitem bool

If True, subitem component must be present

False
require_tag bool

If True, tag component must be present

False

Returns:

Type Description
dict[str, Any] | None

dict | None: Parsed registry path dict if valid and all required components present, else None

Example

result = parse_registry_path_strict("namespace/item:tag") result['namespace'] 'namespace' parse_registry_path_strict("item", require_namespace=True) None

Can be used as a boolean check

if parse_registry_path_strict("namespace/item", require_namespace=True): ... print("Valid!") Valid!

Get specific components

result = parse_registry_path_strict("protocol::namespace/item.subitem:tag", require_protocol=True) result['protocol'] 'protocol'

Source code in ubiquerg/paths.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def parse_registry_path_strict(
    input_string: str,
    require_protocol: bool = False,
    require_namespace: bool = False,
    require_item: bool = True,
    require_subitem: bool = False,
    require_tag: bool = False,
) -> dict[str, Any] | None:
    """Parse and validate a registry path with required component checks.

    This function parses a registry path and returns the parsed dictionary
    only if all required components are present. Returns None otherwise.
    Can be used as a boolean check (truthy/falsy) or to get the parsed components.

    Args:
        input_string: String to parse and validate as a registry path
        require_protocol: If True, protocol component must be present
        require_namespace: If True, namespace component must be present
        require_item: If True, item component must be present (default: True)
        require_subitem: If True, subitem component must be present
        require_tag: If True, tag component must be present

    Returns:
        dict | None: Parsed registry path dict if valid and all required components present, else None

    Example:
        >>> result = parse_registry_path_strict("namespace/item:tag")
        >>> result['namespace']
        'namespace'
        >>> parse_registry_path_strict("item", require_namespace=True)
        None
        >>> # Can be used as a boolean check
        >>> if parse_registry_path_strict("namespace/item", require_namespace=True):
        ...     print("Valid!")
        Valid!
        >>> # Get specific components
        >>> result = parse_registry_path_strict("protocol::namespace/item.subitem:tag", require_protocol=True)
        >>> result['protocol']
        'protocol'
    """
    parsed = parse_registry_path(input_string)

    if parsed is None:
        return None

    # Check required components
    requirements = {
        "protocol": require_protocol,
        "namespace": require_namespace,
        "item": require_item,
        "subitem": require_subitem,
        "tag": require_tag,
    }

    for component, required in requirements.items():
        if required and not parsed.get(component):
            return None

    return parsed

System Utilities

system

System utility functions

is_command_callable

is_command_callable(cmd)

Check if command can be called.

Parameters:

Name Type Description Default
cmd str

actual command to check for callability

required

Returns:

Name Type Description
bool bool

whether given command's call succeeded

Raises:

Type Description
TypeError

if the alleged command isn't a string

ValueError

if the alleged command is empty

Source code in ubiquerg/system.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def is_command_callable(cmd: str) -> bool:
    """Check if command can be called.

    Args:
        cmd: actual command to check for callability

    Returns:
        bool: whether given command's call succeeded

    Raises:
        TypeError: if the alleged command isn't a string
        ValueError: if the alleged command is empty
    """
    if not isinstance(cmd, str):
        raise TypeError("Alleged command isn't a string: {} ({})".format(cmd, type(cmd)))
    if not cmd:
        raise ValueError("Empty command to check for callability")
    if os.path.isdir(cmd) or (os.path.isfile(cmd) and not os.access(cmd, os.X_OK)):
        return False
    # if system is windows run this command:
    if os.name == "nt":
        try:
            subprocess.run(
                ["where", cmd],
                check=True,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
            )
            return True
        except subprocess.CalledProcessError:
            return False
    else:
        return shutil.which(cmd) is not None

is_writable

is_writable(folder, check_exist=False, create=False)

Make sure a folder is writable.

Given a folder, check that it exists and is writable. Errors if requested on a non-existent folder. Otherwise, make sure the first existing parent folder is writable such that this folder could be created.

Parameters:

Name Type Description Default
folder str | None

Folder to check for writeability

required
check_exist bool

Throw an error if it doesn't exist?

False
create bool

Create the folder if it doesn't exist?

False
Source code in ubiquerg/system.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def is_writable(folder: str | None, check_exist: bool = False, create: bool = False) -> bool:
    """Make sure a folder is writable.

    Given a folder, check that it exists and is writable. Errors if requested on
    a non-existent folder. Otherwise, make sure the first existing parent folder
    is writable such that this folder could be created.

    Args:
        folder: Folder to check for writeability
        check_exist: Throw an error if it doesn't exist?
        create: Create the folder if it doesn't exist?
    """
    folder = folder or "."

    if os.path.exists(folder):
        return os.access(folder, os.W_OK) and os.access(folder, os.X_OK)
    elif create:
        os.makedirs(folder, exist_ok=True)
        return True
    elif check_exist:
        raise OSError("Folder not found: {}".format(folder))
    else:
        parent = os.path.dirname(folder)
        if not parent or parent == folder:
            return False
        return is_writable(parent, check_exist)

Web Utilities

web

Web-related utilities

has_scheme

has_scheme(maybe_url)

Check whether a string starts with a URI scheme (e.g. s3://, gs://, file://).

Parameters:

Name Type Description Default
maybe_url str

string to check

required

Returns:

Name Type Description
bool bool

whether string starts with a URI scheme

Source code in ubiquerg/web.py
22
23
24
25
26
27
28
29
30
31
def has_scheme(maybe_url: str) -> bool:
    """Check whether a string starts with a URI scheme (e.g. s3://, gs://, file://).

    Args:
        maybe_url: string to check

    Returns:
        bool: whether string starts with a URI scheme
    """
    return _SCHEME_REGEX.match(str(maybe_url)) is not None

is_url

is_url(maybe_url)

Determine whether a path is a URL.

Parameters:

Name Type Description Default
maybe_url str

path to investigate as URL

required

Returns:

Name Type Description
bool bool

whether path appears to be a URL

Source code in ubiquerg/web.py
34
35
36
37
38
39
40
41
42
43
def is_url(maybe_url: str) -> bool:
    """Determine whether a path is a URL.

    Args:
        maybe_url: path to investigate as URL

    Returns:
        bool: whether path appears to be a URL
    """
    return _URL_REGEX.match(str(maybe_url)) is not None