# -*- coding: utf-8 -*-
"""
.. current_module elephant.causality
Overview
--------
This module provides function to estimate causal influences of signals on each
other.
Granger causality
~~~~~~~~~~~~~~~~~
Granger causality is a method to determine causal influence of one signal on
another based on autoregressive modelling. It was developed by Nobel prize
laureate Clive Granger and has been adopted in various numerical fields ever
since :cite:`granger-Granger69_424`. In its simplest form, the
method tests whether the past values of one signal help to reduce the
prediction error of another signal, compared to the past values of the latter
signal alone. If it does reduce the prediction error, the first signal is said
to Granger cause the other signal.
Limitations
+++++++++++
The user must be mindful of the method's limitations, which are assumptions of
covariance stationary data, linearity imposed by the underlying autoregressive
modelling as well as the fact that the variables not included in the model will
not be accounted for :cite:`granger-Seth07_1667`.
Implementation
++++++++++++++
The mathematical implementation of Granger causality methods in this module
closely follows :cite:`granger-Ding06_0608035`.
Overview of Functions
---------------------
Various formulations of Granger causality have been developed. In this module
you will find function for time-series data to test pairwise Granger causality
(`pairwise_granger`).
Time-series Granger causality
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. autosummary::
:toctree: toctree/causality/
pairwise_granger
conditional_granger
References
----------
.. bibliography:: ../bib/elephant.bib
:labelprefix: gr
:keyprefix: granger-
:style: unsrt
:copyright: Copyright 2014-2020 by the Elephant team, see `doc/authors.rst`.
:license: Modified BSD, see LICENSE.txt for details.
"""
from __future__ import division, print_function, unicode_literals
import warnings
from collections import namedtuple
import numpy as np
from neo.core import AnalogSignal
__all__ = (
"Causality",
"pairwise_granger",
"conditional_granger"
)
# the return type of pairwise_granger() function
Causality = namedtuple('Causality',
['directional_causality_x_y',
'directional_causality_y_x',
'instantaneous_causality',
'total_interdependence'])
def _bic(cov, order, dimension, length):
"""
Calculate Bayesian Information Criterion
Parameters
----------
cov : np.ndarray
covariance matrix of auto regressive model
order : int
order of autoregressive model
dimension : int
dimensionality of the data
length : int
number of time samples
Returns
-------
criterion : float
Bayesian Information Criterion
"""
sign, log_det_cov = np.linalg.slogdet(cov)
criterion = 2 * log_det_cov \
+ 2*(dimension**2)*order*np.log(length)/length
return criterion
def _aic(cov, order, dimension, length):
"""
Calculate Akaike Information Criterion
Parameters
----------
cov : np.ndarray
covariance matrix of auto regressive model
order : int
order of autoregressive model
dimension : int
dimensionality of the data
length : int
number of time samples
Returns
-------
criterion : float
Akaike Information Criterion
"""
sign, log_det_cov = np.linalg.slogdet(cov)
criterion = 2 * log_det_cov \
+ 2*(dimension**2)*order/length
return criterion
def _lag_covariances(signals, dimension, max_lag):
r"""
Determine covariances of time series and time shift of itself up to a
maximal lag
Parameters
----------
signals: np.ndarray
time series data
dimension : int
number of time series
max_lag: int
maximal time lag to be considered
Returns
-------
lag_corr : np.ndarray
correlations matrices of lagged signals
Covariance of shifted signals calculated according to the following
formula:
x: d-dimensional signal
x^T: transpose of d-dimensional signal
N: number of time points
\tau: lag
C(\tau) = \sum_{i=0}^{N-\tau} x[i]*x^T[\tau+i]
"""
length = np.size(signals[0])
if length < max_lag:
raise ValueError("Maximum lag larger than size of data")
# centralize time series
signals_mean = (signals - np.mean(signals, keepdims=True)).T
lag_covariances = np.zeros((max_lag+1, dimension, dimension))
# determine lagged covariance for different time lags
for lag in range(0, max_lag+1):
lag_covariances[lag] = \
np.mean(np.einsum('ij,ik -> ijk', signals_mean[:length-lag],
signals_mean[lag:]), axis=0)
return lag_covariances
def _yule_walker_matrix(data, dimension, order):
r"""
Generate matrix for Yule-Walker equation
Parameters
----------
data : np.ndarray
correlation of data shifted with lags up to order
dimension : int
dimensionality of data (e.g. number of channels)
order : int
order of the autoregressive model
Returns
-------
yule_walker_matrix : np.ndarray
matrix in Yule-Walker equation
Yule-Walker Matrix M is a block-structured symmetric matrix with
dimension (d \cdot p)\times(d \cdot p)
where
d: dimension of signal
p: order of autoregressive model
C(\tau): time-shifted covariances \tau -> d \times d matrix
The blocks of size (d \times d) are set as follows:
M_ij = C(j-i)^T
where 1 \leq i \leq j \leq p. The other entries are determined by
symmetry.
lag_covariances : np.ndarray
"""
lag_covariances = _lag_covariances(data, dimension, order)
yule_walker_matrix = np.zeros((dimension*order, dimension*order))
for block_row in range(order):
for block_column in range(block_row, order):
yule_walker_matrix[block_row*dimension: (block_row+1)*dimension,
block_column*dimension:
(block_column+1)*dimension] = \
lag_covariances[block_column-block_row].T
yule_walker_matrix[block_column*dimension:
(block_column+1)*dimension,
block_row*dimension:
(block_row+1)*dimension] = \
lag_covariances[block_column-block_row]
return yule_walker_matrix, lag_covariances
def _vector_arm(signals, dimension, order):
r"""
Determine coefficients of autoregressive model from time series data.
Coefficients of autoregressive model calculated via solving the linear
equation
M A = C
where
M: Yule-Waler Matrix
A: Coefficients of autoregressive model
C: Time-shifted covariances with positive lags
Covariance matrix C_0 is then given by
C_0 = C[0] - \sum_{i=0}^{p-1} A[i]C[i+1]
where p is the orde of the autoregressive model.
Parameters
----------
signals : np.ndarray
time series data
order : int
order of the autoregressive model
Returns
-------
coeffs: np.ndarray
coefficients of the autoregressive model
ry
covar_mat : np.ndarray
covariance matrix of
"""
yule_walker_matrix, lag_covariances = \
_yule_walker_matrix(signals, dimension, order)
positive_lag_covariances = np.reshape(lag_covariances[1:],
(dimension*order, dimension))
lstsq_coeffs = \
np.linalg.lstsq(yule_walker_matrix, positive_lag_covariances)[0]
coeffs = []
for index in range(order):
coeffs.append(lstsq_coeffs[index*dimension:(index+1)*dimension, ].T)
coeffs = np.stack(coeffs)
cov_matrix = np.copy(lag_covariances[0])
for i in range(order):
cov_matrix -= np.matmul(coeffs[i], lag_covariances[i+1])
return coeffs, cov_matrix
def _optimal_vector_arm(signals, dimension, max_order,
information_criterion='aic'):
"""
Determine optimal auto regressive model by choosing optimal order via
Information Criterion
Parameters
----------
signals : np.ndarray
time series data
dimension : int
dimensionality of the data
max_order : int
maximal order to consider
information_criterion : str
A function to compute the information criterion:
`bic` for Bayesian information_criterion,
`aic` for Akaike information criterion
Default: aic
Returns
-------
optimal_coeffs: np.ndarray
coefficients of the autoregressive model
optimal_cov_mat : np.ndarray
covariance matrix of
optimal_order : int
optimal order
"""
length = np.size(signals[0])
optimal_ic = np.infty
optimal_order = 1
optimal_coeffs = np.zeros((dimension, dimension, optimal_order))
optimal_cov_matrix = np.zeros((dimension, dimension))
for order in range(1, max_order + 1):
coeffs, cov_matrix = _vector_arm(signals, dimension, order)
if information_criterion == 'aic':
temp_ic = _aic(cov_matrix, order, dimension, length)
elif information_criterion == 'bic':
temp_ic = _bic(cov_matrix, order, dimension, length)
else:
raise ValueError("The specified information criterion is not"
"available. Please use 'aic' or 'bic'.")
if temp_ic < optimal_ic:
optimal_ic = temp_ic
optimal_order = order
optimal_coeffs = coeffs
optimal_cov_matrix = cov_matrix
return optimal_coeffs, optimal_cov_matrix, optimal_order
[docs]def pairwise_granger(signals, max_order, information_criterion='aic'):
r"""
Determine Granger Causality of two time series
Parameters
----------
signals : (N, 2) np.ndarray or neo.AnalogSignal
A matrix with two time series (second dimension) that have N time
points (first dimension).
max_order : int
Maximal order of autoregressive model.
information_criterion : {'aic', 'bic'}, optional
A function to compute the information criterion:
`bic` for Bayesian information_criterion,
`aic` for Akaike information criterion,
Default: 'aic'.
Returns
-------
Causality
A `namedtuple` with the following attributes:
directional_causality_x_y : float
The Granger causality value for X influence onto Y.
directional_causality_y_x : float
The Granger causality value for Y influence onto X.
instantaneous_causality : float
The remaining channel interdependence not accounted for by
the directional causalities (e.g. shared input to X and Y).
total_interdependence : float
The sum of the former three metrics. It measures the dependence
of X and Y. If the total interdependence is positive, X and Y
are not independent.
Denote covariance matrix of signals
X by C|X - a real number
Y by C|Y - a real number
(X,Y) by C|XY - a (2 \times 2) matrix
directional causality X -> Y given by
log(C|X / C|XY_00)
directional causality Y -> X given by
log(C|Y / C|XY_11)
instantaneous causality of X,Y given by
log(C|XY_00 / C|XY_11)
total interdependence of X,Y given by
log( {C|X \cdot C|Y} / det{C|XY} )
Raises
------
ValueError
If the provided signal does not have a shape of Nx2.
If the determinant of the prediction error covariance matrix is not
positive.
Warns
-----
UserWarning
If the log determinant of the prediction error covariance matrix is
below the tolerance level of 1e-7.
Notes
-----
The formulas used in this implementation follows
:cite:`granger-Ding06_0608035`. The only difference being that we change
the equation 47 in the following way:
-R(k) - A(1)R(k - 1) - ... - A(m)R(k - m) = 0.
This forumlation allows for the usage of R values without transposition
(i.e. directly) in equation 48.
Examples
--------
Example 1. Independent variables.
>>> import numpy as np
>>> from elephant.causality.granger import pairwise_granger
>>> pairwise_granger(np.random.uniform(size=(1000, 2)), max_order=2)
Causality(directional_causality_x_y=0.0,
directional_causality_y_x=-0.0,
instantaneous_causality=0.0,
total_interdependence=0.0)
Example 2. Dependent variables. Y depends on X but not vice versa.
.. math::
\begin{array}{ll}
X_t \sim \mathcal{N}(0, 1) \\
Y_t = 3.5 \cdot X_{t-1} + \epsilon, \;
\epsilon \sim\mathcal{N}(0, 1)
\end{array}
In this case, the directional causality is non-zero.
>>> x = np.random.randn(1001)
>>> y = 3.5 * x[:-1] + np.random.randn(1000)
>>> signals = np.array([x[1:], y]).T # N x 2 matrix
>>> pairwise_granger(signals, max_order=1)
Causality(directional_causality_x_y=2.64,
directional_causality_y_x=0.0,
instantaneous_causality=0.0,
total_interdependence=2.64)
"""
if isinstance(signals, AnalogSignal):
signals = signals.magnitude
if not (signals.ndim == 2 and signals.shape[1] == 2):
raise ValueError("The input 'signals' must be of dimensions Nx2.")
# transpose (N,2) -> (2,N) for mathematical convenience
signals = signals.T
# signal_x and signal_y are (1, N) arrays
signal_x, signal_y = np.expand_dims(signals, axis=1)
coeffs_x, var_x, p_1 = _optimal_vector_arm(signal_x, 1, max_order,
information_criterion)
coeffs_y, var_y, p_2 = _optimal_vector_arm(signal_y, 1, max_order,
information_criterion)
coeffs_xy, cov_xy, p_3 = _optimal_vector_arm(signals, 2, max_order,
information_criterion)
sign, log_det_cov = np.linalg.slogdet(cov_xy)
tolerance = 1e-7
if sign <= 0:
raise ValueError(
"Determinant of covariance matrix must be always positive: "
"In this case its sign is {}".format(sign))
if log_det_cov <= tolerance:
warnings.warn("The value of the log determinant is at or below the "
"tolerance level. Proceeding with computation.",
UserWarning)
directional_causality_y_x = np.log(var_x[0]) - np.log(cov_xy[0, 0])
directional_causality_x_y = np.log(var_y[0]) - np.log(cov_xy[1, 1])
instantaneous_causality = \
np.log(cov_xy[0, 0]) + np.log(cov_xy[1, 1]) - log_det_cov
instantaneous_causality = np.asarray(instantaneous_causality)
total_interdependence = np.log(var_x[0]) + np.log(var_y[0]) - log_det_cov
# Round GC according to following scheme:
# Note that standard error scales as 1/sqrt(sample_size)
# Calculate significant figures according to standard error
length = np.size(signal_x)
asymptotic_std_error = 1/np.sqrt(length)
est_sig_figures = int((-1)*np.around(np.log10(asymptotic_std_error)))
directional_causality_x_y_round = np.around(directional_causality_x_y,
est_sig_figures)
directional_causality_y_x_round = np.around(directional_causality_y_x,
est_sig_figures)
instantaneous_causality_round = np.around(instantaneous_causality,
est_sig_figures)
total_interdependence_round = np.around(total_interdependence,
est_sig_figures)
return Causality(
directional_causality_x_y=directional_causality_x_y_round.item(),
directional_causality_y_x=directional_causality_y_x_round.item(),
instantaneous_causality=instantaneous_causality_round.item(),
total_interdependence=total_interdependence_round.item())
[docs]def conditional_granger(signals, max_order, information_criterion='aic'):
r"""
Determine conditional Granger Causality of the second time series on the
first time series, given the third time series. In other words, for time
series X_t, Y_t and Z_t, this function tests if Y_t influences X_t via Z_t.
Parameters
----------
signals : (N, 3) np.ndarray or neo.AnalogSignal
A matrix with three time series (second dimension) that have N time
points (first dimension). The time series to be conditioned on is the
third.
max_order : int
Maximal order of autoregressive model.
information_criterion : {'aic', 'bic'}, optional
A function to compute the information criterion:
`bic` for Bayesian information_criterion,
`aic` for Akaike information criterion,
Default: 'aic'.
Returns
-------
conditional_causality_xy_z_round : float
The value of conditional causality of Y_t on X_t given Z_t. Zero value
indicates that causality of Y_t on X_t is solely dependent on Z_t.
Raises
------
ValueError
If the provided signal does not have a shape of Nx3.
Notes
-----
The formulas used in this implementation follows
:cite:`granger-Ding06_0608035`. Specifically, the Eq 35.
"""
if isinstance(signals, AnalogSignal):
signals = signals.magnitude
if not (signals.ndim == 2 and signals.shape[1] == 3):
raise ValueError("The input 'signals' must be of dimensions Nx3.")
# transpose (N,3) -> (3,N) for mathematical convenience
signals = signals.T
# signal_x, signal_y and signal_z are (1, N) arrays
signal_x, signal_y, signal_z = np.expand_dims(signals, axis=1)
signals_xz = np.vstack([signal_x, signal_z])
coeffs_xz, cov_xz, p_1 = _optimal_vector_arm(
signals_xz, dimension=2, max_order=max_order,
information_criterion=information_criterion)
coeffs_xyz, cov_xyz, p_2 = _optimal_vector_arm(
signals, dimension=3, max_order=max_order,
information_criterion=information_criterion)
conditional_causality_xy_z = np.log(cov_xz[0, 0]) - np.log(cov_xyz[0, 0])
# Round conditional GC according to following scheme:
# Note that standard error scales as 1/sqrt(sample_size)
# Calculate significant figures according to standard error
length = np.size(signal_x)
asymptotic_std_error = 1/np.sqrt(length)
est_sig_figures = int((-1)*np.around(np.log10(asymptotic_std_error)))
conditional_causality_xy_z_round = np.around(conditional_causality_xy_z,
est_sig_figures)
return conditional_causality_xy_z_round