Source code for elephant.spectral

# -*- coding: utf-8 -*-
"""
Identification of spectral properties in analog signals (e.g., the power
spectrum).

:copyright: Copyright 2015-2016 by the Elephant team, see `doc/authors.rst`.
:license: Modified BSD, see LICENSE.txt for details.
"""

from __future__ import division, print_function, unicode_literals

import neo
import warnings
import numpy as np
import quantities as pq
import scipy.signal

from elephant.utils import deprecated_alias


[docs]@deprecated_alias(num_seg='n_segments', len_seg='len_segment', freq_res='frequency_resolution') def welch_psd(signal, n_segments=8, len_segment=None, frequency_resolution=None, overlap=0.5, fs=1.0, window='hanning', nfft=None, detrend='constant', return_onesided=True, scaling='density', axis=-1): """ Estimates power spectrum density (PSD) of a given `neo.AnalogSignal` using Welch's method. The PSD is obtained through the following steps: 1. Cut the given data into several overlapping segments. The degree of overlap can be specified by parameter `overlap` (default is 0.5, i.e. segments are overlapped by the half of their length). The number and the length of the segments are determined according to the parameters `n_segments`, `len_segment` or `frequency_resolution`. By default, the data is cut into 8 segments; 2. Apply a window function to each segment. Hanning window is used by default. This can be changed by giving a window function or an array as parameter `window` (see Notes [2]); 3. Compute the periodogram of each segment; 4. Average the obtained periodograms to yield PSD estimate. Parameters ---------- signal : neo.AnalogSignal or pq.Quantity or np.ndarray Time series data, of which PSD is estimated. When `signal` is `pq.Quantity` or `np.ndarray`, sampling frequency should be given through the keyword argument `fs`. Otherwise, the default value is used (`fs` = 1.0). n_segments : int, optional Number of segments. The length of segments is adjusted so that overlapping segments cover the entire stretch of the given data. This parameter is ignored if `len_segment` or `frequency_resolution` is given. Default: 8. len_segment : int, optional Length of segments. This parameter is ignored if `frequency_resolution` is given. If None, it will be determined from other parameters. Default: None. frequency_resolution : pq.Quantity or float, optional Desired frequency resolution of the obtained PSD estimate in terms of the interval between adjacent frequency bins. When given as a `float`, it is taken as frequency in Hz. If None, it will be determined from other parameters. Default: None. overlap : float, optional Overlap between segments represented as a float number between 0 (no overlap) and 1 (complete overlap). Default: 0.5 (half-overlapped). fs : pq.Quantity or float, optional Specifies the sampling frequency of the input time series. When the input is given as a `neo.AnalogSignal`, the sampling frequency is taken from its attribute and this parameter is ignored. Default: 1.0. window : str or tuple or np.ndarray, optional Desired window to use. See Notes [2]. Default: 'hanning'. nfft : int, optional Length of the FFT used. See Notes [2]. Default: None. detrend : str or function or False, optional Specifies how to detrend each segment. See Notes [2]. Default: 'constant'. return_onesided : bool, optional If True, return a one-sided spectrum for real data. If False return a two-sided spectrum. See Notes [2]. Default: True. scaling : {'density', 'spectrum'}, optional If 'density', computes the power spectral density where Pxx has units of V**2/Hz. If 'spectrum', computes the power spectrum where Pxx has units of V**2, if `signal` is measured in V and `fs` is measured in Hz. See Notes [2]. Default: 'density'. axis : int, optional Axis along which the periodogram is computed. See Notes [2]. Default: last axis (-1). Returns ------- freqs : pq.Quantity or np.ndarray Frequencies associated with the power estimates in `psd`. `freqs` is always a vector irrespective of the shape of the input data in `signal`. If `signal` is `neo.AnalogSignal` or `pq.Quantity`, a `pq.Quantity` array is returned. Otherwise, a `np.ndarray` containing frequency in Hz is returned. psd : pq.Quantity or np.ndarray PSD estimates of the time series in `signal`. If `signal` is `neo.AnalogSignal`, a `pq.Quantity` array is returned. Otherwise, the return is a `np.ndarray`. Raises ------ ValueError If `overlap` is not in the interval `[0, 1)`. If `frequency_resolution` is not positive. If `frequency_resolution` is too high for the given data size. If `frequency_resolution` is None and `len_segment` is not a positive number. If `frequency_resolution` is None and `len_segment` is greater than the length of data at `axis`. If both `frequency_resolution` and `len_segment` are None and `n_segments` is not a positive number. If both `frequency_resolution` and `len_segment` are None and `n_segments` is greater than the length of data at `axis`. Notes ----- 1. The computation steps used in this function are implemented in `scipy.signal` module, and this function is a wrapper which provides a proper set of parameters to `scipy.signal.welch` function. 2. The parameters `window`, `nfft`, `detrend`, `return_onesided`, `scaling`, and `axis` are directly passed to the `scipy.signal.welch` function. See the respective descriptions in the docstring of `scipy.signal.welch` for usage. 3. When only `n_segments` is given, parameter `nperseg` of `scipy.signal.welch` function is determined according to the expression `signal.shape[axis] / (n_segments - overlap * (n_segments - 1))` converted to integer. See Also -------- scipy.signal.welch welch_cohere """ # initialize a parameter dict (to be given to scipy.signal.welch()) with # the parameters directly passed on to scipy.signal.welch() params = {'window': window, 'nfft': nfft, 'detrend': detrend, 'return_onesided': return_onesided, 'scaling': scaling, 'axis': axis} # add the input data to params. When the input is AnalogSignal, the # data is added after rolling the axis for time index to the last data = np.asarray(signal) if isinstance(signal, neo.AnalogSignal): data = np.rollaxis(data, 0, len(data.shape)) params['x'] = data # if the data is given as AnalogSignal, use its attribute to specify # the sampling frequency if hasattr(signal, 'sampling_rate'): params['fs'] = signal.sampling_rate.rescale('Hz').magnitude else: params['fs'] = fs if overlap < 0: raise ValueError("overlap must be greater than or equal to 0") elif 1 <= overlap: raise ValueError("overlap must be less then 1") # determine the length of segments (i.e. *nperseg*) according to given # parameters if frequency_resolution is not None: if frequency_resolution <= 0: raise ValueError("frequency_resolution must be positive") if isinstance(frequency_resolution, pq.quantity.Quantity): dF = frequency_resolution.rescale('Hz').magnitude else: dF = frequency_resolution nperseg = int(params['fs'] / dF) if nperseg > data.shape[axis]: raise ValueError("frequency_resolution is too high for the given " "data size") elif len_segment is not None: if len_segment <= 0: raise ValueError("len_seg must be a positive number") elif data.shape[axis] < len_segment: raise ValueError("len_seg must be shorter than the data length") nperseg = len_segment else: if n_segments <= 0: raise ValueError("n_segments must be a positive number") elif data.shape[axis] < n_segments: raise ValueError("n_segments must be smaller than the data length") # when only *n_segments* is given, *nperseg* is determined by solving # the following equation: # n_segments * nperseg - (n_segments-1) * overlap * nperseg = # data.shape[-1] # -------------------- =============================== ^^^^^^^^^^^ # summed segment lengths total overlap data length nperseg = int(data.shape[axis] / (n_segments - overlap * ( n_segments - 1))) params['nperseg'] = nperseg params['noverlap'] = int(nperseg * overlap) freqs, psd = scipy.signal.welch(**params) # attach proper units to return values if isinstance(signal, pq.quantity.Quantity): if 'scaling' in params and params['scaling'] == 'spectrum': psd = psd * signal.units * signal.units else: psd = psd * signal.units * signal.units / pq.Hz freqs = freqs * pq.Hz return freqs, psd
[docs]@deprecated_alias(x='signal_i', y='signal_j', num_seg='n_segments', len_seg='len_segment', freq_res='frequency_resolution') def welch_coherence(signal_i, signal_j, n_segments=8, len_segment=None, frequency_resolution=None, overlap=0.5, fs=1.0, window='hanning', nfft=None, detrend='constant', scaling='density', axis=-1): r""" Estimates coherence between a given pair of analog signals. The estimation is performed with Welch's method: the given pair of data are cut into short segments, cross-spectra are calculated for each pair of segments, and the cross-spectra are averaged and normalized by respective auto-spectra. By default, the data are cut into 8 segments with 50% overlap between neighboring segments. These numbers can be changed through respective parameters. Parameters ---------- signal_i : neo.AnalogSignal or pq.Quantity or np.ndarray First time series data of the pair between which coherence is computed. signal_j : neo.AnalogSignal or pq.Quantity or np.ndarray Second time series data of the pair between which coherence is computed. The shapes and the sampling frequencies of `signal_i` and `signal_j` must be identical. When `signal_i` and `signal_j` are not `neo.AnalogSignal`, sampling frequency should be specified through the keyword argument `fs`. Otherwise, the default value is used (`fs` = 1.0). n_segments : int, optional Number of segments. The length of segments is adjusted so that overlapping segments cover the entire stretch of the given data. This parameter is ignored if `len_seg` or `frequency_resolution` is given. Default: 8. len_segment : int, optional Length of segments. This parameter is ignored if `frequency_resolution` is given. If None, it is determined from other parameters. Default: None. frequency_resolution : pq.Quantity or float, optional Desired frequency resolution of the obtained coherence estimate in terms of the interval between adjacent frequency bins. When given as a `float`, it is taken as frequency in Hz. If None, it is determined from other parameters. Default: None. overlap : float, optional Overlap between segments represented as a float number between 0 (no overlap) and 1 (complete overlap). Default: 0.5 (half-overlapped). fs : pq.Quantity or float, optional Specifies the sampling frequency of the input time series. When the input time series are given as `neo.AnalogSignal`, the sampling frequency is taken from their attribute and this parameter is ignored. Default: 1.0. window : str or tuple or np.ndarray, optional Desired window to use. See Notes [1]. Default: 'hanning'. nfft : int, optional Length of the FFT used. See Notes [1]. Default: None. detrend : str or function or False, optional Specifies how to detrend each segment. See Notes [1]. Default: 'constant'. scaling : {'density', 'spectrum'}, optional If 'density', computes the power spectral density where Pxx has units of V**2/Hz. If 'spectrum', computes the power spectrum where Pxx has units of V**2, if `signal` is measured in V and `fs` is measured in Hz. See Notes [1]. Default: 'density'. axis : int, optional Axis along which the periodogram is computed. See Notes [1]. Default: last axis (-1). Returns ------- freqs : pq.Quantity or np.ndarray Frequencies associated with the estimates of coherency and phase lag. `freqs` is always a vector irrespective of the shape of the input data. If `signal_i` and `signal_j` are `neo.AnalogSignal` or `pq.Quantity`, a `pq.Quantity` array is returned. Otherwise, a `np.ndarray` containing frequency in Hz is returned. coherency : np.ndarray Estimate of coherency between the input time series. For each frequency, coherency takes a value between 0 and 1, with 0 or 1 representing no or perfect coherence, respectively. When the input arrays `signal_i` and `signal_j` are multi-dimensional, `coherency` is of the same shape as the inputs, and the frequency is indexed depending on the type of the input. If the input is `neo.AnalogSignal`, the first axis indexes frequency. Otherwise, frequency is indexed by the last axis. phase_lag : pq.Quantity or np.ndarray Estimate of phase lag in radian between the input time series. For each frequency, phase lag takes a value between :math:`-\pi` and :math:`\pi`, with positive values meaning phase precession of `signal_i` ahead of `signal_j`, and vice versa. If `signal_i` and `signal_j` are `neo.AnalogSignal` or `pq.Quantity`, a `pq.Quantity` array is returned. Otherwise, a `np.ndarray` containing phase lag in radian is returned. The axis for frequency index is determined in the same way as for `coherency`. Raises ------ ValueError Same as in :func:`welch_psd`. Notes ----- 1. The computation steps used in this function are implemented in `scipy.signal` module, and this function is a wrapper which provides a proper set of parameters to `scipy.signal.welch` function. 2. The parameters `window`, `nfft`, `detrend`, `return_onesided`, `scaling`, and `axis` are directly passed to the `scipy.signal.welch` function. See the respective descriptions in the docstring of `scipy.signal.welch` for usage. 3. When only `n_segments` is given, parameter `nperseg` of `scipy.signal.welch` function is determined according to the expression `signal.shape[axis] / (n_segments - overlap * (n_segments - 1))` converted to integer. See Also -------- welch_psd """ # TODO: code duplication with welch_psd() # initialize a parameter dict for scipy.signal.csd() params = {'window': window, 'nfft': nfft, 'detrend': detrend, 'scaling': scaling, 'axis': axis} # When the input is AnalogSignal, the axis for time index is rolled to # the last xdata = np.asarray(signal_i) ydata = np.asarray(signal_j) if isinstance(signal_i, neo.AnalogSignal): xdata = np.rollaxis(xdata, 0, len(xdata.shape)) ydata = np.rollaxis(ydata, 0, len(ydata.shape)) # if the data is given as AnalogSignal, use its attribute to specify # the sampling frequency if hasattr(signal_i, 'sampling_rate'): params['fs'] = signal_i.sampling_rate.rescale('Hz').magnitude else: params['fs'] = fs if overlap < 0: raise ValueError("overlap must be greater than or equal to 0") elif 1 <= overlap: raise ValueError("overlap must be less then 1") # determine the length of segments (i.e. *nperseg*) according to given # parameters if frequency_resolution is not None: if isinstance(frequency_resolution, pq.quantity.Quantity): dF = frequency_resolution.rescale('Hz').magnitude else: dF = frequency_resolution nperseg = int(params['fs'] / dF) if nperseg > xdata.shape[axis]: raise ValueError("frequency_resolution is too high for the given" "data size") elif len_segment is not None: if len_segment <= 0: raise ValueError("len_seg must be a positive number") elif xdata.shape[axis] < len_segment: raise ValueError("len_seg must be shorter than the data length") nperseg = len_segment else: if n_segments <= 0: raise ValueError("n_segments must be a positive number") elif xdata.shape[axis] < n_segments: raise ValueError("n_segments must be smaller than the data length") # when only *n_segments* is given, *nperseg* is determined by solving # the following equation: # n_segments * nperseg - (n_segments-1) * overlap * nperseg = # data.shape[-1] # ------------------- =============================== ^^^^^^^^^^^ # summed segment lengths total overlap data length nperseg = int(xdata.shape[axis] / (n_segments - overlap * ( n_segments - 1))) params['nperseg'] = nperseg params['noverlap'] = int(nperseg * overlap) freqs, Pxx = scipy.signal.welch(xdata, **params) _, Pyy = scipy.signal.welch(ydata, **params) _, Pxy = scipy.signal.csd(xdata, ydata, **params) coherency = np.abs(Pxy) ** 2 / (Pxx * Pyy) phase_lag = np.angle(Pxy) # attach proper units to return values if isinstance(signal_i, pq.quantity.Quantity): freqs = freqs * pq.Hz phase_lag = phase_lag * pq.rad # When the input is AnalogSignal, the axis for frequency index is # rolled to the first to comply with the Neo convention about time axis if isinstance(signal_i, neo.AnalogSignal): coherency = np.rollaxis(coherency, -1) phase_lag = np.rollaxis(phase_lag, -1) return freqs, coherency, phase_lag
def welch_cohere(*args, **kwargs): warnings.warn("'welch_cohere' is deprecated; use 'welch_coherence'", DeprecationWarning)