Source code for elephant.causality.granger

# -*- coding: utf-8 -*-
.. current_module elephant.causality

This module provides function to estimate causal influences of signals on each

Granger causality
Granger causality is a method to determine causal influence of one signal on
another based on autoregressive modelling. It was developed by Nobel prize
laureate Clive Granger and has been adopted in various numerical fields ever
since :cite:`granger-Granger69_424`. In its simplest form, the
method tests whether the past values of one signal help to reduce the
prediction error of another signal, compared to the past values of the latter
signal alone. If it does reduce the prediction error, the first signal is said
to Granger cause the other signal.

The user must be mindful of the method's limitations, which are assumptions of
covariance stationary data, linearity imposed by the underlying autoregressive
modelling as well as the fact that the variables not included in the model will
not be accounted for :cite:`granger-Seth07_1667`.

The mathematical implementation of Granger causality methods in this module
closely follows :cite:`granger-Ding06_0608035`.

Overview of Functions
Various formulations of Granger causality have been developed. In this module
you will find function for time-series data to test pairwise Granger causality

Time-series Granger causality

.. autosummary::
    :toctree: toctree/causality/



.. bibliography:: ../bib/elephant.bib
   :labelprefix: gr
   :keyprefix: granger-
   :style: unsrt

:copyright: Copyright 2014-2020 by the Elephant team, see `doc/authors.rst`.
:license: Modified BSD, see LICENSE.txt for details.

from __future__ import division, print_function, unicode_literals

import warnings
from collections import namedtuple

import numpy as np
from neo.core import AnalogSignal

__all__ = (

# the return type of pairwise_granger() function
Causality = namedtuple('Causality',

def _bic(cov, order, dimension, length):
    Calculate Bayesian Information Criterion

    cov : np.ndarray
        covariance matrix of auto regressive model
    order : int
        order of autoregressive model
    dimension : int
        dimensionality of the data
    length : int
        number of time samples

    criterion : float
       Bayesian Information Criterion
    sign, log_det_cov = np.linalg.slogdet(cov)
    criterion = 2 * log_det_cov \
        + 2*(dimension**2)*order*np.log(length)/length

    return criterion

def _aic(cov, order, dimension, length):
    Calculate Akaike Information Criterion

    cov : np.ndarray
        covariance matrix of auto regressive model
    order : int
        order of autoregressive model
    dimension : int
        dimensionality of the data
    length : int
        number of time samples

    criterion : float
        Akaike Information Criterion
    sign, log_det_cov = np.linalg.slogdet(cov)
    criterion = 2 * log_det_cov \
        + 2*(dimension**2)*order/length

    return criterion

def _lag_covariances(signals, dimension, max_lag):
    Determine covariances of time series and time shift of itself up to a
    maximal lag

    signals: np.ndarray
        time series data
    dimension : int
        number of time series
    max_lag: int
        maximal time lag to be considered

    lag_corr : np.ndarray
        correlations matrices of lagged signals

    Covariance of shifted signals calculated according to the following

        x: d-dimensional signal
        x^T: transpose of d-dimensional signal
        N: number of time points
        \tau: lag

        C(\tau) = \sum_{i=0}^{N-\tau} x[i]*x^T[\tau+i]

    length = np.size(signals[0])

    if length < max_lag:
        raise ValueError("Maximum lag larger than size of data")

    # centralize time series
    signals_mean = (signals - np.mean(signals, keepdims=True)).T

    lag_covariances = np.zeros((max_lag+1, dimension, dimension))

    # determine lagged covariance for different time lags
    for lag in range(0, max_lag+1):
        lag_covariances[lag] = \
                np.mean(np.einsum('ij,ik -> ijk', signals_mean[:length-lag],
                                  signals_mean[lag:]), axis=0)

    return lag_covariances

def _yule_walker_matrix(data, dimension, order):
    Generate matrix for Yule-Walker equation

    data : np.ndarray
        correlation of data shifted with lags up to order
    dimension : int
        dimensionality of data (e.g. number of channels)
    order : int
        order of the autoregressive model

    yule_walker_matrix : np.ndarray
        matrix in Yule-Walker equation

        Yule-Walker Matrix M is a block-structured symmetric matrix with
        dimension (d \cdot p)\times(d \cdot p)

        d: dimension of signal
        p: order of autoregressive model
        C(\tau): time-shifted covariances \tau -> d \times d matrix

        The blocks of size (d \times d) are set as follows:

        M_ij = C(j-i)^T

        where 1 \leq i \leq j \leq p. The other entries are determined by
    lag_covariances : np.ndarray


    lag_covariances = _lag_covariances(data, dimension, order)

    yule_walker_matrix = np.zeros((dimension*order, dimension*order))

    for block_row in range(order):
        for block_column in range(block_row, order):
            yule_walker_matrix[block_row*dimension: (block_row+1)*dimension,
                               (block_column+1)*dimension] = \

                               (block_row+1)*dimension] = \
    return yule_walker_matrix, lag_covariances

def _vector_arm(signals, dimension, order):
    Determine coefficients of autoregressive model from time series data.

    Coefficients of autoregressive model calculated via solving the linear

    M A = C

    M: Yule-Waler Matrix
    A: Coefficients of autoregressive model
    C: Time-shifted covariances with positive lags

    Covariance matrix C_0 is then given by

    C_0 = C[0] - \sum_{i=0}^{p-1} A[i]C[i+1]

    where p is the orde of the autoregressive model.

    signals : np.ndarray
        time series data
    order : int
        order of the autoregressive model

    coeffs: np.ndarray
        coefficients of the autoregressive model
    covar_mat : np.ndarray
        covariance matrix of


    yule_walker_matrix, lag_covariances = \
        _yule_walker_matrix(signals, dimension, order)

    positive_lag_covariances = np.reshape(lag_covariances[1:],
                                          (dimension*order, dimension))

    lstsq_coeffs = \
        np.linalg.lstsq(yule_walker_matrix, positive_lag_covariances)[0]

    coeffs = []
    for index in range(order):
        coeffs.append(lstsq_coeffs[index*dimension:(index+1)*dimension, ].T)

    coeffs = np.stack(coeffs)

    cov_matrix = np.copy(lag_covariances[0])
    for i in range(order):
        cov_matrix -= np.matmul(coeffs[i], lag_covariances[i+1])

    return coeffs, cov_matrix

def _optimal_vector_arm(signals, dimension, max_order,
    Determine optimal auto regressive model by choosing optimal order via
    Information Criterion

    signals : np.ndarray
        time series data
    dimension : int
        dimensionality of the data
    max_order : int
        maximal order to consider
    information_criterion : str
        A function to compute the information criterion:
            `bic` for Bayesian information_criterion,
            `aic` for Akaike information criterion
        Default: aic

    optimal_coeffs: np.ndarray
        coefficients of the autoregressive model
    optimal_cov_mat : np.ndarray
        covariance matrix of
    optimal_order : int
        optimal order

    length = np.size(signals[0])

    optimal_ic = np.infty
    optimal_order = 1
    optimal_coeffs = np.zeros((dimension, dimension, optimal_order))
    optimal_cov_matrix = np.zeros((dimension, dimension))

    for order in range(1, max_order + 1):
        coeffs, cov_matrix = _vector_arm(signals, dimension, order)

        if information_criterion == 'aic':
            temp_ic = _aic(cov_matrix, order, dimension, length)
        elif information_criterion == 'bic':
            temp_ic = _bic(cov_matrix, order, dimension, length)
            raise ValueError("The specified information criterion is not"
                             "available. Please use 'aic' or 'bic'.")

        if temp_ic < optimal_ic:
            optimal_ic = temp_ic
            optimal_order = order
            optimal_coeffs = coeffs
            optimal_cov_matrix = cov_matrix

    return optimal_coeffs, optimal_cov_matrix, optimal_order

[docs]def pairwise_granger(signals, max_order, information_criterion='aic'): r""" Determine Granger Causality of two time series Parameters ---------- signals : (N, 2) np.ndarray or neo.AnalogSignal A matrix with two time series (second dimension) that have N time points (first dimension). max_order : int Maximal order of autoregressive model. information_criterion : {'aic', 'bic'}, optional A function to compute the information criterion: `bic` for Bayesian information_criterion, `aic` for Akaike information criterion, Default: 'aic'. Returns ------- Causality A `namedtuple` with the following attributes: directional_causality_x_y : float The Granger causality value for X influence onto Y. directional_causality_y_x : float The Granger causality value for Y influence onto X. instantaneous_causality : float The remaining channel interdependence not accounted for by the directional causalities (e.g. shared input to X and Y). total_interdependence : float The sum of the former three metrics. It measures the dependence of X and Y. If the total interdependence is positive, X and Y are not independent. Denote covariance matrix of signals X by C|X - a real number Y by C|Y - a real number (X,Y) by C|XY - a (2 \times 2) matrix directional causality X -> Y given by log(C|X / C|XY_00) directional causality Y -> X given by log(C|Y / C|XY_11) instantaneous causality of X,Y given by log(C|XY_00 / C|XY_11) total interdependence of X,Y given by log( {C|X \cdot C|Y} / det{C|XY} ) Raises ------ ValueError If the provided signal does not have a shape of Nx2. If the determinant of the prediction error covariance matrix is not positive. Warns ----- UserWarning If the log determinant of the prediction error covariance matrix is below the tolerance level of 1e-7. Notes ----- The formulas used in this implementation follows :cite:`granger-Ding06_0608035`. The only difference being that we change the equation 47 in the following way: -R(k) - A(1)R(k - 1) - ... - A(m)R(k - m) = 0. This forumlation allows for the usage of R values without transposition (i.e. directly) in equation 48. Examples -------- Example 1. Independent variables. >>> import numpy as np >>> from elephant.causality.granger import pairwise_granger >>> pairwise_granger(np.random.uniform(size=(1000, 2)), max_order=2) Causality(directional_causality_x_y=0.0, directional_causality_y_x=-0.0, instantaneous_causality=0.0, total_interdependence=0.0) Example 2. Dependent variables. Y depends on X but not vice versa. .. math:: \begin{array}{ll} X_t \sim \mathcal{N}(0, 1) \\ Y_t = 3.5 \cdot X_{t-1} + \epsilon, \; \epsilon \sim\mathcal{N}(0, 1) \end{array} In this case, the directional causality is non-zero. >>> x = np.random.randn(1001) >>> y = 3.5 * x[:-1] + np.random.randn(1000) >>> signals = np.array([x[1:], y]).T # N x 2 matrix >>> pairwise_granger(signals, max_order=1) Causality(directional_causality_x_y=2.64, directional_causality_y_x=0.0, instantaneous_causality=0.0, total_interdependence=2.64) """ if isinstance(signals, AnalogSignal): signals = signals.magnitude if not (signals.ndim == 2 and signals.shape[1] == 2): raise ValueError("The input 'signals' must be of dimensions Nx2.") # transpose (N,2) -> (2,N) for mathematical convenience signals = signals.T # signal_x and signal_y are (1, N) arrays signal_x, signal_y = np.expand_dims(signals, axis=1) coeffs_x, var_x, p_1 = _optimal_vector_arm(signal_x, 1, max_order, information_criterion) coeffs_y, var_y, p_2 = _optimal_vector_arm(signal_y, 1, max_order, information_criterion) coeffs_xy, cov_xy, p_3 = _optimal_vector_arm(signals, 2, max_order, information_criterion) sign, log_det_cov = np.linalg.slogdet(cov_xy) tolerance = 1e-7 if sign <= 0: raise ValueError( "Determinant of covariance matrix must be always positive: " "In this case its sign is {}".format(sign)) if log_det_cov <= tolerance: warnings.warn("The value of the log determinant is at or below the " "tolerance level. Proceeding with computation.", UserWarning) directional_causality_y_x = np.log(var_x[0]) - np.log(cov_xy[0, 0]) directional_causality_x_y = np.log(var_y[0]) - np.log(cov_xy[1, 1]) instantaneous_causality = \ np.log(cov_xy[0, 0]) + np.log(cov_xy[1, 1]) - log_det_cov instantaneous_causality = np.asarray(instantaneous_causality) total_interdependence = np.log(var_x[0]) + np.log(var_y[0]) - log_det_cov # Round GC according to following scheme: # Note that standard error scales as 1/sqrt(sample_size) # Calculate significant figures according to standard error length = np.size(signal_x) asymptotic_std_error = 1/np.sqrt(length) est_sig_figures = int((-1)*np.around(np.log10(asymptotic_std_error))) directional_causality_x_y_round = np.around(directional_causality_x_y, est_sig_figures) directional_causality_y_x_round = np.around(directional_causality_y_x, est_sig_figures) instantaneous_causality_round = np.around(instantaneous_causality, est_sig_figures) total_interdependence_round = np.around(total_interdependence, est_sig_figures) return Causality( directional_causality_x_y=directional_causality_x_y_round.item(), directional_causality_y_x=directional_causality_y_x_round.item(), instantaneous_causality=instantaneous_causality_round.item(), total_interdependence=total_interdependence_round.item())
[docs]def conditional_granger(signals, max_order, information_criterion='aic'): r""" Determine conditional Granger Causality of the second time series on the first time series, given the third time series. In other words, for time series X_t, Y_t and Z_t, this function tests if Y_t influences X_t via Z_t. Parameters ---------- signals : (N, 3) np.ndarray or neo.AnalogSignal A matrix with three time series (second dimension) that have N time points (first dimension). The time series to be conditioned on is the third. max_order : int Maximal order of autoregressive model. information_criterion : {'aic', 'bic'}, optional A function to compute the information criterion: `bic` for Bayesian information_criterion, `aic` for Akaike information criterion, Default: 'aic'. Returns ------- conditional_causality_xy_z_round : float The value of conditional causality of Y_t on X_t given Z_t. Zero value indicates that causality of Y_t on X_t is solely dependent on Z_t. Raises ------ ValueError If the provided signal does not have a shape of Nx3. Notes ----- The formulas used in this implementation follows :cite:`granger-Ding06_0608035`. Specifically, the Eq 35. """ if isinstance(signals, AnalogSignal): signals = signals.magnitude if not (signals.ndim == 2 and signals.shape[1] == 3): raise ValueError("The input 'signals' must be of dimensions Nx3.") # transpose (N,3) -> (3,N) for mathematical convenience signals = signals.T # signal_x, signal_y and signal_z are (1, N) arrays signal_x, signal_y, signal_z = np.expand_dims(signals, axis=1) signals_xz = np.vstack([signal_x, signal_z]) coeffs_xz, cov_xz, p_1 = _optimal_vector_arm( signals_xz, dimension=2, max_order=max_order, information_criterion=information_criterion) coeffs_xyz, cov_xyz, p_2 = _optimal_vector_arm( signals, dimension=3, max_order=max_order, information_criterion=information_criterion) conditional_causality_xy_z = np.log(cov_xz[0, 0]) - np.log(cov_xyz[0, 0]) # Round conditional GC according to following scheme: # Note that standard error scales as 1/sqrt(sample_size) # Calculate significant figures according to standard error length = np.size(signal_x) asymptotic_std_error = 1/np.sqrt(length) est_sig_figures = int((-1)*np.around(np.log10(asymptotic_std_error))) conditional_causality_xy_z_round = np.around(conditional_causality_xy_z, est_sig_figures) return conditional_causality_xy_z_round