Source code for elephant.spectral

# -*- coding: utf-8 -*-
"""
Identification of spectral properties in analog signals (e.g., the power
spectrum).

.. autosummary::
    :toctree: _toctree/spectral

    welch_psd
    welch_coherence
    multitaper_psd
    multitaper_cross_spectrum
    segmented_multitaper_cross_spectrum
    multitaper_coherence


:copyright: Copyright 2014-2024 by the Elephant team, see `doc/authors.rst`.
:license: Modified BSD, see LICENSE.txt for details.
"""

import warnings

import neo
import numpy as np
import quantities as pq
import scipy.signal

__all__ = [
    "welch_psd",
    "welch_coherence",
    "multitaper_psd",
    "multitaper_cross_spectrum",
    "segmented_multitaper_cross_spectrum",
    "multitaper_coherence"
]


[docs] def welch_psd(signal, n_segments=8, len_segment=None, frequency_resolution=None, overlap=0.5, fs=1.0, window='hann', nfft=None, detrend='constant', return_onesided=True, scaling='density', axis=-1): """ Estimates power spectrum density (PSD) of a given `neo.AnalogSignal` using Welch's method. The PSD is obtained through the following steps: 1. Cut the given data into several overlapping segments. The degree of overlap can be specified by parameter `overlap` (default is 0.5, i.e. segments are overlapped by the half of their length). The number and the length of the segments are determined according to the parameters `n_segments`, `len_segment` or `frequency_resolution`. By default, the data is cut into 8 segments; 2. Apply a window function to each segment. Hanning window is used by default. This can be changed by giving a window function or an array as parameter `window` (see Notes [2]); 3. Compute the periodogram of each segment; 4. Average the obtained periodograms to yield PSD estimate. Parameters ---------- signal : neo.AnalogSignal or pq.Quantity or np.ndarray Time series data, of which PSD is estimated. When `signal` is `pq.Quantity` or `np.ndarray`, sampling frequency should be given through the keyword argument `fs`. Otherwise, the default value is used (`fs` = 1.0). n_segments : int, optional Number of segments. The length of segments is adjusted so that overlapping segments cover the entire stretch of the given data. This parameter is ignored if `len_segment` or `frequency_resolution` is given. Default: 8 len_segment : int, optional Length of segments. This parameter is ignored if `frequency_resolution` is given. If None, it will be determined from other parameters. Default: None frequency_resolution : pq.Quantity or float, optional Desired frequency resolution of the obtained PSD estimate in terms of the interval between adjacent frequency bins. When given as a `float`, it is taken as frequency in Hz. If None, it will be determined from other parameters. Default: None overlap : float, optional Overlap between segments represented as a float number between 0 (no overlap) and 1 (complete overlap). Default: 0.5 (half-overlapped) fs : pq.Quantity or float, optional Specifies the sampling frequency of the input time series. When the input is given as a `neo.AnalogSignal`, the sampling frequency is taken from its attribute and this parameter is ignored. Default: 1.0 window : str or tuple or np.ndarray, optional Desired window to use. See Notes [2]. Default: 'hann' nfft : int, optional Length of the FFT used. See Notes [2]. Default: None detrend : str or function or False, optional Specifies how to detrend each segment. See Notes [2]. Default: 'constant' return_onesided : bool, optional If True, return a one-sided spectrum for real data. If False return a two-sided spectrum. See Notes [2]. Default: True scaling : {'density', 'spectrum'}, optional If 'density', computes the power spectral density where Pxx has units of V**2/Hz. If 'spectrum', computes the power spectrum where Pxx has units of V**2, if `signal` is measured in V and `fs` is measured in Hz. See Notes [2]. Default: 'density' axis : int, optional Axis along which the periodogram is computed. See Notes [2]. Default: last axis (-1) Returns ------- freqs : pq.Quantity or np.ndarray Frequencies associated with the power estimates in `psd`. `freqs` is always a vector irrespective of the shape of the input data in `signal`. If `signal` is `neo.AnalogSignal` or `pq.Quantity`, a `pq.Quantity` array is returned. Otherwise, a `np.ndarray` containing frequency in Hz is returned. psd : pq.Quantity or np.ndarray PSD estimates of the time series in `signal`. If `signal` is `neo.AnalogSignal`, a `pq.Quantity` array is returned. Otherwise, the return is a `np.ndarray`. Raises ------ ValueError If `overlap` is not in the interval `[0, 1)`. If `frequency_resolution` is not positive. If `frequency_resolution` is too high for the given data size. If `frequency_resolution` is None and `len_segment` is not a positive number. If `frequency_resolution` is None and `len_segment` is greater than the length of data at `axis`. If both `frequency_resolution` and `len_segment` are None and `n_segments` is not a positive number. If both `frequency_resolution` and `len_segment` are None and `n_segments` is greater than the length of data at `axis`. Notes ----- 1. The computation steps used in this function are implemented in `scipy.signal` module, and this function is a wrapper which provides a proper set of parameters to `scipy.signal.welch` function. 2. The parameters `window`, `nfft`, `detrend`, `return_onesided`, `scaling`, and `axis` are directly passed to the `scipy.signal.welch` function. See the respective descriptions in the docstring of `scipy.signal.welch` for usage. 3. When only `n_segments` is given, parameter `nperseg` of `scipy.signal.welch` function is determined according to the expression `signal.shape[axis] / (n_segments - overlap * (n_segments - 1))` converted to integer. See Also -------- scipy.signal.welch welch_cohere Examples -------- >>> import neo >>> import numpy as np >>> import quantities as pq >>> from elephant.spectral import welch_psd >>> signal = neo.AnalogSignal(np.cos(np.linspace(0, 2 * np.pi, num=100)), ... sampling_rate=20 * pq.Hz, units='mV') Sampling frequency will be taken as `signal.sampling_rate`. >>> freq, psd = welch_psd(signal) >>> freq array([ 0. , 0.90909091, 1.81818182, 2.72727273, 3.63636364, 4.54545455, 5.45454545, 6.36363636, 7.27272727, 8.18181818, 9.09090909, 10. ]) * Hz >>> psd # noqa array([[1.09566410e-03, 2.33607943e-02, 1.35436832e-03, 6.74408723e-05, 1.00810196e-05, 2.40079315e-06, 7.35821437e-07, 2.58361700e-07, 9.44183422e-08, 3.14573483e-08, 6.82050475e-09, 1.18183354e-10]]) * mV**2/Hz """ # 'hanning' window was removed with release of scipy 1.9.0, it was # deprecated since 1.1.0. if window == 'hanning': warnings.warn("'hanning' is deprecated and was removed from scipy " "with release 1.9.0. Please use 'hann' instead", DeprecationWarning) window = 'hann' # initialize a parameter dict (to be given to scipy.signal.welch()) with # the parameters directly passed on to scipy.signal.welch() params = {'window': window, 'nfft': nfft, 'detrend': detrend, 'return_onesided': return_onesided, 'scaling': scaling, 'axis': axis} # add the input data to params. When the input is AnalogSignal, the # data is added after rolling the axis for time index to the last data = np.asarray(signal) if isinstance(signal, neo.AnalogSignal): data = np.rollaxis(data, 0, len(data.shape)) params['x'] = data # if the data is given as AnalogSignal, use its attribute to specify # the sampling frequency if hasattr(signal, 'sampling_rate'): params['fs'] = signal.sampling_rate.rescale('Hz').magnitude else: params['fs'] = fs if overlap < 0: raise ValueError("overlap must be greater than or equal to 0") elif 1 <= overlap: raise ValueError("overlap must be less then 1") # determine the length of segments (i.e. *nperseg*) according to given # parameters if frequency_resolution is not None: if frequency_resolution <= 0: raise ValueError("frequency_resolution must be positive") if isinstance(frequency_resolution, pq.quantity.Quantity): dF = frequency_resolution.rescale('Hz').magnitude else: dF = frequency_resolution nperseg = int(params['fs'] / dF) if nperseg > data.shape[axis]: raise ValueError("frequency_resolution is too high for the given " "data size") elif len_segment is not None: if len_segment <= 0: raise ValueError("len_seg must be a positive number") elif data.shape[axis] < len_segment: raise ValueError("len_seg must be shorter than the data length") nperseg = len_segment else: if n_segments <= 0: raise ValueError("n_segments must be a positive number") elif data.shape[axis] < n_segments: raise ValueError("n_segments must be smaller than the data length") # when only *n_segments* is given, *nperseg* is determined by solving # the following equation: # n_segments * nperseg - (n_segments-1) * overlap * nperseg = # data.shape[-1] # -------------------- =============================== ^^^^^^^^^^^ # summed segment lengths total overlap data length nperseg = int(data.shape[axis] / (n_segments - overlap * ( n_segments - 1))) params['nperseg'] = nperseg params['noverlap'] = int(nperseg * overlap) freqs, psd = scipy.signal.welch(**params) # attach proper units to return values if isinstance(signal, pq.quantity.Quantity): if 'scaling' in params and params['scaling'] == 'spectrum': psd = psd * signal.units * signal.units else: psd = psd * signal.units * signal.units / pq.Hz freqs = freqs * pq.Hz return freqs, psd
[docs] def multitaper_psd(signal, fs=1, nw=4, num_tapers=None, peak_resolution=None, attach_units=True): """ Estimates power spectrum density (PSD) of a given 'neo.AnalogSignal' using the Multitaper method. The PSD is obtained through the following steps: 1. Calculate 'num_tapers' approximately independent estimates of the spectrum by multiplying the signal with the discrete prolate spheroidal functions (also known as Slepian function) and calculate the PSD of each tapered signal. 2. Average the approximately independent estimates of each signal to decrease overall variance of the estimates Parameters ---------- signal : neo.AnalogSignal or pq.Quantity or np.ndarray Time series data of which PSD is estimated. When `signal` is np.ndarray sampling frequency should be given through keyword argument `fs`. Signal should be passed as (n_channels, n_samples) fs : float, optional Specifies the sampling frequency of the input time series Default: 1.0. nw : float, optional Time bandwidth product Default: 4.0. num_tapers : int, optional Number of tapers used in 1. to obtain estimate of PSD. By default, [2*nw] - 1 is chosen. Default: None. peak_resolution : pq.Quantity float, optional Quantity in Hz determining the number of tapers used for analysis. Fine peak resolution --> low numerical value --> low number of tapers High peak resolution --> high numerical value --> high number of tapers When given as a `float`, it is taken as frequency in Hz. Default: None. attach_units: bool, optional If True and signals is an instance of pq.Quantity, units are attached to the estimated cross spectrum. Default: True Notes ----- 1. There is a parameter hierarchy regarding nw, num_tapers and peak_resolution. If peak_resolution is provided, it determines both nw and the num_tapers. Specifying num_tapers has an effect only if peak_resolution is not provided. Returns ------- freqs : np.ndarray Frequencies associated with power estimate in `psd` psd : np.ndarray PSD estimate of the time series in `signal` Raises ------ ValueError If `peak_resolution` is not a positive number. If `peak_resolution` is None and `num_tapers` is not a positive number. TypeError If `peak_resolution` is None and `num_tapers` is not an int. """ # When the input is AnalogSignal, the data is added after rolling the axis # for time index to the last data = np.asarray(signal) if isinstance(signal, neo.AnalogSignal): data = np.moveaxis(data, 0, 1) fs = signal.sampling_rate if isinstance(fs, pq.Quantity): fs = fs.rescale('Hz').magnitude # Add a dim if data has only one dimension if data.ndim == 1: data = data[np.newaxis, :] length_signal = np.shape(data)[1] # If peak resolution is pq.Quantity, get magnitude if isinstance(peak_resolution, pq.quantity.Quantity): peak_resolution = peak_resolution.rescale('Hz').magnitude # Determine time-halfbandwidth product from given parameters if peak_resolution is not None: if peak_resolution <= 0: raise ValueError("peak_resolution must be positive") nw = length_signal / fs * peak_resolution / 2 num_tapers = int(np.floor(2*nw) - 1) if num_tapers is None: num_tapers = int(np.floor(2*nw) - 1) else: if not isinstance(num_tapers, int): raise TypeError("num_tapers must be integer") if num_tapers <= 0: raise ValueError("num_tapers must be positive") # Generate frequencies of PSD estimate freqs = np.fft.rfftfreq(length_signal, d=1/fs) # Generate Slepian sequences slepian_fcts = scipy.signal.windows.dpss(M=length_signal, NW=nw, Kmax=num_tapers, sym=False) # Calculate approximately independent spectrum estimates # Use broadcasting to match dim for point-wise multiplication # Shape: (n_channels, n_tapers, n_samples) tapered_signal = data[:, np.newaxis, :] * slepian_fcts # Determine Fourier transform of tapered signal spectrum_estimates = np.abs(np.fft.rfft(tapered_signal, axis=-1))**2 spectrum_estimates[..., 1:] *= 2 # Average Fourier transform windowed signal psd = np.mean(spectrum_estimates, axis=-2) / fs # Attach proper units to return values if isinstance(signal, pq.quantity.Quantity) and attach_units: psd = psd * signal.units * signal.units / pq.Hz freqs = freqs * pq.Hz return freqs, psd
def segmented_multitaper_psd(signal, n_segments=1, len_segment=None, frequency_resolution=None, overlap=0.5, fs=1, nw=4, num_tapers=None, peak_resolution=None): """ Estimates power spectrum density (PSD) of a given 'neo.AnalogSignal' using Multitaper method The PSD is obtained through the following steps: 1. Cut the given data into several overlapping segments. The degree of overlap can be specified by parameter `overlap` (default is 0.5, i.e. segments are overlapped by the half of their length). The number and the length of the segments are determined according to the parameters `n_segments`, `len_segment` or `frequency_resolution`. By default, the data is not cut into segments; 2. Calculate 'num_tapers' approximately independent estimates of the spectrum by multiplying the signal with the discrete prolate spheroidal functions (also known as Slepian function) and calculate the PSD of each tapered segment 3. Average the approximately independent estimates of each segment to decrease overall variance of the estimates 4. Average the obtained estimates for each segment Parameters ---------- signal : neo.AnalogSignal or pq.Quantity or np.ndarray Time series data of which PSD is estimated. When `signal` is np.ndarray sampling frequency should be given through keyword argument `fs`. Signal should be passed as (n_channels, n_samples) fs : float, optional Specifies the sampling frequency of the input time series Default: 1.0. n_segments : int, optional Number of segments. The length of segments is adjusted so that overlapping segments cover the entire stretch of the given data. This parameter is ignored if `len_segment` or `frequency_resolution` is given. Default: 1. len_segment : int, optional Length of segments. This parameter is ignored if `frequency_resolution` is given. If None, it will be determined from other parameters. Default: None. frequency_resolution : pq.Quantity or float, optional Desired frequency resolution of the obtained PSD estimate in terms of the interval between adjacent frequency bins. When given as a `float`, it is taken as frequency in Hz. If None, it will be determined from other parameters. Default: None. overlap : float, optional Overlap between segments represented as a float number between 0 (no overlap) and 1 (complete overlap). Default: 0.5 (half-overlapped). nw : float, optional Time bandwidth product Default: 4.0. num_tapers : int, optional Number of tapers used in 1. to obtain estimate of PSD. By default, [2*nw] - 1 is chosen. Default: None. peak_resolution : pq.Quantity float, optional Quantity in Hz determining the number of tapers used for analysis. Fine peak resolution --> low numerical value --> low number of tapers High peak resolution --> high numerical value --> high number of tapers When given as a `float`, it is taken as frequency in Hz. Default: None. Notes ----- 1. There is a parameter hierarchy regarding n_segments and len_segment. The former parameter is ignored if the latter one is passed. 2. There is a parameter hierarchy regarding nw, num_tapers and peak_resolution. If peak_resolution is provided, it determines both nw and the num_tapers. Specifying num_tapers has an effect only if peak_resolution is not provided. Returns ------- freqs : np.ndarray Frequencies associated with power estimate in `psd` psd : np.ndarray PSD estimate of the time series in `signal` Raises ------ ValueError If `peak_resolution` is None and `num_tapers` is not a positive number. If `frequency_resolution` is too high for the given data size. If `frequency_resolution` is None and `len_segment` is not a positive number. If `frequency_resolution` is None and `len_segment` is greater than the length of data at `axis`. If both `frequency_resolution` and `len_segment` are None and `n_segments` is not a positive number. If both `frequency_resolution` and `len_segment` are None and `n_segments` is greater than the length of data at `axis`. TypeError If `peak_resolution` is None and `num_tapers` is not an int. """ # When the input is AnalogSignal, the data is added after rolling the axis # for time index to the last data = np.asarray(signal) if isinstance(signal, neo.AnalogSignal): data = np.moveaxis(data, 0, 1) fs = signal.sampling_rate # Add a dim if data has only one dimension if data.ndim == 1: data = data[np.newaxis, :] # Number of data points in time series length_signal = np.shape(data)[1] psd_params_dict = { 'nw': nw, 'num_tapers': num_tapers, 'peak_resolution': peak_resolution, 'attach_units': False } freqs, psd = _segmented_apply_func( data=data, fs=fs, n_segments=n_segments, len_segment=len_segment, overlap=overlap, frequency_resolution=frequency_resolution, func=multitaper_psd, func_params_dict=psd_params_dict) # Attach proper units to return values if isinstance(signal, pq.quantity.Quantity): psd = psd * signal.units * signal.units / pq.Hz freqs = freqs * pq.Hz return freqs, psd
[docs] def multitaper_cross_spectrum(signals, fs=1.0, nw=4.0, num_tapers=None, peak_resolution=None, return_onesided=True, attach_units=True): """ Estimates the cross spectrum of a given `neo.AnalogSignal` using the Multitaper method. The cross spectrum is obtained through the following steps: 1. Obtain approximately independent estimates of the spectrum for each signal by multiplying it with tapering functions (obtained as the discrete prolate spheroidal functions, also known as slepian functions) and calculating the cross spectrum for the tapered signals. The number of tapering functions (and hence the number of the estimates) is specified by the parameter `num_tapers`. 2. Average the approximately independent estimates of each tapered instance of the signal to decrease overall variance of the estimates. Parameters ---------- signals : neo.AnalogSignal or pq.Quantity or np.ndarray Time series data of which PSD is estimated. When `signal` is `np.ndarray` or `pq.Quantity`, it needs to be passed as a 2-dimensional array of shape (n_channels, n_samples), and the data sampling frequency should be given through the keyword argument `fs`. fs : float, optional Specifies the sampling frequency of the input time series Default: 1.0 nw : float, optional Time-halfbandwidth product. This parameter can be used to determine the number of tapers following the equation: num_tapers = 2*nw - 1 It can be determined by multiplying the duration of the signal with the desired half-peak resolution frequency: n_samples/fs * peak_resolution/2. Default: 4.0 num_tapers : int, optional Number of tapers used in step 2 (see above) to obtain estimate of PSD. By default, [2*nw] - 1 is chosen. Default: None (see Notes [1]) peak_resolution : pq.Quantity float, optional Quantity in Hz determining the number of tapers used for analysis. Fine peak resolution --> low numerical value --> low number of tapers High peak resolution --> high numerical value --> high number of tapers When given as a `float`, it is taken as frequency in Hz. Default: None (see Notes [1]) return_onesided : bool, optional If True, return a one-sided spectrum for real data. If False return a two-sided spectrum. Default: True attach_units : bool, optional If True and signals is instance of pq.Quantity, units are attached to the estimated cross spectrum. Default: True Notes ----- 1. There is a parameter hierarchy regarding `nw`, `num_tapers` and `peak_resolution`. If peak_resolution is provided, it determines both `nw` and the `num_tapers`. Specifying `num_tapers` has an effect only if `peak_resolution` is not provided. Returns ------- freqs : np.ndarray Frequencies associated with cross spectrum estimate in `cross_spec`. If signals is an instance of `neo.AnalogSignal` or `Quantity` and the `attach_units` is set to `True`, the returned values will have units of Hz. cross_spec : np.ndarray Estimate of the cross spectrum of time series in `signals`. If signals is an instance of `neo.AnalogSignal` or `Quantity` and the `attach_units` is set to `True`, the returned values will have units attached to them (i.e. signals.units ** 2/Hz). Raises ------ ValueError If `peak_resolution` is None and `num_tapers` is not a positive number. TypeError If `peak_resolution` is None and `num_tapers` is not an int. """ # When the input is AnalogSignal, fetch the underlying numpy array and swap # axes from (n_samples, n_channels) to (n_channels, n_samples) data = np.asarray(signals) if isinstance(signals, neo.AnalogSignal): data = np.moveaxis(data, 0, 1) # Number of data points in time series length_signal = np.shape(data)[1] # If the data is given as AnalogSignal, use its attribute to specify the # sampling frequency if hasattr(signals, 'sampling_rate'): fs = signals.sampling_rate.rescale('Hz') # If fs and peak resolution is pq.Quantity, get magnitude if isinstance(fs, pq.quantity.Quantity): fs = fs.rescale('Hz').magnitude # Determine time-halfbandwidth product from given parameters if peak_resolution is not None: if isinstance(peak_resolution, pq.quantity.Quantity): peak_resolution = peak_resolution.rescale('Hz').magnitude if peak_resolution <= 0: raise ValueError("peak_resolution must be positive") nw = length_signal / fs * peak_resolution / 2 num_tapers = int(np.floor(2*nw) - 1) if num_tapers is None: num_tapers = int(np.floor(2*nw) - 1) else: if not isinstance(num_tapers, int): raise TypeError("num_tapers must be integer") if num_tapers <= 0: raise ValueError("num_tapers must be positive") # Get slepian functions (sym='False' used for spectral analysis) slepian_fcts = scipy.signal.windows.dpss(M=length_signal, NW=nw, Kmax=num_tapers, sym=False) # Use broadcasting to match dime for point-wise multiplication tapered_signals = (data[:, np.newaxis] * slepian_fcts) if return_onesided: # Determine frequencies for real Fourier transform freqs = np.fft.rfftfreq(length_signal, 1 / fs) # Determine real Fourier transform of tapered signal spectrum_estimates = np.fft.rfft(tapered_signals, axis=-1) else: # Determine frequencies for full Fourier transform freqs = np.fft.fftfreq(length_signal, 1 / fs) # Determine full Fourier transform of tapered signal spectrum_estimates = np.fft.fft(tapered_signals, axis=-1) temp = (spectrum_estimates[np.newaxis, :, :, :] * np.conjugate(spectrum_estimates[:, np.newaxis, :, :])) # Average Fourier transform windowed signal cross_spec = np.mean(temp, axis=-2, dtype=np.complex64) / fs if attach_units and isinstance(signals, pq.quantity.Quantity): freqs = freqs * pq.Hz cross_spec = cross_spec * signals.units * signals.units / pq.Hz return freqs, cross_spec
def _segmented_apply_func(data, func, fs=1.0, n_segments=1, len_segment=None, frequency_resolution=None, overlap=0.5, func_params_dict=None): """ Estimate a spectral measure of a signal by applying it to segments of a signal and then averaging over the segments. The spectral measure is computed on each segment by a given function. The underlying assumption is the spectral measures computed for the segments are approximately independent. The spectral measure is obtained through the following steps 1. Cut the given data into several overlapping segments. The degree of overlap can be specified by parameter `overlap` (default is 0.5, i.e. segments are overlapped by the half of their length). The number and the length of the segments are determined according to the parameters `n_segments`, `len_segment` or `frequency_resolution`. By default, the `data` is not cut into segments; 2. Apply the function calculating the desired spectral measure. 3. Average the obtained estimates for each segment Parameters ---------- data : np.ndarray Time series data of which spectral measure is estimated. `data` is `np.ndarray` or `pq.Quantity` of shape (n_channels, n_samples), and the data sampling frequency should be given through the keyword argument `fs` in `func_params_dict`. func : callable Function calculating spectral measure. fs : float, optional Specifies the sampling frequency of the input time series Default: 1.0 n_segments : int, optional Number of segments. The length of segments is adjusted so that overlapping segments cover the entire stretch of the given data. This parameter is ignored if `len_segment` or `frequency_resolution` is given. Default: 1 len_segment : int, optional Length of segments. This parameter is ignored if `frequency_resolution` is given. If None, it will be determined from other parameters. Default: None frequency_resolution : pq.Quantity or float, optional Desired frequency resolution of the obtained PSD estimate in terms of the interval between adjacent frequency bins. When given as a `float`, it is taken as frequency in Hz. If None, it will be determined from other parameters. Default: None overlap : float, optional Overlap between segments represented as a float number between 0 (no overlap) and 1 (complete overlap). Default: 0.5 (half-overlapped) func_params_dict : dict, optional Arguments for function `func` returning spectral measure. Notes ----- 1. There is a parameter hierarchy regarding `frequency_resolution`, `len_segment` and `n_segments` (in decreasing order of priority). If the former parameter is passed, the latter parameter(s) is/are ignored. Returns ------- freqs : np.ndarray Frequencies associated with the estimated spectral measure avg_estimate : np.ndarray Estimated spectral measure for segmented data Raises ------ ValueError If `frequency_resolution` is too high for the given data size. If `frequency_resolution` is None and `len_segment` is not a positive number. If `frequency_resolution` is None and `len_segment` is greater than the length of data at `axis`. If both `frequency_resolution` and `len_segment` are None and `n_segments` is not a positive number. If both `frequency_resolution` and `len_segment` are None and `n_segments` is greater than the length of data at `axis`. """ # Number of data points in time series length_signal = np.shape(data)[1] # It is assumed that data has shape (n_channels, n_samples). To fetch # samples set correct axis axis = -1 func_params_dict.update(dict(fs=fs)) # If fs and peak resolution is pq.Quantity, get magnitude if isinstance(fs, pq.quantity.Quantity): fs = fs.rescale('Hz').magnitude # Determine length per segment - n_per_seg if frequency_resolution is not None: if frequency_resolution <= 0: raise ValueError("frequency_resolution must be positive") if isinstance(frequency_resolution, pq.quantity.Quantity): dF = frequency_resolution.rescale('Hz').magnitude else: dF = frequency_resolution n_per_seg = int(fs / dF) if n_per_seg > data.shape[axis]: raise ValueError("frequency_resolution is too high for the given " "data size") elif len_segment is not None: if len_segment <= 0: raise ValueError("len_seg must be a positive number") elif data.shape[axis] < len_segment: raise ValueError("len_seg must be shorter than the data length") n_per_seg = len_segment else: if n_segments <= 0: raise ValueError("n_segments must be a positive number") elif data.shape[axis] < n_segments: raise ValueError("n_segments must be smaller than the data length") # when only *n_segments* is given, *n_per_seg* is determined by solving # the following equation: # n_segments * n_per_seg - (n_segments-1) * overlap * n_per_seg = # data.shape[-1] # -------------------- =============================== ^^^^^^^^^^^ # summed segment lengths total overlap data length n_per_seg = int(data.shape[axis] / (n_segments - overlap * (n_segments - 1))) n_overlap = int(n_per_seg * overlap) n_overlap_step = n_per_seg - n_overlap n_segments = int((length_signal - n_overlap) / (n_per_seg - n_overlap)) # Generate frequencies for spectral measure estimate if func_params_dict.get('return_onesided'): freqs = np.fft.rfftfreq(n_per_seg, d=1/fs) elif func_params_dict.get('return_onesided') is None: # multitaper_psd uses rfft (i.e. no return_onesided parameter) freqs = np.fft.rfftfreq(n_per_seg, d=1/fs) else: freqs = np.fft.fftfreq(n_per_seg, d=1/fs) # Zero-pad signal to fit segment length remainder = length_signal % n_overlap_step data = np.pad(data, [(0, 0), (0, remainder)], mode='constant', constant_values=0) # Generate array for storing cross spectra estimates of segments seg_estimates = np.zeros((n_segments, data.shape[0], data.shape[0], len(freqs)), dtype=np.complex64) n_overlap_step = n_per_seg - n_overlap for i in range(n_segments): _, estimate = func( data[:, i * n_overlap_step:i * n_overlap_step + n_per_seg], **func_params_dict) # Workaround for mismatched dimensions if estimate.ndim != seg_estimates.ndim - 1: # Multitaper PSD seg_estimates[i] = estimate[:, np.newaxis, :] else: seg_estimates[i] = estimate avg_estimate = np.mean(seg_estimates, axis=0) return freqs, avg_estimate
[docs] def segmented_multitaper_cross_spectrum(signals, n_segments=1, len_segment=None, frequency_resolution=None, overlap=0.5, fs=1.0, nw=4.0, num_tapers=None, peak_resolution=None, return_onesided=True): """ Estimates the cross spectrum of a given `neo.AnalogSignal` using the Multitaper method on segments of the data. The segmented cross spectrum is obtained through the following steps: 1. Cut the given data into several overlapping segments. The degree of overlap can be specified by parameter `overlap` (default is 0.5, i.e. segments are overlapped by the half of their length). The number and the length of the segments are determined according to the parameters `n_segments`, `len_segment` or `frequency_resolution`. By default, the `data` is not cut into segments. 2. Calculate 'num_tapers' approximately independent estimates of the spectrum by multiplying the signal with the discrete prolate spheroidal functions (also known as Slepian function) and calculate the PSD of each tapered segment. 3. Average the approximately independent estimates of each segment to decrease overall variance of the estimates. 4. Average the obtained estimates for each segment. The estimation is done by applying `_segmented_apply_func` to :func:`multitaper_cross_spectrum`. See the latter function for further documentation. Parameters ---------- signals : neo.AnalogSignal or pq.Quantity or np.ndarray Time series data of which CSD is estimated. When `signal` is `np.ndarray` or `pq.Quantity`, it needs to be passed as a 2-dimensional array of shape (n_channels, n_samples), and the data sampling frequency should be given through the keyword argument `fs`. n_segments : int, optional Number of segments. The length of segments is adjusted so that overlapping segments cover the entire stretch of the given data. This parameter is ignored if `len_segment` or `frequency_resolution` is given. Default: 1 len_segment : int, optional Length of segments. This parameter is ignored if `frequency_resolution` is given. If None, it will be determined from other parameters. Default: None frequency_resolution : pq.Quantity or float, optional Desired frequency resolution of the obtained PSD estimate in terms of the interval between adjacent frequency bins. When given as a `float`, it is taken as frequency in Hz. If None, it will be determined from other parameters. Default: None overlap : float, optional Overlap between segments represented as a float number between 0 (no overlap) and 1 (complete overlap). Default: 0.5 (half-overlapped) fs : float, optional Specifies the sampling frequency of the input time series Default: 1.0 nw : float, optional Time-halfbandwidth product. This parameter can be used to determine the number of tapers following the equation: num_tapers = 2*nw - 1 It can be determined by multiplying the duration of the signal with the desired half-peak resolution frequency: n_samples/fs * peak_resolution/2. Default: 4.0 num_tapers : int, optional Number of tapers used in step 2 (see above) to obtain estimate of PSD. By default, [2*nw] - 1 is chosen. Default: None peak_resolution : pq.Quantity float, optional Quantity in Hz determining the number of tapers used for analysis. Fine peak resolution --> low numerical value --> low number of tapers High peak resolution --> high numerical value --> high number of tapers When given as a `float`, it is taken as frequency in Hz. Default: None return_onesided : bool, optional If True, return a one-sided spectrum for real data. If False return a two-sided spectrum. Default: True Returns ------- freqs : np.ndarray Frequencies associated with the cross spectrum estimate in `cross_spec`. If signals is an instance of `neo.AnalogSignal` or `Quantity`, the returned values will have units of Hz. cross_spec : np.ndarray Estimate of the cross spectrum of time series in `signals`. If signals is an instance of `neo.AnalogSignal` or `Quantity`, the returned values will have units attached to them (i.e. signals.units ** 2/Hz). See Also -------- :func:`multitaper_cross_spectrum` """ # When the input is AnalogSignal, fetch the underlying numpy array and swap # axes from (n_samples, n_channels) to (n_channels, n_samples) data = np.asarray(signals) if isinstance(signals, neo.AnalogSignal): data = np.moveaxis(data, 0, 1) fs = signals.sampling_rate # Initialize argument dictionary for multitaper_cross_spectrum called on # segments cross_spec_params_dict = { 'nw': nw, 'num_tapers': num_tapers, 'peak_resolution': peak_resolution, 'return_onesided': return_onesided, 'attach_units': False} # Apply segmentation freqs, cross_spec = _segmented_apply_func( data=data, fs=fs, n_segments=n_segments, len_segment=len_segment, overlap=overlap, frequency_resolution=frequency_resolution, func=multitaper_cross_spectrum, func_params_dict=cross_spec_params_dict) # Attach proper units to return values if isinstance(signals, pq.quantity.Quantity): cross_spec = cross_spec * signals.units * signals.units / pq.Hz freqs = freqs * pq.Hz return freqs, cross_spec
[docs] def multitaper_coherence(signal_i, signal_j, n_segments=1, len_segment=None, frequency_resolution=None, overlap=0.5, fs=1, nw=4, num_tapers=None, peak_resolution=None): """ Estimates the magnitude-squared coherence and phase-lag of two given `neo.AnalogSignal` using the Multitaper method. The function calls `segmented_multitaper_cross_spectrum` internally and thus both functions share parts of the respective signatures. .. math:: C(\omega)=\\frac{|S_{xy}(\omega)|^2}{S_{xx}(\omega)S_{yy}(\omega)} Parameters ---------- signal_i : neo.AnalogSignal or pq.Quantity or np.ndarray First time series data of the pair between which coherence is computed. signal_j : neo.AnalogSignal or pq.Quantity or np.ndarray Second time series data of the pair between which coherence is computed. The shapes and the sampling frequencies of `signal_i` and `signal_j` must be identical. When `signal_i` and `signal_j` are not `neo.AnalogSignal`, sampling frequency should be specified through the keyword argument `fs`. Otherwise, the default value is used (`fs` = 1.0). fs : float, optional Specifies the sampling frequency of the input time series Default: 1.0 nw : float, optional Time bandwidth product Default: 4.0 num_tapers : int, optional Number of tapers used in 1. to obtain estimate of PSD. By default, [2*nw] - 1 is chosen. Default: None peak_resolution : pq.Quantity float, optional Quantity in Hz determining the number of tapers used for analysis. Fine peak resolution --> low numerical value --> low number of tapers High peak resolution --> high numerical value --> high number of tapers When given as a `float`, it is taken as frequency in Hz. Default: None. n_segments : int, optional Number of segments. The length of segments is adjusted so that overlapping segments cover the entire stretch of the given data. This parameter is ignored if `len_segment` or `frequency_resolution` is given. Default: 1 len_segment : int, optional Length of segments. This parameter is ignored if `frequency_resolution` is given. If None, it will be determined from other parameters. Default: None frequency_resolution : pq.Quantity or float, optional Desired frequency resolution of the obtained coherence estimate in terms of the interval between adjacent frequency bins. When given as a `float`, it is taken as frequency in Hz. If None, it will be determined from other parameters. Default: None overlap : float, optional Overlap between segments represented as a float number between 0 (no overlap) and 1 (complete overlap). Default: 0.5 (half-overlapped) Returns ------- freqs : np.ndarray Frequencies associated with the magnitude-squared coherence estimate coherence : np.ndarray Magnitude-squared coherence estimate phase_lag : np.ndarray Phase lags associated with the magnitude-square coherence estimate """ if isinstance(signal_i, neo.core.AnalogSignal) and \ isinstance(signal_j, neo.core.AnalogSignal): signals = signal_i.merge(signal_j) elif isinstance(signal_i, np.ndarray) and isinstance(signal_j, np.ndarray): signals = np.vstack([signal_i, signal_j]) freqs, Pxy = segmented_multitaper_cross_spectrum( signals=signals, n_segments=n_segments, len_segment=len_segment, frequency_resolution=frequency_resolution, overlap=overlap, fs=fs, nw=nw, num_tapers=num_tapers, peak_resolution=peak_resolution) # Calculate magnitude-squared coherence. coherence = np.abs(Pxy[0, 1]) ** 2 / (Pxy[0, 0].real * Pxy[1, 1].real) phase_lag = np.angle(Pxy[0, 1]) return freqs, coherence, phase_lag
[docs] def welch_coherence(signal_i, signal_j, n_segments=8, len_segment=None, frequency_resolution=None, overlap=0.5, fs=1.0, window='hann', nfft=None, detrend='constant', scaling='density', axis=-1): r""" Estimates coherence between a given pair of analog signals. The estimation is performed with Welch's method: the given pair of data are cut into short segments, cross-spectra are calculated for each pair of segments, and the cross-spectra are averaged and normalized by respective auto-spectra. By default, the data are cut into 8 segments with 50% overlap between neighboring segments. These numbers can be changed through respective parameters. Parameters ---------- signal_i : neo.AnalogSignal or pq.Quantity or np.ndarray First time series data of the pair between which coherence is computed. signal_j : neo.AnalogSignal or pq.Quantity or np.ndarray Second time series data of the pair between which coherence is computed. The shapes and the sampling frequencies of `signal_i` and `signal_j` must be identical. When `signal_i` and `signal_j` are not `neo.AnalogSignal`, sampling frequency should be specified through the keyword argument `fs`. Otherwise, the default value is used (`fs` = 1.0). n_segments : int, optional Number of segments. The length of segments is adjusted so that overlapping segments cover the entire stretch of the given data. This parameter is ignored if `len_seg` or `frequency_resolution` is given. Default: 8 len_segment : int, optional Length of segments. This parameter is ignored if `frequency_resolution` is given. If None, it is determined from other parameters. Default: None frequency_resolution : pq.Quantity or float, optional Desired frequency resolution of the obtained coherence estimate in terms of the interval between adjacent frequency bins. When given as a `float`, it is taken as frequency in Hz. If None, it is determined from other parameters. Default: None overlap : float, optional Overlap between segments represented as a float number between 0 (no overlap) and 1 (complete overlap). Default: 0.5 (half-overlapped) fs : pq.Quantity or float, optional Specifies the sampling frequency of the input time series. When the input time series are given as `neo.AnalogSignal`, the sampling frequency is taken from their attribute and this parameter is ignored. Default: 1.0 window : str or tuple or np.ndarray, optional Desired window to use. See Notes [1]. Default: 'hann' nfft : int, optional Length of the FFT used. See Notes [1]. Default: None detrend : str or function or False, optional Specifies how to detrend each segment. See Notes [1]. Default: 'constant' scaling : {'density', 'spectrum'}, optional If 'density', computes the power spectral density where Pxx has units of V**2/Hz. If 'spectrum', computes the power spectrum where Pxx has units of V**2, if `signal` is measured in V and `fs` is measured in Hz. See Notes [1]. Default: 'density' axis : int, optional Axis along which the periodogram is computed. See Notes [1]. Default: last axis (-1) Returns ------- freqs : pq.Quantity or np.ndarray Frequencies associated with the estimates of coherency and phase lag. `freqs` is always a vector irrespective of the shape of the input data. If `signal_i` and `signal_j` are `neo.AnalogSignal` or `pq.Quantity`, a `pq.Quantity` array is returned. Otherwise, a `np.ndarray` containing frequency in Hz is returned. coherency : np.ndarray Estimate of coherency between the input time series. For each frequency, coherency takes a value between 0 and 1, with 0 or 1 representing no or perfect coherence, respectively. When the input arrays `signal_i` and `signal_j` are multidimensional, `coherency` is of the same shape as the inputs, and the frequency is indexed depending on the type of the input. If the input is `neo.AnalogSignal`, the first axis indexes frequency. Otherwise, frequency is indexed by the last axis. phase_lag : pq.Quantity or np.ndarray Estimate of phase lag in radian between the input time series. For each frequency, phase lag takes a value between :math:`-\pi` and :math:`\pi`, with positive values meaning phase precession of `signal_i` ahead of `signal_j`, and vice versa. If `signal_i` and `signal_j` are `neo.AnalogSignal` or `pq.Quantity`, a `pq.Quantity` array is returned. Otherwise, a `np.ndarray` containing phase lag in radian is returned. The axis for frequency index is determined in the same way as for `coherency`. Raises ------ ValueError Same as in :func:`welch_psd`. Notes ----- 1. The computation steps used in this function are implemented in `scipy.signal` module, and this function is a wrapper which provides a proper set of parameters to `scipy.signal.welch` function. 2. The parameters `window`, `nfft`, `detrend`, `return_onesided`, `scaling`, and `axis` are directly passed to the `scipy.signal.welch` function. See the respective descriptions in the docstring of `scipy.signal.welch` for usage. 3. When only `n_segments` is given, parameter `nperseg` of `scipy.signal.welch` function is determined according to the expression `signal.shape[axis] / (n_segments - overlap * (n_segments - 1))` converted to integer. See Also -------- welch_psd Examples -------- >>> import neo >>> import numpy as np >>> import quantities as pq >>> from elephant.spectral import welch_coherence >>> signal = neo.AnalogSignal(np.cos(np.linspace(0, 2 * np.pi, num=100)), ... sampling_rate=20 * pq.Hz, units='mV') Sampling frequency will be taken as `signal.sampling_rate`. >>> freq, coherency, phase_lag = welch_coherence(signal, signal) >>> freq array([ 0. , 0.90909091, 1.81818182, 2.72727273, 3.63636364, 4.54545455, 5.45454545, 6.36363636, 7.27272727, 8.18181818, 9.09090909, 10. ]) * Hz >>> coherency.flatten() array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]) >>> phase_lag.flatten() # doctest: +SKIP array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]) * rad """ # TODO: code duplication with welch_psd() # 'hanning' window was removed with release of scipy 1.9.0, it was # deprecated since 1.1.0. if window == 'hanning': warnings.warn("'hanning' is deprecated and was removed from scipy " "with release 1.9.0. Please use 'hann' instead", DeprecationWarning) window = 'hann' # initialize a parameter dict for scipy.signal.csd() params = {'window': window, 'nfft': nfft, 'detrend': detrend, 'scaling': scaling, 'axis': axis} # When the input is AnalogSignal, the axis for time index is rolled to # the last xdata = np.asarray(signal_i) ydata = np.asarray(signal_j) if isinstance(signal_i, neo.AnalogSignal): xdata = np.rollaxis(xdata, 0, len(xdata.shape)) ydata = np.rollaxis(ydata, 0, len(ydata.shape)) # if the data is given as AnalogSignal, use its attribute to specify # the sampling frequency if hasattr(signal_i, 'sampling_rate'): params['fs'] = signal_i.sampling_rate.rescale('Hz').magnitude else: params['fs'] = fs if overlap < 0: raise ValueError("overlap must be greater than or equal to 0") elif 1 <= overlap: raise ValueError("overlap must be less then 1") # determine the length of segments (i.e. *nperseg*) according to given # parameters if frequency_resolution is not None: if isinstance(frequency_resolution, pq.quantity.Quantity): dF = frequency_resolution.rescale('Hz').magnitude else: dF = frequency_resolution nperseg = int(params['fs'] / dF) if nperseg > xdata.shape[axis]: raise ValueError("frequency_resolution is too high for the given" "data size") elif len_segment is not None: if len_segment <= 0: raise ValueError("len_seg must be a positive number") if xdata.shape[axis] < len_segment: raise ValueError("len_seg must be shorter than the data length") nperseg = len_segment else: if n_segments <= 0: raise ValueError("n_segments must be a positive number") if xdata.shape[axis] < n_segments: raise ValueError("n_segments must be smaller than the data length") # when only *n_segments* is given, *nperseg* is determined by solving # the following equation: # n_segments * nperseg - (n_segments-1) * overlap * nperseg = # data.shape[-1] # ------------------- =============================== ^^^^^^^^^^^ # summed segment lengths total overlap data length nperseg = int(xdata.shape[axis] / (n_segments - overlap * ( n_segments - 1))) params['nperseg'] = nperseg params['noverlap'] = int(nperseg * overlap) freqs, Pxx = scipy.signal.welch(xdata, **params) _, Pyy = scipy.signal.welch(ydata, **params) _, Pxy = scipy.signal.csd(xdata, ydata, **params) coherency = np.abs(Pxy) ** 2 / (Pxx * Pyy) phase_lag = np.angle(Pxy) # attach proper units to return values if isinstance(signal_i, pq.quantity.Quantity): freqs = freqs * pq.Hz phase_lag = phase_lag * pq.rad # When the input is AnalogSignal, the axis for frequency index is # rolled to the first to comply with the Neo convention about time axis if isinstance(signal_i, neo.AnalogSignal): coherency = np.rollaxis(coherency, -1) phase_lag = np.rollaxis(phase_lag, -1) return freqs, coherency, phase_lag
def welch_cohere(*args, **kwargs): warnings.warn("'welch_cohere' is deprecated; use 'welch_coherence'", DeprecationWarning) return welch_coherence(*args, **kwargs)