{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Parallel\n", "\n", "The executed version of this tutorial is at https://elephant.readthedocs.io/en/latest/tutorials/parallel.html\n", "\n", "`elephant.parallel` module provides a simple interface to parallelize multiple calls to any user-specified function. The typical use case is calling a function many times with different parameters.\n", "\n", "\n", "## Available executors\n", "\n", "`elephant.parallel` has 3 interfaces to choose from, depending whether the user has a laptop/PC or the computation is being done on a cluster machine with many nodes and MPI installed.\n", "\n", "* `ProcessPoolExecutor` is a wrapper of python built-in `concurrent.futures.ProcessPoolExecutor`. This is recommended to run on laptops and personal computers;\n", "* `MPIPoolExecutor` is a wrapper of `mpi4py.futures.MPIPoolExecutor`. This is recommened to run on cluster nodes with MPI-2 installed;\n", "* `MPICommExecutor` is a wrapper of `mpi4py.futures.MPICommExecutor`. This is a legacy MPI-1 class for `MPIPoolExecutor` and is recommended to run only on cluster nodes which do not support MPI-2 protocol.\n", "\n", "Besides these three, a `SingleProcess` executor is available as a fall-back option to test executions in a single process (no speedup).\n", "\n", "All listed above classes have the same API and can be used interchangeably.\n", "\n", "\n", "## How to use\n", "\n", "Let's say you want to call some function `my_function()` for each element in a list `iterables_list` like so:\n", "\n", "(eq. 1) `results = [my_function(arg) for arg in iterables_list]`.\n", "\n", "If `my_function`'s implementaion does not use parallelization, you can obtain the results by computing `my_function()` asynchronously for each element in arguments list. Then the result of eq. 1 is equivalent to\n", "\n", "(eq. 2) `results = Executor().execute(my_function, iterables_list)`,\n", "\n", "where `Executor` can be any of the available executors listed above. For more information about parallel executors in Python refer to https://docs.python.org/3/library/concurrent.futures.html.\n", "\n", "**Note**. To successfully run this notebook, `mpi4py` package should be installed in either of ways:\n", "* `conda install -c conda-forge mpi4py`\n", "* `pip install mpi4py`, if you have manually installed OpenMPI with `sudo apt install libopenmpi-dev openmpi-bin`\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Examples\n", "\n", "### Example 1. Computing the mean firing rate\n", "\n", "`mean_firing_rate()` function in `elephant.statistics` works with a single spiketrain as input. Let's parallelize it by computing firing rates of 8 spiketrains with random spike times." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import quantities as pq\n", "\n", "from elephant.spike_train_generation import homogeneous_poisson_process\n", "from elephant.statistics import mean_firing_rate, time_histogram\n", "\n", "from elephant.parallel import SingleProcess, ProcessPoolExecutor\n", "\n", "try:\n", " import mpi4py\n", " from elephant.parallel.mpi import MPIPoolExecutor, MPICommExecutor\n", "except ImportError:\n", " raise ImportError(\"To run this tutorial, please install mpi4py with 'pip install mpi4py' and restart the jupyter notebook kernel\")" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "rate = 10 * pq.Hz\n", "spiketrains = [homogeneous_poisson_process(rate, t_stop=10*pq.s) for _ in range(8)]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We start with a sanity check by computing the mean firing rate of the spiketrains with `SingleProcess` executor, which is run in the main process with no parallelization." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[array(8.9) * 1/s,\n", " array(9.6) * 1/s,\n", " array(9.2) * 1/s,\n", " array(8.7) * 1/s,\n", " array(10.) * 1/s,\n", " array(9.4) * 1/s,\n", " array(7.9) * 1/s,\n", " array(9.4) * 1/s]" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "firing_rate0 = SingleProcess().execute(mean_firing_rate, spiketrains)\n", "firing_rate0" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's verify that all three other executors produce the same result, but now with parallelization turned on." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[array(8.9) * 1/s,\n", " array(9.6) * 1/s,\n", " array(9.2) * 1/s,\n", " array(8.7) * 1/s,\n", " array(10.) * 1/s,\n", " array(9.4) * 1/s,\n", " array(7.9) * 1/s,\n", " array(9.4) * 1/s]" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "firing_rate1 = ProcessPoolExecutor().execute(mean_firing_rate, spiketrains)\n", "firing_rate1" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[array(8.9) * 1/s,\n", " array(9.6) * 1/s,\n", " array(9.2) * 1/s,\n", " array(8.7) * 1/s,\n", " array(10.) * 1/s,\n", " array(9.4) * 1/s,\n", " array(7.9) * 1/s,\n", " array(9.4) * 1/s]" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "firing_rate2 = MPIPoolExecutor().execute(mean_firing_rate, spiketrains)\n", "firing_rate2" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[array(8.9) * 1/s,\n", " array(9.6) * 1/s,\n", " array(9.2) * 1/s,\n", " array(8.7) * 1/s,\n", " array(10.) * 1/s,\n", " array(9.4) * 1/s,\n", " array(7.9) * 1/s,\n", " array(9.4) * 1/s]" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "firing_rate3 = MPICommExecutor().execute(mean_firing_rate, spiketrains)\n", "firing_rate3" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "All executors produce identical output, as intended.\n", "\n", "#### Note about MPI executors\n", "\n", "If you print the detailed information about either of MPI executors, you will notice in this notebook that they use only one node (core) which is equivalent to `SingleProcess` executor." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "MPIPoolExecutor(max_workers=None, {'maxprocs': '1', 'soft': '1', 'host': 'PC0Y2Q1V', 'arch': 'x86_64', 'thread_level': 'MPI_THREAD_MULTIPLE', 'ompi_np': '1'})\n" ] } ], "source": [ "print(MPIPoolExecutor())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is because MPI executors require `-m mpi4py.futures` flag while running python scripts:\n", "\n", "```\n", "mpiexec -n numprocs python -m mpi4py.futures pyfile [arg] ...\n", "```\n", "\n", "If you run it without `mpiexec` command, as in this notebook, MPI features will still be available but only with one single core (`maxprocs=1`). For more information of how to launch MPI processes in Python refer to https://mpi4py.readthedocs.io/en/stable/mpi4py.futures.html#command-line." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Example 2. Custom functions and positional argument\n", "\n", "Sometimes you might want to iterate over the second (or third, etc.) argument of a function. To do this, you need to create a custom function that passes its first input argument into the right position of the original function. Below is an example of how to compute time histograms of spiketrains with different `bin_size` values (the second argument)." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "# step 1: initialize the first argument - spiketrains\n", "spiketrains = [homogeneous_poisson_process(rate, t_stop=10*pq.s) for _ in range(8)]\n", "\n", "# step 2: define your custom function\n", "def my_custom_function(bin_size):\n", " # specify all custom key-word options here\n", " return time_histogram(spiketrains, bin_size, output='counts')" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "collapsed": true, "jupyter": { "outputs_hidden": true } }, "outputs": [], "source": [ "bin_size_list = np.linspace(0.1, 1, num=8) * pq.s\n", "\n", "time_hist = ProcessPoolExecutor().execute(my_custom_function, bin_size_list)" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[AnalogSignal with 1 channels of length 100; units dimensionless; datatype float64 \n", " sampling rate: 10.0 1/s\n", " time: 0.0 s to 10.0 s,\n", " AnalogSignal with 1 channels of length 43; units dimensionless; datatype float64 \n", " sampling rate: 4.375 1/s\n", " time: 0.0 s to 9.82857142857143 s,\n", " AnalogSignal with 1 channels of length 28; units dimensionless; datatype float64 \n", " sampling rate: 2.7999999999999994 1/s\n", " time: 0.0 s to 10.000000000000002 s,\n", " AnalogSignal with 1 channels of length 20; units dimensionless; datatype float64 \n", " sampling rate: 2.0588235294117645 1/s\n", " time: 0.0 s to 9.714285714285715 s,\n", " AnalogSignal with 1 channels of length 16; units dimensionless; datatype float64 \n", " sampling rate: 1.627906976744186 1/s\n", " time: 0.0 s to 9.82857142857143 s,\n", " AnalogSignal with 1 channels of length 13; units dimensionless; datatype float64 \n", " sampling rate: 1.346153846153846 1/s\n", " time: 0.0 s to 9.657142857142858 s,\n", " AnalogSignal with 1 channels of length 11; units dimensionless; datatype float64 \n", " sampling rate: 1.1475409836065573 1/s\n", " time: 0.0 s to 9.585714285714285 s,\n", " AnalogSignal with 1 channels of length 10; units dimensionless; datatype float64 \n", " sampling rate: 1.0 1/s\n", " time: 0.0 s to 10.0 s]" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "time_hist" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`time_hist` contains 8 AnalogSignals - one AnalogSignal per `bin_size` from `bin_size_list`.\n", "\n", "### Benchmark\n", "\n", "Finally, let's see if `ProcessPoolExecutor` brings any speedup compared to sequential processing." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "import warnings\n", "warnings.filterwarnings(\"ignore\")\n", "\n", "# initialize the iteration list\n", "bin_size_list = np.linspace(0.1, 1, 100) * pq.s" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "873 ms ± 35.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n" ] } ], "source": [ "# sequential processing\n", "%timeit [time_histogram(spiketrains, bin_size) for bin_size in bin_size_list]" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "374 ms ± 4.29 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n" ] } ], "source": [ "# with parallelization\n", "%timeit ProcessPoolExecutor(max_workers=4).execute(my_custom_function, bin_size_list)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.10" } }, "nbformat": 4, "nbformat_minor": 4 }