Source code for elephant.unitary_event_analysis

# -*- coding: utf-8 -*-
"""
Unitary Event (UE) analysis is a statistical method to analyze in a time
resolved manner excess spike correlation between simultaneously recorded
neurons by comparing the empirical spike coincidences (precision of a few ms)
to the expected number based on the firing rates of the neurons
(see :cite:`unitary_event_analysis-Gruen99_67`).

Background
----------

It has been proposed that cortical neurons organize dynamically into functional
groups (“cell assemblies”) by the temporal structure of their joint spiking
activity. The Unitary Events analysis method detects conspicuous patterns of
synchronous spike activity among simultaneously recorded single neurons. The
statistical significance of a pattern is evaluated by comparing the empirical
number of occurrences to the number expected given the firing rates of the
neurons. Key elements of the method are the proper formulation of the null
hypothesis and the derivation of the corresponding count distribution of
synchronous spike events used in the significance test. The analysis is
performed in a sliding window manner and yields a time-resolved measure of
significant spike synchrony. For further reading, see
:cite:`unitary_event_analysis-Riehle97_1950,unitary_event_analysis-Gruen02_43,\
unitary_event_analysis-Gruen02_81,unitary_event_analysis-Gruen03,\
unitary_event_analysis-Gruen09_1126,unitary_event_analysis-Gruen99_67`.


Tutorial
--------

:doc:`View tutorial <../tutorials/unitary_event_analysis>`

Run tutorial interactively:

.. image:: https://mybinder.org/badge.svg
   :target: https://mybinder.org/v2/gh/NeuralEnsemble/elephant/master
            ?filepath=doc/tutorials/unitary_event_analysis.ipynb


Functions overview
------------------

.. autosummary::
    :toctree: _toctree/unitary_event_analysis/

    jointJ_window_analysis

:copyright: Copyright 2014-2024 by the Elephant team, see `doc/authors.rst`.
:license: Modified BSD, see LICENSE.txt for details.
"""

from __future__ import division, print_function, unicode_literals

import warnings
from collections import defaultdict

import neo
import numpy as np
import quantities as pq
import scipy

import elephant.conversion as conv
from elephant.utils import is_binary

__all__ = [
    "hash_from_pattern",
    "inverse_hash_from_pattern",
    "n_emp_mat",
    "n_emp_mat_sum_trial",
    "n_exp_mat",
    "n_exp_mat_sum_trial",
    "gen_pval_anal",
    "jointJ",
    "jointJ_window_analysis"
]


def hash_from_pattern(m, base=2):
    """
    Calculate for a spike pattern or a matrix of spike patterns
    (provide each pattern as a column) composed of N neurons a
    unique number.


    Parameters
    ----------
    m: np.ndarray or list
        2-dim ndarray
        spike patterns represented as a binary matrix (i.e., matrix of 0's and
        1's).
        Rows and columns correspond to patterns and neurons, respectively.
    base: integer
        The base for hashes calculation.
        Default: 2

    Returns
    -------
    np.ndarray
        An array containing the hash values of each pattern,
        shape: (number of patterns).

    Raises
    ------
    ValueError
        If matrix `m` has wrong orientation.

    Examples
    --------
    With `base=2`, the hash of `[0, 1, 1]` is `0*2^2 + 1*2^1 + 1*2^0 = 3`.

    >>> import numpy as np
    >>> hash_from_pattern([0, 1, 1])
    3

    >>> import numpy as np
    >>> m = np.array([[0, 1, 0, 0, 1, 1, 0, 1],
    ...               [0, 0, 1, 0, 1, 0, 1, 1],
    ...               [0, 0, 0, 1, 0, 1, 1, 1]])

    >>> hash_from_pattern(m)
    array([0, 4, 2, 1, 6, 5, 3, 7])

    """
    m = np.asarray(m)
    n_neurons = m.shape[0]

    # check the entries of the matrix
    if not is_binary(m):
        raise ValueError('Patterns should be binary: 0 or 1')

    # generate the representation
    # don't use numpy - it's upperbounded by int64
    powers = [base ** x for x in range(n_neurons)][::-1]

    # calculate the binary number by use of scalar product
    return np.dot(powers, m)


def inverse_hash_from_pattern(h, N, base=2):
    """
    Calculate the binary spike patterns (matrix) from hash values `h`.

    Parameters
    ----------
    h: list of int
        Array-like of integer hash values of length of the number of patterns.
    N: integer
        The number of neurons.
    base: integer
        The base, used to generate the hash values.
        Default: 2

    Returns
    -------
    m: (N, P) np.ndarray
       A matrix of shape: (N, number of patterns)

    Raises
    ------
    ValueError
        If the hash is not compatible with the number of neurons.
        The hash value should not be larger than the largest
        possible hash number with the given number of neurons
        (e.g. for N = 2, max(hash) = 2^1 + 2^0 = 3, or for N = 4,
        max(hash) = 2^3 + 2^2 + 2^1 + 2^0 = 15).

    Examples
    ---------
    >>> import numpy as np
    >>> h = np.array([3, 7])
    >>> N = 4
    >>> print(inverse_hash_from_pattern(h, N))
    [[0 0]
     [0 1]
     [1 1]
     [1 1]]


    """
    h = np.asarray(h)  # this will cast to object type if h > int64
    if not all(isinstance(v, int) for v in h.tolist()):
        # .tolist() is necessary because np.int[64] is not int
        raise ValueError("hash values should be integers")

    # check if the hash values are not greater than the greatest possible
    # value for N neurons with the given base
    powers = np.array([base ** x for x in range(N)])[::-1]
    if any(h > sum(powers)):
        raise ValueError(f"hash value {h} is not compatible with the number "
                         f"of neurons {N}")
    m = h // np.expand_dims(powers, axis=1)
    m %= base  # m is a binary matrix now
    m = m.astype(int)  # convert object to int if the hash was > int64
    return m


def n_emp_mat(mat, pattern_hash, base=2):
    """
    Count the occurrences of spike coincidence patterns in the given spike
    trains.

    Parameters
    ----------
    mat : (N, M) np.ndarray
        Binned spike trains of N neurons. Rows and columns correspond
        to neurons and temporal bins, respectively.
    pattern_hash: list of int
        List of hash values, representing the spike coincidence patterns
        of which occurrences are counted.
    base: integer
        The base, used to generate the hash values.
        Default: 2

    Returns
    -------
    N_emp: np.ndarray
        The number of occurrences of the given patterns in the given
        spiketrains.
    indices: list of list
        List of lists of int.
        Indices indexing the bins where the given spike patterns are found
        in `mat`. Same length as `pattern_hash`.
        `indices[i] = N_emp[i] = pattern_hash[i]`

    Raises
    ------
    ValueError
        If `mat` is not a binary matrix.

    Examples
    --------
    >>> mat = np.array([[1, 0, 0, 1, 1],
    ...                 [1, 0, 0, 1, 0]])
    >>> pattern_hash = np.array([1,3])
    >>> n_emp, n_emp_indices = n_emp_mat(mat, pattern_hash)
    >>> print(n_emp)
    [0. 2.]
    >>> print(n_emp_indices)
    [array([], dtype=int64), array([0, 3])]

    """
    # check if the mat is zero-one matrix
    if not is_binary(mat):
        raise ValueError("entries of mat should be either one or zero")
    h = hash_from_pattern(mat, base=base)
    N_emp = np.zeros(len(pattern_hash))
    indices = []
    for idx_ph, ph in enumerate(pattern_hash):
        indices_tmp = np.where(h == ph)[0]
        indices.append(indices_tmp)
        N_emp[idx_ph] = len(indices_tmp)
    return N_emp, indices


def n_emp_mat_sum_trial(mat, pattern_hash):
    """
    Calculate empirical number of observed patterns, summed across trials.

    Parameters
    ----------
    mat: np.ndarray
        Binned spike trains are represented as a binary matrix (i.e., matrix of
        0's and 1's), segmented into trials. Trials should contain an identical
        number of neurons and an identical number of time bins.
         the entries are zero or one
         0-axis --> trials
         1-axis --> neurons
         2-axis --> time bins
    pattern_hash: list of int
         Array of hash values of length of the number of patterns.

    Returns
    -------
    N_emp: np.ndarray
        The number of occurences of the given spike patterns in the given spike
        trains, summed across trials. Same length as `pattern_hash`.
    idx_trials: list of int
        List of indices of `mat` for each trial in which the specific pattern
        has been observed.
        0-axis --> trial
        1-axis --> list of indices for the chosen trial per entry of
                    `pattern_hash`

    Raises
    ------
    ValueError
        If `mat` has the wrong orientation.
        If `mat` is not a binary matrix.

    Examples
    ---------
    >>> mat = np.array([[[1, 1, 1, 1, 0],
    ...                  [0, 1, 1, 1, 0],
    ...                  [0, 1, 1, 0, 1]],
    ...                 [[1, 1, 1, 1, 1],
    ...                  [0, 1, 1, 1, 1],
    ...                  [1, 1, 0, 1, 0]]])
    >>> pattern_hash = np.array([4,6])
    >>> n_emp_sum_trial, n_emp_sum_trial_idx = n_emp_mat_sum_trial(
    ...                                                     mat, pattern_hash)
    >>> print(n_emp_sum_trial)
    [1. 3.]
    >>> n_emp_sum_trial_idx
    [[array([0]), array([3])], [array([], dtype=int64), array([2, 4])]]

    """
    num_patt = len(pattern_hash)
    N_emp = np.zeros(num_patt)

    idx_trials = []
    # check if the mat is zero-one matrix
    if not is_binary(mat):
        raise ValueError("entries of mat should be either one or zero")

    for mat_tr in mat:
        N_emp_tmp, indices_tmp = n_emp_mat(mat_tr, pattern_hash, base=2)
        idx_trials.append(indices_tmp)
        N_emp += N_emp_tmp

    return N_emp, idx_trials


def _n_exp_mat_analytic(mat, pattern_hash):
    """
    Calculates the expected joint probability for each spike pattern
    analytically.
    """
    marg_prob = np.mean(mat, 1, dtype=float)
    # marg_prob needs to be a column vector, so we
    # build a two dimensional array with 1 column
    # and len(marg_prob) rows
    marg_prob = np.expand_dims(marg_prob, axis=1)
    n_neurons = mat.shape[0]
    m = inverse_hash_from_pattern(pattern_hash, n_neurons)
    nrep = m.shape[1]
    # multipyling the marginal probability of neurons with regard to the
    # pattern
    pmat = np.multiply(m, np.tile(marg_prob, (1, nrep))) \
        + np.multiply(1 - m, np.tile(1 - marg_prob, (1, nrep)))
    return np.prod(pmat, axis=0) * float(mat.shape[1])


def _n_exp_mat_surrogate(mat, pattern_hash, n_surrogates=1):
    """
    Calculates the expected joint probability for each spike pattern with spike
    time randomization surrogate
    """
    if len(pattern_hash) > 1:
        raise ValueError('surrogate method works only for one pattern!')
    N_exp_array = np.zeros(n_surrogates)
    for rz_idx, rz in enumerate(np.arange(n_surrogates)):
        # row-wise shuffling all elements of zero-one matrix
        mat_surr = np.copy(mat)
        for row in mat_surr:
            np.random.shuffle(row)
        N_exp_array[rz_idx] = n_emp_mat(mat_surr, pattern_hash)[0][0]
    return N_exp_array


def n_exp_mat(mat, pattern_hash, method='analytic', n_surrogates=1):
    """
    Calculates the expected joint probability for each spike pattern.

    Parameters
    ----------
    mat: np.ndarray
         The entries are in the range [0, 1].
         The only possibility when the entries are floating point values is
         when the `mat` is calculated with the flag `analytic_TrialAverage`
         in `n_exp_mat_sum_trial()`.
         Otherwise, the entries are binary.
         0-axis --> neurons
         1-axis --> time bins
    pattern_hash: list of int
         List of hash values, length: number of patterns
    method: {'analytic', 'surr'}, optional
         The method with which the expectation is calculated.
         'analytic' -- > analytically
         'surr' -- > with surrogates (spike time randomization)
         Default: 'analytic'
    n_surrogates: int
         number of surrogates for constructing the distribution of expected
         joint probability.
         Default: 1 and this number is needed only when method = 'surr'

    Returns
    -------
    np.ndarray
        if method is 'analytic':
            An array containing the expected joint probability of each pattern,
            shape: (number of patterns,)
        if method is 'surr':
            0-axis --> different realizations, length = number of surrogates
            1-axis --> patterns

    Raises
    ------
    ValueError
        If `mat` has the wrong orientation.

    Examples
    --------
    >>> mat = np.array([[1, 1, 1, 1],
    ...                 [0, 1, 0, 1],
    ...                 [0, 0, 1, 0]])
    >>> pattern_hash = np.array([5,6])
    >>> n_exp_anal = n_exp_mat(mat, pattern_hash, method='analytic')
    >>> print(n_exp_anal)
    [0.5 1.5]
    >>> n_exp_surr = n_exp_mat(mat, pattern_hash, method='surr',
    ...                        n_surrogates=5000) # doctest: +SKIP
    >>> print(n_exp_surr) # doctest: +SKIP
    [[ 1.  1.]
     [ 2.  0.]
     [ 2.  0.]
     ...,
     [ 2.  0.]
     [ 2.  0.]
     [ 1.  1.]]

    """
    # check if the mat is in the range [0, 1]
    if not np.all((mat >= 0) & (mat <= 1)):
        raise ValueError("entries of mat should be in range [0, 1]")

    if method == 'analytic':
        return _n_exp_mat_analytic(mat, pattern_hash)
    elif method == 'surr':
        return _n_exp_mat_surrogate(mat, pattern_hash,
                                    n_surrogates=n_surrogates)


def n_exp_mat_sum_trial(mat, pattern_hash, method='analytic_TrialByTrial',
                        n_surrogates=1):
    """
    Calculates the expected joint probability for each spike pattern sum over
    trials.

    Parameters
    ----------
    mat: np.ndarray
        Binned spike trains represented as a binary matrix (i.e., matrix of
        0's and 1's), segmented into trials. Trials should contain an identical
        number of neurons and an identical number of time bins.
        The entries of mat should be a list of a list where 0-axis is trials
        and 1-axis is neurons.
         0-axis --> trials
         1-axis --> neurons
         2-axis --> time bins
    pattern_hash: list of int
         List of hash values, length: number of patterns
    method: str
         method with which the unitary events whould be computed
         'analytic_TrialByTrial' -- > calculate the expectency
         (analytically) on each trial, then sum over all trials.
         'analytic_TrialAverage' -- > calculate the expectency
         by averaging over trials.
         (cf. Gruen et al. 2003)
         'surrogate_TrialByTrial' -- > calculate the distribution
         of expected coincidences by spike time randomzation in
         each trial and sum over trials.
         Default: 'analytic_trialByTrial'.
    n_surrogates: int, optional
         The number of surrogate to be used.
         Default: 1

    Returns
    -------
    n_exp: np.ndarray
         An array containing the expected joint probability of
         each pattern summed over trials,shape: (number of patterns,)

    Raises
    ------
    ValueError
        If `method` is not one of the specified above.

    Examples
    --------
    >>> mat = np.array([[[1, 1, 1, 1, 0],
    ...                  [0, 1, 1, 1, 0],
    ...                  [0, 1, 1, 0, 1]],
    ...                 [[1, 1, 1, 1, 1],
    ...                  [0, 1, 1, 1, 1],
    ...                  [1, 1, 0, 1, 0]]])

    >>> pattern_hash = np.array([5,6])
    >>> n_exp_anal = n_exp_mat_sum_trial(mat, pattern_hash)
    >>> print(n_exp_anal)
    [1.56 2.56]
    """
    if method == 'analytic_TrialByTrial':
        n_exp = np.zeros(len(pattern_hash))
        for mat_tr in mat:
            n_exp += n_exp_mat(mat_tr, pattern_hash,
                               method='analytic')
    elif method == 'analytic_TrialAverage':
        n_exp = n_exp_mat(
            np.mean(mat, axis=0), pattern_hash,
            method='analytic') * mat.shape[0]
    elif method == 'surrogate_TrialByTrial':
        n_exp = np.zeros(n_surrogates)
        for mat_tr in mat:
            n_exp += n_exp_mat(mat_tr, pattern_hash,
                               method='surr', n_surrogates=n_surrogates)
    else:
        raise ValueError(
            "The method only works on the zero_one matrix at the moment")
    return n_exp


def gen_pval_anal(mat, pattern_hash, method='analytic_TrialByTrial',
                  n_surrogates=1):
    """
    Compute the expected coincidences and a function to calculate the
    p-value for the given empirical coincidences.

    This function generates a poisson distribution with the expected
    value calculated by `mat`. It returns a function that gets
    the empirical coincidences, `n_emp`, and calculates a p-value
    as the area under the poisson distribution from `n_emp` to infinity.

    Parameters
    ----------
    mat: np.ndarray
        Binned spike trains represented as a binary matrix (i.e., matrix of
        0's and 1's), segmented into trials. Trials should contain an identical
        number of neurons and an identical number of time bins.
        The entries of mat should be a list of a list where 0-axis is trials
        and 1-axis is neurons.
         0-axis --> trials
         1-axis --> neurons
         2-axis --> time bins
    pattern_hash: list of int
         List of hash values, length: number of patterns
    method: string
         method with which the unitary events whould be computed
         'analytic_TrialByTrial' -- > calculate the expectency
         (analytically) on each trial, then sum over all trials.
         ''analytic_TrialAverage' -- > calculate the expectency
         by averaging over trials.
         Default: 'analytic_trialByTrial'
         (cf. Gruen et al. 2003)
    n_surrogates: integer, optional
         number of surrogate to be used
         Default: 1

    Returns
    --------
    pval_anal: callable
         The function that calculates the p-value for the given empirical
         coincidences.
    n_exp: list
        List of expected coincidences.

    Raises
    ------
    ValueError
        If `method` is not one of the specified above.

    Examples
    --------
    >>> mat = np.array([[[1, 1, 1, 1, 0],
    ...                  [0, 1, 1, 1, 0],
    ...                  [0, 1, 1, 0, 1]],
    ...                 [[1, 1, 1, 1, 1],
    ...                  [0, 1, 1, 1, 1],
    ...                  [1, 1, 0, 1, 0]]])
    >>> pattern_hash = np.array([5, 6])
    >>> pval_anal, n_exp = gen_pval_anal(mat, pattern_hash)
    >>> print(n_exp)
    [1.56 2.56]

    """
    if method == 'analytic_TrialByTrial' or method == 'analytic_TrialAverage':
        n_exp = n_exp_mat_sum_trial(mat, pattern_hash, method=method)

        def pval(n_emp):
            p = 1. - scipy.special.gammaincc(n_emp, n_exp)
            return p
    elif method == 'surrogate_TrialByTrial':
        n_exp = n_exp_mat_sum_trial(
            mat, pattern_hash, method=method, n_surrogates=n_surrogates)

        def pval(n_emp):
            hist = np.bincount(n_exp.astype(int, casting='unsafe'))
            exp_dist = hist / float(np.sum(hist))
            if len(n_emp) > 1:
                raise ValueError('In surrogate method the p_value can be'
                                 'calculated only for one pattern!')
            return np.sum(exp_dist[int(n_emp[0]):])
    else:
        raise ValueError("Method is not allowed: {method}".format(
            method=method))

    return pval, n_exp


def jointJ(p_val):
    """
    Surprise measurement.

    Logarithmic transformation of joint-p-value into surprise measure
    for better visualization as the highly significant events are
    indicated by very low joint-p-values.

    Parameters
    ----------
    p_val : float or list of float
        List of p-values of statistical tests for different pattern.

    Returns
    -------
    Js: list of float
        List of surprise measures.

    Examples
    --------
    >>> p_val = np.array([0.31271072,  0.01175031])
    >>> print(jointJ(p_val))
    [0.3419968  1.92481736]

    """
    p_arr = np.asarray(p_val)
    with np.errstate(divide='ignore'):
        # Ignore 'Division by zero' warning which happens when p_arr is 1.0 or
        # 0.0 (no spikes).
        Js = np.log10(1 - p_arr) - np.log10(p_arr)
    return Js


def _rate_mat_avg_trial(mat):
    """
    Calculates the average firing rate of each neurons across trials.
    """
    n_trials, n_neurons, n_bins = np.shape(mat)
    psth = np.zeros(n_neurons, dtype=np.float32)
    for tr, mat_tr in enumerate(mat):
        psth += np.sum(mat_tr, axis=1)
    return psth / (n_bins * n_trials)


def _bintime(t, bin_size):
    """
    Change the real time to `bin_size` units.
    """
    t_dl = t.rescale('ms').magnitude
    bin_size_dl = bin_size.rescale('ms').item()
    return np.floor(np.array(t_dl) / bin_size_dl).astype(int)


def _winpos(t_start, t_stop, win_size, win_step, position='left-edge'):
    """
    Calculate the position of the analysis window.
    """
    t_start_dl = t_start.rescale('ms').item()
    t_stop_dl = t_stop.rescale('ms').item()
    winsize_dl = win_size.rescale('ms').item()
    winstep_dl = win_step.rescale('ms').item()

    # left side of the window time
    if position == 'left-edge':
        ts_winpos = np.arange(
            t_start_dl, t_stop_dl - winsize_dl + winstep_dl,
            winstep_dl) * pq.ms
    else:
        raise ValueError(
            'the current version only returns left-edge of the window')
    return ts_winpos


def _UE(mat, pattern_hash, method='analytic_TrialByTrial', n_surrogates=1):
    """
    Return the default results of unitary events analysis
    (Surprise, empirical coincidences and index of where it happened
    in the given mat, n_exp and average rate of neurons)
    """
    rate_avg = _rate_mat_avg_trial(mat)
    n_emp, indices = n_emp_mat_sum_trial(mat, pattern_hash)
    if method == 'surrogate_TrialByTrial':
        dist_exp, n_exp = gen_pval_anal(
            mat, pattern_hash, method, n_surrogates=n_surrogates)
        n_exp = np.mean(n_exp)
    elif method == 'analytic_TrialByTrial' or \
            method == 'analytic_TrialAverage':
        dist_exp, n_exp = gen_pval_anal(mat, pattern_hash, method)
    pval = dist_exp(n_emp)
    Js = jointJ(pval)
    return Js, rate_avg, n_exp, n_emp, indices


[docs] def jointJ_window_analysis(spiketrains, bin_size=5 * pq.ms, win_size=100 * pq.ms, win_step=5 * pq.ms, pattern_hash=None, method='analytic_TrialByTrial', t_start=None, t_stop=None, binary=True, n_surrogates=100): """ Calculates the joint surprise in a sliding window fashion. Implementation is based on :cite:`unitary_event_analysis-Gruen99_67`. Parameters ---------- spiketrains : list A list of spike trains (`neo.SpikeTrain` objects) in different trials: * 0-axis --> Trials * 1-axis --> Neurons * 2-axis --> Spike times bin_size : pq.Quantity, optional The size of bins for discretizing spike trains. Default: 5 ms win_size : pq.Quantity, optional The size of the window of analysis. Default: 100 ms win_step : pq.Quantity, optional The size of the window step. Default: 5 ms pattern_hash : int or list of int or None, optional A list of interested patterns in hash values (see `hash_from_pattern` and `inverse_hash_from_pattern` functions). If None, all neurons are participated. Default: None method : str, optional The method with which to compute the unitary events: * 'analytic_TrialByTrial': calculate the analytical expectancy on each trial, then sum over all trials; * 'analytic_TrialAverage': calculate the expectancy by averaging over trials (cf. Gruen et al. 2003); * 'surrogate_TrialByTrial': calculate the distribution of expected coincidences by spike time randomization in each trial and sum over trials. Default: 'analytic_trialByTrial' t_start, t_stop : float or pq.Quantity, optional The start and stop times to use for the time points. If not specified, retrieved from the `t_start` and `t_stop` attributes of the input spiketrains. binary : bool, optional Binarize the binned spike train objects (True) or not. Only the binary matrices are supported at the moment. Default: True n_surrogates : int, optional The number of surrogates to be used. Default: 100 Returns ------- dict The values of the following keys have the shape of * different window --> 0-axis * different pattern hash --> 1-axis 'Js': list of float JointSurprise of different given patterns within each window. 'indices': list of list of int A list of indices of pattern within each window. 'n_emp': list of int The empirical number of each observed pattern. 'n_exp': list of float The expected number of each pattern. 'rate_avg': list of float The average firing rate of each neuron. Additionally, 'input_parameters' key stores the input parameters. Raises ------ ValueError If `data` is not in the format, specified above. NotImplementedError If `binary` is not True. The method works only with binary matrices at the moment. Warns ----- UserWarning The ratio between `winsize` or `winstep` and `bin_size` is not an integer. """ if not isinstance(spiketrains[0][0], neo.SpikeTrain): raise ValueError( "structure of the data is not correct: 0-axis should be trials, " "1-axis units and 2-axis neo spike trains") if t_start is None: t_start = spiketrains[0][0].t_start if t_stop is None: t_stop = spiketrains[0][0].t_stop n_trials = len(spiketrains) n_neurons = len(spiketrains[0]) if pattern_hash is None: pattern = [1] * n_neurons pattern_hash = hash_from_pattern(pattern) if np.issubdtype(type(pattern_hash), np.integer): pattern_hash = [int(pattern_hash)] # position of all windows (left edges) t_winpos = _winpos(t_start, t_stop, win_size, win_step, position='left-edge') t_winpos_bintime = _bintime(t_winpos, bin_size) winsize_bintime = _bintime(win_size, bin_size) winstep_bintime = _bintime(win_step, bin_size) if winsize_bintime * bin_size != win_size: warnings.warn(f"The ratio between the win_size ({win_size}) and the " f"bin_size ({bin_size}) is not an integer") if winstep_bintime * bin_size != win_step: warnings.warn(f"The ratio between the win_step ({win_step}) and the " f"bin_size ({bin_size}) is not an integer") input_parameters = dict(pattern_hash=pattern_hash, bin_size=bin_size, win_size=win_size, win_step=win_step, method=method, t_start=t_start, t_stop=t_stop, n_surrogates=n_surrogates) n_bins = int(((t_stop - t_start) / bin_size).simplified.item()) mat_tr_unit_spt = np.zeros((len(spiketrains), n_neurons, n_bins), dtype=np.int32) for trial, sts in enumerate(spiketrains): bs = conv.BinnedSpikeTrain(list(sts), t_start=t_start, t_stop=t_stop, bin_size=bin_size) if not binary: raise NotImplementedError( "The method works only with binary matrices at the moment") mat_tr_unit_spt[trial] = bs.to_bool_array() n_windows = len(t_winpos) n_hashes = len(pattern_hash) Js_win, n_exp_win, n_emp_win = np.zeros((3, n_windows, n_hashes), dtype=np.float32) rate_avg = np.zeros((n_windows, n_hashes, n_neurons), dtype=np.float32) indices_win = defaultdict(list) for i, win_pos in enumerate(t_winpos_bintime): mat_win = mat_tr_unit_spt[:, :, win_pos:win_pos + winsize_bintime] Js_win[i], rate_avg[i], n_exp_win[i], n_emp_win[ i], indices_lst = _UE(mat_win, pattern_hash=pattern_hash, method=method, n_surrogates=n_surrogates) for j in range(n_trials): if len(indices_lst[j][0]) > 0: indices_win[f"trial{j}"].append(indices_lst[j][0] + win_pos) for key in indices_win.keys(): indices_win[key] = np.hstack(indices_win[key]) return {'Js': Js_win, 'indices': indices_win, 'n_emp': n_emp_win, 'n_exp': n_exp_win, 'rate_avg': rate_avg / bin_size, 'input_parameters': input_parameters}