import itertools
from typing import Iterable, List, NamedTuple, Union, Optional
import numpy as np
from scipy.signal import oaconvolve
from elephant.conversion import BinnedSpikeTrain
[docs]
def total_spiking_probability_edges(
spike_trains: BinnedSpikeTrain,
surrounding_window_sizes: Optional[List[int]] = None,
observed_window_sizes: Optional[List[int]] = None,
crossover_window_sizes: Optional[List[int]] = None,
max_delay: int = 25,
normalize: bool = False,
):
r"""
Estimate the functional connectivity and delay times of a neural network
using Total Spiking Probability Edges (TSPE).
This algorithm uses a normalized cross correlation between pairs of
spike trains at different delay times to get a cross-correlogram.
Afterwards, a series of convolutions with multiple edge filters
on the cross-correlogram are performed, in order to estimate the
connectivity between neurons and thus allowing the discrimination
between inhibitory and excitatory effects.
The default window sizes and maximum delay were optimized using
in-silico generated spike trains.
**Background:**
- On an excitatory connection the spike rate increases and decreases again
due to the refractory period which results in local maxima in the
cross-correlogram followed by downwards slope.
- On an inhibitory connection the spike rate decreases and after refractory
period, increases again which results in local minima surrounded by high
values in the cross-correlogram.
- An edge filter can be used to interpret the cross-correlogram and
accentuate the local maxima and minima
**Procedure:**
1. Compute normalized cross-correlation :math:`NCC` of spike trains of all
neuron pairs.
2. Convolve :math:`NCC` with edge filter :math:`g_{i}` to compute
:math:`SPE`.
3. Convolve :math:`SPE` with corresponding running total filter
:math:`h_{i}` to account for different lengths after convolution with
edge filter.
4. Compute :math:`TSPE` using the sum of all :math:`SPE` for all different
filter pairs.
5. Compute the connectivity matrix by using the index of the TSPE values
with the highest absolute values.
**Normalized Cross-Correlation:**
.. math::
NCC_{XY}(d) = \frac{1}{N} \sum_{i=-\infty}^{\infty}{ \frac{ (y_{(i)} -
\bar{y}) \cdot (x_{(i-d)} - \bar{x}) }{ \sigma_x \cdot \sigma_y }}
**Edge Filter**
.. math::
g_{(i)} = \begin{cases}
- \frac{1}{a} & 0 \lt i \leq a \ \
\frac{2}{b} & a+c \lt i \leq a + b + c \ \
- \frac{1}{a} & a+b+2c \lt i \leq 2a + b + 2c \ \
0 & \mathrm{otherwise}
\end{cases}
where :math:`a` is the parameter `surrounding_window_size`, :math:`b`
`observed_window_size`, and :math:`c` is the parameter
`crossover_window_size`.
**Spiking Probability Edges**
.. math::
SPE_{X \rightarrow Y(d)} = NCC_{XY}(d) * g(i)
*Total Spiking Probability Edges:*
.. math::
TSPE_{X \rightarrow Y}(d) = \sum_{n=1}^{N_a \cdot N_b \cdot N_c}
{SPE_{X \rightarrow Y}^{(n)}(d) * h(i)^{(n)} }
:cite:`functional_connectivity-de_blasi19_169`
Parameters
----------
spike_trains : (N, ) elephant.conversion.BinnedSpikeTrain
A binned spike train containing all neurons for connectivity estimation
surrounding_window_sizes : List[int]
Array of window sizes for the surrounding area of the point of
interest. This corresponds to parameter `a` of the edge filter in
:cite:`functional_connectivity-de_blasi19_169`. Value is given in units of
the number of bins according to the binned spike trains `spike_trains`.
Default: [3, 4, 5, 6, 7, 8]
observed_window_sizes : List[int]
Array of window sizes for the observed area. This corresponds to
parameter `b` of the edge filter and the length of the running filter
as defined in :cite:`functional_connectivity-de_blasi19_169`. Value is
given in units of the number of bins according to the binned spike trains
`spike_trains`.
Default: [2, 3, 4, 5, 6]
crossover_window_sizes : List[int]
Array of window sizes for the crossover between surrounding and
observed window. This corresponds to parameter `c` of the edge filter in
:cite:`functional_connectivity-de_blasi19_169`. Value is given in units of
the number of bins according to the binned spike trains `spike_trains`.
Default: [0]
max_delay : int
Defines the max delay when performing the normalized cross-correlations.
Value is given in units of the number of bins according to the binned spike
trains `spike_trains`.
Default: 25
normalize : bool, optional
Normalize the output [experimental]. Default: False.
Returns
-------
connectivity_matrix : (N, N) np.ndarray
Square matrix of the connectivity estimation between neurons.
Positive values describe an excitatory connection while
negative values describe an inhibitory connection.
delay_matrix : (N, N) np.ndarray
Square matrix of the estimated delay times between neuron activities.
"""
if not surrounding_window_sizes:
surrounding_window_sizes = [3, 4, 5, 6, 7, 8]
if not observed_window_sizes:
observed_window_sizes = [2, 3, 4, 5, 6]
if not crossover_window_sizes:
crossover_window_sizes = [0]
n_neurons, n_bins = spike_trains.shape
filter_pairs = generate_filter_pairs(
surrounding_window_sizes, observed_window_sizes, crossover_window_sizes
)
# Calculate normalized cross-correlation for different delays.
# The delay range is from 0 to max_delay and includes
# padding for the filter convolution
max_padding = max(surrounding_window_sizes) + max(crossover_window_sizes)
delay_times = list(range(-max_padding, max_delay + max_padding))
NCC_d = normalized_cross_correlation(spike_trains, delay_times=delay_times)
# Normalize to counter network bursts
if normalize:
for delay_time in delay_times:
NCC_d[:, :, delay_time] /= np.sum(
NCC_d[:, :, delay_time][~np.identity(NCC_d.shape[0],
dtype=bool)]
)
# Apply edge and running total filter
tspe_matrix = np.zeros((n_neurons, n_neurons, max_delay))
for filter in filter_pairs:
# Select ncc_window based on needed filter padding
NCC_window = NCC_d[
:,
:,
max_padding
- filter.needed_padding: max_delay
+ max_padding
+ filter.needed_padding,
]
# Compute two convolutions with edge- and running total filter
x1 = oaconvolve(
NCC_window, np.expand_dims(filter.edge_filter, (0, 1)),
mode="valid", axes=2
)
x2 = oaconvolve(
x1, np.expand_dims(filter.running_total_filter, (0, 1)),
mode="full", axes=2
)
tspe_matrix += x2
# Take maxima of absolute of delays to get estimation for connectivity
connectivity_matrix_index = np.argmax(np.abs(tspe_matrix),
axis=2, keepdims=True)
connectivity_matrix = np.take_along_axis(tspe_matrix,
connectivity_matrix_index, axis=2
).squeeze(axis=2)
delay_matrix = connectivity_matrix_index.squeeze()
return connectivity_matrix, delay_matrix
def normalized_cross_correlation(
spike_trains: BinnedSpikeTrain,
delay_times: Union[int, List[int], Iterable[int]] = 0,
) -> np.ndarray:
r"""
Normalized cross correlation using std deviation
Computes the normalized_cross_correlation between all
spike trains inside a `BinnedSpikeTrain` object at a given delay time.
The underlying formula is:
.. math::
NCC_{X\arrY(d)} = \frac{1}{N_{bins}}\sum_{i=-\inf}^{\inf}{
\frac{(y_{(i)} - \bar{y}) \cdot (x_{(i-d) - \bar{x})}{\sigma_x
\cdot \sigma_y}}}
The subtraction of mean-values is omitted, since it offers little added
accuracy but increases the compute-time considerably.
"""
n_neurons, n_bins = spike_trains.shape
# Get sparse array of BinnedSpikeTrain
spike_trains_array = spike_trains.sparse_matrix
# Get std deviation of spike trains
spike_trains_zeroed = spike_trains_array - spike_trains_array.mean(axis=1)
spike_trains_std = np.std(spike_trains_zeroed, ddof=1, axis=1)
std_factors = spike_trains_std @ spike_trains_std.T
# Loop over delay times
if isinstance(delay_times, int):
delay_times = [delay_times]
elif isinstance(delay_times, list):
pass
elif isinstance(delay_times, Iterable):
delay_times = list(delay_times)
NCC_d = np.zeros((len(delay_times), n_neurons, n_neurons))
for index, delay_time in enumerate(delay_times):
# Uses theoretical zero-padding for shifted values,
# but since $0 \cdot x = 0$ values can simply be omitted
if delay_time == 0:
CC = spike_trains_array[:, :] @ spike_trains_array[:, :
].transpose()
elif delay_time > 0:
CC = (
spike_trains_array[:, delay_time:]
@ spike_trains_array[:, :-delay_time].transpose()
)
else:
CC = (
spike_trains_array[:, :delay_time]
@ spike_trains_array[:, -delay_time:].transpose()
)
# Convert CC to dense matrix before performing the division
CC = CC.toarray()
# Normalize using std deviation
NCC = CC / std_factors / n_bins
# Compute cross correlation at given delay time
NCC_d[index, :, :] = NCC
# Move delay_time axis to back of array
# Makes index using neurons more intuitive → (n_neuron, n_neuron,
# delay_times)
NCC_d = np.moveaxis(NCC_d, 0, -1)
return NCC_d
def generate_edge_filter(
surrounding_window_size: int,
observed_window_size: int,
crossover_window_size: int,
) -> np.ndarray:
r"""Generate an edge filter
The edge filter is generated using following piecewise defined function:
a = surrounding_window_size
b = observed_window_size
c = crossover_window_size
.. math::
g_{(i)} = \begin{cases}
- \frac{1}{a} & 0 \lt i \leq a \\
\frac{2}{b} & a+c \lt i \leq a + b + c \\
- \frac{1}{a} & a+b+2c \lt i \leq 2a + b + 2c \ \
0 & \mathrm{otherwise}
\end{cases}
"""
filter_length = (
(2 * surrounding_window_size)
+ observed_window_size
+ (2 * crossover_window_size)
)
i = np.arange(1, filter_length + 1, dtype=np.float64)
conditions = [
(i > 0) & (i <= surrounding_window_size),
(i > (surrounding_window_size + crossover_window_size))
& (i <= surrounding_window_size + observed_window_size +
crossover_window_size),
(
i
> surrounding_window_size
+ observed_window_size
+ (2 * crossover_window_size)
)
& (
i
<= (2 * surrounding_window_size)
+ observed_window_size
+ (2 * crossover_window_size)
),
]
values = [
-(1 / surrounding_window_size),
2 / observed_window_size,
-(1 / surrounding_window_size),
0,
] # Default Value
edge_filter = np.piecewise(i, conditions, values)
return edge_filter
def generate_running_total_filter(observed_window_size: int) -> np.ndarray:
return np.ones(observed_window_size)
class TspeFilterPair(NamedTuple):
edge_filter: np.ndarray
running_total_filter: np.ndarray
needed_padding: int
surrounding_window_size: int
observed_window_size: int
crossover_window_size: int
def generate_filter_pairs(
surrounding_window_sizes: List[int],
observed_window_sizes: List[int],
crossover_window_sizes: List[int],
) -> List[TspeFilterPair]:
"""Generates filter pairs of edge and running total filter using all
permutations of given parameters
"""
filter_pairs = []
for _a, _b, _c in itertools.product(
surrounding_window_sizes, observed_window_sizes, crossover_window_sizes
):
edge_filter = generate_edge_filter(_a, _b, _c)
running_total_filter = generate_running_total_filter(_b)
needed_padding = _a + _c
filter_pair = TspeFilterPair(
edge_filter, running_total_filter, needed_padding, _a, _b, _c
)
filter_pairs.append(filter_pair)
return filter_pairs