Source code for elephant.parallel.parallel
import concurrent.futures
from functools import update_wrapper, partial
class SingleProcess(object):
"""
A fall-back parallel context that executes jobs sequentially.
"""
def __repr__(self):
return "{name}({extra})".format(name=self.__class__.__name__,
extra=self._extra_repr())
def _extra_repr(self):
return ""
@staticmethod
def _update_handler(handler, **kwargs):
handler_wrapper = partial(handler, **kwargs)
update_wrapper(handler_wrapper, handler)
return handler_wrapper
def execute(self, handler, args_iterate, **kwargs):
"""
Executes the queue of
`[handler(arg, **kwargs) for arg in args_iterate]` in a single process
(no speedup).
Parameters
----------
handler : callable
A function to be executed for each argument in `args_iterate`.
args_iterate : list
A list of (different) values of the first argument of the `handler`
function.
kwargs
Additional key arguments to `handler`.
Returns
-------
results : list
The result of applying the `handler` for each `arg` in the
`args_iterate`. The `i`-th item of the resulting list corresponds
to `args_iterate[i]` (the order is preserved).
"""
handler = self._update_handler(handler, **kwargs)
results = [handler(arg) for arg in args_iterate]
return results
[docs]
class ProcessPoolExecutor(SingleProcess):
"""
The wrapper of Python built-in `concurrent.futures.ProcessPoolExecutor`
class.
`ProcessPoolExecutor` is recommended to use if you have one physical
machine (laptop or PC).
Parameters
----------
max_workers : int or None
The maximum number of processes that can be used to
execute the given calls. If None or not given then as many
worker processes will be created as the machine has processors.
Default: None
"""
[docs]
def __init__(self, max_workers=None):
self.max_workers = max_workers
def _extra_repr(self):
return "max_workers={0}".format(self.max_workers)
def _create_executor(self):
return concurrent.futures.ProcessPoolExecutor(self.max_workers)
def execute(self, handler, args_iterate, **kwargs):
"""
Executes the queue of
`[handler(arg, **kwargs) for arg in args_iterate]` in multiple
processes within one machine (`ProcessPoolExecutor`) or multiple
nodes (`MPIPoolExecutor` and `MPICommExecutor`).
Parameters
----------
handler : callable
A function to be executed for each argument in `args_iterate`.
args_iterate : list
A list of (different) values of the first argument of the `handler`
function.
kwargs
Additional key arguments to `handler`.
Returns
-------
results : list
The result of applying the `handler` for each `arg` in the
`args_iterate`. The `i`-th item of the resulting list corresponds
to `args_iterate[i]` (the order is preserved).
"""
handler = self._update_handler(handler, **kwargs)
# if not initialized, MPICommExecutor crashes if run without
# -m mpi4py.futures mode
results = []
with self._create_executor() as executor:
results = executor.map(handler, args_iterate)
# print(executor, results)
results = list(results) # convert a map to a list
return results