Source code for elephant.spectral

# -*- coding: utf-8 -*-
"""
Identification of spectral properties in analog signals (e.g., the power
spectrum).

:copyright: Copyright 2015-2016 by the Elephant team, see `doc/authors.rst`.
:license: Modified BSD, see LICENSE.txt for details.
"""

from __future__ import division, print_function, unicode_literals

import warnings

import numpy as np
import scipy.signal
import scipy.fftpack as fftpack
import scipy.signal.signaltools as signaltools
from scipy.signal.windows import get_window
from six import string_types
import quantities as pq
import neo


def _welch(x, y, fs=1.0, window='hanning', nperseg=256, noverlap=None,
           nfft=None, detrend='constant', scaling='density', axis=-1):
    """
    A helper function to estimate cross spectral density using Welch's method.

    Welch's method [1]_ computes an estimate of the cross spectral density
    by dividing the data into overlapping segments, computing a modified
    periodogram for each segment and averaging the cross-periodograms.

    Parameters
    ----------
    x : list or tuple or np.ndarray
        Time series of measurement values for the first signal.
        It will be converted to `np.ndarray`.
    y : list or tuple or np.ndarray
        Time series of measurement values for the second signal.
        It will be converted to `np.ndarray`.
    fs : float, optional
        Sampling frequency of the `x` and `y` time series in units of Hz.
        Default: 1.0.
    window : str or tuple or np.ndarray, optional
        Desired window to use.
        If `window` is a string, see `scipy.signal.get_window` for a list of
        windows and required parameters.
        If `window` is tuple or `np.ndarray`, it will be used directly as the
        window, and its length will be used for `nperseg`. If not string,
        it will be treated as `np.ndarray`, and the result must be a vector
        with length less than or equal to the length of the last axes of `x`
        and `y`.
        Default: 'hanning'.
    nperseg : int, optional
        Length of each segment.
        Default: 256.
    noverlap : int, optional
        Number of points to overlap between segments.
        If None, `noverlap` will be set to `nperseg`/2.
        Default: None.
    nfft : int, optional
        Length of the FFT used, if a zero-padded FFT is desired.
        If None, the FFT length is `nperseg`.
        Default: None.
    detrend : str or function, optional
        Specifies how to detrend each segment.
        If `detrend` is a string, it is passed as the `type` argument to
        `scipy.signal.detrend`.
        If it is a function, it takes a segment as input and returns a
        detrended segment.
        Default: 'constant'.
    scaling : {'density', 'spectrum'}, optional
        If 'density', computes the power spectral density where Pxx has units
        of V**2/Hz if `x` is measured in V.
        If 'spectrum', computes the power spectrum where Pxx has units of V**2
        if `x` is measured in V.
        Default: 'density'.
    axis : int, optional
        Axis along which the periodogram is computed.
        Default: last axis (-1).

    Returns
    -------
    f : np.ndarray
        Array of sample frequencies.
    Pxy : np.ndarray
        Cross spectral density or cross spectrum of `x` and `y`.

    Raises
    ------
    ValueError
        If `x` and `y` do not have the same shape.

        If `window` is tuple or `np.ndarray` and has more than 1 dimension.

        If length of `window` is greater than the length of the `axis`.

        If `scaling` is neither 'density' nor 'spectrum'.

        If `noverlap` is greater than or equal to `nperseg`.

        If `nfft` is less than `nperseg`.

    Warns
    -----
    UserWarning
        If `nperseg` is greater than the shape of the axis used (`axis`),
        `nperseg` will be changed to the shape of `axis`.

    Notes
    -----
    1. This function is a slightly modified version of `scipy.signal.welch`
       function, with modifications based on
       `matplotlib.mlab._spectral_helper`.
    2. An appropriate amount of overlap will depend on the choice of window
       and on your requirements. For the default 'hanning' window, an overlap
       of 50% is a reasonable trade off between accurately estimating
       the signal power, while not over counting any of the data. Narrower
       windows may require a larger overlap.
    3. If `noverlap` is 0, this method is equivalent to Bartlett's method
       [2]_.

    References
    ----------
    .. [1] P. Welch, "The use of the fast Fourier transform for the
           estimation of power spectra: A method based on time averaging
           over short, modified periodograms", IEEE Trans. Audio
           Electroacoust., vol. 15, pp. 70-73, 1967.
    .. [2] M.S. Bartlett, "Periodogram Analysis and Continuous Spectra",
           Biometrika, vol. 37, pp. 1-16, 1950.
    """
    # TODO: This function should be replaced by `scipy.signal.csd()`, which
    # will appear in SciPy 0.16.0.

    # The checks for if y is x are so that we can use the same function to
    # obtain both power spectrum and cross spectrum without doing extra
    # calculations.
    same_data = y is x
    # Make sure we're dealing with a numpy array. If y and x were the same
    # object to start with, keep them that way
    x = np.asarray(x)
    if same_data:
        y = x
    else:
        if x.shape != y.shape:
            raise ValueError("x and y must be of the same shape.")
        y = np.asarray(y)

    if x.size == 0:
        return np.empty(x.shape), np.empty(x.shape)

    if axis != -1:
        x = np.rollaxis(x, axis, len(x.shape))
        if not same_data:
            y = np.rollaxis(y, axis, len(y.shape))

    if x.shape[-1] < nperseg:
        warnings.warn('nperseg = %d, is greater than x.shape[%d] = %d, using '
                      'nperseg = x.shape[%d]'
                      % (nperseg, axis, x.shape[axis], axis))
        nperseg = x.shape[-1]

    if isinstance(window, string_types) or type(window) is tuple:
        win = get_window(window, nperseg)
    else:
        win = np.asarray(window)
        if len(win.shape) != 1:
            raise ValueError('window must be 1-D')
        if win.shape[0] > x.shape[-1]:
            raise ValueError('window is longer than x.')
        nperseg = win.shape[0]

    if scaling == 'density':
        scale = 1.0 / (fs * (win * win).sum())
    elif scaling == 'spectrum':
        scale = 1.0 / win.sum()**2
    else:
        raise ValueError('Unknown scaling: %r' % scaling)

    if noverlap is None:
        noverlap = nperseg // 2
    elif noverlap >= nperseg:
        raise ValueError('noverlap must be less than nperseg.')

    if nfft is None:
        nfft = nperseg
    elif nfft < nperseg:
        raise ValueError('nfft must be greater than or equal to nperseg.')

    if not hasattr(detrend, '__call__'):
        detrend_func = lambda seg: signaltools.detrend(seg, type=detrend)
    elif axis != -1:
        # Wrap this function so that it receives a shape that it could
        # reasonably expect to receive.
        def detrend_func(seg):
            seg = np.rollaxis(seg, -1, axis)
            seg = detrend(seg)
            return np.rollaxis(seg, axis, len(seg.shape))
    else:
        detrend_func = detrend

    step = nperseg - noverlap
    indices = np.arange(0, x.shape[-1] - nperseg + 1, step)

    for k, ind in enumerate(indices):
        x_dt = detrend_func(x[..., ind:ind + nperseg])
        xft = fftpack.fft(x_dt * win, nfft)
        if same_data:
            yft = xft
        else:
            y_dt = detrend_func(y[..., ind:ind + nperseg])
            yft = fftpack.fft(y_dt * win, nfft)
        if k == 0:
            Pxy = (xft * yft.conj())
        else:
            Pxy *= k / (k + 1.0)
            Pxy += (xft * yft.conj()) / (k + 1.0)
    Pxy *= scale
    f = fftpack.fftfreq(nfft, 1.0 / fs)

    if axis != -1:
        Pxy = np.rollaxis(Pxy, -1, axis)

    return f, Pxy


[docs]def welch_psd(signal, num_seg=8, len_seg=None, freq_res=None, overlap=0.5, fs=1.0, window='hanning', nfft=None, detrend='constant', return_onesided=True, scaling='density', axis=-1): """ Estimates power spectrum density (PSD) of a given `neo.AnalogSignal` using Welch's method. The PSD is obtained through the following steps: 1. Cut the given data into several overlapping segments. The degree of overlap can be specified by parameter `overlap` (default is 0.5, i.e. segments are overlapped by the half of their length). The number and the length of the segments are determined according to the parameters `num_seg`, `len_seg` or `freq_res`. By default, the data is cut into 8 segments; 2. Apply a window function to each segment. Hanning window is used by default. This can be changed by giving a window function or an array as parameter `window` (see Notes [2]); 3. Compute the periodogram of each segment; 4. Average the obtained periodograms to yield PSD estimate. Parameters ---------- signal : neo.AnalogSignal or pq.Quantity or np.ndarray Time series data, of which PSD is estimated. When `signal` is `pq.Quantity` or `np.ndarray`, sampling frequency should be given through the keyword argument `fs`. Otherwise, the default value is used (`fs` = 1.0). num_seg : int, optional Number of segments. The length of segments is adjusted so that overlapping segments cover the entire stretch of the given data. This parameter is ignored if `len_seg` or `freq_res` is given. Default: 8. len_seg : int, optional Length of segments. This parameter is ignored if `freq_res` is given. If None, it will be determined from other parameters. Default: None. freq_res : pq.Quantity or float, optional Desired frequency resolution of the obtained PSD estimate in terms of the interval between adjacent frequency bins. When given as a `float`, it is taken as frequency in Hz. If None, it will be determined from other parameters. Default: None. overlap : float, optional Overlap between segments represented as a float number between 0 (no overlap) and 1 (complete overlap). Default: 0.5 (half-overlapped). fs : pq.Quantity or float, optional Specifies the sampling frequency of the input time series. When the input is given as a `neo.AnalogSignal`, the sampling frequency is taken from its attribute and this parameter is ignored. Default: 1.0. window : str or tuple or np.ndarray, optional Desired window to use. See Notes [2]. Default: 'hanning'. nfft : int, optional Length of the FFT used. See Notes [2]. Default: None. detrend : str or function or False, optional Specifies how to detrend each segment. See Notes [2]. Default: 'constant'. return_onesided : bool, optional If True, return a one-sided spectrum for real data. If False return a two-sided spectrum. See Notes [2]. Default: True. scaling : {'density', 'spectrum'}, optional If 'density', computes the power spectral density where Pxx has units of V**2/Hz. If 'spectrum', computes the power spectrum where Pxx has units of V**2, if `signal` is measured in V and `fs` is measured in Hz. See Notes [2]. Default: 'density'. axis : int, optional Axis along which the periodogram is computed. See Notes [2]. Default: last axis (-1). Returns ------- freqs : pq.Quantity or np.ndarray Frequencies associated with the power estimates in `psd`. `freqs` is always a vector irrespective of the shape of the input data in `signal`. If `signal` is `neo.AnalogSignal` or `pq.Quantity`, a `pq.Quantity` array is returned. Otherwise, a `np.ndarray` containing frequency in Hz is returned. psd : pq.Quantity or np.ndarray PSD estimates of the time series in `signal`. If `signal` is `neo.AnalogSignal`, a `pq.Quantity` array is returned. Otherwise, the return is a `np.ndarray`. Raises ------ ValueError If `overlap` is not in the interval [0, 1). If `freq_res` is not positive. If `freq_res` is too high for the given data size. If `freq_res` is None and `len_seg` is not a positive number. If `freq_res` is None and `len_seg` is greater than the length of data on `axis`. If both `freq_res` and `len_seg` are None and `num_seg` is not a positive number. If both `freq_res` and `len_seg` are None and `num_seg` is greater than the length of data on `axis`. Notes ----- 1. The computation steps used in this function are implemented in `scipy.signal` module, and this function is a wrapper which provides a proper set of parameters to `scipy.signal.welch` function. 2. The parameters `window`, `nfft`, `detrend`, `return_onesided`, `scaling`, and `axis` are directly passed to the `scipy.signal.welch` function. See the respective descriptions in the docstring of `scipy.signal.welch` for usage. 3. When only `num_seg` is given, parameter `nperseg` of `scipy.signal.welch` function is determined according to the expression `signal.shape[axis]` / (`num_seg` - `overlap` * (`num_seg` - 1)) converted to integer. See Also -------- scipy.signal.welch """ # initialize a parameter dict (to be given to scipy.signal.welch()) with # the parameters directly passed on to scipy.signal.welch() params = {'window': window, 'nfft': nfft, 'detrend': detrend, 'return_onesided': return_onesided, 'scaling': scaling, 'axis': axis} # add the input data to params. When the input is AnalogSignal, the # data is added after rolling the axis for time index to the last data = np.asarray(signal) if isinstance(signal, neo.AnalogSignal): data = np.rollaxis(data, 0, len(data.shape)) params['x'] = data # if the data is given as AnalogSignal, use its attribute to specify # the sampling frequency if hasattr(signal, 'sampling_rate'): params['fs'] = signal.sampling_rate.rescale('Hz').magnitude else: params['fs'] = fs if overlap < 0: raise ValueError("overlap must be greater than or equal to 0") elif 1 <= overlap: raise ValueError("overlap must be less then 1") # determine the length of segments (i.e. *nperseg*) according to given # parameters if freq_res is not None: if freq_res <= 0: raise ValueError("freq_res must be positive") dF = freq_res.rescale('Hz').magnitude \ if isinstance(freq_res, pq.quantity.Quantity) else freq_res nperseg = int(params['fs'] / dF) if nperseg > data.shape[axis]: raise ValueError("freq_res is too high for the given data size") elif len_seg is not None: if len_seg <= 0: raise ValueError("len_seg must be a positive number") elif data.shape[axis] < len_seg: raise ValueError("len_seg must be shorter than the data length") nperseg = len_seg else: if num_seg <= 0: raise ValueError("num_seg must be a positive number") elif data.shape[axis] < num_seg: raise ValueError("num_seg must be smaller than the data length") # when only *num_seg* is given, *nperseg* is determined by solving the # following equation: # num_seg * nperseg - (num_seg-1) * overlap * nperseg = data.shape[-1] # ----------------- =============================== ^^^^^^^^^^^ # summed segment lengths total overlap data length nperseg = int(data.shape[axis] / (num_seg - overlap * (num_seg - 1))) params['nperseg'] = nperseg params['noverlap'] = int(nperseg * overlap) freqs, psd = scipy.signal.welch(**params) # attach proper units to return values if isinstance(signal, pq.quantity.Quantity): if 'scaling' in params and params['scaling'] == 'spectrum': psd = psd * signal.units * signal.units else: psd = psd * signal.units * signal.units / pq.Hz freqs = freqs * pq.Hz return freqs, psd
[docs]def welch_cohere(x, y, num_seg=8, len_seg=None, freq_res=None, overlap=0.5, fs=1.0, window='hanning', nfft=None, detrend='constant', scaling='density', axis=-1): r""" Estimates coherence between a given pair of analog signals. The estimation is performed with Welch's method: the given pair of data are cut into short segments, cross-spectra are calculated for each pair of segments, and the cross-spectra are averaged and normalized by respective auto-spectra. By default, the data are cut into 8 segments with 50% overlap between neighboring segments. These numbers can be changed through respective parameters. Parameters ---------- x : neo.AnalogSignal or pq.Quantity or np.ndarray First time series data of the pair between which coherence is computed. y : neo.AnalogSignal or pq.Quantity or np.ndarray Second time series data of the pair between which coherence is computed. The shapes and the sampling frequencies of `x` and `y` must be identical. When `x` and `y` are not `neo.AnalogSignal`, sampling frequency should be specified through the keyword argument `fs`. Otherwise, the default value is used (`fs` = 1.0). num_seg : int, optional Number of segments. The length of segments is adjusted so that overlapping segments cover the entire stretch of the given data. This parameter is ignored if `len_seg` or `freq_res` is given. Default: 8. len_seg : int, optional Length of segments. This parameter is ignored if `freq_res` is given. If None, it is determined from other parameters. Default: None. freq_res : pq.Quantity or float, optional Desired frequency resolution of the obtained coherence estimate in terms of the interval between adjacent frequency bins. When given as a `float`, it is taken as frequency in Hz. If None, it is determined from other parameters. Default: None. overlap : float, optional Overlap between segments represented as a float number between 0 (no overlap) and 1 (complete overlap). Default: 0.5 (half-overlapped). fs : pq.Quantity or float, optional Specifies the sampling frequency of the input time series. When the input time series are given as `neo.AnalogSignal`, the sampling frequency is taken from their attribute and this parameter is ignored. Default: 1.0. window : str or tuple or np.ndarray, optional Desired window to use. See Notes [1]. Default: 'hanning'. nfft : int, optional Length of the FFT used. See Notes [1]. Default: None. detrend : str or function or False, optional Specifies how to detrend each segment. See Notes [1]. Default: 'constant'. scaling : {'density', 'spectrum'}, optional If 'density', computes the power spectral density where Pxx has units of V**2/Hz. If 'spectrum', computes the power spectrum where Pxx has units of V**2, if `signal` is measured in V and `fs` is measured in Hz. See Notes [1]. Default: 'density'. axis : int, optional Axis along which the periodogram is computed. See Notes [1]. Default: last axis (-1). Returns ------- freqs : pq.Quantity or np.ndarray Frequencies associated with the estimates of coherency and phase lag. `freqs` is always a vector irrespective of the shape of the input data. If `x` and `y` are `neo.AnalogSignal` or `pq.Quantity`, a `pq.Quantity` array is returned. Otherwise, a `np.ndarray` containing frequency in Hz is returned. coherency : np.ndarray Estimate of coherency between the input time series. For each frequency, coherency takes a value between 0 and 1, with 0 or 1 representing no or perfect coherence, respectively. When the input arrays `x` and `y` are multi-dimensional, `coherency` is of the same shape as the inputs, and the frequency is indexed depending on the type of the input. If the input is `neo.AnalogSignal`, the first axis indexes frequency. Otherwise, frequency is indexed by the last axis. phase_lag : pq.Quantity or np.ndarray Estimate of phase lag in radian between the input time series. For each frequency, phase lag takes a value between :math:`-\pi` and :math:`\pi`, with positive values meaning phase precession of `x` ahead of `y`, and vice versa. If `x` and `y` are `neo.AnalogSignal` or `pq.Quantity`, a `pq.Quantity` array is returned. Otherwise, a `np.ndarray` containing phase lag in radian is returned. The axis for frequency index is determined in the same way as for `coherency`. Raises ------ ValueError If `overlap` is not in the interval [0, 1). If `freq_res` is not positive. If `freq_res` is too high for the given data size. If `freq_res` is None and `len_seg` is not a positive number. If `freq_res` is None and `len_seg` is greater than the length of data on `axis`. If both `freq_res` and `len_seg` are None and `num_seg` is not a positive number. If both `freq_res` and `len_seg` are None and `num_seg` is greater than the length of data on `axis`. Notes ----- 1. The parameters `window`, `nfft`, `detrend`, `scaling`, and `axis` are directly passed to the helper function `_welch`. See the respective descriptions in the docstring of `_welch` for usage. 2. When only `num_seg` is given, parameter `nperseg` for `_welch` function is determined according to the expression `x.shape[axis]` / (`num_seg` - `overlap` * (`num_seg` - 1)) converted to integer. See Also -------- spectral._welch """ # initialize a parameter dict (to be given to _welch()) with # the parameters directly passed on to _welch() params = {'window': window, 'nfft': nfft, 'detrend': detrend, 'scaling': scaling, 'axis': axis} # When the input is AnalogSignal, the axis for time index is rolled to # the last xdata = np.asarray(x) ydata = np.asarray(y) if isinstance(x, neo.AnalogSignal): xdata = np.rollaxis(xdata, 0, len(xdata.shape)) ydata = np.rollaxis(ydata, 0, len(ydata.shape)) # if the data is given as AnalogSignal, use its attribute to specify # the sampling frequency if hasattr(x, 'sampling_rate'): params['fs'] = x.sampling_rate.rescale('Hz').magnitude else: params['fs'] = fs if overlap < 0: raise ValueError("overlap must be greater than or equal to 0") elif 1 <= overlap: raise ValueError("overlap must be less then 1") # determine the length of segments (i.e. *nperseg*) according to given # parameters if freq_res is not None: if freq_res <= 0: raise ValueError("freq_res must be positive") dF = freq_res.rescale('Hz').magnitude \ if isinstance(freq_res, pq.quantity.Quantity) else freq_res nperseg = int(params['fs'] / dF) if nperseg > xdata.shape[axis]: raise ValueError("freq_res is too high for the given data size") elif len_seg is not None: if len_seg <= 0: raise ValueError("len_seg must be a positive number") elif xdata.shape[axis] < len_seg: raise ValueError("len_seg must be shorter than the data length") nperseg = len_seg else: if num_seg <= 0: raise ValueError("num_seg must be a positive number") elif xdata.shape[axis] < num_seg: raise ValueError("num_seg must be smaller than the data length") # when only *num_seg* is given, *nperseg* is determined by solving the # following equation: # num_seg * nperseg - (num_seg-1) * overlap * nperseg = data.shape[-1] # ----------------- =============================== ^^^^^^^^^^^ # summed segment lengths total overlap data length nperseg = int(xdata.shape[axis] / (num_seg - overlap * (num_seg - 1))) params['nperseg'] = nperseg params['noverlap'] = int(nperseg * overlap) freqs, Pxy = _welch(xdata, ydata, **params) freqs, Pxx = _welch(xdata, xdata, **params) freqs, Pyy = _welch(ydata, ydata, **params) coherency = np.abs(Pxy)**2 / (np.abs(Pxx) * np.abs(Pyy)) phase_lag = np.angle(Pxy) # attach proper units to return values if isinstance(x, pq.quantity.Quantity): freqs = freqs * pq.Hz phase_lag = phase_lag * pq.rad # When the input is AnalogSignal, the axis for frequency index is # rolled to the first to comply with the Neo convention about time axis if isinstance(x, neo.AnalogSignal): coherency = np.rollaxis(coherency, -1) phase_lag = np.rollaxis(phase_lag, -1) return freqs, coherency, phase_lag