# -*- coding: utf-8 -*-
"""
This module allows to convert standard data representations
(e.g., a spike train stored as Neo SpikeTrain object)
into other representations useful to perform calculations on the data.
An example is the representation of a spike train as a sequence of 0-1 values
(binned spike train).
.. autosummary::
:toctree: _toctree/conversion
BinnedSpikeTrain
BinnedSpikeTrainView
binarize
Examples
********
>>> import neo
>>> import quantities as pq
>>> from elephant.conversion import BinnedSpikeTrain
>>> spiketrains = [
... neo.SpikeTrain([0.5, 0.7, 1.2, 3.1, 4.3, 5.5, 6.7], t_stop=9, units='s'),
... neo.SpikeTrain([0.1, 0.7, 1.2, 2.2, 4.3, 5.5, 8.0], t_stop=9, units='s')
... ]
>>> bst = BinnedSpikeTrain(spiketrains, bin_size=1 * pq.s)
>>> bst # doctest: +ELLIPSIS
BinnedSpikeTrain(t_start=0.0 s, t_stop=9.0 s, bin_size=1.0 s; shape=(2, 9), ...
>>> bst.to_array()
array([[2, 1, 0, 1, 1, 1, 1, 0, 0],
[2, 1, 1, 0, 1, 1, 0, 0, 1]], dtype=int32)
Binarizing the binned matrix.
>>> bst.to_bool_array()
array([[ True, True, False, True, True, True, True, False, False],
[ True, True, True, False, True, True, False, False, True]])
>>> bst_binary = bst.binarize()
>>> bst_binary # doctest: +ELLIPSIS
BinnedSpikeTrainView(t_start=0.0 s, t_stop=9.0 s, bin_size=1.0 s; shape=(2, ...
>>> bst_binary.to_array()
array([[1, 1, 0, 1, 1, 1, 1, 0, 0],
[1, 1, 1, 0, 1, 1, 0, 0, 1]], dtype=int32)
Slicing.
>>> bst.time_slice(t_stop=3.5 * pq.s) # doctest: +ELLIPSIS
BinnedSpikeTrainView(t_start=0.0 s, t_stop=3.0 s, bin_size=1.0 s; shape=(2, ...
>>> bst[0, 1:-3] # doctest: +ELLIPSIS
BinnedSpikeTrainView(t_start=1.0 s, t_stop=6.0 s, bin_size=1.0 s; shape=(1, ...
Generate a realisation of spike trains from the binned version.
>>> print(bst.to_spike_trains(spikes='center')[0])
[0.33333333 0.66666667 1.5 3.5 4.5 5.5
6.5 ] s
>>> print(bst.to_spike_trains(spikes='center')[1])
[0.33333333 0.66666667 1.5 2.5 4.5 5.5
8.5 ] s
Check the correctness of a spike trains realosation
>>> BinnedSpikeTrain(bst.to_spike_trains(), bin_size=bst.bin_size) == bst
True
Rescale the units of a binned spike train without changing the data.
>>> bst.rescale('ms')
>>> bst # doctest: +ELLIPSIS
BinnedSpikeTrain(t_start=0.0 ms, t_stop=9000.0 ms, bin_size=1000.0 ms; ...
:copyright: Copyright 2014-2024 by the Elephant team, see `doc/authors.rst`.
:license: BSD, see LICENSE.txt for details.
"""
from __future__ import division, print_function, unicode_literals
import math
import warnings
import neo
import numpy as np
import quantities as pq
import scipy.sparse as sps
from elephant.utils import is_binary, is_time_quantity, \
check_neo_consistency, round_binning_errors
__all__ = [
"binarize",
"BinnedSpikeTrain"
]
[docs]
def binarize(spiketrain, sampling_rate=None, t_start=None, t_stop=None,
return_times=False):
"""
Return an array indicating if spikes occurred at individual time points.
The array contains boolean values identifying whether at least one spike
occurred in the corresponding time bin. Time bins start at `t_start`
and end at `t_stop`, spaced in `1/sampling_rate` intervals.
Accepts either a `neo.SpikeTrain`, a `pq.Quantity` array, or a plain
`np.ndarray`.
Returns a boolean array with each element indicating the presence or
absence of a spike in that time bin.
Optionally also returns an array of time points corresponding to the
elements of the boolean array. The units of this array will be the same as
the units of the neo.SpikeTrain, if any.
Parameters
----------
spiketrain : neo.SpikeTrain or pq.Quantity or np.ndarray
The spike times. Does not have to be sorted.
sampling_rate : float or pq.Quantity, optional
The sampling rate to use for the time points.
If not specified, retrieved from the `sampling_rate` attribute of
`spiketrain`.
Default: None
t_start : float or pq.Quantity, optional
The start time to use for the time points.
If not specified, retrieved from the `t_start` attribute of
`spiketrain`. If this is not present, defaults to `0`. Any element of
`spiketrain` lower than `t_start` is ignored.
Default: None
t_stop : float or pq.Quantity, optional
The stop time to use for the time points.
If not specified, retrieved from the `t_stop` attribute of
`spiketrain`. If this is not present, defaults to the maximum value of
`spiketrain`. Any element of `spiketrain` higher than `t_stop` is
ignored.
Default: None
return_times : bool, optional
If True, also return the corresponding time points.
Default: False
Returns
-------
values : np.ndarray of bool
A True value at a particular index indicates the presence of one or
more spikes at the corresponding time point.
times : np.ndarray or pq.Quantity, optional
The time points. This will have the same units as `spiketrain`.
If `spiketrain` has no units, this will be an `np.ndarray` array.
Raises
------
TypeError
If `spiketrain` is an `np.ndarray` and `t_start`, `t_stop`, or
`sampling_rate` is a `pq.Quantity`.
ValueError
If `sampling_rate` is not explicitly defined and not present as an
attribute of `spiketrain`.
Notes
-----
Spike times are placed in the bin of the closest time point, going to the
higher bin if exactly between two bins.
So in the case where the bins are `5.5` and `6.5`, with the spike time
being `6.0`, the spike will be placed in the `6.5` bin.
The upper edge of the last bin, equal to `t_stop`, is inclusive. That is,
a spike time exactly equal to `t_stop` will be included.
If `spiketrain` is a `pq.Quantity` or `neo.SpikeTrain` and `t_start`,
`t_stop` or `sampling_rate` is not, then the arguments that are not
`pq.Quantity` will be assumed to have the same units as `spiketrain`.
"""
# get the values from spiketrain if they are not specified.
if sampling_rate is None:
sampling_rate = getattr(spiketrain, 'sampling_rate', None)
if sampling_rate is None:
raise ValueError('sampling_rate must either be explicitly defined '
'or must be an attribute of spiketrain')
if t_start is None:
t_start = getattr(spiketrain, 't_start', 0)
if t_stop is None:
t_stop = getattr(spiketrain, 't_stop', np.max(spiketrain))
# we don't actually want the sampling rate, we want the sampling period
sampling_period = 1. / sampling_rate
# figure out what units, if any, we are dealing with
if hasattr(spiketrain, 'units'):
units = spiketrain.units
spiketrain = spiketrain.magnitude
else:
units = None
# convert everything to the same units, then get the magnitude
if hasattr(sampling_period, 'units'):
if units is None:
raise TypeError('sampling_period cannot be a Quantity if '
'spiketrain is not a quantity')
sampling_period = sampling_period.rescale(units).magnitude
if hasattr(t_start, 'units'):
if units is None:
raise TypeError('t_start cannot be a Quantity if '
'spiketrain is not a quantity')
t_start = t_start.rescale(units).magnitude
if hasattr(t_stop, 'units'):
if units is None:
raise TypeError('t_stop cannot be a Quantity if '
'spiketrain is not a quantity')
t_stop = t_stop.rescale(units).magnitude
# figure out the bin edges
edges = np.arange(t_start - sampling_period / 2,
t_stop + sampling_period * 3 / 2,
sampling_period)
# we don't want to count any spikes before t_start or after t_stop
if edges[-2] > t_stop:
edges = edges[:-1]
if edges[1] < t_start:
edges = edges[1:]
edges[0] = t_start
edges[-1] = t_stop
# this is where we actually get the binarized spike train
res = np.histogram(spiketrain, edges)[0].astype('bool')
# figure out what to output
if not return_times:
return res
if units is None:
return res, np.arange(t_start, t_stop + sampling_period,
sampling_period)
return res, pq.Quantity(np.arange(t_start, t_stop + sampling_period,
sampling_period), units=units)
###########################################################################
#
# Methods to calculate parameters, t_start, t_stop, bin size,
# number of bins
#
###########################################################################
[docs]
class BinnedSpikeTrain(object):
"""
Class which calculates a binned spike train and provides methods to
transform the binned spike train to a boolean matrix or a matrix with
counted time points.
A binned spike train represents the occurrence of spikes in a certain time
frame.
I.e., a time series like [0.5, 0.7, 1.2, 3.1, 4.3, 5.5, 6.7] is
represented as [0, 0, 1, 3, 4, 5, 6]. The outcome is dependent on given
parameter such as size of bins, number of bins, start and stop points.
A boolean matrix represents the binned spike train in a binary (True/False)
manner. Its rows represent the number of spike trains and the columns
represent the binned index position of a spike in a spike train.
The calculated matrix entry containing `True` indicates a spike.
A matrix with counted time points is calculated the same way, but its
entries contain the number of spikes that occurred in the given bin of the
given spike train.
Note that with most common parameter combinations spike times can end up
on bin edges. This makes the binning susceptible to rounding errors which
is accounted for by moving spikes which are within tolerance of the next
bin edge into the following bin. This can be adjusted using the tolerance
parameter and turned off by setting `tolerance=None`.
Parameters
----------
spiketrains : neo.SpikeTrain or list of neo.SpikeTrain or np.ndarray
Spike train(s) to be binned.
bin_size : pq.Quantity, optional
Width of a time bin.
Default: None
n_bins : int, optional
Number of bins of the binned spike train.
Default: None
t_start : pq.Quantity, optional
Time of the left edge of the first bin (left extreme; included).
Default: None
t_stop : pq.Quantity, optional
Time of the right edge of the last bin (right extreme; excluded).
Default: None
tolerance : float, optional
Tolerance for rounding errors in the binning process and in the input
data
Default: 1e-8
sparse_format : {'csr', 'csc'}, optional
The sparse matrix format. By default, CSR format is used to perform
slicing and computations efficiently.
Default: 'csr'
Raises
------
AttributeError
If less than 3 optional parameters are `None`.
TypeError
If `spiketrains` is an np.ndarray with dimensionality different than
NxM or
if type of `n_bins` is not an `int` or `n_bins` < 0.
ValueError
When number of bins calculated from `t_start`, `t_stop` and `bin_size`
differs from provided `n_bins` or
if `t_stop` of any spike train is smaller than any `t_start` or
if any spike train does not cover the full [`t_start`, t_stop`] range.
Warns
-----
UserWarning
If some spikes fall outside of [`t_start`, `t_stop`] range
Notes
-----
There are four minimal configurations of the optional parameters which have
to be provided, otherwise a `ValueError` will be raised:
* t_start, n_bins, bin_size
* t_start, n_bins, t_stop
* t_start, bin_size, t_stop
* t_stop, n_bins, bin_size
If `spiketrains` is a `neo.SpikeTrain` or a list thereof, it is enough to
explicitly provide only one parameter: `n_bins` or `bin_size`. The
`t_start` and `t_stop` will be calculated from given `spiketrains` (max
`t_start` and min `t_stop` of `neo.SpikeTrain`s`).
Missing parameter will be calculated automatically.
All parameters will be checked for consistency. A corresponding error will
be raised, if one of the four parameters does not match the consistency
requirements.
"""
[docs]
def __init__(self, spiketrains, bin_size=None, n_bins=None, t_start=None,
t_stop=None, tolerance=1e-8, sparse_format="csr"):
if sparse_format not in ("csr", "csc"):
raise ValueError(f"Invalid 'sparse_format': {sparse_format}. "
"Available: 'csr' and 'csc'")
# Converting spiketrains to a list, if spiketrains is one
# SpikeTrain object
if isinstance(spiketrains, neo.SpikeTrain):
spiketrains = [spiketrains]
# The input params will be rescaled later to unit-less floats
self.tolerance = tolerance
self._t_start = t_start
self._t_stop = t_stop
self.n_bins = n_bins
self._bin_size = bin_size
self.units = None # will be set later
# Check all parameter, set also missing values
self._resolve_input_parameters(spiketrains)
# Now create the sparse matrix
self.sparse_matrix = self._create_sparse_matrix(
spiketrains, sparse_format=sparse_format)
@property
def shape(self):
"""
The shape of the sparse matrix.
"""
return self.sparse_matrix.shape
@property
def bin_size(self):
"""
Bin size quantity.
"""
return pq.Quantity(self._bin_size, units=self.units, copy=False)
@property
def t_start(self):
"""
t_start quantity; spike times below this value have been ignored.
"""
return pq.Quantity(self._t_start, units=self.units, copy=False)
@property
def t_stop(self):
"""
t_stop quantity; spike times above this value have been ignored.
"""
return pq.Quantity(self._t_stop, units=self.units, copy=False)
def __repr__(self):
return f"{type(self).__name__}(t_start={str(self.t_start)}, " \
f"t_stop={str(self.t_stop)}, bin_size={str(self.bin_size)}; " \
f"shape={self.shape}, " \
f"format={self.sparse_matrix.__class__.__name__})"
def rescale(self, units):
"""
Inplace rescaling to the new quantity units.
Parameters
----------
units : pq.Quantity or str
New quantity units.
Raises
------
TypeError
If the input units are not quantities.
"""
if isinstance(units, str):
units = pq.Quantity(1, units=units)
if units == self.units:
# do nothing
return
if not isinstance(units, pq.Quantity):
raise TypeError("The input units must be quantities or string")
scale = self.units.rescale(units).item()
self._t_stop *= scale
self._t_start *= scale
self._bin_size *= scale
self.units = units
def __resolve_binned(self, spiketrains):
spiketrains = np.asarray(spiketrains)
if spiketrains.ndim != 2 or spiketrains.dtype == np.dtype('O'):
raise ValueError("If the input is not a spiketrain(s), it "
"must be an MxN numpy array, each cell of "
"which represents the number of (binned) "
"spikes that fall in an interval - not "
"raw spike times.")
if self.n_bins is not None:
raise ValueError("When the input is a binned matrix, 'n_bins' "
"must be set to None - it's extracted from the "
"input shape.")
self.n_bins = spiketrains.shape[1]
if self._bin_size is None:
if self._t_start is None or self._t_stop is None:
raise ValueError("To determine the bin size, both 't_start' "
"and 't_stop' must be set")
self._bin_size = (self._t_stop - self._t_start) / self.n_bins
if self._t_start is None and self._t_stop is None:
raise ValueError("Either 't_start' or 't_stop' must be set")
if self._t_start is None:
self._t_start = self._t_stop - self._bin_size * self.n_bins
if self._t_stop is None:
self._t_stop = self._t_start + self._bin_size * self.n_bins
def _resolve_input_parameters(self, spiketrains):
"""
Calculates `t_start`, `t_stop` from given spike trains.
The start and stop points are calculated from given spike trains only
if they are not calculable from given parameters or the number of
parameters is less than three.
Parameters
----------
spiketrains : neo.SpikeTrain or list or np.ndarray of neo.SpikeTrain
"""
def get_n_bins():
n_bins = (self._t_stop - self._t_start) / self._bin_size
if isinstance(n_bins, pq.Quantity):
n_bins = n_bins.simplified.item()
n_bins = round_binning_errors(n_bins, tolerance=self.tolerance)
return n_bins
def check_n_bins_consistency():
if self.n_bins != get_n_bins():
raise ValueError(
"Inconsistent arguments: t_start ({t_start}), "
"t_stop ({t_stop}), bin_size ({bin_size}), and "
"n_bins ({n_bins})".format(
t_start=self.t_start, t_stop=self.t_stop,
bin_size=self.bin_size, n_bins=self.n_bins))
def check_consistency():
if self.t_start >= self.t_stop:
raise ValueError("t_start must be smaller than t_stop")
if not isinstance(self.n_bins, int) or self.n_bins <= 0:
raise TypeError("The number of bins ({}) must be a positive "
"integer".format(self.n_bins))
if not _check_neo_spiketrain(spiketrains):
# a binned numpy matrix
self.__resolve_binned(spiketrains)
self.units = self._bin_size.units
check_n_bins_consistency()
check_consistency()
self._t_start = self._t_start.rescale(self.units).item()
self._t_stop = self._t_stop.rescale(self.units).item()
self._bin_size = self._bin_size.rescale(self.units).item()
return
if self._bin_size is None and self.n_bins is None:
raise ValueError("Either 'bin_size' or 'n_bins' must be given")
try:
check_neo_consistency(spiketrains,
object_type=neo.SpikeTrain,
t_start=self._t_start,
t_stop=self._t_stop,
tolerance=self.tolerance)
except ValueError as er:
# different t_start/t_stop
raise ValueError(er, "If you want to bin over the shared "
"[t_start, t_stop] interval, provide "
"shared t_start and t_stop explicitly, "
"which can be obtained like so: "
"t_start, t_stop = elephant.utils."
"get_common_start_stop_times(spiketrains)"
)
if self._t_start is None:
self._t_start = spiketrains[0].t_start
if self._t_stop is None:
self._t_stop = spiketrains[0].t_stop
# At this point, all spiketrains share the same units.
self.units = spiketrains[0].units
# t_start and t_stop are checked to be time quantities in the
# check_neo_consistency call.
self._t_start = self._t_start.rescale(self.units).item()
self._t_stop = self._t_stop.rescale(self.units).item()
start_shared = max(st.t_start.rescale(self.units).item()
for st in spiketrains)
stop_shared = min(st.t_stop.rescale(self.units).item()
for st in spiketrains)
tolerance = self.tolerance
if tolerance is None:
tolerance = 0
if self._t_start < start_shared - tolerance \
or self._t_stop > stop_shared + tolerance:
raise ValueError("'t_start' ({t_start}) or 't_stop' ({t_stop}) is "
"outside of the shared [{start_shared}, "
"{stop_shared}] interval".format(
t_start=self.t_start, t_stop=self.t_stop,
start_shared=start_shared,
stop_shared=stop_shared))
if self.n_bins is None:
# bin_size is provided
self._bin_size = self._bin_size.rescale(self.units).item()
self.n_bins = get_n_bins()
elif self._bin_size is None:
# n_bins is provided
self._bin_size = (self._t_stop - self._t_start) / self.n_bins
else:
# both n_bins are bin_size are given
self._bin_size = self._bin_size.rescale(self.units).item()
check_n_bins_consistency()
check_consistency()
@property
def bin_edges(self):
"""
Returns all time edges as a quantity array with :attr:`n_bins` bins.
The borders of all time steps between :attr:`t_start` and
:attr:`t_stop` with a step :attr:`bin_size`. It is crucial for many
analyses that all bins have the same size, so if
:attr:`t_stop` - :attr:`t_start` is not divisible by :attr:`bin_size`,
there will be some leftover time at the end
(see https://github.com/NeuralEnsemble/elephant/issues/255).
The length of the returned array should match :attr:`n_bins`.
Returns
-------
bin_edges : pq.Quantity
All edges in interval [:attr:`t_start`, :attr:`t_stop`] with
:attr:`n_bins` bins are returned as a quantity array.
"""
bin_edges = np.linspace(self._t_start, self._t_start + self.n_bins *
self._bin_size,
num=self.n_bins + 1, endpoint=True)
return pq.Quantity(bin_edges, units=self.units, copy=False)
@property
def bin_centers(self):
"""
Returns each center time point of all bins between :attr:`t_start` and
:attr:`t_stop` points.
The center of each bin of all time steps between start and stop.
Returns
-------
bin_edges : pq.Quantity
All center edges in interval (:attr:`start`, :attr:`stop`).
"""
start = self._t_start + self._bin_size / 2
stop = start + (self.n_bins - 1) * self._bin_size
bin_centers = np.linspace(start=start,
stop=stop,
num=self.n_bins, endpoint=True)
bin_centers = pq.Quantity(bin_centers, units=self.units, copy=False)
return bin_centers
def to_sparse_bool_array(self):
"""
Getter for boolean version of the sparse matrix, calculated from
sparse matrix with counted time points.
Returns
-------
scipy.sparse.csr_matrix or scipy.sparse.csc_matrix
Sparse matrix, binary, boolean version.
See also
--------
scipy.sparse.csr_matrix
to_bool_array
"""
# Return sparse Matrix as a copy
spmat_copy = self.sparse_matrix.copy()
spmat_copy.data = spmat_copy.data.astype(bool)
return spmat_copy
def __eq__(self, other):
if not isinstance(other, BinnedSpikeTrain):
return False
if self.n_bins != other.n_bins:
return
dt_start = other.t_start.rescale(self.units).item() - self._t_start
dt_stop = other.t_stop.rescale(self.units).item() - self._t_stop
dbin_size = other.bin_size.rescale(self.units).item() - self._bin_size
tol = 0 if self.tolerance is None else self.tolerance
if any(abs(diff) > tol for diff in [dt_start, dt_stop, dbin_size]):
return False
sp1 = self.sparse_matrix
sp2 = other.sparse_matrix
if sp1.__class__ is not sp2.__class__ or sp1.shape != sp2.shape \
or sp1.data.shape != sp2.data.shape:
return False
return (sp1.data == sp2.data).all() and \
(sp1.indptr == sp2.indptr).all() and \
(sp1.indices == sp2.indices).all()
def copy(self):
"""
Copies the binned sparse matrix and returns a view. Any changes to
the copied object won't affect the original object.
Returns
-------
BinnedSpikeTrainView
A copied view of itself.
"""
return BinnedSpikeTrainView(t_start=self._t_start,
t_stop=self._t_stop,
bin_size=self._bin_size,
units=self.units,
sparse_matrix=self.sparse_matrix.copy(),
tolerance=self.tolerance)
def __iter_sparse_matrix(self):
spmat = self.sparse_matrix
if isinstance(spmat, sps.csc_matrix):
warnings.warn("The sparse matrix format is CSC. For better "
"performance, specify the CSR format while "
"constructing a "
"BinnedSpikeTrain(sparse_format='csr')")
spmat = spmat.tocsr()
# taken from csr_matrix.__iter__()
i0 = 0
for i1 in spmat.indptr[1:]:
indices = spmat.indices[i0:i1]
data = spmat.data[i0:i1]
yield indices, data
i0 = i1
def __getitem__(self, item):
"""
Returns a binned slice view of itself; `t_start` and `t_stop` will be
set accordingly to the second slicing argument, if any.
Parameters
----------
item : int or slice or tuple
Spike train and bin index slicing, passed to
``self.sparse_matrix``.
Returns
-------
BinnedSpikeTrainView
A slice of itself that carry the original data. Any changes to
the returned binned sparse matrix will affect the original data.
"""
# taken from csr_matrix.__getitem__
row, col = self.sparse_matrix._validate_indices(item)
spmat = self.sparse_matrix[item]
if np.isscalar(spmat):
# data with one element
spmat = sps.csr_matrix(([spmat], ([0], [0])), dtype=spmat.dtype)
if isinstance(col, (int, np.integer)):
start, stop, stride = col, col + 1, 1
elif isinstance(col, slice):
start, stop, stride = col.indices(self.n_bins)
else:
raise TypeError(f"The second slice argument ({col}), which "
"corresponds to bin indices, must be either int "
"or slice.")
t_start = self._t_start + start * self._bin_size
t_stop = self._t_start + stop * self._bin_size
bin_size = stride * self._bin_size
bst = BinnedSpikeTrainView(t_start=t_start,
t_stop=t_stop,
bin_size=bin_size,
units=self.units,
sparse_matrix=spmat,
tolerance=self.tolerance)
return bst
def __setitem__(self, key, value):
"""
Changes the values of ``self.sparse_matrix`` according to `key` and
`value`. A shortcut to ``self.sparse_matrix[key] = value``.
Parameters
----------
key : int or list or tuple or slice
The binned sparse matrix keys (axes slice) to change.
value : int or list or tuple or slice
New values of the sparse matrix selection.
"""
self.sparse_matrix[key] = value
def time_slice(self, t_start=None, t_stop=None, copy=False):
"""
Returns a view or a copied view of currently binned spike trains with
``(t_start, t_stop)`` time slice. Only valid (fully overlapping) bins
are sliced.
Parameters
----------
t_start, t_stop : pq.Quantity or None, optional
Start and stop times or Nones.
Default: None
copy : bool, optional
Copy the sparse matrix or not.
Default: False
Returns
-------
BinnedSpikeTrainView
A time slice of itself.
"""
if not is_time_quantity(t_start, t_stop, allow_none=True):
raise TypeError("t_start and t_stop must be quantities")
if t_start is None and t_stop is None and not copy:
return self
if t_start is None:
start_index = 0
else:
t_start = t_start.rescale(self.units).item()
start_index = (t_start - self._t_start) / self._bin_size
start_index = math.ceil(start_index)
start_index = max(start_index, 0)
if t_stop is None:
stop_index = self.n_bins
else:
t_stop = t_stop.rescale(self.units).item()
stop_index = (t_stop - self._t_start) / self._bin_size
stop_index = round_binning_errors(stop_index,
tolerance=self.tolerance)
stop_index = min(stop_index, self.n_bins)
stop_index = max(stop_index, start_index)
spmat = self.sparse_matrix[:, start_index: stop_index]
if copy:
spmat = spmat.copy()
t_start = self._t_start + start_index * self._bin_size
t_stop = self._t_start + stop_index * self._bin_size
bst = BinnedSpikeTrainView(t_start=t_start,
t_stop=t_stop,
bin_size=self._bin_size,
units=self.units,
sparse_matrix=spmat,
tolerance=self.tolerance)
return bst
def to_spike_trains(self, spikes="random", as_array=False,
annotate_bins=False):
"""
Generate spike trains from the binned spike train object. This function
is inverse to binning such that
.. code-block:: python
BinnedSpikeTrain(binned_st.to_spike_trains()) == binned_st
The object bin size is stored in resulting
``spiketrain.annotations['bin_size']``.
Parameters
----------
spikes : {"left", "center", "random"}, optional
Specifies how to generate spikes inside bins.
* "left": align spikes from left to right to have equal inter-
spike interval;
* "center": align spikes around center to have equal inter-spike
interval;
* "random": generate spikes from a homogenous Poisson process;
it's the fastest mode.
Default: "random"
as_array : bool, optional
If True, numpy arrays are returned; otherwise, wrap the arrays in
`neo.SpikeTrain`.
Default: False
annotate_bins : bool, optional
If `as_array` is False, this flag allows to include the bin index
in resulting ``spiketrain.array_annotations['bins']``.
Default: False
Returns
-------
spiketrains : list of neo.SpikeTrain
A list of spike trains - one possible realisation of spiketrains
that could have been used as the input to `BinnedSpikeTrain`.
"""
description = f"generated from {self.__class__.__name__}"
shift = 0
if spikes == "center":
shift = 1
spikes = "left"
spiketrains = []
for indices, spike_count in self.__iter_sparse_matrix():
bin_indices = np.repeat(indices, spike_count)
t_starts = self._t_start + bin_indices * self._bin_size
if spikes == "random":
spiketrain = np.random.uniform(low=0, high=self._bin_size,
size=spike_count.sum())
spiketrain += t_starts
spiketrain.sort()
elif spikes == "left":
spiketrain = [np.arange(shift, count + shift) / (count + shift)
for count in spike_count]
spiketrain = np.hstack(spiketrain) * self._bin_size
spiketrain += t_starts
else:
raise ValueError(f"Invalid 'spikes' mode: '{spikes}'")
# account for the last bin
spiketrain = spiketrain[spiketrain <= self._t_stop]
if not as_array:
array_ants = None
if annotate_bins:
array_ants = dict(bins=bin_indices)
spiketrain = neo.SpikeTrain(spiketrain, t_start=self._t_start,
t_stop=self._t_stop,
units=self.units, copy=False,
description=description,
array_annotations=array_ants,
bin_size=self.bin_size)
spiketrains.append(spiketrain)
return spiketrains
def get_num_of_spikes(self, axis=None):
"""
Compute the number of binned spikes.
Parameters
----------
axis : int, optional
If `None`, compute the total num. of spikes.
Otherwise, compute num. of spikes along axis.
If axis is `1`, compute num. of spikes per spike train (row).
Default is `None`.
Returns
-------
n_spikes_per_row : int or np.ndarray
The number of binned spikes.
"""
if axis is None:
return self.sparse_matrix.sum(axis=axis)
n_spikes_per_row = self.sparse_matrix.sum(axis=axis)
n_spikes_per_row = np.ravel(n_spikes_per_row)
return n_spikes_per_row
@property
def spike_indices(self):
"""
A list of lists for each spike train (i.e., rows of the binned matrix),
that in turn contains for each spike the index into the binned matrix
where this spike enters.
In contrast to `self.sparse_matrix.nonzero()`, this function will
report two spikes falling in the same bin as two entries.
Examples
--------
>>> import elephant.conversion as conv
>>> import neo as n
>>> import quantities as pq
>>> st = n.SpikeTrain([0.5, 0.7, 1.2, 3.1, 4.3, 5.5, 6.7] * pq.s,
... t_stop=10.0 * pq.s)
>>> x = conv.BinnedSpikeTrain(st, n_bins=10, bin_size=1 * pq.s,
... t_start=0 * pq.s)
>>> print(x.spike_indices)
[array([0, 0, 1, 3, 4, 5, 6], dtype=int32)]
>>> print(x.sparse_matrix.nonzero()[1])
[0 1 3 4 5 6]
>>> print(x.to_array())
[[2 1 0 1 1 1 1 0 0 0]]
"""
spike_idx = []
for indices, spike_count in self.__iter_sparse_matrix():
# Extract each non-zeros column index and how often it exists,
# i.e., how many spikes fall in this column
n_cols = np.repeat(indices, spike_count)
spike_idx.append(n_cols)
return spike_idx
@property
def is_binary(self):
"""
Returns True if the sparse matrix contains binary values only.
Beware, that the function does not know if the input was binary
because e.g `to_bool_array()` was used before or if the input is just
sparse (i.e. only one spike per bin at maximum).
Returns
-------
bool
True for binary input, False otherwise.
"""
return is_binary(self.sparse_matrix.data)
def to_bool_array(self):
"""
Returns a matrix, in which the rows correspond to the spike trains and
the columns correspond to the bins in the `BinnedSpikeTrain`.
`True` indicates a spike in given bin of given spike train and
`False` indicates lack of spikes.
Returns
-------
numpy.ndarray
Returns a dense matrix representation of the sparse matrix,
with `True` indicating a spike and `False` indicating a no-spike.
The columns represent the index position of the bins and rows
represent the number of spike trains.
See also
--------
scipy.sparse.csr_matrix
scipy.sparse.csr_matrix.toarray
Examples
--------
>>> import elephant.conversion as conv
>>> import neo as n
>>> import quantities as pq
>>> a = n.SpikeTrain([0.5, 0.7, 1.2, 3.1, 4.3, 5.5, 6.7] * pq.s,
... t_stop=10.0 * pq.s)
>>> x = conv.BinnedSpikeTrain(a, n_bins=10, bin_size=1 * pq.s,
... t_start=0 * pq.s)
>>> print(x.to_bool_array())
[[ True True False True True True True False False False]]
"""
return self.to_array(dtype=bool)
def to_array(self, dtype=None):
"""
Returns a dense matrix, calculated from the sparse matrix, with counted
time points of spikes. The rows correspond to spike trains and the
columns correspond to bins in a `BinnedSpikeTrain`.
Entries contain the count of spikes that occurred in the given bin of
the given spike train.
Returns
-------
matrix : np.ndarray
Matrix with spike counts. Columns represent the index positions of
the binned spikes and rows represent the spike trains.
Examples
--------
>>> import elephant.conversion as conv
>>> import neo as n
>>> import quantities as pq
>>> a = n.SpikeTrain([0.5, 0.7, 1.2, 3.1, 4.3, 5.5, 6.7] * pq.s,
... t_stop=10.0 * pq.s)
>>> x = conv.BinnedSpikeTrain(a, n_bins=10, bin_size=1 * pq.s,
... t_start=0 * pq.s)
>>> print(x.to_array())
[[2 1 0 1 1 1 1 0 0 0]]
See also
--------
scipy.sparse.csr_matrix
scipy.sparse.csr_matrix.toarray
"""
array = self.sparse_matrix.toarray()
if dtype is not None:
array = array.astype(dtype)
return array
def binarize(self, copy=True):
"""
Clip the internal array (no. of spikes in a bin) to `0` (no spikes) or
`1` (at least one spike) values only.
Parameters
----------
copy : bool, optional
If True, a **shallow** copy - a view of `BinnedSpikeTrain` - is
returned with the data array filled with zeros and ones. Otherwise,
the binarization (clipping) is done in-place. A shallow copy
means that :attr:`indices` and :attr:`indptr` of a sparse matrix
is shared with the original sparse matrix. Only the data is copied.
If you want to perform a deep copy, call
:func:`BinnedSpikeTrain.copy` prior to binarizing.
Default: True
Returns
-------
bst : BinnedSpikeTrain or BinnedSpikeTrainView
A (view of) `BinnedSpikeTrain` with the sparse matrix data clipped
to zeros and ones.
"""
spmat = self.sparse_matrix
if copy:
data = np.ones(len(spmat.data), dtype=spmat.data.dtype)
spmat = spmat.__class__(
(data, spmat.indices, spmat.indptr),
shape=spmat.shape, copy=False)
bst = BinnedSpikeTrainView(t_start=self._t_start,
t_stop=self._t_stop,
bin_size=self._bin_size,
units=self.units,
sparse_matrix=spmat,
tolerance=self.tolerance)
else:
spmat.data[:] = 1
bst = self
return bst
@property
def sparsity(self):
"""
The sparsity of the sparse matrix computed as the no. of nonzero
elements divided by the matrix size.
Returns
-------
float
"""
num_nonzero = self.sparse_matrix.data.shape[0]
shape = self.sparse_matrix.shape
size = shape[0] * shape[1]
return num_nonzero / size
def _create_sparse_matrix(self, spiketrains, sparse_format):
"""
Converts `neo.SpikeTrain` objects to a scipy sparse matrix, which
contains the binned spike times, and
stores it in :attr:`sparse_matrix`.
Parameters
----------
spiketrains : neo.SpikeTrain or list of neo.SpikeTrain
Spike trains to bin.
"""
# The data type for numeric values
data_dtype = np.int32
if sparse_format == 'csr':
sparse_format = sps.csr_matrix
else:
# csc
sparse_format = sps.csc_matrix
if not _check_neo_spiketrain(spiketrains):
# a binned numpy array
sparse_matrix = sparse_format(spiketrains, dtype=data_dtype)
return sparse_matrix
# Get index dtype that can accomodate the largest index
# (this is the same dtype that will be used for the index arrays of the
# sparse matrix, so already using it here avoids array duplication)
shape = (len(spiketrains), self.n_bins)
numtype = np.int32
if max(shape) > np.iinfo(numtype).max:
numtype = np.int64
row_ids, column_ids = [], []
# data
counts = []
n_discarded = 0
# all spiketrains carry the same units
scale_units = 1 / self._bin_size
for idx, st in enumerate(spiketrains):
times = st.magnitude
times = times[(times >= self._t_start) & (
times <= self._t_stop)] - self._t_start
bins = times * scale_units
# shift spikes that are very close
# to the right edge into the next bin
bins = round_binning_errors(bins, tolerance=self.tolerance)
valid_bins = bins[bins < self.n_bins]
n_discarded += len(bins) - len(valid_bins)
f, c = np.unique(valid_bins, return_counts=True)
# f inherits the dtype np.int32 from bins, but c is created in
# np.unique with the default int dtype (usually np.int64)
c = c.astype(data_dtype)
column_ids.append(f)
counts.append(c)
row_ids.append(np.repeat(idx, repeats=len(f)).astype(numtype))
if n_discarded > 0:
warnings.warn("Binning discarded {} last spike(s) of the "
"input spiketrain".format(n_discarded))
# Stacking preserves the data type. In any case, while creating
# the sparse matrix, a copy is performed even if we set 'copy' to False
# explicitly (however, this might change in future scipy versions -
# this depends on scipy csr matrix initialization implementation).
counts = np.hstack(counts)
column_ids = np.hstack(column_ids)
row_ids = np.hstack(row_ids)
sparse_matrix = sparse_format((counts, (row_ids, column_ids)),
shape=shape, dtype=data_dtype,
copy=False)
return sparse_matrix
[docs]
class BinnedSpikeTrainView(BinnedSpikeTrain):
"""
A view of :class:`BinnedSpikeTrain`.
This class is used to avoid deep copies in several functions of a binned
spike train object like :meth:`BinnedSpikeTrain.binarize`,
:meth:`BinnedSpikeTrain.time_slice`, etc.
Parameters
----------
t_start, t_stop : float
Unit-less start and stop times that share the same units.
bin_size : float
Unit-less bin size that was used used in binning the `sparse_matrix`.
units : pq.Quantity
The units of input spike trains.
sparse_matrix : scipy.sparse.csr_matrix
Binned sparse matrix.
tolerance : float or None, optional
The tolerance property of the original `BinnedSpikeTrain`.
Default: 1e-8
Warnings
--------
This class is an experimental feature.
"""
[docs]
def __init__(self, t_start, t_stop, bin_size, units, sparse_matrix,
tolerance=1e-8):
self._t_start = t_start
self._t_stop = t_stop
self._bin_size = bin_size
self.n_bins = sparse_matrix.shape[1]
self.units = units.copy()
self.sparse_matrix = sparse_matrix
self.tolerance = tolerance
def _check_neo_spiketrain(query):
"""
Checks if given input contains neo.SpikeTrain objects
Parameters
----------
query
Object to test for `neo.SpikeTrain` objects
Returns
-------
bool
True if `matrix` is a neo.SpikeTrain or a list or tuple thereof,
otherwise False.
"""
# Check for single spike train
if isinstance(query, neo.SpikeTrain):
return True
# Check for list, tuple, or SpikeTrainList
try:
return all(map(_check_neo_spiketrain, query))
except TypeError:
pass
return False
def discretise_spiketimes(spiketrains, sampling_rate):
"""
Rounds down all spike times in the input spike train(s)
to multiples of the sampling_rate
Parameters
----------
spiketrains : neo.SpikeTrain or list of neo.SpikeTrain
The spiketrain(s) to discretise
sampling_rate : pq.Quantity
The desired sampling rate
Returns
-------
neo.SpikeTrain or list of neo.SpikeTrain
The discretised spiketrain(s)
Examples
--------
>>> import neo
>>> import numpy as np
>>> import quantities as pq
>>> from elephant import conversion
>>>
>>> np.random.seed(1)
>>> times = (np.arange(10) + np.random.uniform(size=10)) * pq.ms
>>> spiketrain = neo.SpikeTrain(times, t_stop=10*pq.ms)
>>>
>>> spiketrain.times
array([0.417022 , 1.72032449, 2.00011437, 3.30233257, 4.14675589,
5.09233859, 6.18626021, 7.34556073, 8.39676747, 9.53881673]) * ms
>>>
>>> discretised_spiketrain = conversion.discretise_spiketimes(spiketrain,
... 1 / pq.ms)
>>> discretised_spiketrain.times
array([0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]) * ms
"""
# spiketrains type check
was_single_spiketrain = False
if isinstance(spiketrains, neo.SpikeTrain):
spiketrains = [spiketrains]
was_single_spiketrain = True
elif isinstance(spiketrains, list):
for st in spiketrains:
if not isinstance(st, (np.ndarray, neo.SpikeTrain)):
raise TypeError(
"spiketrains must be a SpikeTrain, a numpy ndarray, or a "
"list of one of those, not %s." % type(spiketrains))
else:
raise TypeError(
"spiketrains must be a SpikeTrain or a list of SpikeTrain objects,"
" not %s." % type(spiketrains))
if not isinstance(sampling_rate, pq.Quantity):
raise TypeError(
"The 'sampling_rate' must be pq.Quantity.\n"
"Found: %s." % type(sampling_rate))
units = spiketrains[0].times.units
mag_sampling_rate = sampling_rate.rescale(1/units).magnitude.flatten()
new_spiketrains = []
for spiketrain in spiketrains:
mag_t_start = spiketrain.t_start.rescale(units).magnitude.flatten()
mag_times = spiketrain.times.magnitude.flatten()
discrete_times = (mag_times // (1 / mag_sampling_rate)
/ mag_sampling_rate)
mask = discrete_times < mag_t_start
if np.any(mask):
warnings.warn(f'{mask.sum()} spike(s) would be before t_start '
'and are set to t_start instead.')
discrete_times[mask] = mag_t_start
discrete_times *= units
new_spiketrain = spiketrain.duplicate_with_new_data(discrete_times)
new_spiketrain.sampling_rate = sampling_rate
new_spiketrains.append(new_spiketrain)
if was_single_spiketrain:
new_spiketrains = new_spiketrains[0]
return new_spiketrains