Source code for phys2cvr.signal

#!/usr/bin/env python3
"""
Signal analysis module for phys2cvr.

Attributes
----------
LGR :
    Logger
"""

import logging
from copy import deepcopy
from pathlib import Path, PosixPath

import numpy as np
import scipy.interpolate as spint
import scipy.stats as sct
from scipy.signal import butter, filtfilt

LGR = logging.getLogger(__name__)
LGR.setLevel(logging.INFO)


[docs] def spc(ts): """ Compute signal percentage change over time series (ts). Timeseries are divided by the mean. Timeseries that have a mean of 0 are divided by 1 instead. Parameters ---------- ts : numpy.ndarray A timeseries or set of timeseries; last dimension is assumed to be time. Returns ------- numpy.ndarray Signal percentage change version of the input ts. """ m = ts.mean(axis=-1)[..., np.newaxis] md = deepcopy(m) md[md == 0] = 1 ts = (ts - m) / md ts[np.isnan(ts)] = 0 return ts
[docs] def create_hrf(freq=40): """ Create a canonical haemodynamic response function sampled at the given freq. Parameters ---------- freq : float Sampling frequency used to resample the haemodynamic response function. Returns ------- hrf : np.ndarray Haemodynamic response function. """ # Create HRF RT = 1 / freq fMRI_T = 16 p = [6, 16, 1, 1, 6, 0, 32] # Modelled hemodynamic response function - {mixture of Gammas} dt = RT / fMRI_T u = np.arange(0, p[6] / dt + 1, 1) - p[5] / dt a1 = p[0] / p[2] b1 = 1 / p[2] a2 = p[1] / p[3] b2 = 1 / p[3] hrf = ( sct.gamma.pdf(u * dt, a1, scale=b1) - sct.gamma.pdf(u * dt, a2, scale=b2) / p[4] ) / dt time_axis = np.arange(0, int(p[6] / RT + 1), 1) * fMRI_T hrf = hrf[time_axis] min_hrf = 1e-9 * min(hrf[hrf > 10 * np.finfo(float).eps]) if min_hrf < 10 * np.finfo(float).eps: min_hrf = 10 * np.finfo(float).eps hrf[hrf == 0] = min_hrf hrf = hrf / max(hrf) return hrf
[docs] def filter_signal(data, tr, lowcut=0.02, highcut=0.04, order=9, axis=-1): """ Create a bandpass filter within lower and upper threshold, then filter. Parameters ---------- data : np.ndarray Data to filter. tr : float TR of functional files. lowcut : float Low frequency threshold. highcut : float High frequency threshold. order : int Butterworth filter order. axis : int The axis along which the filter is applied. Returns ------- filt_data : np.ndarray Bandpass-filtered data. """ nyq = (1 / tr) / 2 low = lowcut / nyq high = highcut / nyq a, b = butter(int(order), [low, high], btype='band') return filtfilt(a, b, data, axis=axis)
def endtidal_interpolation(signal, peaks, axis=-1): """ Compute end-tidal interpolation of signal. Parameters ---------- signal : np.ndarray-like or list The signal that needs to be interpolated peaks : 1d array-like or list The index of the peaks (or point of interest) to interpolate at. axis : int, optional The axis of signal to interpolate through. Default is last axis. Returns ------- IntETSignal The end tidal interpolation of the signal """ peaks = np.sort(np.unique(peaks)) nx = np.arange(signal.size) f = spint.interp1d(peaks, signal[peaks], fill_value='extrapolate', axis=axis) return f(nx) def convolve_signal(signal, freq, response_function='hrf', mode='full'): """ Convolve provided signal with provided response function. Parameters ---------- signal : 1D np.ndarray or list The signal to convolve freq : int or float The sampling frequency of `signal` response_function : {`hrf`, `rrf`, `crf`, `icrf`}, None, str, path, or 1D array-like , optional Name of the response function to be used in the convolution of the regressor of interest or path to a 1D file containing one. Default is `hrf`. For `rrf` and `crf`, `phys2denoise` must be installed (see extra installs). - `None` (or a string version of none) will skip the convolution - `hrf` will use an internally generated canonical HRF - `rrf` will use `phys2denoise`'s RRF - `crf` will use `phys2denoise`'s CRF - `icrf` will use `phys2denoise`'s iCRF - Alternatively, specify a valid path file containing a custom one. - Alternatively, specify a custom one directly as np.ndarray or list. mode : {'full', 'valid', 'same'} str, optional Convolution mode, see numpy.convolve. Returns ------- colvolved_signal : 1D np.ndarray The convolved version of `signal`. Raises ------ ImportError If phys2denoise is not installed and RRF, CRF, or iCRF are called. NotImplementedError If signal has more than 1 dimension. If response function is not of a supported type or was not found in system. OSError If response function is a path but it does not exist. ValueError If the response function is a list or ndarray but is not numeric. """ signal = signal.squeeze() if signal.ndim > 2: raise NotImplementedError( 'Convolution on data that has more than 1 dimension is not supported yet.' ) # Check if RF is a string representing a path or a Nonetype if type(response_function) is str: if Path(response_function).exists(): response_function = Path(response_function) LGR.debug(f'{response_function} is a valid file') else: response_function = response_function.lower() response_function = ( None if response_function == 'none' else response_function ) if type(response_function) in [list, np.ndarray]: # Check if RF is or should be a 1darray-like convolving_function = ( np.array(response_function) if type(response_function) is list else response_function ) if not np.issubdtype(convolving_function.dtype, np.number): raise ValueError( 'Provided function is not a numeric ndarray-like variable.' ) elif response_function is None: LGR.info('Skipping convolution with response function') return signal elif response_function == 'hrf': convolving_function = create_hrf(freq) elif response_function == 'rrf': try: from phys2denoise.metrics.responses import rrf except ImportError: raise ImportError( 'phys2denoise is required for the use of RRF response functions. ' 'Please see install instructions.' ) convolving_function = rrf(1 / freq) elif response_function == 'crf': try: from phys2denoise.metrics.responses import crf except ImportError: raise ImportError( 'phys2denoise is required for the use of CRF response functions. ' 'Please see install instructions.' ) convolving_function = crf(1 / freq) elif response_function == 'icrf': try: from phys2denoise.metrics.responses import icrf except ImportError: raise ImportError( 'phys2denoise is required for the use of iCRF response functions. ' 'Please see install instructions.' ) convolving_function = icrf(1 / freq) elif type(response_function) is PosixPath: if not Path(response_function).exists(): raise OSError(f'{response_function} not found in system.') # Local import to avoid circularity from .io import load_array # noqa: ABS101 convolving_function = load_array(response_function) else: raise NotImplementedError( f'Response function "{response_function}" is not supported yet or file was ' 'not found.' ) convolved_signal = np.convolve(signal, convolving_function, mode=mode) convolved_signal = np.interp( convolved_signal, (convolved_signal.min(), convolved_signal.max()), (signal.min(), signal.max()), ) return convolved_signal
[docs] def resample_signal_samples(ts, samples, axis=-1): """ Resample timeseries based on desired number of samples. Parameters ---------- ts : np.ndarray The timeseries to be resampled samples : int Desired number of samples. axis : int The axis (dimension) over which the interpolation should be applied - by default it's -1, i.e., the last dimension. Returns ------- numpy.ndarray Resampled timeseries. """ len_tp = ts.shape[axis] regr_t = np.linspace(0, len_tp - 1, samples) time_t = np.linspace(0, len_tp - 1, len_tp) f = spint.interp1d(time_t, ts, fill_value='extrapolate', axis=axis) return f(regr_t)
[docs] def resample_signal_freqs(ts, freq1, freq2, axis=-1): """ Resample timeseries based on current and desired frequency. Parameters ---------- ts : np.ndarray The timeseries to be resampled. freq1 : float Current frequency. freq2 : float Desired frequency. axis : int The axis (dimension) over which the interpolation should happen - by default it's -1, i.e. the last dimension. Returns ------- numpy.ndarray Resampled timeseries. """ len_tp = ts.shape[axis] len_s = (len_tp - 1) / freq1 regr_t = np.linspace(0, len_s, int(len_s * freq2) + 1) time_t = np.linspace(0, len_s, len_tp) f = spint.interp1d(time_t, ts, fill_value='extrapolate', axis=axis) return f(regr_t)
""" Copyright 2021-2026, Stefano Moia & phys2cvr contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """