multitask

sofia_redux.toolkit.utilities.multiprocessing.multitask(func, iterable, args, kwargs, jobs=None, skip=None, max_nbytes='1M', force_threading=False, force_processes=False, logger=None)[source]

Process a series of tasks in serial, or in parallel using joblib.

multitask is used to run a function multiple times on a series of arguments. Tasks may be run in series (default), or in parallel using multi-processing via the joblib package.

If an error is encountered while attempting to process in parallel with joblib, an attempt will be made to process the tasks in series.

The function to process multiple times (func) must take one of the following forms:

1. result[i] = func(args, iterable[i])
2. result[i] = func(args, kwargs, iterable[i])

Here, the above “args” is the same as the args argument. i.e., the full argument list. Setting the argument kwargs to None implies that func takes form 1, while anything else sets multitask to assume the function is of form 2.

Since this is a non-standard method of specifying a function, it is highly likely the user will have to define their own func. For example, to use multitask to add ten to a series of numbers:

>>> from sofia_redux.toolkit.utilities.multiprocessing import multitask
>>> numbers = list(range(5))
>>> numbers
[0, 1, 2, 3, 4]
>>> def add_ten(args, i):
...     return args[i] + 10
>>> multitask(add_ten, range(len(numbers)), numbers, None)
[10, 11, 12, 13, 14]

In the above example, iterable has been set to range(len(numbers)) indicating that multitask should supply i to add_ten multiple times (0, 1, 2, 3, 4). Note that kwargs is explicitly set to None indicating that add_ten is of form 1. While multitask may seem like overkill in this example, it can be highly adaptable for complex procedures.

The skip parameter can be used to skip processing of certain tasks. For example:

>>> skip = [False] * len(numbers)
>>> skip[2] = True
>>> multitask(add_ten, range(len(numbers)), numbers, None, skip=skip)
[10, 11, 13, 14]

By default, parallelization is managed with the loky backend. If calling code is known to perform better with the threading backend, it should be called within the joblib parallel_backend context manager:

>>> from joblib import parallel_backend
>>> with parallel_backend('threading', n_jobs=2):
...    multitask(add_ten, range(len(numbers)), numbers, None)
[10, 11, 12, 13, 14]
Parameters:
funcfunction

The function to repeat multiple times on different sets of arguments. Must be of the form func(args, i) if kwargs is None, or func(args, kwargs, i) otherwise, where i is a member of iterable.

iterableiterable

A Python object that can be iterated though such that each member can be passed to func as the final argument.

args

Anything that should be passed to func as the first argument. The intended use is such that the output of func is result[i] = f(args[iterable[i]]).

kwargsNone or anything

If set to None, multitask assumes func is of the form func(args, i). Otherwise, multitask assumes func is of the form func(args, kwargs, i).

jobsint, optional

If set to a positive integer, processes tasks in parallel using jobs threads. If negative, sets the number of threads to the number of CPUs - jobs + 1. i.e., if jobs = -1, multitask will use all available CPUs.

skiparray_like of bool, optional

Should be of len(iterable), where a value of True signifies that processing should be skipped for that task and omitted from the final result.

max_nbytesint or str or None, optional

Threshold on the size of arrays passed to the workers that triggers automated memory mapping in temp_folder. Can be an int in Bytes, or a human-readable string, e.g., ‘1M’ for 1 megabyte. Use None to disable memmapping of large arrays. Only active when backend=”loky” or “multiprocessing”. The default is currently set to ‘1M’ for consistency with the joblib library. Note that memmapping disallows in-place modification of data, so if this functionality is required set max_nbytes to None.

force_threadingbool, optional

If True, force joblib to run parrallel jobs using threads so that shared memory can be used. Otherwise, threading will only occur when parallel processing is spawned from a child process of the main thread.

force_processesbool, optional

If True, force joblib to run parallel jobs using CPUs rather than threads. This can sometimes lead to unexpected outcomes when the multiprocessing is launched from a non-main thread. Pickling arguments prior and return values during processing is recommended in this case.

loggerlogging.Logger, optional

If supplied, will attempt to produce sensibly ordered logs for the multiprocessing tasks for all handlers.

Returns:
resultlist

The final output where result[i] = func(args, iterable[i]). Will be of length len(iterable) if skip is None, otherwise len(iterable) - sum(skip).