Source code for elephant.causality.granger

# -*- coding: utf-8 -*-
"""
This module provides function to estimate causal influences of signals on each
other.


Granger causality
*****************
Granger causality is a method to determine causal influence of one signal on
another based on autoregressive modelling. It was developed by Nobel prize
laureate Clive Granger and has been adopted in various numerical fields ever
since :cite:`granger-Granger69_424`. In its simplest form, the
method tests whether the past values of one signal help to reduce the
prediction error of another signal, compared to the past values of the latter
signal alone. If it does reduce the prediction error, the first signal is said
to Granger cause the other signal.
Granger causality analysis can be extended to the spectral domain investigating
the influnece signals have onto each other in a frequency resolved manner. It
relies on estimating the cross-spectrum of time series and decomposing them
into a transfer function and a noise covariance matrix. The method to estimate
the spectral Granger causality is non-parametric in the sense that it does not
require to fit an autoregressive model (see :cite:`granger-Dhamala08_354`).

Limitations
-----------
The user must be mindful of the method's limitations, which are assumptions of
covariance stationary data, linearity imposed by the underlying autoregressive
modelling as well as the fact that the variables not included in the model will
not be accounted for :cite:`granger-Seth07_1667`.
When estimating spectral Granger causality the user must be familiar with
basics the multitaper method to estimate power- and cross-spectra (e.g.
sampling frequency, DPSS, time-half bandwidth product).

Implementation
--------------
The mathematical implementation of Granger causality methods in this module
closely follows :cite:`granger-Ding06_0608035`.
The implementation of spectral Granger causality follows
:cite:`granger-Dhamala08_354`, :cite:`granger-Wen13_20110610` and
:cite:`granger-Wilson72_420` for the spectral matrix factorization.


Overview of Functions
---------------------
Various formulations of Granger causality have been developed. In this module
you will find function for time-series data to test pairwise Granger causality
(`pairwise_granger`).

Time-series Granger causality
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. autosummary::
    :toctree: _toctree/causality/

    pairwise_granger
    conditional_granger

Spectral Granger causality
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. autosummary::
    :toctree: _toctree/causality/

    pairwise_spectral_granger


Tutorial
--------

:doc:`View tutorial <../tutorials/granger_causality>`

Run tutorial interactively:

.. image:: https://mybinder.org/badge.svg
   :target: https://mybinder.org/v2/gh/NeuralEnsemble/elephant/master
            ?filepath=doc/tutorials/granger_causality.ipynb

:copyright: Copyright 2014-2024 by the Elephant team, see `doc/authors.rst`.
:license: Modified BSD, see LICENSE.txt for details.
"""

from __future__ import division, print_function, unicode_literals

import warnings
from collections import namedtuple

import numpy as np
import quantities as pq
import neo
from neo.core import AnalogSignal
from elephant.spectral import segmented_multitaper_cross_spectrum, multitaper_psd


__all__ = (
    "Causality",
    "pairwise_granger",
    "conditional_granger",
    "pairwise_spectral_granger"
)


# the return type of pairwise_granger(), pairwise_spectral_granger() function
Causality = namedtuple('Causality',
                       ['directional_causality_x_y',
                        'directional_causality_y_x',
                        'instantaneous_causality',
                        'total_interdependence'])


def _bic(cov, order, dimension, length):
    """
    Calculate Bayesian Information Criterion

    Parameters
    ----------
    cov : np.ndarray
        covariance matrix of auto regressive model
    order : int
        order of autoregressive model
    dimension : int
        dimensionality of the data
    length : int
        number of time samples

    Returns
    -------
    criterion : float
       Bayesian Information Criterion
    """
    sign, log_det_cov = np.linalg.slogdet(cov)
    criterion = 2 * log_det_cov \
        + 2*(dimension**2)*order*np.log(length)/length

    return criterion


def _aic(cov, order, dimension, length):
    """
    Calculate Akaike Information Criterion

    Parameters
    ----------
    cov : np.ndarray
        covariance matrix of auto regressive model
    order : int
        order of autoregressive model
    dimension : int
        dimensionality of the data
    length : int
        number of time samples

    Returns
    -------
    criterion : float
        Akaike Information Criterion
    """
    sign, log_det_cov = np.linalg.slogdet(cov)
    criterion = 2 * log_det_cov \
        + 2*(dimension**2)*order/length

    return criterion


def _lag_covariances(signals, dimension, max_lag):
    r"""
    Determine covariances of time series and time shift of itself up to a
    maximal lag

    Parameters
    ----------
    signals : np.ndarray
        time series data
    dimension : int
        number of time series
    max_lag : int
        maximal time lag to be considered

    Returns
    -------
    lag_corr : np.ndarray
        correlations matrices of lagged signals

    Covariance of shifted signals calculated according to the following
    formula:

        x: d-dimensional signal
        x^T: transpose of d-dimensional signal
        N: number of time points
        \tau: lag

        C(\tau) = \sum_{i=0}^{N-\tau} x[i]*x^T[\tau+i]

    """
    length = np.size(signals[0])

    if length < max_lag:
        raise ValueError("Maximum lag larger than size of data")

    # centralize time series
    signals_mean = (signals - np.mean(signals, keepdims=True)).T

    lag_covariances = np.zeros((max_lag+1, dimension, dimension))

    # determine lagged covariance for different time lags
    for lag in range(0, max_lag+1):
        lag_covariances[lag] = \
                np.mean(np.einsum('ij,ik -> ijk', signals_mean[:length-lag],
                                  signals_mean[lag:]), axis=0)

    return lag_covariances


def _yule_walker_matrix(data, dimension, order):
    r"""
    Generate matrix for Yule-Walker equation

    Parameters
    ----------
    data : np.ndarray
        correlation of data shifted with lags up to order
    dimension : int
        dimensionality of data (e.g. number of channels)
    order : int
        order of the autoregressive model

    Returns
    -------
    yule_walker_matrix : np.ndarray
        matrix in Yule-Walker equation

        Yule-Walker Matrix M is a block-structured symmetric matrix with
        dimension (d \cdot p)\times(d \cdot p)

        where
        d: dimension of signal
        p: order of autoregressive model
        C(\tau): time-shifted covariances \tau -> d \times d matrix

        The blocks of size (d \times d) are set as follows:

        M_ij = C(j-i)^T

        where 1 \leq i \leq j \leq p. The other entries are determined by
        symmetry.
    lag_covariances : np.ndarray

    """

    lag_covariances = _lag_covariances(data, dimension, order)

    yule_walker_matrix = np.zeros((dimension*order, dimension*order))

    for block_row in range(order):
        for block_column in range(block_row, order):
            yule_walker_matrix[block_row*dimension: (block_row+1)*dimension,
                               block_column*dimension:
                               (block_column+1)*dimension] = \
                lag_covariances[block_column-block_row].T

            yule_walker_matrix[block_column*dimension:
                               (block_column+1)*dimension,
                               block_row*dimension:
                               (block_row+1)*dimension] = \
                lag_covariances[block_column-block_row]
    return yule_walker_matrix, lag_covariances


def _vector_arm(signals, dimension, order):
    r"""
    Determine coefficients of autoregressive model from time series data.

    Coefficients of autoregressive model calculated via solving the linear
    equation

    M A = C

    where
    M: Yule-Waler Matrix
    A: Coefficients of autoregressive model
    C: Time-shifted covariances with positive lags

    Covariance matrix C_0 is then given by

    C_0 = C[0] - \sum_{i=0}^{p-1} A[i]C[i+1]

    where p is the orde of the autoregressive model.

    Parameters
    ----------
    signals : np.ndarray
        time series data
    order : int
        order of the autoregressive model

    Returns
    -------
    coeffs : np.ndarray
        coefficients of the autoregressive model
        ry
    covar_mat : np.ndarray
        covariance matrix of

    """

    yule_walker_matrix, lag_covariances = \
        _yule_walker_matrix(signals, dimension, order)

    positive_lag_covariances = np.reshape(lag_covariances[1:],
                                          (dimension*order, dimension))

    lstsq_coeffs = np.linalg.lstsq(yule_walker_matrix,
                                   positive_lag_covariances,
                                   rcond=None)[0]

    coeffs = []
    for index in range(order):
        coeffs.append(lstsq_coeffs[index*dimension:(index+1)*dimension, ].T)

    coeffs = np.stack(coeffs)

    cov_matrix = np.copy(lag_covariances[0])
    for i in range(order):
        cov_matrix -= np.matmul(coeffs[i], lag_covariances[i+1])

    return coeffs, cov_matrix


def _optimal_vector_arm(signals, dimension, max_order,
                        information_criterion='aic'):
    """
    Determine optimal auto regressive model by choosing optimal order via
    Information Criterion

    Parameters
    ----------
    signals : np.ndarray
        time series data
    dimension : int
        dimensionality of the data
    max_order : int
        maximal order to consider
    information_criterion : str
        A function to compute the information criterion:
            `bic` for Bayesian information_criterion,
            `aic` for Akaike information criterion
        Default: aic

    Returns
    -------
    optimal_coeffs : np.ndarray
        coefficients of the autoregressive model
    optimal_cov_mat : np.ndarray
        covariance matrix of
    optimal_order : int
        optimal order
    """

    length = np.size(signals[0])

    optimal_ic = np.infty
    optimal_order = 1
    optimal_coeffs = np.zeros((dimension, dimension, optimal_order))
    optimal_cov_matrix = np.zeros((dimension, dimension))

    for order in range(1, max_order + 1):
        coeffs, cov_matrix = _vector_arm(signals, dimension, order)

        if information_criterion == 'aic':
            temp_ic = _aic(cov_matrix, order, dimension, length)
        elif information_criterion == 'bic':
            temp_ic = _bic(cov_matrix, order, dimension, length)
        else:
            raise ValueError("The specified information criterion is not"
                             "available. Please use 'aic' or 'bic'.")

        if temp_ic < optimal_ic:
            optimal_ic = temp_ic
            optimal_order = order
            optimal_coeffs = coeffs
            optimal_cov_matrix = cov_matrix

    return optimal_coeffs, optimal_cov_matrix, optimal_order


def _bracket_operator(spectrum, num_freqs, num_signals):
    r"""
    Implementation of the $[ \cdot ]^{+}$ from 'The Factorization of Matricial
    Spectral Densities', Wilson 1972, SiAM J Appl Math, Definition 1.2 (ii).

    Paramaters
    ----------
    spectrum : np.ndarray
        Cross-spectrum of multivariate time series
    num_freqs : int
        Number of frequencies
    num_signals : int
        Number of time series

    Returns
    -------
    causal_part : np.ndarray
        Causal part of cross-spectrum of multivariate time series
    """
    # Get coefficients from spectrum
    causal_part = np.fft.ifft(spectrum, axis=0)

    # Throw away acausal part
    causal_part[(num_freqs + 1) // 2:] = 0

    # Treat coefficient belonging to 0
    causal_part[0] /= 2

    # Back-transformation
    causal_part = np.fft.fft(causal_part, axis=0)

    # Adjust zero frequency part to ensure convergence by setting entries
    # below diagonal to zero at zero-frequency
    if num_signals > 1:
        indices = np.tril_indices(num_signals, k=-1)
        causal_part[0, indices[0], indices[1]] = 0

    return causal_part


def _dagger(matrix_array):
    r"""
    Return Hermitian conjugate of matrix array.

    Parameters
    ----------
    matrix_array : np.ndarray
        Array of matrices the Hermitian conjugate of which needs to be
        determined

    Returns
    -------
    matrix_array_dagger : np.ndarray
        Hermitian conjugate of matrix_array
    """

    if matrix_array.ndim == 2:
        matrix_array_dagger = np.transpose(matrix_array.conj(), axes=(1, 0))

    else:
        matrix_array_dagger = np.transpose(matrix_array.conj(), axes=(0, 2, 1))

    return matrix_array_dagger


def _spectral_factorization(cross_spectrum, num_iterations, term_crit=1e-12):
    r"""
    Determine the spectral matrix factorization of the cross-spectrum
    (denoted by S) of multiple time series:
        S = H \Sigma H^{\dagger}
    Here, \Sigma is the covariance matrix, H the transfer function.
    We follow the algorithm outlined in 'The Factorization of Matricial
    Spectral Densities', Wilson 1972, SiAM J Appl Math.
    The algorithm iteratively calculates approximations for the spectral
    factorization and terminates if either the maximum number of iterations is
    reached or the difference between the cross spectrum and the approximate
    cross spectrum calculated from the covariance matrix and transfer function
    is sufficiently small.

    Parameters
    ----------
    cross_spectrum : np.ndarray
        Cross spectrum to be decomposed in covariance matrix and transfer
        function
    num_iterations : int
        Maximal number of iterations of iterative algorithm
    term_crit : float
        Termination criterion for iteration step in spectral matrix
        factorization
        Default: 1e-12


    Returns
    ------
    cov_matrix : np.ndarray
        Covariance matrix of spectral factorization
    transfer_function : np.ndarray
        Transfer function of spectral factorization
    """
    # spectral_density_function = np.fft.ifft(cross_spectrum, axis=0)
    spectral_density_function = np.copy(cross_spectrum)

    # Resolve dimensions
    num_freqs = np.shape(spectral_density_function)[0]
    num_signals = np.shape(spectral_density_function)[1]

    # Initialization
    identity = np.identity(num_signals)
    factorization = np.zeros(np.shape(spectral_density_function),
                             dtype='complex128')

    # Estimate initial conditions
    try:
        initial_cond = np.linalg.cholesky(cross_spectrum[0].real)
    except np.linalg.LinAlgError:
        raise ValueError('Could not calculate Cholesky decomposition of real'
                         + ' part of zero frequency estimate of cross-spectrum'
                         + '. This might suggest a problem with the input')

    factorization += initial_cond

    converged = False
    # Iteration for calculating spectral factorization
    for i in range(num_iterations):

        factorization_old = np.copy(factorization)

        # Implementation of Eq. 3.1 from "The Factorization of Matricial
        # Spectral Densities", Wilson 1972, SiAM J Appl Math
        X = np.linalg.solve(factorization,
                            spectral_density_function)
        Y = np.linalg.solve(factorization,
                            _dagger(X))
        Y += identity
        Y = _bracket_operator(Y, num_freqs, num_signals)

        factorization = np.matmul(factorization, Y)

        diff = factorization - factorization_old
        error = np.max(np.abs(diff))
        if error < term_crit:
            print(f'Spectral factorization converged after {i} steps')
            converged=True
            break

    if not converged:
        raise Exception("Spectral factorization did not converge after "
                        + f"{num_iterations} steps. Try to increase "
                        + "'num_iterations', or lower the allowed error "
                        + " in the termination criterion, currently "
                        + f"{term_crit}")

    cov_matrix = np.matmul(factorization[0],
                           _dagger(factorization[0]))

    transfer_function = np.matmul(factorization,
                                  np.linalg.inv(factorization[0]))

    return cov_matrix, transfer_function


[docs] def pairwise_granger(signals, max_order, information_criterion='aic'): r""" Determine Granger Causality of two time series Parameters ---------- signals : (N, 2) np.ndarray or neo.AnalogSignal A matrix with two time series (second dimension) that have N time points (first dimension). max_order : int Maximal order of autoregressive model. information_criterion : {'aic', 'bic'}, optional A function to compute the information criterion: * `bic` for Bayesian information_criterion, * `aic` for Akaike information criterion, Default: 'aic' Returns ------- Causality A `namedtuple` with the following attributes: directional_causality_x_y : float The Granger causality value for X influence onto Y. directional_causality_y_x : float The Granger causality value for Y influence onto X. instantaneous_causality : float The remaining channel interdependence not accounted for by the directional causalities (e.g. shared input to X and Y). total_interdependence : float The sum of the former three metrics. It measures the dependence of X and Y. If the total interdependence is positive, X and Y are not independent. Denote covariance matrix of signals X by C|X - a real number Y by C|Y - a real number (X,Y) by C|XY - a (2 \times 2) matrix directional causality X -> Y given by log(C|X / C|XY_00) directional causality Y -> X given by log(C|Y / C|XY_11) instantaneous causality of X,Y given by log(C|XY_00 / C|XY_11) total interdependence of X,Y given by log( {C|X \cdot C|Y} / det{C|XY} ) Raises ------ ValueError If the provided signal does not have a shape of Nx2. If the determinant of the prediction error covariance matrix is not positive. Warns ----- UserWarning If the log determinant of the prediction error covariance matrix is below the tolerance level of 1e-7. Notes ----- The formulas used in this implementation follows :cite:`granger-Ding06_0608035`. The only difference being that we change the equation 47 in the following way: -R(k) - A(1)R(k - 1) - ... - A(m)R(k - m) = 0. This forumlation allows for the usage of R values without transposition (i.e. directly) in equation 48. Examples -------- Example 1. Independent variables. >>> import numpy as np >>> from elephant.causality.granger import pairwise_granger >>> pairwise_granger(np.random.uniform(size=(1000, 2)), max_order=2) # noqa Causality(directional_causality_x_y=-0.0, directional_causality_y_x=0.0, instantaneous_causality=0.0, total_interdependence=0.0) Example 2. Dependent variables. Y depends on X but not vice versa. .. math:: \begin{array}{ll} X_t \sim \mathcal{N}(0, 1) \\ Y_t = 3.5 \cdot X_{t-1} + \epsilon, \; \epsilon \sim\mathcal{N}(0, 1) \end{array} In this case, the directional causality is non-zero. >>> np.random.seed(31) >>> x = np.random.randn(1001) >>> y = 3.5 * x[:-1] + np.random.randn(1000) >>> signals = np.array([x[1:], y]).T # N x 2 matrix >>> pairwise_granger(signals, max_order=1) # noqa Causality(directional_causality_x_y=2.64, directional_causality_y_x=-0.0, instantaneous_causality=0.0, total_interdependence=2.64) """ if isinstance(signals, AnalogSignal): signals = signals.magnitude if not (signals.ndim == 2 and signals.shape[1] == 2): raise ValueError("The input 'signals' must be of dimensions Nx2.") # transpose (N,2) -> (2,N) for mathematical convenience signals = signals.T # signal_x and signal_y are (1, N) arrays signal_x, signal_y = np.expand_dims(signals, axis=1) coeffs_x, var_x, p_1 = _optimal_vector_arm(signal_x, 1, max_order, information_criterion) coeffs_y, var_y, p_2 = _optimal_vector_arm(signal_y, 1, max_order, information_criterion) coeffs_xy, cov_xy, p_3 = _optimal_vector_arm(signals, 2, max_order, information_criterion) sign, log_det_cov = np.linalg.slogdet(cov_xy) tolerance = 1e-7 if sign <= 0: raise ValueError( "Determinant of covariance matrix must be always positive: " "In this case its sign is {}".format(sign)) if log_det_cov <= tolerance: warnings.warn("The value of the log determinant is at or below the " "tolerance level. Proceeding with computation.", UserWarning) directional_causality_y_x = np.log(var_x[0]) - np.log(cov_xy[0, 0]) directional_causality_x_y = np.log(var_y[0]) - np.log(cov_xy[1, 1]) instantaneous_causality = \ np.log(cov_xy[0, 0]) + np.log(cov_xy[1, 1]) - log_det_cov instantaneous_causality = np.asarray(instantaneous_causality) total_interdependence = np.log(var_x[0]) + np.log(var_y[0]) - log_det_cov # Round GC according to following scheme: # Note that standard error scales as 1/sqrt(sample_size) # Calculate significant figures according to standard error length = np.size(signal_x) asymptotic_std_error = 1/np.sqrt(length) est_sig_figures = int((-1)*np.around(np.log10(asymptotic_std_error))) directional_causality_x_y_round = np.around(directional_causality_x_y, est_sig_figures) directional_causality_y_x_round = np.around(directional_causality_y_x, est_sig_figures) instantaneous_causality_round = np.around(instantaneous_causality, est_sig_figures) total_interdependence_round = np.around(total_interdependence, est_sig_figures) return Causality( directional_causality_x_y=directional_causality_x_y_round.item(), directional_causality_y_x=directional_causality_y_x_round.item(), instantaneous_causality=instantaneous_causality_round.item(), total_interdependence=total_interdependence_round.item())
[docs] def conditional_granger(signals, max_order, information_criterion='aic'): r""" Determine conditional Granger Causality of the second time series on the first time series, given the third time series. In other words, for time series X_t, Y_t and Z_t, this function tests if Y_t influences X_t via Z_t. Parameters ---------- signals : (N, 3) np.ndarray or neo.AnalogSignal A matrix with three time series (second dimension) that have N time points (first dimension). The time series to be conditioned on is the third. max_order : int Maximal order of autoregressive model. information_criterion : {'aic', 'bic'}, optional A function to compute the information criterion: * `bic` for Bayesian information_criterion, * `aic` for Akaike information criterion, Default: 'aic' Returns ------- conditional_causality_xy_z_round : float The value of conditional causality of Y_t on X_t given Z_t. Zero value indicates that causality of Y_t on X_t is solely dependent on Z_t. Raises ------ ValueError If the provided signal does not have a shape of Nx3. Notes ----- The formulas used in this implementation follows :cite:`granger-Ding06_0608035`. Specifically, the Eq 35. """ if isinstance(signals, AnalogSignal): signals = signals.magnitude if not (signals.ndim == 2 and signals.shape[1] == 3): raise ValueError("The input 'signals' must be of dimensions Nx3.") # transpose (N,3) -> (3,N) for mathematical convenience signals = signals.T # signal_x, signal_y and signal_z are (1, N) arrays signal_x, signal_y, signal_z = np.expand_dims(signals, axis=1) signals_xz = np.vstack([signal_x, signal_z]) coeffs_xz, cov_xz, p_1 = _optimal_vector_arm( signals_xz, dimension=2, max_order=max_order, information_criterion=information_criterion) coeffs_xyz, cov_xyz, p_2 = _optimal_vector_arm( signals, dimension=3, max_order=max_order, information_criterion=information_criterion) conditional_causality_xy_z = np.log(cov_xz[0, 0]) - np.log(cov_xyz[0, 0]) # Round conditional GC according to following scheme: # Note that standard error scales as 1/sqrt(sample_size) # Calculate significant figures according to standard error length = np.size(signal_x) asymptotic_std_error = 1/np.sqrt(length) est_sig_figures = int((-1)*np.around(np.log10(asymptotic_std_error))) conditional_causality_xy_z_round = np.around(conditional_causality_xy_z, est_sig_figures) return conditional_causality_xy_z_round
[docs] def pairwise_spectral_granger(signal_i, signal_j, fs=1, nw=4, num_tapers=None, peak_resolution=None, n_segments=1, len_segment=None, frequency_resolution=None, overlap=0.5, num_iterations=300, term_crit=1e-12): r"""Determine spectral Granger Causality of two signals. The spectral Granger Causality is obtained through the following steps: 1. Determine the cross spectrum of the two signals by applying :func:`segmented_multitaper_cross_spectrum` to the joint signal. See the documentation of this function for the hierarchy of the parameters used for the estimation of the cross spectrum. 2. Calculate the spectral factorization of the cross spectrum decomposing the cross spectrum into the covariance matrix and the transfer function. 3. Calculate the directional and instantaneous spectral Granger Causality from the power spectra, the transfer function and the covariance matrix (see e.g. Wen et al., 2013, Phil Trans R Soc, eq. 2.10 ff). Parameters ---------- signal_i : neo.AnalogSignal or pq.Quantity or np.ndarray First time series data of the pair between which spectral Granger Causality is computed. signal_j : neo.AnalogSignal or pq.Quantity or np.ndarray Second time series data of the pair between which spectral Granger Causality is computed. The shapes and the sampling frequencies of `signal_i` and `signal_j` must be identical. When `signal_i` and `signal_j` are not `neo.AnalogSignal`, sampling frequency should be specified through the keyword argument `fs`. Otherwise, the default value is used (`fs` = 1.0). fs : float, optional Specifies the sampling frequency of the input time series Default: 1.0 nw : float, optional Time bandwidth product Default: 4.0 num_tapers : int, optional Number of tapers used in 1. to obtain estimate of PSD. By default, [2*nw] - 1 is chosen. Default: None peak_resolution : pq.Quantity float, optional Quantity in Hz determining the number of tapers used for analysis. Fine peak resolution --> low numerical value --> low number of tapers High peak resolution --> high numerical value --> high number of tapers When given as a `float`, it is taken as frequency in Hz. Default: None. n_segments : int, optional Number of segments. The length of segments is adjusted so that overlapping segments cover the entire stretch of the given data. This parameter is ignored if `len_segment` or `frequency_resolution` is given. Default: 1 len_segment : int, optional Length of segments. This parameter is ignored if `frequency_resolution` is given. If None, it will be determined from other parameters. Default: None frequency_resolution : pq.Quantity or float, optional Desired frequency resolution of the obtained spectral Granger Causality estimate in terms of the interval between adjacent frequency bins. When given as a `float`, it is taken as frequency in Hz. If None, it will be determined from other parameters. Default: None overlap : float, optional Overlap between segments represented as a float number between 0 (no overlap) and 1 (complete overlap). Default: 0.5 (half-overlapped) num_iterations : int Number of iterations for algorithm to estimate spectral factorization. Default: 300 term_crit : float Termination criterion for iteration step in spectral matrix factorization Default: 1e-12 Returns ------- freqs : np.ndarray Frequencies associated with the spectral Granger Causality estimate. Causality A `namedtuple` with the following attributes: directional_causality_x_y : np.ndarray Spectrally resolved Granger causality influence of `signal_i`, abbreviated by X, onto `signal_j`, abbreviated by Y. directional_causality_y_x : np.ndarray Spectrally resolved Granger causality influence of `signal_j`, abbreviated by Y, onto `signal_i`, abbreviated by Y. instantaneous_causality : np.ndarray The remaining spectrally resolved channel interdependence not accounted for by the directional causalities (e.g. shared input to X, i.e. `signal_i`, and Y, i.e. `signal_j`). total_interdependence : np.ndarray The sum of the former three metrics. It measures the dependence of X, i.e. `signal_i` and Y, i.e. `signal_j`. If the total interdependence is positive, X and Y are not independent. """ if isinstance(signal_i, neo.core.AnalogSignal) and \ isinstance(signal_j, neo.core.AnalogSignal): signals = signal_i.merge(signal_j) elif isinstance(signal_i, np.ndarray) and isinstance(signal_j, np.ndarray): signals = np.vstack([signal_i, signal_j]) # Calculate cross spectrum for signals freqs, S = segmented_multitaper_cross_spectrum( signals=signals, n_segments=n_segments, len_segment=len_segment, frequency_resolution=frequency_resolution, overlap=overlap, fs=fs, nw=nw, num_tapers=num_tapers, peak_resolution=peak_resolution, return_onesided=False) # Remove units attached by the multitaper_cross_spectrum if isinstance(S, pq.Quantity): S = S.magnitude # Transpose cross spectrum due to different conventions used in # segemented_multitaper_cross_spectrum and the calculations of spectral # Granger causality S = np.transpose(S, axes=(1, 0, 2)) # Move frequencies to first axis - Needed for _spectral_factorization S = np.transpose(S, axes=(2, 0, 1)) # Decompose cross spectrum into covariance and transfer function C, H = _spectral_factorization(S, num_iterations=num_iterations) # Take positive frequencies mask = (freqs >= 0) freqs = freqs[mask] S = S[mask] H = H[mask] # Calculate spectral Granger Causality. # Formulae follow Wen et al., 2013, Phil Trans R Soc H_tilde_xx = H[:, 0, 0] + C[0, 1]/C[0, 0]*H[:, 0, 1] H_tilde_yy = H[:, 1, 1] + C[0, 1]/C[1, 1]*H[:, 1, 0] directional_causality_y_x = np.log(S[:, 0, 0].real / (H_tilde_xx * C[0, 0] * H_tilde_xx.conj()).real) directional_causality_x_y = np.log(S[:, 1, 1].real / (H_tilde_yy * C[1, 1] * H_tilde_yy.conj()).real) instantaneous_causality = np.log( (H_tilde_xx * C[0, 0] * H_tilde_xx.conj()).real * (H_tilde_yy * C[1, 1] * H_tilde_yy.conj()).real) instantaneous_causality -= np.linalg.slogdet(S)[1] total_interdependence = (directional_causality_x_y + directional_causality_y_x + instantaneous_causality) spectral_causality = Causality( directional_causality_x_y=directional_causality_x_y, directional_causality_y_x=directional_causality_y_x, instantaneous_causality=instantaneous_causality, total_interdependence=total_interdependence) return freqs, spectral_causality