# -*- coding: utf-8 -*-
"""
Unitary Event (UE) analysis is a statistical method that
enables to analyze in a time resolved manner excess spike correlation
between simultaneously recorded neurons by comparing the empirical
spike coincidences (precision of a few ms) to the expected number
based on the firing rates of the neurons.
References:
- Gruen, Diesmann, Grammont, Riehle, Aertsen (1999) J Neurosci Methods,
94(1): 67-79.
- Gruen, Diesmann, Aertsen (2002a,b) Neural Comput, 14(1): 43-80; 81-19.
- Gruen S, Riehle A, and Diesmann M (2003) Effect of cross-trial
nonstationarity on joint-spike events Biological Cybernetics 88(5):335-351.
- Gruen S (2009) Data-driven significance estimation of precise spike
correlation. J Neurophysiology 101:1126-1140 (invited review)
:copyright: Copyright 2015-2016 by the Elephant team, see AUTHORS.txt.
:license: Modified BSD, see LICENSE.txt for details.
"""
import numpy as np
import quantities as pq
import neo
import warnings
import elephant.conversion as conv
import scipy
[docs]def hash_from_pattern(m, N, base=2):
"""
Calculate for a spike pattern or a matrix of spike patterns
(provide each pattern as a column) composed of N neurons a
unique number.
Parameters:
-----------
m: 2-dim ndarray
spike patterns represented as a binary matrix (i.e., matrix of 0's and 1's).
Rows and columns correspond to patterns and neurons, respectively.
N: integer
number of neurons is required to be equal to the number
of rows
base: integer
base for calculation of hash values from binary
sequences (= pattern).
Default is 2
Returns:
--------
list of integers:
An array containing the hash values of each pattern,
shape: (number of patterns)
Raises:
-------
ValueError: if matrix m has wrong orientation
Examples:
---------
descriptive example:
m = [0
1
1]
N = 3
base = 2
hash = 0*2^2 + 1*2^1 + 1*2^0 = 3
second example:
>>> import numpy as np
>>> m = np.array([[0, 1, 0, 0, 1, 1, 0, 1],
[0, 0, 1, 0, 1, 0, 1, 1],
[0, 0, 0, 1, 0, 1, 1, 1]])
>>> hash_from_pattern(m,N=3)
array([0, 4, 2, 1, 6, 5, 3, 7])
"""
# check the consistency between shape of m and number neurons N
if N != np.shape(m)[0]:
raise ValueError('patterns in the matrix should be column entries')
# check the entries of the matrix
if not np.all((np.array(m) == 0) + (np.array(m) == 1)):
raise ValueError('patterns should be zero or one')
# generate the representation
v = np.array([base**x for x in range(N)])
# reverse the order
v = v[np.argsort(-v)]
# calculate the binary number by use of scalar product
return np.dot(v, m)
[docs]def inverse_hash_from_pattern(h, N, base=2):
"""
Calculate the 0-1 spike patterns (matrix) from hash values
Parameters:
-----------
h: list of integers
list or array of hash values, length: number of patterns
N: integer
number of neurons
base: integer
base for calculation of the number from binary
sequences (= pattern).
Default is 2
Raises:
-------
ValueError: if the hash is not compatible with the number
of neurons hash value should not be larger than the biggest
possible hash number with given number of neurons
(e.g. for N = 2, max(hash) = 2^1 + 2^0 = 3
, or for N = 4, max(hash) = 2^3 + 2^2 + 2^1 + 2^0 = 15)
Returns:
--------
numpy.array:
A matrix of shape: (N, number of patterns)
Examples
---------
>>> import numpy as np
>>> h = np.array([3,7])
>>> N = 4
>>> inverse_hash_from_pattern(h,N)
array([[1, 1],
[1, 1],
[0, 1],
[0, 0]])
"""
# check if the hash values are not greater than the greatest possible
# value for N neurons with the given base
if np.any(h > np.sum([base**x for x in range(N)])):
raise ValueError(
"hash value is not compatible with the number of neurons N")
# check if the hash values are integer
if not np.all(np.int64(h) == h):
raise ValueError("hash values are not integers")
m = np.zeros((N, len(h)), dtype=int)
for j, hh in enumerate(h):
i = N - 1
while i >= 0 and hh != 0:
m[i, j] = hh % base
hh /= base
i -= 1
return m
[docs]def n_emp_mat(mat, N, pattern_hash, base=2):
"""
Count the occurrences of spike coincidence patterns
in the given spike trains.
Parameters:
-----------
mat: 2-dim ndarray
binned spike trains of N neurons. Rows and columns correspond
to neurons and temporal bins, respectively.
N: integer
number of neurons
pattern_hash: list of integers
hash values representing the spike coincidence patterns
of which occurrences are counted.
base: integer
Base which was used to generate the hash values.
Default is 2
Returns:
--------
N_emp: list of integers
number of occurrences of the given patterns in the given spike trains
indices: list of lists of integers
indices indexing the bins where the given spike patterns are found
in `mat`. Same length as `pattern_hash`
indices[i] = N_emp[i] = pattern_hash[i]
Raises:
-------
ValueError: if mat is not zero-one matrix
Examples:
---------
>>> mat = np.array([[1, 0, 0, 1, 1],
[1, 0, 0, 1, 0]])
>>> pattern_hash = np.array([1,3])
>>> n_emp, n_emp_indices = N_emp_mat(mat, N,pattern_hash)
>>> print n_emp
[ 0. 2.]
>>> print n_emp_indices
[array([]), array([0, 3])]
"""
# check if the mat is zero-one matrix
if not np.all((np.array(mat) == 0) + (np.array(mat) == 1)):
raise ValueError("entries of mat should be either one or zero")
h = hash_from_pattern(mat, N, base=base)
N_emp = np.zeros(len(pattern_hash))
indices = []
for idx_ph, ph in enumerate(pattern_hash):
indices_tmp = np.where(h == ph)[0]
indices.append(indices_tmp)
N_emp[idx_ph] = len(indices_tmp)
return N_emp, indices
[docs]def n_emp_mat_sum_trial(mat, N, pattern_hash):
"""
Calculates empirical number of observed patterns summed across trials
Parameters:
-----------
mat: 3d numpy array or elephant BinnedSpikeTrain object
Binned spike trains represented as a binary matrix (i.e., matrix of 0's and 1's),
segmented into trials. Trials should contain an identical number of neurons and
an identical number of time bins.
the entries are zero or one
0-axis --> trials
1-axis --> neurons
2-axis --> time bins
N: integer
number of neurons
pattern_hash: list of integers
array of hash values, length: number of patterns
Returns:
--------
N_emp: list of integers
numbers of occurences of the given spike patterns in the given spike trains,
summed across trials. Same length as `pattern_hash`.
idx_trials: list of lists of integers
list of indices of mat for each trial in which
the specific pattern has been observed.
0-axis --> trial
1-axis --> list of indices for the chosen trial per
entry of `pattern_hash`
Raises:
-------
ValueError: if matrix mat has wrong orientation
ValueError: if mat is not zero-one matrix
Examples:
---------
>>> mat = np.array([[[1, 1, 1, 1, 0],
[0, 1, 1, 1, 0],
[0, 1, 1, 0, 1]],
[[1, 1, 1, 1, 1],
[0, 1, 1, 1, 1],
[1, 1, 0, 1, 0]]])
>>> pattern_hash = np.array([4,6])
>>> N = 3
>>> n_emp_sum_trial, n_emp_sum_trial_idx =
n_emp_mat_sum_trial(mat, N,pattern_hash)
>>> n_emp_sum_trial
array([ 1., 3.])
>>> n_emp_sum_trial_idx
[[array([0]), array([3])], [array([], dtype=int64), array([2, 4])]]
"""
# check the consistency between shape of m and number neurons N
if N != np.shape(mat)[1]:
raise ValueError('the entries of mat should be a list of a'
'list where 0-axis is trials and 1-axis is neurons')
num_patt = len(pattern_hash)
N_emp = np.zeros(num_patt)
idx_trials = []
# check if the mat is zero-one matrix
if not np.all((np.array(mat) == 0) + (np.array(mat) == 1)):
raise ValueError("entries of mat should be either one or zero")
for mat_tr in mat:
N_emp_tmp, indices_tmp = n_emp_mat(mat_tr, N, pattern_hash, base=2)
idx_trials.append(indices_tmp)
N_emp += N_emp_tmp
return N_emp, idx_trials
def _n_exp_mat_analytic(mat, N, pattern_hash):
"""
Calculates the expected joint probability for each spike pattern analyticaly
"""
marg_prob = np.mean(mat, 1, dtype=float)
# marg_prob needs to be a column vector, so we
# build a two dimensional array with 1 column
# and len(marg_prob) rows
marg_prob = np.reshape(marg_prob, (len(marg_prob), 1))
m = inverse_hash_from_pattern(pattern_hash, N)
nrep = np.shape(m)[1]
# multipyling the marginal probability of neurons with regard to the
# pattern
pmat = np.multiply(m, np.tile(marg_prob, (1, nrep))) +\
np.multiply(1 - m, np.tile(1 - marg_prob, (1, nrep)))
return np.prod(pmat, axis=0) * float(np.shape(mat)[1])
def _n_exp_mat_surrogate(mat, N, pattern_hash, n_surr=1):
"""
Calculates the expected joint probability for each spike pattern with spike
time randomization surrogate
"""
if len(pattern_hash) > 1:
raise ValueError('surrogate method works only for one pattern!')
N_exp_array = np.zeros(n_surr)
for rz_idx, rz in enumerate(np.arange(n_surr)):
# shuffling all elements of zero-one matrix
mat_surr = np.array(mat)
[np.random.shuffle(i) for i in mat_surr]
N_exp_array[rz_idx] = n_emp_mat(mat_surr, N, pattern_hash)[0][0]
return N_exp_array
[docs]def n_exp_mat(mat, N, pattern_hash, method='analytic', n_surr=1):
"""
Calculates the expected joint probability for each spike pattern
Parameters:
-----------
mat: 2d numpy array
the entries are zero or one
0-axis --> neurons
1-axis --> time bins
pattern_hash: list of integers
array of hash values, length: number of patterns
method: string
method with which the expectency should be caculated
'analytic' -- > analytically
'surr' -- > with surrogates (spike time randomization)
Default is 'analytic'
n_surr: integer
number of surrogates for constructing the distribution of expected joint probability.
Default is 1 and this number is needed only when method = 'surr'
kwargs:
-------
Raises:
-------
ValueError: if matrix m has wrong orientation
Returns:
--------
if method is analytic:
numpy.array:
An array containing the expected joint probability of each pattern,
shape: (number of patterns,)
if method is surr:
numpy.ndarray, 0-axis --> different realizations,
length = number of surrogates
1-axis --> patterns
Examples:
---------
>>> mat = np.array([[1, 1, 1, 1],
[0, 1, 0, 1],
[0, 0, 1, 0]])
>>> pattern_hash = np.array([5,6])
>>> N = 3
>>> n_exp_anal = n_exp_mat(mat,N, pattern_hash, method = 'analytic')
>>> n_exp_anal
[ 0.5 1.5 ]
>>>
>>>
>>> n_exp_surr = n_exp_mat(
mat, N,pattern_hash, method = 'surr', n_surr = 5000)
>>> print n_exp_surr
[[ 1. 1.]
[ 2. 0.]
[ 2. 0.]
...,
[ 2. 0.]
[ 2. 0.]
[ 1. 1.]]
"""
# check if the mat is zero-one matrix
if np.any(mat > 1) or np.any(mat < 0):
raise ValueError("entries of mat should be either one or zero")
if method == 'analytic':
return _n_exp_mat_analytic(mat, N, pattern_hash)
if method == 'surr':
return _n_exp_mat_surrogate(mat, N, pattern_hash, n_surr)
[docs]def n_exp_mat_sum_trial(
mat, N, pattern_hash, method='analytic_TrialByTrial', **kwargs):
"""
Calculates the expected joint probability
for each spike pattern sum over trials
Parameters:
-----------
mat: 3d numpy array or elephant BinnedSpikeTrain object
Binned spike trains represented as a binary matrix (i.e., matrix of 0's and 1's),
segmented into trials. Trials should contain an identical number of neurons and
an identical number of time bins.
the entries are zero or one
0-axis --> trials
1-axis --> neurons
2-axis --> time bins
N: integer
number of neurons
pattern_hash: list of integers
array of hash values, length: number of patterns
method: string
method with which the unitary events whould be computed
'analytic_TrialByTrial' -- > calculate the expectency
(analytically) on each trial, then sum over all trials.
'analytic_TrialAverage' -- > calculate the expectency
by averaging over trials.
(cf. Gruen et al. 2003)
'surrogate_TrialByTrial' -- > calculate the distribution
of expected coincidences by spike time randomzation in
each trial and sum over trials.
Default is 'analytic_trialByTrial'
kwargs:
-------
n_surr: integer
number of surrogate to be used
Default is 1
Returns:
--------
numpy.array:
An array containing the expected joint probability of
each pattern summed over trials,shape: (number of patterns,)
Examples:
--------
>>> mat = np.array([[[1, 1, 1, 1, 0],
[0, 1, 1, 1, 0],
[0, 1, 1, 0, 1]],
[[1, 1, 1, 1, 1],
[0, 1, 1, 1, 1],
[1, 1, 0, 1, 0]]])
>>> pattern_hash = np.array([5,6])
>>> N = 3
>>> n_exp_anal = n_exp_mat_sum_trial(mat, N, pattern_hash)
>>> print n_exp_anal
array([ 1.56, 2.56])
"""
# check the consistency between shape of m and number neurons N
if N != np.shape(mat)[1]:
raise ValueError('the entries of mat should be a list of a'
'list where 0-axis is trials and 1-axis is neurons')
if method == 'analytic_TrialByTrial':
n_exp = np.zeros(len(pattern_hash))
for mat_tr in mat:
n_exp += n_exp_mat(mat_tr, N, pattern_hash, method='analytic')
elif method == 'analytic_TrialAverage':
n_exp = n_exp_mat(
np.mean(mat, 0), N, pattern_hash, method='analytic') * np.shape(mat)[0]
elif method == 'surrogate_TrialByTrial':
if 'n_surr' in kwargs:
n_surr = kwargs['n_surr']
else:
n_surr = 1.
n_exp = np.zeros(n_surr)
for mat_tr in mat:
n_exp += n_exp_mat(mat_tr, N, pattern_hash,
method='surr', n_surr=n_surr)
else:
raise ValueError(
"The method only works on the zero_one matrix at the moment")
return n_exp
[docs]def gen_pval_anal(
mat, N, pattern_hash, method='analytic_TrialByTrial', **kwargs):
"""
computes the expected coincidences and a function to calculate
p-value for given empirical coincidences
this function generate a poisson distribution with the expected
value calculated by mat. it returns a function which gets
the empirical coincidences, `n_emp`, and calculates a p-value
as the area under the poisson distribution from `n_emp` to infinity
Parameters:
-----------
mat: 3d numpy array or elephant BinnedSpikeTrain object
Binned spike trains represented as a binary matrix (i.e., matrix of 0's and 1's),
segmented into trials. Trials should contain an identical number of neurons and
an identical number of time bins.
the entries are zero or one
0-axis --> trials
1-axis --> neurons
2-axis --> time bins
N: integer
number of neurons
pattern_hash: list of integers
array of hash values, length: number of patterns
method: string
method with which the unitary events whould be computed
'analytic_TrialByTrial' -- > calculate the expectency
(analytically) on each trial, then sum over all trials.
''analytic_TrialAverage' -- > calculate the expectency
by averaging over trials.
Default is 'analytic_trialByTrial'
(cf. Gruen et al. 2003)
kwargs:
-------
n_surr: integer
number of surrogate to be used
Default is 1
Returns:
--------
pval_anal:
a function which calculates the p-value for
the given empirical coincidences
n_exp: list of floats
expected coincidences
Examples:
--------
>>> mat = np.array([[[1, 1, 1, 1, 0],
[0, 1, 1, 1, 0],
[0, 1, 1, 0, 1]],
[[1, 1, 1, 1, 1],
[0, 1, 1, 1, 1],
[1, 1, 0, 1, 0]]])
>>> pattern_hash = np.array([5,6])
>>> N = 3
>>> pval_anal,n_exp = gen_pval_anal(mat, N,pattern_hash)
>>> n_exp
array([ 1.56, 2.56])
"""
if method == 'analytic_TrialByTrial' or method == 'analytic_TrialAverage':
n_exp = n_exp_mat_sum_trial(mat, N, pattern_hash, method=method)
def pval(n_emp):
p = 1. - scipy.special.gammaincc(n_emp, n_exp)
return p
elif method == 'surrogate_TrialByTrial':
if 'n_surr' in kwargs:
n_surr = kwargs['n_surr']
else:
n_surr = 1.
n_exp = n_exp_mat_sum_trial(
mat, N, pattern_hash, method=method, n_surr=n_surr)
def pval(n_emp):
hist = np.bincount(np.int64(n_exp))
exp_dist = hist / float(np.sum(hist))
if len(n_emp) > 1:
raise ValueError(
'in surrogate method the p_value can be calculated only for one pattern!')
return np.sum(exp_dist[int(n_emp[0]):])
return pval, n_exp
[docs]def jointJ(p_val):
"""Surprise measurement
logarithmic transformation of joint-p-value into surprise measure
for better visualization as the highly significant events are
indicated by very low joint-p-values
Parameters:
-----------
p_val: list of floats
p-values of statistical tests for different pattern.
Returns:
--------
J: list of floats
list of surprise measure
Examples:
---------
>>> p_val = np.array([0.31271072, 0.01175031])
>>> jointJ(p_val)
array([0.3419968 , 1.92481736])
"""
p_arr = np.array(p_val)
try:
Js = np.log10(1 - p_arr) - np.log10(p_arr)
except RuntimeWarning:
pass
return Js
def _rate_mat_avg_trial(mat):
"""
calculates the average firing rate of each neurons across trials
"""
num_tr, N, nbins = np.shape(mat)
psth = np.zeros(N)
for tr, mat_tr in enumerate(mat):
psth += np.sum(mat_tr, axis=1)
return psth / float(nbins) / float(num_tr)
def _bintime(t, binsize):
"""
change the real time to bintime
"""
t_dl = t.rescale('ms').magnitude
binsize_dl = binsize.rescale('ms').magnitude
return np.floor(np.array(t_dl) / binsize_dl).astype(int)
def _winpos(t_start, t_stop, winsize, winstep, position='left-edge'):
"""
Calculates the position of the analysis window
"""
t_start_dl = t_start.rescale('ms').magnitude
t_stop_dl = t_stop.rescale('ms').magnitude
winsize_dl = winsize.rescale('ms').magnitude
winstep_dl = winstep.rescale('ms').magnitude
# left side of the window time
if position == 'left-edge':
ts_winpos = np.arange(
t_start_dl, t_stop_dl - winsize_dl + winstep_dl, winstep_dl) * pq.ms
else:
raise ValueError(
'the current version only returns left-edge of the window')
return ts_winpos
def _UE(mat, N, pattern_hash, method='analytic_TrialByTrial', **kwargs):
"""
returns the default results of unitary events analysis
(Surprise, empirical coincidences and index of where it happened
in the given mat, n_exp and average rate of neurons)
"""
rate_avg = _rate_mat_avg_trial(mat)
n_emp, indices = n_emp_mat_sum_trial(mat, N, pattern_hash)
if method == 'surrogate_TrialByTrial':
if 'n_surr' in kwargs:
n_surr = kwargs['n_surr']
else:
n_surr = 1
dist_exp, n_exp = gen_pval_anal(
mat, N, pattern_hash, method, n_surr=n_surr)
n_exp = np.mean(n_exp)
elif method == 'analytic_TrialByTrial' or method == 'analytic_TrialAverage':
dist_exp, n_exp = gen_pval_anal(mat, N, pattern_hash, method)
pval = dist_exp(n_emp)
Js = jointJ(pval)
return Js, rate_avg, n_exp, n_emp, indices
[docs]def jointJ_window_analysis(
data, binsize, winsize, winstep, pattern_hash,
method='analytic_TrialByTrial', t_start=None,
t_stop=None, binary=True, **kwargs):
"""
Calculates the joint surprise in a sliding window fashion
Parameters:
----------
data: list of neo.SpikeTrain objects
list of spike trains in different trials
0-axis --> Trials
1-axis --> Neurons
2-axis --> Spike times
binsize: Quantity scalar with dimension time
size of bins for descritizing spike trains
winsize: Quantity scalar with dimension time
size of the window of analysis
winstep: Quantity scalar with dimension time
size of the window step
pattern_hash: list of integers
list of interested patterns in hash values
(see hash_from_pattern and inverse_hash_from_pattern functions)
method: string
method with which the unitary events whould be computed
'analytic_TrialByTrial' -- > calculate the expectency
(analytically) on each trial, then sum over all trials.
'analytic_TrialAverage' -- > calculate the expectency
by averaging over trials.
(cf. Gruen et al. 2003)
'surrogate_TrialByTrial' -- > calculate the distribution
of expected coincidences by spike time randomzation in
each trial and sum over trials.
Default is 'analytic_trialByTrial'
t_start: float or Quantity scalar, optional
The start time to use for the time points.
If not specified, retrieved from the `t_start`
attribute of `spiketrain`.
t_stop: float or Quantity scalar, optional
The start time to use for the time points.
If not specified, retrieved from the `t_stop`
attribute of `spiketrain`.
kwargs:
-------
n_surr: integer
number of surrogate to be used
Default is 100
Returns:
-------
result: dictionary
Js: list of float
JointSurprise of different given patterns within each window
shape: different pattern hash --> 0-axis
different window --> 1-axis
indices: list of list of integers
list of indices of pattern within each window
shape: different pattern hash --> 0-axis
different window --> 1-axis
n_emp: list of integers
empirical number of each observed pattern.
shape: different pattern hash --> 0-axis
different window --> 1-axis
n_exp: list of floats
expeced number of each pattern.
shape: different pattern hash --> 0-axis
different window --> 1-axis
rate_avg: list of floats
average firing rate of each neuron
shape: different pattern hash --> 0-axis
different window --> 1-axis
"""
if not isinstance(data[0][0], neo.SpikeTrain):
raise ValueError(
"structure of the data is not correct: 0-axis should be trials, 1-axis units and 2-axis neo spike trains")
if t_start is None:
t_start = data[0][0].t_start.rescale('ms')
if t_stop is None:
t_stop = data[0][0].t_stop.rescale('ms')
# position of all windows (left edges)
t_winpos = _winpos(t_start, t_stop, winsize, winstep, position='left-edge')
t_winpos_bintime = _bintime(t_winpos, binsize)
winsize_bintime = _bintime(winsize, binsize)
winstep_bintime = _bintime(winstep, binsize)
if winsize_bintime * binsize != winsize:
warnings.warn(
"ratio between winsize and binsize is not integer -- "
"the actual number for window size is " + str(winsize_bintime * binsize))
if winstep_bintime * binsize != winstep:
warnings.warn(
"ratio between winsize and binsize is not integer -- "
"the actual number for window size is" + str(winstep_bintime * binsize))
num_tr, N = np.shape(data)[:2]
n_bins = int((t_stop - t_start) / binsize)
mat_tr_unit_spt = np.zeros((len(data), N, n_bins))
for tr, sts in enumerate(data):
sts = list(sts)
bs = conv.BinnedSpikeTrain(
sts, t_start=t_start, t_stop=t_stop, binsize=binsize)
if binary is True:
mat = bs.to_bool_array()
else:
raise ValueError(
"The method only works on the zero_one matrix at the moment")
mat_tr_unit_spt[tr] = mat
num_win = len(t_winpos)
Js_win, n_exp_win, n_emp_win = (np.zeros(num_win) for _ in range(3))
rate_avg = np.zeros((num_win, N))
indices_win = {}
for i in range(num_tr):
indices_win['trial' + str(i)] = []
for i, win_pos in enumerate(t_winpos_bintime):
mat_win = mat_tr_unit_spt[:, :, win_pos:win_pos + winsize_bintime]
if method == 'surrogate_TrialByTrial':
if 'n_surr' in kwargs:
n_surr = kwargs['n_surr']
else:
n_surr = 100
Js_win[i], rate_avg[i], n_exp_win[i], n_emp_win[i], indices_lst = _UE(
mat_win, N, pattern_hash, method, n_surr=n_surr)
else:
Js_win[i], rate_avg[i], n_exp_win[i], n_emp_win[
i], indices_lst = _UE(mat_win, N, pattern_hash, method)
for j in range(num_tr):
if len(indices_lst[j][0]) > 0:
indices_win[
'trial' + str(j)] = np.append(indices_win['trial' + str(j)], indices_lst[j][0] + win_pos)
return {'Js': Js_win, 'indices': indices_win, 'n_emp': n_emp_win, 'n_exp': n_exp_win, 'rate_avg': rate_avg / binsize}