Source code for elephant.spectral

# -*- coding: utf-8 -*-
"""
Identification of spectral properties in analog signals (e.g., the power
spectrum).

:copyright: Copyright 2015-2016 by the Elephant team, see AUTHORS.txt.
:license: Modified BSD, see LICENSE.txt for details.
"""

import warnings

import numpy as np
import scipy.signal
import scipy.fftpack as fftpack
import scipy.signal.signaltools as signaltools
from scipy.signal.windows import get_window
from six import string_types
import quantities as pq
import neo


def _welch(x, y, fs=1.0, window='hanning', nperseg=256, noverlap=None,
          nfft=None, detrend='constant', scaling='density', axis=-1):
    """
    A helper function to estimate cross spectral density using Welch's method.
    This function is a slightly modified version of `scipy.signal.welch()` with
    modifications based on `matplotlib.mlab._spectral_helper()`.

    Welch's method [1]_ computes an estimate of the cross spectral density
    by dividing the data into overlapping segments, computing a modified
    periodogram for each segment and averaging the cross-periodograms.

    Parameters
    ----------
    x, y : array_like
        Time series of measurement values
    fs : float, optional
        Sampling frequency of the `x` and `y` time series in units of Hz.
        Defaults to 1.0.
    window : str or tuple or array_like, optional
        Desired window to use. See `get_window` for a list of windows and
        required parameters. If `window` is array_like it will be used
        directly as the window and its length will be used for nperseg.
        Defaults to 'hanning'.
    nperseg : int, optional
        Length of each segment.  Defaults to 256.
    noverlap: int, optional
        Number of points to overlap between segments. If None,
        ``noverlap = nperseg / 2``.  Defaults to None.
    nfft : int, optional
        Length of the FFT used, if a zero padded FFT is desired.  If None,
        the FFT length is `nperseg`. Defaults to None.
    detrend : str or function, optional
        Specifies how to detrend each segment. If `detrend` is a string,
        it is passed as the ``type`` argument to `detrend`. If it is a
        function, it takes a segment and returns a detrended segment.
        Defaults to 'constant'.
    scaling : { 'density', 'spectrum' }, optional
        Selects between computing the power spectral density ('density')
        where Pxx has units of V**2/Hz if x is measured in V and computing
        the power spectrum ('spectrum') where Pxx has units of V**2 if x is
        measured in V. Defaults to 'density'.
    axis : int, optional
        Axis along which the periodogram is computed; the default is over
        the last axis (i.e. ``axis=-1``).

    Returns
    -------
    f : ndarray
        Array of sample frequencies.
    Pxy : ndarray
        Cross spectral density or cross spectrum of x and y.

    Notes
    -----
    An appropriate amount of overlap will depend on the choice of window
    and on your requirements.  For the default 'hanning' window an
    overlap of 50% is a reasonable trade off between accurately estimating
    the signal power, while not over counting any of the data.  Narrower
    windows may require a larger overlap.

    If `noverlap` is 0, this method is equivalent to Bartlett's method [2]_.

    References
    ----------
    .. [1] P. Welch, "The use of the fast Fourier transform for the
           estimation of power spectra: A method based on time averaging
           over short, modified periodograms", IEEE Trans. Audio
           Electroacoust. vol. 15, pp. 70-73, 1967.
    .. [2] M.S. Bartlett, "Periodogram Analysis and Continuous Spectra",
           Biometrika, vol. 37, pp. 1-16, 1950.
    """
    # TODO: This function should be replaced by `scipy.signal.csd()`, which
    # will appear in SciPy 0.16.0.

    # The checks for if y is x are so that we can use the same function to
    # obtain both power spectrum and cross spectrum without doing extra
    # calculations.
    same_data = y is x
    # Make sure we're dealing with a numpy array. If y and x were the same
    # object to start with, keep them that way
    x = np.asarray(x)
    if same_data:
        y = x
    else:
        if x.shape != y.shape:
            raise ValueError("x and y must be of the same shape.")
        y = np.asarray(y)

    if x.size == 0:
        return np.empty(x.shape), np.empty(x.shape)

    if axis != -1:
        x = np.rollaxis(x, axis, len(x.shape))
        if not same_data:
            y = np.rollaxis(y, axis, len(y.shape))

    if x.shape[-1] < nperseg:
        warnings.warn('nperseg = %d, is greater than x.shape[%d] = %d, using '
                      'nperseg = x.shape[%d]'
                      % (nperseg, axis, x.shape[axis], axis))
        nperseg = x.shape[-1]

    if isinstance(window, string_types) or type(window) is tuple:
        win = get_window(window, nperseg)
    else:
        win = np.asarray(window)
        if len(win.shape) != 1:
            raise ValueError('window must be 1-D')
        if win.shape[0] > x.shape[-1]:
            raise ValueError('window is longer than x.')
        nperseg = win.shape[0]

    if scaling == 'density':
        scale = 1.0 / (fs * (win * win).sum())
    elif scaling == 'spectrum':
        scale = 1.0 / win.sum()**2
    else:
        raise ValueError('Unknown scaling: %r' % scaling)

    if noverlap is None:
        noverlap = nperseg // 2
    elif noverlap >= nperseg:
        raise ValueError('noverlap must be less than nperseg.')

    if nfft is None:
        nfft = nperseg
    elif nfft < nperseg:
        raise ValueError('nfft must be greater than or equal to nperseg.')

    if not hasattr(detrend, '__call__'):
        detrend_func = lambda seg: signaltools.detrend(seg, type=detrend)
    elif axis != -1:
        # Wrap this function so that it receives a shape that it could
        # reasonably expect to receive.
        def detrend_func(seg):
            seg = np.rollaxis(seg, -1, axis)
            seg = detrend(seg)
            return np.rollaxis(seg, axis, len(seg.shape))
    else:
        detrend_func = detrend

    step = nperseg - noverlap
    indices = np.arange(0, x.shape[-1] - nperseg + 1, step)

    for k, ind in enumerate(indices):
        x_dt = detrend_func(x[..., ind:ind + nperseg])
        xft = fftpack.fft(x_dt * win, nfft)
        if same_data:
            yft = xft
        else:
            y_dt = detrend_func(y[..., ind:ind + nperseg])
            yft = fftpack.fft(y_dt * win, nfft)
        if k == 0:
            Pxy = (xft * yft.conj())
        else:
            Pxy *= k / (k + 1.0)
            Pxy += (xft * yft.conj()) / (k + 1.0)
    Pxy *= scale
    f = fftpack.fftfreq(nfft, 1.0 / fs)

    if axis != -1:
        Pxy = np.rollaxis(Pxy, -1, axis)

    return f, Pxy


[docs]def welch_psd(signal, num_seg=8, len_seg=None, freq_res=None, overlap=0.5, fs=1.0, window='hanning', nfft=None, detrend='constant', return_onesided=True, scaling='density', axis=-1): """ Estimates power spectrum density (PSD) of a given AnalogSignal using Welch's method, which works in the following steps: 1. cut the given data into several overlapping segments. The degree of overlap can be specified by parameter *overlap* (default is 0.5, i.e. segments are overlapped by the half of their length). The number and the length of the segments are determined according to parameter *num_seg*, *len_seg* or *freq_res*. By default, the data is cut into 8 segments. 2. apply a window function to each segment. Hanning window is used by default. This can be changed by giving a window function or an array as parameter *window* (for details, see the docstring of `scipy.signal.welch()`) 3. compute the periodogram of each segment 4. average the obtained periodograms to yield PSD estimate These steps are implemented in `scipy.signal`, and this function is a wrapper which provides a proper set of parameters to `scipy.signal.welch()`. Some parameters for scipy.signal.welch(), such as `nfft`, `detrend`, `window`, `return_onesided` and `scaling`, also works for this function. Parameters ---------- signal: Neo AnalogSignal or Quantity array or Numpy ndarray Time series data, of which PSD is estimated. When a Quantity array or Numpy ndarray is given, sampling frequency should be given through the keyword argument `fs`, otherwise the default value (`fs=1.0`) is used. num_seg: int, optional Number of segments. The length of segments is adjusted so that overlapping segments cover the entire stretch of the given data. This parameter is ignored if *len_seg* or *freq_res* is given. Default is 8. len_seg: int, optional Length of segments. This parameter is ignored if *freq_res* is given. Default is None (determined from other parameters). freq_res: Quantity or float, optional Desired frequency resolution of the obtained PSD estimate in terms of the interval between adjacent frequency bins. When given as a float, it is taken as frequency in Hz. Default is None (determined from other parameters). overlap: float, optional Overlap between segments represented as a float number between 0 (no overlap) and 1 (complete overlap). Default is 0.5 (half-overlapped). fs: Quantity array or float, optional Specifies the sampling frequency of the input time series. When the input is given as an AnalogSignal, the sampling frequency is taken from its attribute and this parameter is ignored. Default is 1.0. window, nfft, detrend, return_onesided, scaling, axis: optional These arguments are directly passed on to scipy.signal.welch(). See the respective descriptions in the docstring of `scipy.signal.welch()` for usage. Returns ------- freqs: Quantity array or Numpy ndarray Frequencies associated with the power estimates in `psd`. `freqs` is always a 1-dimensional array irrespective of the shape of the input data. Quantity array is returned if `signal` is AnalogSignal or Quantity array. Otherwise Numpy ndarray containing frequency in Hz is returned. psd: Quantity array or Numpy ndarray PSD estimates of the time series in `signal`. Quantity array is returned if `data` is AnalogSignal or Quantity array. Otherwise Numpy ndarray is returned. """ # initialize a parameter dict (to be given to scipy.signal.welch()) with # the parameters directly passed on to scipy.signal.welch() params = {'window': window, 'nfft': nfft, 'detrend': detrend, 'return_onesided': return_onesided, 'scaling': scaling, 'axis': axis} # add the input data to params. When the input is AnalogSignal, the # data is added after rolling the axis for time index to the last data = np.asarray(signal) if isinstance(signal, neo.AnalogSignal): data = np.rollaxis(data, 0, len(data.shape)) params['x'] = data # if the data is given as AnalogSignal, use its attribute to specify # the sampling frequency if hasattr(signal, 'sampling_rate'): params['fs'] = signal.sampling_rate.rescale('Hz').magnitude else: params['fs'] = fs if overlap < 0: raise ValueError("overlap must be greater than or equal to 0") elif 1 <= overlap: raise ValueError("overlap must be less then 1") # determine the length of segments (i.e. *nperseg*) according to given # parameters if freq_res is not None: if freq_res <= 0: raise ValueError("freq_res must be positive") dF = freq_res.rescale('Hz').magnitude \ if isinstance(freq_res, pq.quantity.Quantity) else freq_res nperseg = int(params['fs'] / dF) if nperseg > data.shape[axis]: raise ValueError("freq_res is too high for the given data size") elif len_seg is not None: if len_seg <= 0: raise ValueError("len_seg must be a positive number") elif data.shape[axis] < len_seg: raise ValueError("len_seg must be shorter than the data length") nperseg = len_seg else: if num_seg <= 0: raise ValueError("num_seg must be a positive number") elif data.shape[axis] < num_seg: raise ValueError("num_seg must be smaller than the data length") # when only *num_seg* is given, *nperseg* is determined by solving the # following equation: # num_seg * nperseg - (num_seg-1) * overlap * nperseg = data.shape[-1] # ----------------- =============================== ^^^^^^^^^^^ # summed segment lengths total overlap data length nperseg = int(data.shape[axis] / (num_seg - overlap * (num_seg - 1))) params['nperseg'] = nperseg params['noverlap'] = int(nperseg * overlap) freqs, psd = scipy.signal.welch(**params) # attach proper units to return values if isinstance(signal, pq.quantity.Quantity): if 'scaling' in params and params['scaling'] is 'spectrum': psd = psd * signal.units * signal.units else: psd = psd * signal.units * signal.units / pq.Hz freqs = freqs * pq.Hz return freqs, psd
[docs]def welch_cohere(x, y, num_seg=8, len_seg=None, freq_res=None, overlap=0.5, fs=1.0, window='hanning', nfft=None, detrend='constant', scaling='density', axis=-1): """ Estimates coherence between a given pair of analog signals. The estimation is performed with Welch's method: the given pair of data are cut into short segments, cross-spectra are calculated for each pair of segments, and the cross-spectra are averaged and normalized by respective auto_spectra. By default the data are cut into 8 segments with 50% overlap between neighboring segments. These numbers can be changed through respective parameters. Parameters ---------- x, y: Neo AnalogSignal or Quantity array or Numpy ndarray A pair of time series data, between which coherence is computed. The shapes and the sampling frequencies of `x` and `y` must be identical. When `x` and `y` are not of AnalogSignal, sampling frequency should be specified through the keyword argument `fs`, otherwise the default value (`fs=1.0`) is used. num_seg: int, optional Number of segments. The length of segments is adjusted so that overlapping segments cover the entire stretch of the given data. This parameter is ignored if *len_seg* or *freq_res* is given. Default is 8. len_seg: int, optional Length of segments. This parameter is ignored if *freq_res* is given. Default is None (determined from other parameters). freq_res: Quantity or float, optional Desired frequency resolution of the obtained coherence estimate in terms of the interval between adjacent frequency bins. When given as a float, it is taken as frequency in Hz. Default is None (determined from other parameters). overlap: float, optional Overlap between segments represented as a float number between 0 (no overlap) and 1 (complete overlap). Default is 0.5 (half-overlapped). fs: Quantity array or float, optional Specifies the sampling frequency of the input time series. When the input time series are given as AnalogSignal, the sampling frequency is taken from their attribute and this parameter is ignored. Default is 1.0. window, nfft, detrend, scaling, axis: optional These arguments are directly passed on to a helper function `elephant.spectral._welch()`. See the respective descriptions in the docstring of `elephant.spectral._welch()` for usage. Returns ------- freqs: Quantity array or Numpy ndarray Frequencies associated with the estimates of coherency and phase lag. `freqs` is always a 1-dimensional array irrespective of the shape of the input data. Quantity array is returned if `x` and `y` are of AnalogSignal or Quantity array. Otherwise Numpy ndarray containing frequency in Hz is returned. coherency: Numpy ndarray Estimate of coherency between the input time series. For each frequency coherency takes a value between 0 and 1, with 0 or 1 representing no or perfect coherence, respectively. When the input arrays `x` and `y` are multi-dimensional, `coherency` is of the same shape as the inputs and frequency is indexed along either the first or the last axis depending on the type of the input: when the input is AnalogSignal, the first axis indexes frequency, otherwise the last axis does. phase_lag: Quantity array or Numpy ndarray Estimate of phase lag in radian between the input time series. For each frequency phase lag takes a value between -PI and PI, positive values meaning phase precession of `x` ahead of `y` and vice versa. Quantity array is returned if `x` and `y` are of AnalogSignal or Quantity array. Otherwise Numpy ndarray containing phase lag in radian is returned. The axis for frequency index is determined in the same way as for `coherency`. """ # initialize a parameter dict (to be given to _welch()) with # the parameters directly passed on to _welch() params = {'window': window, 'nfft': nfft, 'detrend': detrend, 'scaling': scaling, 'axis': axis} # When the input is AnalogSignal, the axis for time index is rolled to # the last xdata = np.asarray(x) ydata = np.asarray(y) if isinstance(x, neo.AnalogSignal): xdata = np.rollaxis(xdata, 0, len(xdata.shape)) ydata = np.rollaxis(ydata, 0, len(ydata.shape)) # if the data is given as AnalogSignal, use its attribute to specify # the sampling frequency if hasattr(x, 'sampling_rate'): params['fs'] = x.sampling_rate.rescale('Hz').magnitude else: params['fs'] = fs if overlap < 0: raise ValueError("overlap must be greater than or equal to 0") elif 1 <= overlap: raise ValueError("overlap must be less then 1") # determine the length of segments (i.e. *nperseg*) according to given # parameters if freq_res is not None: if freq_res <= 0: raise ValueError("freq_res must be positive") dF = freq_res.rescale('Hz').magnitude \ if isinstance(freq_res, pq.quantity.Quantity) else freq_res nperseg = int(params['fs'] / dF) if nperseg > xdata.shape[axis]: raise ValueError("freq_res is too high for the given data size") elif len_seg is not None: if len_seg <= 0: raise ValueError("len_seg must be a positive number") elif xdata.shape[axis] < len_seg: raise ValueError("len_seg must be shorter than the data length") nperseg = len_seg else: if num_seg <= 0: raise ValueError("num_seg must be a positive number") elif xdata.shape[axis] < num_seg: raise ValueError("num_seg must be smaller than the data length") # when only *num_seg* is given, *nperseg* is determined by solving the # following equation: # num_seg * nperseg - (num_seg-1) * overlap * nperseg = data.shape[-1] # ----------------- =============================== ^^^^^^^^^^^ # summed segment lengths total overlap data length nperseg = int(xdata.shape[axis] / (num_seg - overlap * (num_seg - 1))) params['nperseg'] = nperseg params['noverlap'] = int(nperseg * overlap) freqs, Pxy = _welch(xdata, ydata, **params) freqs, Pxx = _welch(xdata, xdata, **params) freqs, Pyy = _welch(ydata, ydata, **params) coherency = np.abs(Pxy)**2 / (np.abs(Pxx) * np.abs(Pyy)) phase_lag = np.angle(Pxy) # attach proper units to return values if isinstance(x, pq.quantity.Quantity): freqs = freqs * pq.Hz phase_lag = phase_lag * pq.rad # When the input is AnalogSignal, the axis for frequency index is # rolled to the first to comply with the Neo convention about time axis if isinstance(x, neo.AnalogSignal): coherency = np.rollaxis(coherency, -1) phase_lag = np.rollaxis(phase_lag, -1) return freqs, coherency, phase_lag