Source code for elephant.spike_train_generation

# -*- coding: utf-8 -*-
"""
Functions to generate spike trains from analog signals,
or to generate random spike trains.

Some functions are based on the NeuroTools stgen module, which was mostly
written by Eilif Muller, or from the NeuroTools signals.analogs module.

:copyright: Copyright 2015 by the Elephant team, see `doc/authors.rst`.
:license: Modified BSD, see LICENSE.txt for details.
"""

from __future__ import division, print_function, unicode_literals

import warnings
from functools import partial

import neo
import numpy as np
import quantities as pq

from elephant.spike_train_surrogates import dither_spike_train
from elephant.utils import deprecated_alias

__all__ = [
    "spike_extraction",
    "threshold_detection",
    "peak_detection",
    "homogeneous_poisson_process",
    "inhomogeneous_poisson_process",
    "homogeneous_gamma_process",
    "single_interaction_process",
    "compound_poisson_process"
]


[docs]@deprecated_alias(extr_interval='interval') def spike_extraction(signal, threshold=0.0 * pq.mV, sign='above', time_stamps=None, interval=(-2 * pq.ms, 4 * pq.ms)): """ Return the peak times for all events that cross threshold and the waveforms. Usually used for extracting spikes from a membrane potential to calculate waveform properties. Parameters ---------- signal : neo.AnalogSignal An analog input signal. threshold : pq.Quantity, optional Contains a value that must be reached for an event to be detected. Default: 0.0 * pq.mV. sign : {'above', 'below'}, optional Determines whether to count threshold crossings that cross above or below the threshold. Default: 'above'. time_stamps : pq.Quantity, optional If `spike_train` is a `pq.Quantity` array, `time_stamps` provides the time stamps around which the waveform is extracted. If it is None, the function `peak_detection` is used to calculate the time_stamps from signal. Default: None. interval : tuple of pq.Quantity Specifies the time interval around the `time_stamps` where the waveform is extracted. Default: (-2 * pq.ms, 4 * pq.ms). Returns ------- result_st : neo.SpikeTrain Contains the time_stamps of each of the spikes and the waveforms in `result_st.waveforms`. See Also -------- elephant.spike_train_generation.peak_detection """ # Get spike time_stamps if time_stamps is None: time_stamps = peak_detection(signal, threshold, sign=sign) elif hasattr(time_stamps, 'times'): time_stamps = time_stamps.times elif isinstance(time_stamps, pq.Quantity): raise TypeError("time_stamps must be None, a pq.Quantity array or" + " expose the.times interface") if len(time_stamps) == 0: return neo.SpikeTrain(time_stamps, units=signal.times.units, t_start=signal.t_start, t_stop=signal.t_stop, waveforms=np.array([]), sampling_rate=signal.sampling_rate) # Unpack the extraction interval from tuple or array extr_left, extr_right = interval if extr_left > extr_right: raise ValueError("interval[0] must be < interval[1]") if any(np.diff(time_stamps) < interval[1]): warnings.warn("Waveforms overlap.", UserWarning) data_left = (extr_left * signal.sampling_rate).simplified.magnitude data_right = (extr_right * signal.sampling_rate).simplified.magnitude data_stamps = (((time_stamps - signal.t_start) * signal.sampling_rate).simplified).magnitude data_stamps = data_stamps.astype(int) borders_left = data_stamps + data_left borders_right = data_stamps + data_right borders = np.dstack((borders_left, borders_right)).flatten() waveforms = np.array( np.split(np.array(signal), borders.astype(int))[1::2]) * signal.units # len(np.shape(waveforms)) == 1 if waveforms do not have the same width. # this can occur when extraction interval indexes beyond the signal. # Workaround: delete spikes shorter than the maximum length with if len(np.shape(waveforms)) == 1: max_len = (np.array([len(x) for x in waveforms])).max() to_delete = np.array([idx for idx, x in enumerate(waveforms) if len(x) < max_len]) waveforms = np.delete(waveforms, to_delete, axis=0) waveforms = np.array(waveforms) warnings.warn("Waveforms " + ("{:d}, " * len(to_delete)).format(*to_delete) + "exceeded signal and had to be deleted. " + "Change 'interval' to keep.") waveforms = waveforms[:, np.newaxis, :] return neo.SpikeTrain(time_stamps, units=signal.times.units, t_start=signal.t_start, t_stop=signal.t_stop, sampling_rate=signal.sampling_rate, waveforms=waveforms, left_sweep=extr_left)
[docs]def threshold_detection(signal, threshold=0.0 * pq.mV, sign='above'): """ Returns the times when the analog signal crosses a threshold. Usually used for extracting spike times from a membrane potential. Adapted from version in NeuroTools. Parameters ---------- signal : neo.AnalogSignal An analog input signal. threshold : pq.Quantity, optional Contains a value that must be reached for an event to be detected. Default: 0.0 * pq.mV. sign : {'above', 'below'}, optional Determines whether to count threshold crossings that cross above or below the threshold. Default: 'above'. Returns ------- result_st : neo.SpikeTrain Contains the spike times of each of the events (spikes) extracted from the signal. """ if not isinstance(threshold, pq.Quantity): raise ValueError('threshold must be a pq.Quantity') if sign not in ('above', 'below'): raise ValueError("sign should be 'above' or 'below'") if sign == 'above': cutout = np.where(signal > threshold)[0] else: # sign == 'below' cutout = np.where(signal < threshold)[0] if len(cutout) == 0: events_base = np.zeros(0) else: take = np.where(np.diff(cutout) > 1)[0] + 1 take = np.append(0, take) time = signal.times events = time[cutout][take] events_base = events.magnitude if events_base is None: # This occurs in some Python 3 builds due to some # bug in quantities. events_base = np.array( [event.magnitude for event in events]) # Workaround result_st = neo.SpikeTrain(events_base, units=signal.times.units, t_start=signal.t_start, t_stop=signal.t_stop) return result_st
[docs]@deprecated_alias(format='as_array') def peak_detection(signal, threshold=0.0 * pq.mV, sign='above', as_array=False): """ Return the peak times for all events that cross threshold. Usually used for extracting spike times from a membrane potential. Similar to spike_train_generation.threshold_detection. Parameters ---------- signal : neo.AnalogSignal An analog input signal. threshold : pq.Quantity, optional Contains a value that must be reached for an event to be detected. Default: 0.*pq.mV. sign : {'above', 'below'}, optional Determines whether to count threshold crossings that cross above or below the threshold. Default: 'above'. as_array : bool, optional If True, a NumPy array of the resulting peak times is returned instead of a (default) `neo.SpikeTrain` object. Default: False. format : {None, 'raw'}, optional .. deprecated:: 0.8.0 Whether to return as SpikeTrain (None) or as a plain array of times ('raw'). Deprecated. Use `as_array=False` for None format and `as_array=True` otherwise. Default: None. Returns ------- result_st : neo.SpikeTrain Contains the spike times of each of the events (spikes) extracted from the signal. """ if not isinstance(threshold, pq.Quantity): raise ValueError("threshold must be a pq.Quantity") if sign not in ('above', 'below'): raise ValueError("sign should be 'above' or 'below'") if as_array in (None, 'raw'): warnings.warn("'format' is deprecated; use as_array=True", DeprecationWarning) as_array = bool(as_array) if sign == 'above': cutout = np.where(signal > threshold)[0] peak_func = np.argmax else: # sign == 'below' cutout = np.where(signal < threshold)[0] peak_func = np.argmin if len(cutout) == 0: events_base = np.zeros(0) else: # Select thr crossings lasting at least 2 dtps, np.diff(cutout) > 2 # This avoids empty slices border_start = np.where(np.diff(cutout) > 1)[0] border_end = border_start + 1 borders = sorted(np.r_[0, border_start, border_end, len(cutout) - 1]) true_borders = cutout[borders] right_borders = true_borders[1::2] + 1 true_borders = np.sort(np.append(true_borders[0::2], right_borders)) # Workaround for bug that occurs when signal goes below thr for 1 dtp, # Workaround eliminates empty slices from np. split backward_mask = np.absolute(np.ediff1d(true_borders, to_begin=1)) > 0 forward_mask = np.absolute(np.ediff1d(true_borders[::-1], to_begin=1)[::-1]) > 0 true_borders = true_borders[backward_mask * forward_mask] split_signal = np.split(np.array(signal), true_borders)[1::2] maxima_idc_split = np.array([peak_func(x) for x in split_signal]) max_idc = maxima_idc_split + true_borders[0::2] events = signal.times[max_idc] events_base = events.magnitude if events_base is None: # This occurs in some Python 3 builds due to some # bug in quantities. events_base = np.array( [event.magnitude for event in events]) # Workaround result_st = neo.SpikeTrain(events_base, units=signal.times.units, t_start=signal.t_start, t_stop=signal.t_stop) if as_array: result_st = result_st.magnitude return result_st
def _homogeneous_process(interval_generator, mean_rate, t_start, t_stop, as_array): """ Returns a spike train whose spikes are a realization of a random process generated by the function `interval_generator` with the given rate, starting at time `t_start` and stopping `time t_stop`. """ t_start = t_start.rescale(t_stop.units) n_spikes_expected = int(np.ceil( ((t_stop - t_start) * mean_rate).simplified)) if n_spikes_expected < 0: raise ValueError("Expected no. of spikes: {n_spikes} < 0. The firing " "rate ({rate}) cannot be negative and t_stop " "({t_stop}) must be greater than t_start " "({t_start})".format(n_spikes=n_spikes_expected, rate=mean_rate, t_stop=t_stop, t_start=t_start)) spikes = [] if n_spikes_expected > 0: # 3 STDs corresponds to 99.7% n_spikes_expected = int(np.ceil( n_spikes_expected + 3 * np.sqrt(n_spikes_expected))) t_last = t_start.simplified.magnitude while True: isi = interval_generator(size=n_spikes_expected) spikes = np.r_[spikes, t_last + np.cumsum(isi)] # Check if not whole time range is covered. index_last_spike = spikes.searchsorted(t_stop.simplified.magnitude) if index_last_spike < len(spikes): spikes = spikes[:index_last_spike] spikes = (spikes / mean_rate.units).rescale(t_stop.units) break t_last = spikes[-1] if as_array: spikes = spikes.magnitude else: spikes = neo.SpikeTrain(spikes, t_start=t_start, t_stop=t_stop, units=t_stop.units) return spikes
[docs]def homogeneous_poisson_process(rate, t_start=0.0 * pq.ms, t_stop=1000.0 * pq.ms, as_array=False, refractory_period=None): """ Returns a spike train whose spikes are a realization of a Poisson process with the given rate, starting at time `t_start` and stopping time `t_stop`. All numerical values should be given as Quantities, e.g. `100*pq.Hz`. Parameters ---------- rate : pq.Quantity The rate of the discharge. t_start : pq.Quantity, optional The beginning of the spike train. Default: 0 * pq.ms. t_stop : pq.Quantity, optional The end of the spike train. Default: 1000 * pq.ms. as_array : bool, optional If True, a NumPy array of sorted spikes is returned, rather than a `neo.SpikeTrain` object. Default: False. refractory_period : pq.Quantity or None, optional `pq.Quantity` scalar with dimension time. The time period after one spike no other spike is emitted. Default: None. Returns ------- spiketrain : neo.SpikeTrain or np.ndarray Homogeneous Poisson process realization, stored in `neo.SpikeTrain` if `as_array` is False (default) and `np.ndarray` otherwise. Raises ------ ValueError If one of `rate`, `t_start` and `t_stop` is not of type `pq.Quantity`. If `refractory_period` is not None or not of type `pq.Quantity`. If `refractory_period` is not None and the period between two successive spikes (`1 / rate`) is smaller than the `refractory_period`. Examples -------- >>> import quantities as pq >>> spikes = homogeneous_poisson_process(50*pq.Hz, t_start=0*pq.ms, ... t_stop=1000*pq.ms) >>> spikes = homogeneous_poisson_process( ... 20*pq.Hz, t_start=5000*pq.ms, t_stop=10000*pq.ms, as_array=True) >>> spikes = homogeneous_poisson_process(50*pq.Hz, t_start=0*pq.ms, ... t_stop=1000*pq.ms, refractory_period = 3*pq.ms) """ if not (isinstance(t_start, pq.Quantity) and isinstance(t_stop, pq.Quantity)): raise ValueError("t_start and t_stop must be of type pq.Quantity") if not isinstance(rate, pq.Quantity): raise ValueError("rate must be of type pq.Quantity") if not isinstance(refractory_period, pq.Quantity) and \ refractory_period is not None: raise ValueError("refractory_period must be of type pq.Quantity or" "None") rate = rate.simplified # Case without a refractory period if refractory_period is None: interval_generator = partial(np.random.exponential, scale=1. / rate.magnitude) return _homogeneous_process( interval_generator, rate, t_start, t_stop, as_array) # Case with a refractory period refractory_period = refractory_period.simplified if rate * refractory_period >= 1.: raise ValueError("Period between two successive spikes must be larger " "than the refractory period. Decrease either the " "firing rate or the refractory period.") effective_rate = rate / (1. - rate * refractory_period) def interval_generator_refractory(size): return refractory_period.magnitude + \ np.random.exponential(1. / effective_rate.magnitude, size) # we subtract refractory_period from t_start to be added later on # in interval_generator_refractory() spiketrain = _homogeneous_process(interval_generator_refractory, rate, t_start - refractory_period, t_stop, as_array) if not as_array: spiketrain.t_start = t_start return spiketrain
[docs]def inhomogeneous_poisson_process(rate, as_array=False, refractory_period=None): """ Returns a spike train whose spikes are a realization of an inhomogeneous Poisson process with the given rate profile. Parameters ---------- rate : neo.AnalogSignal A `neo.AnalogSignal` representing the rate profile evolving over time. Its values have all to be `>=0`. The output spiketrain will have `t_start = rate.t_start` and `t_stop = rate.t_stop` as_array : bool, optional If True, a NumPy array of sorted spikes is returned, rather than a SpikeTrain object. Default: False. refractory_period : pq.Quantity or None, optional `pq.Quantity` scalar with dimension time. The time period after one spike no other spike is emitted. Default: None. Returns ------- spiketrain : neo.SpikeTrain or np.ndarray Inhomogeneous Poisson process realization, of type `neo.SpikeTrain` if `as_array` is False (default) and `np.ndarray` otherwise. Raises ------ ValueError If `rate` contains a negative value. If `refractory_period` is not None or not of type `pq.Quantity`. If `refractory_period` is not None and the period between two successive spikes (`1 / rate`) is smaller than the `refractory_period`. """ # Check rate contains only positive values if np.any(rate < 0) or rate.size == 0: raise ValueError( 'rate must be a positive non empty signal, representing the' 'rate at time t') if not isinstance(refractory_period, pq.Quantity) and \ refractory_period is not None: raise ValueError("refractory_period must be of type pq.Quantity or" "None") rate_max = np.max(rate) if refractory_period is not None: if (rate_max * refractory_period).simplified >= 1.: raise ValueError( "Period between two successive spikes must be larger " "than the refractory period. Decrease either the " "firing rate or the refractory period.") # effective rate parameter for the refractory period case rate = rate / (1. - (rate * refractory_period).simplified) rate_max = np.max(rate) # Generate n hidden Poisson SpikeTrains with rate equal # to the peak rate homogeneous_poiss = homogeneous_poisson_process( rate=rate_max, t_stop=rate.t_stop, t_start=rate.t_start) # Compute the rate profile at each spike time by interpolation rate_interpolated = _analog_signal_linear_interp( signal=rate, times=homogeneous_poiss.times) # Accept each spike at time t with probability rate(t)/max_rate random_uniforms = np.random.uniform(size=len(homogeneous_poiss)) * rate_max spikes = homogeneous_poiss[random_uniforms < rate_interpolated.flatten()] if refractory_period is not None: refractory_period = refractory_period.rescale( rate.t_stop.units).magnitude # thinning in average cancels the effect of the effective firing rate spikes = _thinning_for_refractory_period(spikes.magnitude, refractory_period) if not as_array: spikes = neo.SpikeTrain(spikes * rate.t_stop.units, t_start=rate.t_start, t_stop=rate.t_stop) else: if as_array: spikes = spikes.magnitude return spikes
def _thinning_for_refractory_period(spiketrain, refractory_period): """ Function to thin out a spiketrain, that every ISI is greater than the refractory period. Parameters ---------- spiketrain : np.ndarray Magnitude of a spiketrain. refractory_period : float Magnitude of a refractory period. Returns ------- thinned_spiketrain : np.ndarray thinned out spiketrain """ thinned_spiketrain = [] previous_spike = -refractory_period for spike in spiketrain: if spike > previous_spike + refractory_period: thinned_spiketrain.append(spike) previous_spike = spike return np.array(thinned_spiketrain) def _analog_signal_linear_interp(signal, times): """ Compute the linear interpolation of a signal at desired times. Given the `signal` (neo.AnalogSignal) taking value `s0` and `s1` at two consecutive time points `t0` and `t1` `(t0 < t1)`, for every time `t` in `times`, such that `t0<t<=t1` is returned the value of the linear interpolation, given by: `s = ((s1 - s0) / (t1 - t0)) * t + s0`. Parameters ---------- signal : neo.AnalogSignal The analog signal containing the discretization of the function to interpolate times : pq.Quantity The time points for which the interpolation is computed Returns ------- out: pq.Quantity representing the values of the interpolated signal at the times given by times Notes ----- If `signal` has sampling period `sampling_period=signal.sampling_period`, its values are defined at `t=signal.times`, such that `t[i] = signal.t_start + i * sampling_period` The last of such times is lower than signal.t_stop`:t[-1] = signal.t_stop - sampling_period`. For the interpolation at times t such that `t[-1] <= t <= signal.t_stop`, the value of `signal` at `signal.t_stop` is taken to be that at time `t[-1]`. """ sampling_period = signal.sampling_period t_start = signal.t_start.rescale(signal.times.units) t_stop = signal.t_stop.rescale(signal.times.units) # Extend the signal (as a dimensionless array) copying the last value # one time, and extend its times to t_stop signal_extended = np.vstack( [signal.magnitude, signal[-1].magnitude]).flatten() times_extended = np.hstack([signal.times, t_stop]) * signal.times.units time_ids = (times - t_start) / sampling_period time_ids = np.floor(time_ids.simplified.magnitude).astype(np.int32) # Compute the slope of the signal at each time in times signal_1 = signal_extended[time_ids] signal_2 = signal_extended[time_ids + 1] slope = (signal_2 - signal_1) / sampling_period # Interpolate the signal at each time in times by linear interpolation return (signal_1 + slope * (times - times_extended[time_ids])) * signal.units
[docs]def homogeneous_gamma_process(a, b, t_start=0.0 * pq.ms, t_stop=1000.0 * pq.ms, as_array=False): """ Returns a spike train whose spikes are a realization of a gamma process with the given parameters, starting at time `t_start` and stopping time `t_stop` (average rate will be `b/a`). All numerical values should be given as Quantities, e.g. `100*pq.Hz`. Parameters ---------- a : int or float The shape parameter of the gamma distribution. b : pq.Quantity The rate parameter of the gamma distribution. t_start : pq.Quantity, optional The beginning of the spike train. Default: 0 * pq.ms. t_stop : pq.Quantity, optional The end of the spike train. Default: 1000 * pq.ms. as_array : bool, optional If True, a NumPy array of sorted spikes is returned, rather than a `neo.SpikeTrain` object. Default: False. Returns ------- spiketrain : neo.SpikeTrain or np.ndarray Homogeneous Gamma process realization, stored in `neo.SpikeTrain` if `as_array` is False (default) and `np.ndarray` otherwise. Raises ------ ValueError If `t_start` and `t_stop` are not of type `pq.Quantity`. Examples -------- >>> import quantities as pq >>> spikes = homogeneous_gamma_process(2.0, 50*pq.Hz, 0*pq.ms, ... 1000*pq.ms) >>> spikes = homogeneous_gamma_process( ... 5.0, 20*pq.Hz, 5000*pq.ms, 10000*pq.ms, as_array=True) """ # note that the rate of the gamma distribution is called 'b' and not 'rate' # to avoid false thoughts that 'rate' could be the mean firing rate, which # equals to b / a if not (isinstance(t_start, pq.Quantity) and isinstance(t_stop, pq.Quantity)): raise ValueError("t_start and t_stop must be of type pq.pq.Quantity") b = b.rescale(1 / t_start.units).simplified rate = b / a theta = 1. / b.magnitude interval_generator = partial(np.random.gamma, shape=a, scale=theta) return _homogeneous_process(interval_generator, rate, t_start, t_stop, as_array)
def inhomogeneous_gamma_process(rate, shape_factor, as_array=False): """ Returns a spike train whose spikes are a realization of an inhomogeneous Gamma process with the given rate profile and the given shape factor. The implementation using operational time is inspired by Nawrot et al. (2018) [1]_. Parameters ---------- rate : neo.AnalogSignal A `neo.AnalogSignal` representing the rate profile evolving over time. Its values have all to be `>=0`. The output spiketrain will have `t_start = rate.t_start` and `t_stop = rate.t_stop` shape_factor : float The shape factor of the Gamma process as_array : bool, optional If True, a NumPy array of sorted spikes is returned, rather than a SpikeTrain object. Default: False. Returns ------- spiketrain : neo.SpikeTrain or np.ndarray Inhomogeneous Poisson process realization, of type `neo.SpikeTrain` if `as_array` is False (default) and `np.ndarray` otherwise. Raises ------ ValueError If `rate` is not a neo AnalogSignal If `rate` contains a negative value. References ---------- .. [1] Nawrot, M., Boucsein, C., Denker, M., Rodriguez Molina, V., Riehle A., Aertsen A., & Rotter, S. (2008). Measurement of variability dynamics in cortical spike trains. Journal of Neuroscience Methods, 169, 374–390. """ if not isinstance(rate, neo.AnalogSignal): raise ValueError('rate must be a neo AnalogSignal') # Check rate contains only positive values if np.any(rate < 0) or rate.size == 0: raise ValueError( 'rate must be a positive non empty signal, representing the' 'rate at time t') # Operational time corresponds to the integral of the firing rate over time operational_time = np.cumsum( (rate*rate.sampling_period).simplified.magnitude) operational_time = np.hstack((0., operational_time)) # The time points at which the firing rates are given real_time = np.hstack((rate.times.simplified.magnitude, rate.t_stop.simplified.magnitude)) spiketrain_operational_time = homogeneous_gamma_process( a=shape_factor, b=shape_factor*1.*pq.Hz, t_start=0.*pq.s, t_stop=operational_time[-1]*pq.s, as_array=True) # indices where between which points in operational time the spikes lie indices = np.searchsorted(operational_time, spiketrain_operational_time) # In real time the spikes are first aligned to the left border of the bin. # Note that indices are greater than 0 because 'operational_time' was # padded with zeros. spiketrain = real_time[indices - 1] # the relative position of the spikes in the operational time bins positions_in_bins = \ (spiketrain_operational_time - operational_time[indices-1]) / ( operational_time[indices]-operational_time[indices-1]) # add the positions in the bin times the sampling period in real time spiketrain += rate.sampling_period.simplified.magnitude * positions_in_bins if as_array: return spiketrain return neo.SpikeTrain(spiketrain, units=pq.s, t_stop=rate.t_stop) @deprecated_alias(n='n_spiketrains') def _n_poisson(rate, t_stop, t_start=0.0 * pq.ms, n_spiketrains=1): """ Generates one or more independent Poisson spike trains. Parameters ---------- rate : pq.Quantity scalar or pq.Quantity array Expected firing rate (frequency) of each output SpikeTrain. Can be one of: * a single pq.Quantity value: expected firing rate of each output SpikeTrain * a pq.Quantity array: rate[i] is the expected firing rate of the i-th output SpikeTrain t_stop : pq.Quantity Single common stop time of each output SpikeTrain. Must be > t_start. t_start : pq.Quantity, optional Single common start time of each output SpikeTrain. Must be < t_stop. Default: 0 * pq.ms n_spiketrains: int, optional If rate is a single pq.Quantity value, n specifies the number of SpikeTrains to be generated. If rate is an array, n is ignored and the number of SpikeTrains is equal to len(rate). Default: 1 Returns ------- list of neo.SpikeTrain Each SpikeTrain contains one of the independent Poisson spike trains, either n SpikeTrains of the same rate, or len(rate) SpikeTrains with varying rates according to the rate parameter. The time unit of the SpikeTrains is given by t_stop. """ # Check that the provided input is Hertz if not isinstance(rate, pq.Quantity): raise ValueError('rate must be a pq.Quantity') try: rate.rescale(pq.Hz) except ValueError: raise ValueError('rate argument must have rate unit (1/time)') # Check t_start < t_stop and create their strip dimensions if not t_start < t_stop: raise ValueError( 't_start (={}) must be < t_stop (={})'.format(t_start, t_stop)) # Set number n of output spike trains (specified or set to len(rate)) if not (isinstance(n_spiketrains, int) and n_spiketrains > 0): raise ValueError( 'n (={}) must be a positive integer'.format(str(n_spiketrains))) rate_dl = rate.simplified.magnitude.flatten() # Check rate input parameter if len(rate_dl) == 1: if rate_dl < 0: raise ValueError('rate (={}) must be non-negative.'.format(rate)) rates = np.array([rate_dl] * n_spiketrains) else: rates = rate_dl.flatten() if any(rates < 0): raise ValueError('rate must have non-negative elements.') return [homogeneous_poisson_process(rate * pq.Hz, t_start, t_stop) for rate in rates]
[docs]@deprecated_alias(rate_c='coincidence_rate', n='n_spiketrains', return_coinc='return_coincidences') def single_interaction_process( rate, coincidence_rate, t_stop, n_spiketrains=2, jitter=0 * pq.ms, coincidences='deterministic', t_start=0 * pq.ms, min_delay=0 * pq.ms, return_coincidences=False): """ Generates a multidimensional Poisson SIP (single interaction process) plus independent Poisson processes A Poisson SIP consists of Poisson time series which are independent except for simultaneous events in all of them. This routine generates a SIP plus additional parallel independent Poisson processes. See _[1]. Parameters ---------- t_stop : pq.Quantity Total time of the simulated processes. The events are drawn between 0 and `t_stop`. rate : pq.Quantity Overall mean rate of the time series to be generated (coincidence rate `coincidence_rate` is subtracted to determine the background rate). Can be: * a float, representing the overall mean rate of each process. If so, it must be higher than `coincidence_rate`. * an iterable of floats (one float per process), each float representing the overall mean rate of a process. If so, all the entries must be larger than `coincidence_rate`. coincidence_rate : pq.Quantity Coincidence rate (rate of coincidences for the n-dimensional SIP). The SIP spike trains will have coincident events with rate `coincidence_rate` plus independent 'background' events with rate `rate-rate_coincidence`. n_spiketrains : int, optional If `rate` is a single pq.Quantity value, `n_spiketrains` specifies the number of SpikeTrains to be generated. If rate is an array, `n_spiketrains` is ignored and the number of SpikeTrains is equal to `len(rate)`. Default: 2 jitter : pq.Quantity, optional Jitter for the coincident events. If `jitter == 0`, the events of all n correlated processes are exactly coincident. Otherwise, they are jittered around a common time randomly, up to +/- `jitter`. Default: 0 * pq.ms coincidences : {'deterministic', 'stochastic'}, optional Whether the total number of injected coincidences must be determin- istic (i.e. rate_coincidence is the actual rate with which coincidences are generated) or stochastic (i.e. rate_coincidence is the mean rate of coincidences): * 'deterministic': deterministic rate * 'stochastic': stochastic rate Default: 'deterministic' t_start : pq.Quantity, optional Starting time of the series. If specified, it must be lower than `t_stop`. Default: 0 * pq.ms min_delay : pq.Quantity, optional Minimum delay between consecutive coincidence times. Default: 0 * pq.ms return_coincidences : bool, optional Whether to return the coincidence times for the SIP process Default: False Returns ------- output: list Realization of a SIP consisting of n Poisson processes characterized by synchronous events (with the given jitter) If `return_coinc` is `True`, the coincidence times are returned as a second output argument. They also have an associated time unit (same as `t_stop`). References ---------- .. [1] Kuhn, Aertsen, Rotter (2003) Neural Comput 15(1):67-101 Examples -------- >>> import quantities as pq >>> import elephant.spike_train_generation as stg # TODO: check if rate_coincidence=4 is correct. >>> sip, coinc = stg.single_interaction_process( ... rate=20*pq.Hz, coincidence_rate=4, ... t_stop=1*pq.s, n_spiketrains=10, return_coincidences = True) """ # Check if n is a positive integer if not (isinstance(n_spiketrains, int) and n_spiketrains > 0): raise ValueError( 'n (={}) must be a positive integer'.format(n_spiketrains)) if coincidences not in ('deterministic', 'stochastic'): raise ValueError( "coincidences must be 'deterministic' or 'stochastic'") # Assign time unit to jitter, or check that its existing unit is a time # unit jitter = abs(jitter) # Define the array of rates from input argument rate. Check that its length # matches with n if rate.ndim == 0: if rate < 0 * pq.Hz: raise ValueError( 'rate (={}) must be non-negative.'.format(rate)) rates_b = np.repeat(rate, n_spiketrains) else: rates_b = rate.flatten() if not all(rates_b >= 0. * pq.Hz): raise ValueError('*rate* must have non-negative elements') # Check: rate>=rate_coincidence if np.any(rates_b < coincidence_rate): raise ValueError( 'all elements of *rate* must be >= *rate_coincidence*') # Check min_delay < 1./rate_coincidence if not (coincidence_rate == 0 * pq.Hz or min_delay < 1. / coincidence_rate): raise ValueError( "'*min_delay* (%s) must be lower than 1/*rate_coincidence* (%s)." % (str(min_delay), str((1. / coincidence_rate).rescale( min_delay.units)))) # Generate the n Poisson processes there are the basis for the SIP # (coincidences still lacking) embedded_poisson_trains = _n_poisson( rate=rates_b - coincidence_rate, t_stop=t_stop, t_start=t_start) # Convert the trains from neo SpikeTrain objects to simpler pq.Quantity # objects embedded_poisson_trains = [ emb.view(pq.Quantity) for emb in embedded_poisson_trains] # Generate the array of times for coincident events in SIP, not closer than # min_delay. The array is generated as a pq.Quantity. if coincidences == 'deterministic': # P. Bouss: we want the closest approximation to the average # coincidence count. n_coincidences = (t_stop - t_start) * coincidence_rate # Conversion to integer necessary for python 2 n_coincidences = int(round(n_coincidences.simplified.item())) while True: coinc_times = t_start + \ np.sort(np.random.random(n_coincidences)) * ( t_stop - t_start) if len(coinc_times) < 2 or min(np.diff(coinc_times)) >= min_delay: break else: # coincidences == 'stochastic' while True: coinc_times = homogeneous_poisson_process( rate=coincidence_rate, t_stop=t_stop, t_start=t_start) if len(coinc_times) < 2 or min(np.diff(coinc_times)) >= min_delay: break coinc_times = coinc_times.simplified units = coinc_times.units # Set the coincidence times to T-jitter if larger. This ensures that # the last jittered spike time is <T effective_t_stop = t_stop - jitter coinc_times = np.minimum(coinc_times.magnitude, effective_t_stop.simplified.magnitude) coinc_times = coinc_times * units # Replicate coinc_times n times, and jitter each event in each array by # +/- jitter (within (t_start, t_stop)) embedded_coinc = coinc_times + \ np.random.random( (len(rates_b), len(coinc_times))) * 2 * jitter - jitter embedded_coinc = embedded_coinc + \ (t_start - embedded_coinc) * (embedded_coinc < t_start) - \ (t_stop - embedded_coinc) * (embedded_coinc > t_stop) # Inject coincident events into the n SIP processes generated above, and # merge with the n independent processes sip_process = [ np.sort(np.concatenate(( embedded_poisson_trains[m].rescale(t_stop.units), embedded_coinc[m].rescale(t_stop.units))) * t_stop.units) for m in range(len(rates_b))] # Convert back sip_process and coinc_times from pq.Quantity objects to # neo.SpikeTrain objects sip_process = [ neo.SpikeTrain(t, t_start=t_start, t_stop=t_stop).rescale(t_stop.units) for t in sip_process] coinc_times = [ neo.SpikeTrain(t, t_start=t_start, t_stop=t_stop).rescale(t_stop.units) for t in embedded_coinc] # Return the processes in the specified output_format if not return_coincidences: output = sip_process else: output = sip_process, coinc_times return output
def _pool_two_spiketrains(spiketrain_1, spiketrain_2, extremes='inner'): """ Pool the spikes of two spike trains a and b into a unique spike train. Parameters ---------- spiketrain_1, spiketrain_2 : neo.SpikeTrain Spiketrains to be pooled. extremes : {'inner', 'outer'}, optional Only spikes of a and b in the specified extremes are considered. * 'inner': pool all spikes from max(a.tstart_ b.t_start) to min(a.t_stop, b.t_stop) * 'outer': pool all spikes from min(a.tstart_ b.t_start) to max(a.t_stop, b.t_stop) Default: 'inner' Returns ------- neo.SpikeTrain containing all spikes of the two input spiketrains falling in the specified extremes """ unit = spiketrain_1.units spiketrain_2 = spiketrain_2.rescale(unit) times_1_dimless = spiketrain_1.magnitude times_2_dimless = spiketrain_2.rescale(unit).magnitude times = np.sort(np.concatenate((times_1_dimless, times_2_dimless))) if extremes == 'outer': t_start = min(spiketrain_1.t_start, spiketrain_2.t_start) t_stop = max(spiketrain_1.t_stop, spiketrain_2.t_stop) elif extremes == 'inner': t_start = max(spiketrain_1.t_start, spiketrain_2.t_start) t_stop = min(spiketrain_1.t_stop, spiketrain_2.t_stop) times = times[times > t_start.magnitude] times = times[times < t_stop.magnitude] else: raise ValueError( 'extremes (%s) can only be "inner" or "outer"' % extremes) return neo.SpikeTrain(times=times, units=unit, t_start=t_start, t_stop=t_stop) def _pool_spiketrains(spiketrains, extremes='inner'): """ Pool spikes from any number of spike trains into a unique spike train. Parameters ---------- spiketrains: list of neo.SpikeTrain A list of spiketrains to merge. extremes: str, optional Only spikes of a and b in the specified extremes are considered. * 'inner': pool all spikes from min(a.t_start b.t_start) to max(a.t_stop, b.t_stop) * 'outer': pool all spikes from max(a.tstart_ b.t_start) to min(a.t_stop, b.t_stop) Default: 'inner' Returns ------- neo.SpikeTrain containing all spikes in trains falling in the specified extremes """ merge_trains = spiketrains[0] for spiketrain in spiketrains[1:]: merge_trains = _pool_two_spiketrains( merge_trains, spiketrain, extremes=extremes) t_start, t_stop = merge_trains.t_start, merge_trains.t_stop merge_trains = sorted(merge_trains) merge_trains = np.squeeze(merge_trains) merge_trains = neo.SpikeTrain( merge_trains, t_stop=t_stop, t_start=t_start, units=spiketrains[0].units) return merge_trains def _sample_int_from_pdf(probability_density, n_samples): """ Draw n independent samples from the set {0,1,...,L}, where L=len(a)-1, according to the probability distribution a. a[j] is the probability to sample j, for each j from 0 to L. Parameters ---------- probability_density : np.ndarray Probability vector (i..e array of sum 1) that at each entry j carries the probability to sample j (j=0,1,...,len(a)-1). n_samples : int Number of samples generated with the function Returns ------- np.ndarray An array of n samples taking values between `0` and `n=len(a)-1`. """ cumulative_distribution = np.cumsum(probability_density) random_uniforms = np.random.uniform(0, 1, size=n_samples) random_uniforms = np.repeat(np.expand_dims(random_uniforms, axis=1), repeats=len(probability_density), axis=1) return (cumulative_distribution < random_uniforms).sum(axis=1) def _mother_proc_cpp_stat(A, t_stop, rate, t_start=0 * pq.ms): """ Generate the hidden ("mother") Poisson process for a Compound Poisson Process (CPP). Parameters ---------- A : np.ndarray Amplitude distribution. A[j] represents the probability of a synchronous event of size j. The sum over all entries of a must be equal to one. t_stop : pq.Quantity The stopping time of the mother process rate : pq.Quantity Homogeneous rate of the n spike trains that will be generated by the CPP function t_start : pq.Quantity, optional The starting time of the mother process Default: 0 pq.ms Returns ------- Poisson spike train representing the mother process generating the CPP """ n_spiketrains = len(A) - 1 # expected amplitude exp_amplitude = np.dot(A, np.arange(n_spiketrains + 1)) # expected rate of the mother process exp_mother_rate = (n_spiketrains * rate) / exp_amplitude return homogeneous_poisson_process( rate=exp_mother_rate, t_stop=t_stop, t_start=t_start) def _cpp_hom_stat(A, t_stop, rate, t_start=0 * pq.ms): """ Generate a Compound Poisson Process (CPP) with amplitude distribution A and heterogeneous firing rates r=r[0], r[1], ..., r[-1]. Parameters ---------- A : np.ndarray Amplitude distribution. A[j] represents the probability of a synchronous event of size j. The sum over all entries of A must be equal to one. t_stop : pq.Quantity The end time of the output spike trains rate : pq.Quantity Average rate of each spike train generated t_start : pq.Quantity, optional The start time of the output spike trains Default: 0 pq.ms Returns ------- list of neo.SpikeTrain with n elements, having average firing rate r and correlated such to form a CPP with amplitude distribution a """ # Generate mother process and associated spike labels mother = _mother_proc_cpp_stat( A=A, t_stop=t_stop, rate=rate, t_start=t_start) labels = _sample_int_from_pdf(A, len(mother)) n_spiketrains = len(A) - 1 # Number of trains in output spiketrains = [[]] * n_spiketrains try: # Faster but more memory-consuming approach n_mother_trains = len(mother) # number of spikes in the mother process spike_matrix = np.zeros((n_spiketrains, n_mother_trains), dtype=bool) # for each spike, take its label for spike_id, label in enumerate(labels): # choose label random trains train_ids = np.random.choice(n_spiketrains, label, replace=False) # and set the spike matrix for that train for train_id in train_ids: spike_matrix[train_id, spike_id] = True # and spike to True for train_id, row in enumerate(spike_matrix): spiketrains[train_id] = mother[row].view(pq.Quantity) except MemoryError: # Slower (~2x) but less memory-consuming approach print('memory case') for mother_spiketrain, label in zip(mother, labels): train_ids = np.random.choice(n_spiketrains, label) for train_id in train_ids: spiketrains[train_id].append(mother_spiketrain) return [neo.SpikeTrain(times=spiketrain, t_start=t_start, t_stop=t_stop) for spiketrain in spiketrains] def _cpp_het_stat(A, t_stop, rates, t_start=0. * pq.ms): """ Generate a Compound Poisson Process (CPP) with amplitude distribution A and heterogeneous firing rates r=r[0], r[1], ..., r[-1]. Parameters ---------- A : np.ndarray CPP's amplitude distribution. A[j] represents the probability of a synchronous event of size j among the generated spike trains. The sum over all entries of A must be equal to one. t_stop : pq.Quantity The end time of the output spike trains rates : pq.Quantity Array of firing rates of each spike train generated with t_start : pq.Quantity, optional The start time of the output spike trains Default: 0 pq.ms Returns ------- list of neo.SpikeTrain List of neo.SpikeTrains with different firing rates, forming a CPP with amplitude distribution `A`. """ # Computation of Parameters of the two CPPs that will be merged # (uncorrelated with heterog. rates + correlated with homog. rates) n_spiketrains = len(rates) # number of output spike trains # amplitude expectation expected_amplitude = np.dot(A, np.arange(n_spiketrains + 1)) r_sum = np.sum(rates) # sum of all output firing rates r_min = np.min(rates) # minimum of the firing rates # rate of the uncorrelated CPP r_uncorrelated = r_sum - n_spiketrains * r_min # rate of the correlated CPP r_correlated = r_sum / expected_amplitude - r_uncorrelated # rate of the hidden mother process r_mother = r_uncorrelated + r_correlated # Check the analytical constraint for the amplitude distribution if A[1] < (r_uncorrelated / r_mother).rescale( pq.dimensionless).magnitude: raise ValueError('A[1] too small / A[i], i>1 too high') # Compute the amplitude distribution of the correlated CPP, and generate it A = A * (r_mother / r_correlated).magnitude A[1] = A[1] - r_uncorrelated / r_correlated compound_poisson_spiketrains = _cpp_hom_stat( A, t_stop, r_min, t_start) # Generate the independent heterogeneous Poisson processes poisson_spiketrains = \ [homogeneous_poisson_process(rate - r_min, t_start, t_stop) for rate in rates] # Pool the correlated CPP and the corresponding Poisson processes return [_pool_two_spiketrains(compound_poisson_spiketrain, poisson_spiketrain) for compound_poisson_spiketrain, poisson_spiketrain in zip(compound_poisson_spiketrains, poisson_spiketrains)]
[docs]@deprecated_alias(A='amplitude_distribution') def compound_poisson_process( rate, amplitude_distribution, t_stop, shift=None, t_start=0 * pq.ms): """ Generate a Compound Poisson Process (CPP; see _[1]) with a given `amplitude_distribution` :math:`A` and stationary marginal rates `rate`. The CPP process is a model for parallel, correlated processes with Poisson spiking statistics at pre-defined firing rates. It is composed of `len(A)-1` spike trains with a correlation structure determined by the amplitude distribution :math:`A`: A[j] is the probability that a spike occurs synchronously in any `j` spike trains. The CPP is generated by creating a hidden mother Poisson process, and then copying spikes of the mother process to `j` of the output spike trains with probability `A[j]`. Note that this function decorrelates the firing rate of each SpikeTrain from the probability for that SpikeTrain to participate in a synchronous event (which is uniform across SpikeTrains). Parameters ---------- rate : pq.Quantity Average rate of each spike train generated. Can be: - a single value, all spike trains will have same rate rate - an array of values (of length len(A)-1), each indicating the firing rate of one process in output amplitude_distribution : np.ndarray or list CPP's amplitude distribution :math:`A`. `A[j]` represents the probability of a synchronous event of size `j` among the generated spike trains. The sum over all entries of :math:`A` must be equal to one. t_stop : pq.Quantity The end time of the output spike trains. shift : pq.Quantity, optional If `None`, the injected synchrony is exact. If shift is a `pq.Quantity`, all the spike trains are shifted independently by a random amount in the interval `[-shift, +shift]`. Default: None t_start : pq.Quantity, optional The `t_start` time of the output spike trains. Default: 0 pq.ms Returns ------- list of neo.SpikeTrain SpikeTrains with specified firing rates forming the CPP with amplitude distribution :math:`A`. References ---------- .. [1] Staude, Rotter, Gruen (2010) J Comput Neurosci 29:327-350. """ if not isinstance(amplitude_distribution, np.ndarray): amplitude_distribution = np.array(amplitude_distribution) # Check A is a probability distribution (it sums to 1 and is positive) if abs(sum(amplitude_distribution) - 1) > np.finfo('float').eps: raise ValueError( "'amplitude_distribution' must be a probability vector: " "sum(A) = {} != 1".format(sum(amplitude_distribution))) if np.any(amplitude_distribution < 0): raise ValueError("'amplitude_distribution' must be a probability " "vector with positive entries") # Check that the rate is not an empty pq.Quantity if rate.ndim == 1 and len(rate) == 0: raise ValueError('Rate is an empty pq.Quantity array') # Return empty spike trains for specific parameters if amplitude_distribution[0] == 1 or np.sum(np.abs(rate.magnitude)) == 0: return [neo.SpikeTrain([] * t_stop.units, t_stop=t_stop, t_start=t_start)] * ( len(amplitude_distribution) - 1) # Homogeneous rates if rate.ndim == 0: compound_poisson_spiketrains = _cpp_hom_stat( A=amplitude_distribution, t_stop=t_stop, rate=rate, t_start=t_start) # Heterogeneous rates else: compound_poisson_spiketrains = _cpp_het_stat( A=amplitude_distribution, t_stop=t_stop, rates=rate, t_start=t_start) if shift is not None: # Dither the output spiketrains compound_poisson_spiketrains = \ [dither_spike_train(spiketrain, shift=shift, edges=True)[0] for spiketrain in compound_poisson_spiketrains] return compound_poisson_spiketrains
# Alias for :func:`compound_poisson_process` cpp = compound_poisson_process