Source code for piogrowth.fit_spline

import itertools
from collections import namedtuple

import numpy as np
import pandas as pd
from pandas.api.types import is_datetime64_any_dtype
from scipy.interpolate import make_splrep, splev

SmoothingRange = namedtuple("SmoothingRange", ["s_min", "s", "s_max"])


[docs] def get_smoothing_range(m: int): """ Compute the smoothing range for B-spline fitting in scipy interpolate functionality. """ s_min, s, s_max = int(m - np.sqrt(2 * m)), m, int(m + np.sqrt(2 * m)) s = SmoothingRange(s_min, s, s_max) return s
[docs] def fit_spline_and_derivatives( s: pd.Series, smoothing_factor: float = 1000.0, ) -> tuple[pd.DataFrame, pd.DataFrame]: """Fit B-splines to each column in the DataFrame and compute specified derivatives. Values cannot be missing as NaNs, i.e. on rolling median of data. Parameters ---------- s: pd.Series Input Series with time series data smoothing_factor: float Smoothing factor for the spline fitting. Returns: dict[str, pd.DataFrame]: Dictionary containing the fitted spline and its derivatives. """ # drop NaN values s = s.dropna() if len(s) < 4: raise ValueError( "Not enough data points to fit a spline. Need at least 4 non-NaN values." ) if not is_datetime64_any_dtype(s.index.dtype): raise TypeError("Index of the input Series must be datetime type.") x = (s.index - s.index[0]).total_seconds().to_numpy() / 3_600 # convert to hours bspl = make_splrep( x, s, s=smoothing_factor, k=3, ) s_fitted = pd.Series( splev(x, bspl), index=s.index, ) # for order in derivative_ord_ers: der = bspl.derivative(nu=1) s_first_derivative = pd.Series( der(x), index=s.index, ) return s_fitted, s_first_derivative
[docs] def fit_spline_and_derivatives_one_batch( df: pd.DataFrame, smoothing_factor: float = 1000.0, ) -> tuple[pd.DataFrame, pd.DataFrame]: """Fit B-splines to each column in the DataFrame and compute specified derivatives. Values cannot be missing as NaNs, i.e. on rolling median of data. Parameters ---------- df: pd.DataFrame Input DataFrame with time series data. smoothing_factor: float Smoothing factor for the spline fitting. Returns: tuple[pd.DataFrame, pd.DataFrame]: Tuple containing the fitted spline and its first derivative. """ assert df.isna().sum().sum() == 0, "Input DataFrame contains NaN values" df_fitted = pd.DataFrame(index=df.index) df_first_derivative = pd.DataFrame(index=df.index) for col in df.columns: s = df[col] s_fitted, s_first_derivative = fit_spline_and_derivatives(s, smoothing_factor) df_fitted[f"{col}"] = s_fitted df_first_derivative[f"{col}"] = s_first_derivative return df_fitted, df_first_derivative
[docs] def fit_splines_to_segments( s: pd.Series, peaks: pd.Series, smoothing_factor: float = 100.0 ) -> tuple[pd.DataFrame, pd.DataFrame]: """Fit splines to segments of the time series data between detected peaks. Parameters ---------- s : pd.Series _description_ peaks : pd.Series _description_ smoothing_factor : float, optional _description_, by default 100.0 Returns ------- tuple[pd.DataFrame, pd.DataFrame] _description_ """ peak_timepoints = [s.index.min(), *peaks.dropna().index, s.index.max()] res_fitted, res_derivative, res_max, res_idx_max = [], [], [], [] for start, end in itertools.pairwise(peak_timepoints): s_segment = s[start:end] if len(s_segment) < 4: continue s_segment_fitted, s_segment_derivative = fit_spline_and_derivatives( s_segment, smoothing_factor=smoothing_factor ) res_fitted.append(s_segment_fitted) res_derivative.append(s_segment_derivative) idx_max = s_segment_derivative.idxmax() res_max.append(s_segment.loc[idx_max]) res_idx_max.append(idx_max) res_fitted = pd.concat(res_fitted).sort_index() res_fitted = res_fitted.loc[~res_fitted.index.duplicated(keep="first")] res_derivative = pd.concat(res_derivative).sort_index() res_derivative = res_derivative.loc[~res_derivative.index.duplicated(keep="first")] res_max = pd.Series(res_max, index=res_idx_max).sort_index() return res_fitted, res_derivative, res_max
[docs] def fit_growth_data_w_peaks( df_wide: pd.DataFrame, peaks: pd.DataFrame, smoothing_factor: float = 100.0, ) -> tuple[pd.DataFrame, pd.DataFrame]: """Fit growth data with splines between detected peaks.""" df_fitted = pd.DataFrame(index=df_wide.index) df_first_derivative = pd.DataFrame(index=df_wide.index) df_max = {} for col in df_wide.columns: s = df_wide[col].dropna() s_peaks = peaks[col].dropna() s_fitted, s_derivative, s_max = fit_splines_to_segments( s, s_peaks, smoothing_factor=smoothing_factor ) df_fitted[col] = s_fitted df_first_derivative[col] = s_derivative df_max[col] = s_max return df_fitted, df_first_derivative, df_max