#!/usr/bin/env python3
"""
Module that creates the regressors used in phys2cvr.
Attributes
----------
LGR
Logger
"""
import logging
import os
import numpy as np
from numpy.lib.stride_tricks import sliding_window_view as swv
from phys2cvr.io import export_regressor
from phys2cvr.signal import (
convolve_signal,
endtidal_interpolation,
resample_signal_freqs,
)
from phys2cvr.stats import x_corr
from phys2cvr.viz import plot_two_timeseries, plot_xcorr
LGR = logging.getLogger(__name__)
LGR.setLevel(logging.INFO)
[docs]
def create_legendre(degree, length):
"""
Produce the Legendre polynomials of order `degree`.
Parameters
----------
degree : int
Highest number of desired orders.
length : int
Length of the desired polynomials (number of samples).
Returns
-------
legendre : np.ndarray
A `degree`*`length` array which includes all the polynomials up to order `degree`.
"""
def _bonnet(d, x):
"""Use Bonnet method to create Leg polys."""
if d == 0:
return np.ones_like(x)
if d == 1:
return x
return ((2 * d - 1) * x * _bonnet(d - 1, x) - (d - 1) * _bonnet(d - 2, x)) / d
x = np.linspace(-1, 1, length)
legendre = np.empty((length, degree + 1), dtype='float32')
for n in range(degree + 1):
legendre[:, n] = _bonnet(n, x)
return legendre
def compute_petco2hrf(
co2, pidx, freq, outprefix, comp_endtidal=True, response_function='hfr', mode='full'
):
"""
Create the PetCO2 trace from CO2 trace, then convolve it to obtain the PetCO2hrf.
Parameters
----------
co2 : np.ndarray
CO2 (or physiological) regressor
pidx : np.ndarray
Indices of peaks
freq : str, int, or float
Sample frequency of the CO2 regressor
outprefix : str
Prefix of the output file (i.e., the regressor of interest).
comp_endtidal : bool
If True, interpolate input regressor
response_function : {`hrf`, `rrf`, `crf`}, None, str, path, or 1D array-like , optional
Name of the response function to be used in the convolution of the regressor of
interest or path to a 1D file containing one. Default is `hrf`.
See `.signal.convolve_signal` for more information.
mode : {'full', 'valid', 'same'} str, optional
Convolution mode, see numpy.convolve.
Returns
-------
petco2hrf : np.ndarray
Convolved PetCO2 trace.
Raises
------
NotImplementedError
If the provided CO2 is not a 1D array.
"""
co2 = co2.squeeze()
if co2.ndim > 1:
raise NotImplementedError(
'Arrays with more than 2 dimensions are not supported.'
)
if comp_endtidal and pidx is not None:
petco2 = endtidal_interpolation(co2, pidx, axis=-1)
# Plot PetCO2 vs CO2
plot_two_timeseries(
petco2, co2, f'{outprefix}_co2_vs_petco2.png', 'PetCO2', 'CO2', freq
)
# Demean and export
petco2 = petco2 - petco2.mean()
np.savetxt(f'{outprefix}_petco2.1D', petco2, fmt='%.18f')
elif comp_endtidal and pidx is None:
raise ValueError(
'End-tidal interpolation was requested, but peaks were not provided.'
)
else:
LGR.info(
'Skipping End Tidal interpolation of PetCO2 trace (if you provided raw CO2 '
'data, then you probably should not be doing this)'
)
petco2 = co2 - co2.mean()
# Plot convolved PetCO2 vs PetCO2
if response_function not in [None, 'none', 'None', 'NONE']:
petco2hrf = convolve_signal(petco2, freq, response_function, mode)
plot_two_timeseries(
petco2hrf,
petco2,
f'{outprefix}_petco2_vs_petco2hrf.png',
'Convolved PetCO2',
'PetCO2',
freq,
)
else:
LGR.info('Skipping convolution of PetCO2 trace')
petco2hrf = petco2
return petco2hrf
[docs]
def compute_bulk_shift(
func_upsampled,
petco2hrf,
freq,
outprefix,
trial_len=None,
n_trials=None,
abs_xcorr=False,
):
"""
Compute (initial) bulk shift of regressor.
Parameters
----------
func_upsampled : np.ndarray
Functional timeseries average upsampled at the frequency of the regressor of interest.
petco2hrf : np.ndarray
Regressor of interest
freq : str, int, or float
Sample frequency of petco2hrf
outprefix : list or path
Path to output directory for regressors.
trial_len : str or int, optional
Length of each single trial for tasks that have more than one
(E.g. BreathHold, CO2 challenges, ...)
Used to improve cross correlation estimation.
Default: None
n_trials : str or int, optional
Number of trials in the task.
Default: None
abs_xcorr : bool, optional
If True, the cross correlation will consider the maximum absolute
correlation, i.e. if a negative correlation is higher than the highest
positive, the negative correlation will be chosen instead.
Returns
-------
optshift : int
The index of optimal shifting computed via Xcorr
"""
first_tp, n_shifts = 0, None
if trial_len and n_trials:
# If both are specified, disregard two extreme _trial from matching.
LGR.info(f'Specified {n_trials} trials lasting {trial_len} seconds')
if n_trials > 3:
LGR.info('Ignoring first trial to improve bulk shift estimation')
first_tp = int(trial_len * freq)
else:
LGR.info('Using all trials for bulk shift estimation')
if n_trials > 4:
LGR.info('Ignoring last trial to improve bulk shift estimation')
n_shifts = first_tp * (n_trials - 2)
elif trial_len and not n_trials:
LGR.warning(
'The length of trial was specified, but the number of '
'trials was not. Using all available trials for bulk shift estimation'
)
elif not trial_len and n_trials:
LGR.warning(
'The number of trials was specified, but the length of '
'trial was not. Using all available trials for bulk shift estimation'
)
else:
LGR.info('Using all trials for bulk shift estimation.')
# Preparing breathhold and CO2 trace for Xcorr
func_cut = func_upsampled[first_tp:]
_, optshift, xcorr = x_corr(
func_cut, petco2hrf, n_shifts=n_shifts, offset=first_tp, abs_xcorr=abs_xcorr
)
LGR.info(f'Cross correlation estimated a bulk shift of {optshift / freq} seconds')
# Export estimated optimal shift in seconds
with open(f'{outprefix}_optshift.1D', 'w') as f:
print(f'{(optshift / freq):.4f}', file=f)
# Export xcorr figure
plot_xcorr(xcorr, outprefix, freq)
# This shouldn't happen, but still check
if optshift + func_upsampled.shape[0] > len(petco2hrf):
raise Exception(
f'The identified optimal shift {optshift / freq} removes too many samples to '
'continue.'
)
return optshift
[docs]
def create_fine_shift_regressors(
petco2hrf,
optshift,
lag_max,
freq,
func_size,
func_upsamp_size,
outprefix,
ext='.1D',
legacy=False,
):
"""
Compute fine shifts to further optimize shifts.
Parameters
----------
petco2hrf : np.ndarray
Regressor of interest
optshift : int
The index shift computed by the Xcorr/bulk shift
lag_max : int or float, optional
Limits (both positive and negative) of the temporal area to explore,
expressed in seconds.
freq : str, int, or float
Sample frequency of petco2hrf
func_size : int
Total timepoints of functional timeseries
func_upsamp_size : int
Total timepoints of functional timeseries, resampled at `freq` frequency
outprefix : list or path
Path to output directory for regressors.
ext : str, optional
Extension to be used for the exported regressors.
legacy : bool, optional
If True, exclude the upper lag limit from the regression estimation.
If True, the maximum number of regressors will be `(freq*lag_max*2)`
Returns
-------
petco2hrf_lagged : np.ndarray
The shifted versions of the regresosr of interest.
"""
outdir, base = os.path.split(outprefix)
regr_dir = os.path.join(outdir, 'regr')
os.makedirs(regr_dir, exist_ok=True)
outprefix = os.path.join(regr_dir, base)
neg_shifts = int(lag_max * freq)
pos_shifts = neg_shifts if legacy else neg_shifts + 1
# Padding regressor right for shifts if not enough timepoints
# Padding regressor left for shifts and update optshift if less than neg_shifts.
rpad = max(0, func_upsamp_size + optshift + pos_shifts - petco2hrf.shape[0])
lpad = max(0, neg_shifts - optshift)
petco2hrf = np.pad(petco2hrf, (int(lpad), int(rpad)), 'mean')
# Create sliding window view into petco2hrf, -1 because of reversed indexing
neg_idx = optshift - neg_shifts + lpad - 1
pos_idx = optshift + pos_shifts + lpad - 1
# select the right windows the other way round
petco2hrf_lagged = swv(petco2hrf, func_upsamp_size)[pos_idx:neg_idx:-1].copy()
petco2hrf_lagged = export_regressor(
petco2hrf_lagged, func_size, outprefix, 'shifts', ext
)
return petco2hrf_lagged
[docs]
def create_physio_regressor(
func_avg,
petco2hrf,
tr,
freq,
outprefix,
lag_max=None,
trial_len=None,
n_trials=None,
ext='.1D',
lagged_regression=True,
legacy=False,
abs_xcorr=False,
skip_xcorr=False,
):
"""
Create regressor(s) of interest for nifti GLM.
Parameters
----------
func_avg : np.ndarray
Average functional timeseries (1D)
petco2hrf : np.ndarray
Regressor of interest (e.g., CO2 regressor)
tr : str, int, or float
Repetition time (TR) of timeseries
freq : str, int, or float
Sample frequency of petco2hrf
outprefix : list or path
Path to output directory for computed regressors.
lag_max : int or float, optional
Limits (both positive and negative) for the estimated temporal lag,
expressed in seconds.
Default: 9 (i.e., -9 to +9 seconds)
trial_len : str or int, optional
Length of each individual trial for timeseries which include more than one trial
(e.g., multiple BreathHold trials, trials within CO2 challenges, ...)
Used to improve cross correlation estimation.
Default: None
n_trials : str or int, optional
Number of trials within the timeseries.
Default: None
ext : str, optional
Extension to be used for the exported regressors (e.g., .txt, .csv)
lagged_regression : bool, optional
Estimate regressors for each possible lag of `petco2hrf`.
legacy : bool, optional
If True, exclude the upper (positive) lag limit from the regression estimation,
i.e., the maximum number of regressors will be `(freq*lag_max*2)`
If False, the maximum number of regressors will be `(freq*lag_max*2)+1`
abs_xcorr : bool, optional
If True, the cross correlation will consider the maximum absolute
correlation, i.e., if a negative correlation is stronger than the strongest
positive, the negative correlation will be used.
skip_xcorr : bool, optional
If True, skip the cross correlation step.
Returns
-------
petco2hrf_demean : np.ndarray
The demeaned petco2hrf regressor, central in time (not shifted).
petco2hrf_lagged : np.ndarray
The other shifted versions of the regressor.
"""
# Upsample functional signal
func_upsampled = resample_signal_freqs(func_avg, 1 / tr, freq)
if skip_xcorr:
LGR.info('Skipping Bulk Shift Computation')
optshift = 0
else:
optshift = compute_bulk_shift(
func_upsampled, petco2hrf, freq, outprefix, trial_len, n_trials, abs_xcorr
)
petco2hrf_shift = petco2hrf[optshift : optshift + func_upsampled.shape[0]]
# Plot (shifted) regressor vs average ROI signal.
plot_two_timeseries(
petco2hrf_shift,
func_upsampled,
f'{outprefix}_petco2hrf_vs_avgroi.png',
'Optimally shifted regressor',
'Average ROI signal',
freq,
zscore=True,
)
petco2hrf_demean = export_regressor(
petco2hrf_shift, func_avg.shape[-1], outprefix, 'petco2hrf_simple', ext
)
# Initialise the shifts first.
petco2hrf_lagged = None
if lagged_regression and lag_max:
petco2hrf_lagged = create_fine_shift_regressors(
petco2hrf,
optshift,
lag_max,
freq,
func_avg.shape[-1],
func_upsampled.shape[-1],
outprefix,
ext,
legacy,
)
elif lagged_regression and not lag_max:
LGR.warning(
'Lagged regressors requested but maximum lag not provided. Skipping.'
)
else:
LGR.info('Skipping generation of lagged regressors.')
return petco2hrf_demean, petco2hrf_lagged
"""
Copyright 2021-2026, Stefano Moia & phys2cvr contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""