Source code for pyTENAX.tenax

import pandas as pd
import numpy as np
from scipy.special import gamma
from scipy.stats import weibull_min, norm, skewnorm, chi2
from scipy.optimize import root_scalar, minimize
import time
from scipy.optimize import curve_fit
from typing import Union, Tuple, Dict
from joblib import Parallel, delayed
from pyTENAX.smev import (
    _smev_inner_loop_numba_seq,
    _smev_inner_loop_numba,
    _NUMBA_AVAILABLE,
)


[docs] class TENAX: def __init__( self, return_period: list[Union[int, float]], durations: list[int], time_resolution: int, beta: Union[float, int] = 4, temp_time_hour: int = 24, alpha=0.05, n_monte_carlo=int(2e4), tolerance=0.1, min_event_duration=30, storm_separation_time=24, left_censoring: list = [0, 1], niter_smev=100, # why is this here? niter_tenax=100, temp_res_monte_carlo=0.001, temp_delta=10, init_param_guess=[0.7, 0, 2, 0], min_rain: Union[float, int] = 0, ) -> None: """Initialize the TENAX model with the specified parameters. The TEmperaturedependent Non-Asymptotic statistical model for eXtreme return levels (TENAX), is based on a parsimonious nonstationary and non-asymptotic theoretical framework that incorporates temperature as a covariate in a physically consistent manner. Parameters ---------- return_period : list[Union[int, float]] Return periods [years]. durations : list[int] Durations of interest [min]. time_resolution : int Temporal resolution of the precipitation data [min]. beta : Union[float, int], optional Shape parameter of the Generalized Normal for g(T). Defaults to 4. temp_time_hour : int, optional Time window to compute T [h]. Will be converted to negative if needed. Defaults to 24. alpha : float, optional Unitless significance level for the dependence of the shape on T. Defaults to 0.05. 0 → dependence always allowed; 1 → never allowed; (0, 1) → depends on statistical significance at alpha-level. n_monte_carlo : int, optional Number of elements in the MC samples. Defaults to int(2e4). tolerance : float, optional Maximum allowed fraction of missing data in one year. If exceeded, year will be disregarded. Defaults to 0.1. min_event_duration : int, optional Minimum event duration [min]. Defaults to 30. storm_separation_time : int, optional Separation time between independent storms [hours]. Defaults to 24. left_censoring : list, optional 2-element list with lower and upper probability limits for parameter estimation. Defaults to [0, 1]. niter_smev : int, optional Number of bootstrap iterations for SMEV uncertainty. Defaults to 100. niter_tenax : int, optional Number of bootstrap iterations for TENAX uncertainty. Defaults to 100. temp_res_monte_carlo : float, optional Resolution in T for the MC samples. Defaults to 0.001. temp_delta : int, optional Range in T of MC samples — explores temperatures up to this many degrees above and below observed values. Defaults to 10. init_param_guess : list, optional Initial values of Weibull parameters for optimisation. Defaults to [0.7, 0, 2, 0]. min_rain : Union[float, int], optional Minimum rainfall value. Defaults to 0. """ self.return_period = return_period self.durations = durations self.time_resolution = time_resolution self.beta = beta self.temp_time_hour = -abs(temp_time_hour) self.alpha = alpha self.n_monte_carlo = n_monte_carlo self.tolerance = tolerance self.min_event_duration = min_event_duration self.storm_separation_time = storm_separation_time self.left_censoring = left_censoring self.niter_smev = niter_smev self.niter_tenax = niter_tenax self.temp_res_monte_carlo = temp_res_monte_carlo self.temp_delta = temp_delta self.init_param_guess = init_param_guess self.min_rain = min_rain self.__incomplete_years_removed__ = False
[docs] def remove_incomplete_years( self, data_pr: pd.DataFrame, name_col="value", nan_to_zero=True, ) -> pd.DataFrame: """Delete incomplete years in precipitation data. An incomplete year is defined as a year where observations are missing above a given threshold. Parameters ---------- data_pr : pd.DataFrame Dataframe containing (hourly) precipitation values. name_col : str, optional Column name in `data_pr` with precipitation values. Defaults to "value". nan_to_zero : bool, optional Set `nan` to zero. Defaults to True. Returns ------- pd.DataFrame Dataframe containing (hourly) precipitation values with incomplete years removed. """ # Step 1: get resolution of dataset (MUST BE SAME in whole dataset!!!) time_res = ( (data_pr.index[-1] - data_pr.index[-2]).total_seconds() / 60 ) # Validate: if user provided time_resolution, it must match the data if ( self.time_resolution is not None and self.time_resolution != time_res ): raise ValueError( "time_resolution provided " f"({self.time_resolution} min) does not match " f"the resolution detected from data ({time_res} min)." ) # Step 2: Resample by year and count total and NaN values yearly_valid = data_pr.resample("YE").apply( lambda x: x.notna().sum() ) # Count not NaNs per year # Step 3: Estimate expected length of yearly timeseries expected = pd.DataFrame(index=yearly_valid.index) # 1440 = number of minutes in a day expected["Total"] = 1440 / time_res * 365 # Step 4: Calculate percentage of missing data per year valid_percentage = yearly_valid[name_col] / expected["Total"] # Step 5: Filter out years where more than tolerance% of values are NaN years_to_remove = valid_percentage[ valid_percentage < (1 - self.tolerance) ].index # Step 6: Remove data for those years from the original DataFrame data_cleanded = data_pr[ ~data_pr.index.year.isin(years_to_remove.year) ] # Replace NaN values with 0 in the specific column if nan_to_zero: data_cleanded.loc[:, name_col] = ( data_cleanded[name_col].fillna(0) ) self.time_resolution = time_res self.__incomplete_years_removed__ = True return data_cleanded
[docs] def get_ordinary_events( self, data: Union[pd.DataFrame, np.ndarray], dates: np.ndarray, name_col: str = "value", check_gaps=True, ) -> list: """Extract ordinary precipitation events from a time series. Groups timesteps at or above ``self.min_rain`` into independent storm events separated by at least ``self.storm_separation_time`` hours. Optionally removes events too close to dataset boundaries or data gaps. Parameters ---------- data : Union[pd.DataFrame, np.ndarray] Precipitation values. dates : np.ndarray Timestamps of the precipitation data. name_col : str, optional Column name to use when ``data`` is a DataFrame. Defaults to "value". check_gaps : bool, optional Remove events that fall within ``storm_separation_time`` of the dataset boundaries or internal data gaps. Defaults to True. Returns ------- list List of np.ndarray, each containing the timestamps of one ordinary event (values >= ``self.min_rain`` separated by more than ``self.storm_separation_time`` hours). """ if not self.__incomplete_years_removed__: raise ValueError( "You must run 'remove_incomplete_years' " "before running this function. " "If you are sure your data is complete, set " "self.__incomplete_years_removed__ = True " "to bypass this check." ) if isinstance(data, pd.DataFrame): data = np.array(data[name_col]) dates = dates.astype("datetime64[ns]") if self.min_rain == 0: above_threshold_indices = np.where(data > self.min_rain)[0] else: above_threshold_indices = np.where(data >= self.min_rain)[0] if len(above_threshold_indices) == 0: return [] # Get dates at above-threshold positions above_dates = dates[above_threshold_indices] # Compute time differences between consecutive above-threshold # timesteps (in nanoseconds) time_diffs_above = np.diff(above_dates).astype(np.int64) # Find where gaps exceed separation time (hours to nanoseconds) separation_ns = int(self.storm_separation_time * 3.6e12) gap_mask = time_diffs_above > separation_ns # Split indices at gap locations split_points = np.where(gap_mask)[0] + 1 # Split into groups of indices, then map back to dates index_groups = np.split(above_threshold_indices, split_points) # Convert to list of date arrays (same format as original) consecutive_values = [dates[group] for group in index_groups] if check_gaps: # Remove event too close to the start of the dataset if (consecutive_values[0][0] - dates[0]).item() < ( self.storm_separation_time * 3.6e12 ): # numpy dt is in nanoseconds consecutive_values.pop(0) else: pass # Remove event too close to the end of the dataset if (dates[-1] - consecutive_values[-1][-1]).item() < ( self.storm_separation_time * 3.6e12 ): # numpy dt is in nanoseconds consecutive_values.pop() else: pass # Locate OE that ends before gaps in data starts. # Calculate the differences between consecutive elements time_diffs = np.diff(dates) # difference of first element is time resolution time_res = time_diffs[0] # Identify gaps larger than separation time sep_td = np.timedelta64( int(self.storm_separation_time * 3.6e12), "ns" ) gap_indices_end = np.where(time_diffs > sep_td)[0] # Extend by one index to also check OE near gap start gap_indices_start = gap_indices_end + 1 match_info = [] for gap_idx in gap_indices_end: end_date = dates[gap_idx] start_date = end_date - np.timedelta64( int(self.storm_separation_time * 3.6e12), "ns" ) temp_date_array = np.arange(start_date, end_date, time_res) for i, sub_array in enumerate(consecutive_values): match_indices = np.where( np.isin(sub_array, temp_date_array) )[0] if match_indices.size > 0: match_info.append(i) for gap_idx in gap_indices_start: start_date = dates[gap_idx] end_date = start_date + np.timedelta64( int(self.storm_separation_time * 3.6e12), "ns" ) temp_date_array = np.arange(start_date, end_date, time_res) for i, sub_array in enumerate(consecutive_values): match_indices = np.where( np.isin(sub_array, temp_date_array) )[0] if match_indices.size > 0: match_info.append(i) for del_index in sorted(match_info, reverse=True): del consecutive_values[del_index] return consecutive_values
[docs] def remove_short( self, list_ordinary: list, ) -> Tuple[np.ndarray, np.ndarray, pd.DataFrame]: """Remove ordinary events that are too short. Parameters ---------- list_ordinary : list List of ordinary events as returned by `get_ordinary_events()`. Each event may contain pd.Timestamp or np.datetime64 values. Returns ------- arr_vals : np.ndarray Boolean array (all True) of length equal to the number of kept events, one entry per event that passed the duration filter. arr_dates : np.ndarray Array of (end, start) date tuples for each kept event. n_ordinary_per_year : pd.DataFrame DataFrame with the count of ordinary events per year. """ if not self.__incomplete_years_removed__: raise ValueError( "You must run 'remove_incomplete_years' " "before running this function. " "If you are sure your data is complete, set " "self.__incomplete_years_removed__ = True " "to bypass this check." ) # Convert pd.Timestamp events to np.datetime64 if needed if isinstance(list_ordinary[0][0], pd.Timestamp): list_ordinary = [ np.array([t.to_datetime64() for t in ev]) for ev in list_ordinary ] min_duration = np.timedelta64(int(self.min_event_duration), "m") time_res = np.timedelta64(int(self.time_resolution), "m") ll_short = [ (ev[-1] - ev[0]).astype("timedelta64[m]") + time_res >= min_duration for ev in list_ordinary ] ll_dates = [ (ev[-1], ev[0]) if keep else (np.nan, np.nan) for ev, keep in zip(list_ordinary, ll_short) ] arr_vals = np.array(ll_short)[ll_short] arr_dates = np.array(ll_dates)[ll_short] filtered_list = [ ev for ev, keep in zip(list_ordinary, ll_short) if keep ] list_year = pd.DataFrame( [ev[0].astype("datetime64[Y]").item().year for ev in filtered_list], columns=["year"], ) n_ordinary_per_year = list_year.reset_index().groupby(["year"]).count() return arr_vals, arr_dates, n_ordinary_per_year
[docs] def get_ordinary_events_values( self, data: np.ndarray, dates: np.ndarray, arr_dates_oe: np.ndarray, method: str = "vectorized", ) -> Tuple[Dict[str, pd.DataFrame], Dict[str, pd.DataFrame]]: """Extract ordinary events and annual maxima from precipitation data. Parameters ---------- data : np.ndarray Full precipitation time series. dates : np.ndarray Timestamps of the full precipitation dataset. arr_dates_oe : np.ndarray End and start times of ordinary events as returned by `remove_short`. method : str, optional Backend used for the sliding-window maximum search. Defaults to ``"vectorized"``. One of: - ``"vectorized"`` — pure numpy, ``np.convolve`` per event. - ``"njit"`` — numba JIT-compiled loop, single-threaded. - ``"njit_parallel"`` — numba JIT-compiled loop, parallelised over events. Requires ``numba`` to be installed. Returns ------- dict_ordinary : dict Key is duration (str), value is a ``pd.DataFrame`` with columns ``year``, ``oe_time``, ``ordinary`` (event depth/intensity). dict_AMS : dict Key is duration (str), value is a ``pd.DataFrame`` with columns ``year`` and ``AMS`` (annual maximum value). """ if method in ("njit", "njit_parallel") and not _NUMBA_AVAILABLE: raise ImportError( "numba is required for method='njit'/'njit_parallel'. " "Install with: pip install numba" ) dict_ordinary = {} dict_AMS = {} time_index = dates.reshape(-1) n_events = arr_dates_oe.shape[0] oe_end = arr_dates_oe[:, 0].astype("datetime64[ns]") oe_start = arr_dates_oe[:, 1].astype("datetime64[ns]") start_indices = np.searchsorted(time_index, oe_start).astype(np.int64) end_indices = np.searchsorted(time_index, oe_end).astype(np.int64) ll_yrs = np.array([ oe_end[i].astype("datetime64[Y]").item().year for i in range(n_events) ], dtype=np.int64) unique_years = np.unique(ll_yrs) year_masks = {yr: ll_yrs == yr for yr in unique_years} data_int = np.round(data * 10000).astype(np.int64) for d in range(len(self.durations)): window_size = int(self.durations[d] / self.time_resolution) if method == "vectorized": ones_kernel = np.ones(window_size, dtype=np.int64) max_vals = np.empty(n_events, dtype=np.float64) max_global_idx = np.empty(n_events, dtype=np.int64) for i in range(n_events): si = start_indices[i] ei = end_indices[i] if si == ei: max_vals[i] = data_int[si] / 10000.0 max_global_idx[i] = si else: arr_conv = np.convolve( data_int[si:ei + 1], ones_kernel, "same" ) ll_idx = np.nanargmax(arr_conv) max_vals[i] = arr_conv[ll_idx] / 10000.0 max_global_idx[i] = si + ll_idx elif method == "njit": max_vals_int, max_global_idx = _smev_inner_loop_numba_seq( data_int, start_indices, end_indices, window_size, n_events ) max_vals = max_vals_int / 10000.0 else: # njit_parallel max_vals_int, max_global_idx = _smev_inner_loop_numba( data_int, start_indices, end_indices, window_size, n_events ) max_vals = max_vals_int / 10000.0 ll_dates_arr = time_index[max_global_idx] ams_vals = np.array([ np.max(max_vals[mask]) for _, mask in year_masks.items() ]) df_ams = pd.DataFrame({"year": unique_years, "AMS": ams_vals}) df_oe = pd.DataFrame({ "year": ll_yrs, "oe_time": ll_dates_arr, "ordinary": max_vals, }) dict_AMS[f"{self.durations[d]}"] = df_ams dict_ordinary[f"{self.durations[d]}"] = df_oe return dict_ordinary, dict_AMS
[docs] def associate_vars( self, dict_ordinary, data_temperature, dates_temperature, method="vectorized", ): """Associate temperature with each ordinary event. The associated temperature is the mean over the past ``temp_time_hour`` hours before the event. Events for which no temperature can be found are dropped. Parameters ---------- dict_ordinary : dict Dictionary of ordinary events as returned by `get_ordinary_events_values`. data_temperature : np.ndarray Full temperature time series. dates_temperature : np.ndarray Timestamps of the full temperature dataset. method : str, optional Backend used for the temperature association. Defaults to ``"iterrows"``. One of: - ``"iterrows"`` — pandas iterrows loop (original behaviour). - ``"vectorized"`` — fully vectorised numpy using cumulative-sum trick; O(1) per event after one O(n) precomputation pass. Returns ------- dict_ordinary : dict Ordinary events with an added ``T`` column, keyed by duration. Example: ``{"10": pd.DataFrame( columns=['year', 'oe_time', 'ordinary', 'T'])}``. dict_dropped_oe : dict Ordinary events dropped because no temperature was found, keyed by duration. n_ordinary_per_year_new : pd.DataFrame DataFrame with the count of ordinary events per year after dropping events without temperature. """ dict_dropped_oe = {} time_index = dates_temperature.reshape(-1) delta_time = np.timedelta64(int(self.temp_time_hour), "h") if method == "vectorized": # Precompute cumulative sum and valid-count once over full series nan_mask = np.isnan(data_temperature) data_filled = data_temperature.copy() data_filled[nan_mask] = 0.0 cumsum = np.cumsum(data_filled) count_valid = np.cumsum(~nan_mask) for d in self.durations: df_oe = dict_ordinary[f"{d}"] arr_dates_oe = np.array(df_oe["oe_time"], dtype=time_index.dtype) if method == "vectorized": # Vectorized nearest-neighbour search for all events at once idx = np.searchsorted(time_index, arr_dates_oe, side="right") idx = np.clip(idx, 1, len(time_index) - 1) dist_left = ( arr_dates_oe - time_index[idx - 1] ).astype(np.int64) dist_right = ( time_index[idx] - arr_dates_oe ).astype(np.int64) closest_idxs = np.where(dist_left <= dist_right, idx - 1, idx) # Vectorized window start indices start_times = time_index[closest_idxs] + delta_time start_idxs = np.searchsorted(time_index, start_times, side="left") # Cumsum trick: O(1) per event mean using precomputed arrays prev_cumsum = np.where( start_idxs > 0, cumsum[np.maximum(start_idxs - 1, 0)], 0.0 ) prev_count = np.where( start_idxs > 0, count_valid[np.maximum(start_idxs - 1, 0)], 0 ) sums = cumsum[closest_idxs] - prev_cumsum counts = count_valid[closest_idxs] - prev_count ll_vals = np.where(counts > 0, np.round(sums / counts, 3), np.nan) else: # iterrows ll_vals = [] df_time_index = pd.DataFrame({"time_index": time_index}) df_arr_dates_oe = pd.DataFrame({"oe_time": arr_dates_oe}) merged = pd.merge_asof( df_arr_dates_oe, df_time_index, left_on="oe_time", right_on="time_index", direction="nearest", ) for _, row in merged.iterrows(): end_time = row["time_index"] if end_time is None: continue end_dt = np.datetime64(end_time) closest_idx = np.searchsorted(time_index, end_dt) end_time_minus_delta = time_index[closest_idx] + delta_time start_time_idx = np.searchsorted(time_index, end_time_minus_delta) slc = data_temperature[start_time_idx:closest_idx + 1] if np.all(np.isnan(slc)): ll_vals.append(np.nan) else: ll_vals.append(np.nanmean(slc).round(decimals=3)) dict_ordinary[f"{d}"]["T"] = ll_vals dict_dropped_oe[f"{d}"] = dict_ordinary[f"{d}"][ dict_ordinary[f"{d}"]["T"].isna() ] dict_ordinary[f"{d}"] = ( dict_ordinary[f"{d}"] .dropna(subset=["T"]) .reset_index(drop=True) ) n_ordinary_per_year_new = ( dict_ordinary[f"{d}"] .groupby(["year"])["ordinary"] .count() .to_frame(name="N_oe") ) return dict_ordinary, dict_dropped_oe, n_ordinary_per_year_new
[docs] def magnitude_model( self, data_oe_prec, data_oe_temp, thr, b_set=None, b_exp=False, minimize_method='Nelder-Mead' ): """ Fits the data to the magnitude model of TENAX. Parameters ---------- data_oe_prec : numpy.ndarray Array of precipitation ordinary events data. data_oe_temp : numpy.ndarray Array of temperature ordinary events data. thr : numpy.float64 Magnitude of precipitation threshold. b_set : NoneType or float Set value of b. fits magnitude model with a specified value for b. b_exp : bool If True, uses the exponential rather than linear fit for b. minimize_method : str, optional Optimisation method passed to ``scipy.optimize.minimize``. Defaults to ``'Nelder-Mead'``. Tested also ``'L-BFGS-B'``, though it can lead to some instability Returns ------- phat : np.ndarray Fitted parameters ``[kappa_0, b, lambda_0, a]``. loglik : float Log-likelihood of the selected model. loglik_H1 : float or None Log-likelihood of the alternative hypothesis (H1). ``None`` when ``b_set`` is provided. loglik_H0shape : float or None Log-likelihood of the null hypothesis (constant shape). ``None`` when ``b_set`` is provided. """ # alpha=0 --> dependence of shape on T is always allowed # alpha=1 --> dependence of shape on T is never allowed # else --> dependence of shape on T depends on stat. significance P = data_oe_prec T = data_oe_temp init_g = self.init_param_guess alpha = self.alpha # Precompute split once — mask never changes during optimisation mask = P < thr T0, P1, T1 = T[mask], P[~mask], T[~mask] if minimize_method == 'L-BFGS-B': h0_options = {'gtol': 1e-8, 'ftol': 1e-8, 'maxiter': 1000} bounds = [(1e-6, None), (-0.3, 0.3), (1e-6, None), (-0.3, 0.3)] elif minimize_method == 'Nelder-Mead': h0_options = {'xatol': 1e-8, 'fatol': 1e-8, 'maxiter': 1000} bounds = None else: h0_options = {'maxiter': 1000} bounds = None if b_set: if b_exp: min_phat_bset = minimize( lambda theta: -wbl_leftcensor_loglik_bset_bexp( theta, T0, P1, T1, thr, b_set ), init_g, method=minimize_method, bounds=bounds) phat_bset = min_phat_bset.x loglik_bset = wbl_leftcensor_loglik_bset_bexp( phat_bset, T0, P1, T1, thr, b_set ) phat_bset[1] = b_set phat = phat_bset loglik = loglik_bset # TODO: figure this out, do we need these outputs? loglik_H1, loglik_H0shape = None, None else: min_phat_bset = minimize( lambda theta: -wbl_leftcensor_loglik_bset( theta, T0, P1, T1, thr, b_set ), init_g, method=minimize_method, bounds=bounds) phat_bset = min_phat_bset.x loglik_bset = wbl_leftcensor_loglik_bset( phat_bset, T0, P1, T1, thr, b_set ) phat_bset[1] = b_set phat = phat_bset loglik = loglik_bset # TODO: figure this out, do we need these outputs? loglik_H1, loglik_H0shape = None, None elif b_exp: min_phat_H1 = minimize( lambda theta: -wbl_leftcensor_loglik_exp( theta, T0, P1, T1, thr ), init_g, method=minimize_method) phat_H1 = min_phat_H1.x init_H0shape = np.array([phat_H1[0], 0, phat_H1[2], phat_H1[3]]) min_phat_H0shape = minimize( lambda theta: -wbl_leftcensor_loglik_H0shape( theta, T0, P1, T1, thr ), init_H0shape, method=minimize_method, bounds=bounds, options=h0_options) phat_H0shape = min_phat_H0shape.x phat_H0shape[1] = 0 loglik_H1 = wbl_leftcensor_loglik_exp( phat_H1, T0, P1, T1, thr ) loglik_H0shape = wbl_leftcensor_loglik_H0shape( phat_H0shape, T0, P1, T1, thr ) lambda_LR_shape = -2*(loglik_H0shape - loglik_H1) pval = chi2.sf(lambda_LR_shape, df=1) if alpha == 0: # dependence of shape on T is always allowed phat = phat_H1 loglik = loglik_H1 elif alpha == 1: # dependence of shape on T is never allowed phat = phat_H0shape loglik = loglik_H0shape elif pval <= alpha: # depends on stat. significance phat = phat_H1 loglik = loglik_H1 else: phat = phat_H0shape loglik = loglik_H0shape else: min_phat_H1 = minimize( lambda theta: -wbl_leftcensor_loglik( theta, T0, P1, T1, thr ), init_g, method=minimize_method) phat_H1 = min_phat_H1.x init_H0shape = np.array([phat_H1[0], 0, phat_H1[2], phat_H1[3]]) min_phat_H0shape = minimize( lambda theta: -wbl_leftcensor_loglik_H0shape( theta, T0, P1, T1, thr ), init_H0shape, method=minimize_method, bounds=bounds, options=h0_options) phat_H0shape = min_phat_H0shape.x phat_H0shape[1] = 0 loglik_H1 = wbl_leftcensor_loglik( phat_H1, T0, P1, T1, thr ) loglik_H0shape = wbl_leftcensor_loglik_H0shape( phat_H0shape, T0, P1, T1, thr ) lambda_LR_shape = -2*(loglik_H0shape - loglik_H1) pval = chi2.sf(lambda_LR_shape, df=1) if alpha == 0: # dependence of shape on T is always allowed phat = phat_H1 loglik = loglik_H1 elif alpha == 1: # dependence of shape on T is never allowed phat = phat_H0shape loglik = loglik_H0shape elif pval <= alpha: # depends on stat. significance phat = phat_H1 loglik = loglik_H1 else: phat = phat_H0shape loglik = loglik_H0shape return phat, loglik, loglik_H1, loglik_H0shape
[docs] def temperature_model( self, data_oe_temp, beta=0, method="norm" ): """Fit temperature data to the TENAX temperature model. Parameters ---------- data_oe_temp : np.ndarray Temperature data. beta : float, optional Shape parameter of the generalised normal distribution. If 0, uses ``self.beta``. Defaults to 0. method : str, optional Distribution to fit. ``"norm"`` uses the generalised normal; ``"skewnorm"`` uses a skewed normal. Defaults to ``"norm"``. Returns ------- g_phat : np.ndarray Fitted parameters. ``[mu, sigma]`` for ``"norm"``; ``[alpha, loc, scale]`` for ``"skewnorm"``. """ if beta == 0: beta = self.beta else: beta = beta if method == "norm": mu, sigma = norm.fit(data_oe_temp) init_g = [mu, sigma] g_phat = minimize( lambda par: -gen_norm_loglik(data_oe_temp, par, beta), init_g, method="Nelder-Mead", ).x elif method == "skewnorm": # Fit the skew-normal distribution # Initial guess for the parameters # Compute histogram data def skewnorm_pdf(x, alpha, loc, scale): return skewnorm.pdf(x, alpha, loc=loc, scale=scale) hist, bin_edges = np.histogram( data_oe_temp, bins=100, density=True ) # Bin centers for xdata xdata = (bin_edges[:-1] + bin_edges[1:]) / 2 initial_guess = [ -3, np.mean(data_oe_temp), np.std(data_oe_temp), ] # Guess for alpha, loc, scale g_phat, _ = curve_fit( skewnorm_pdf, xdata, hist, p0=initial_guess, maxfev=10000 ) g_phat = tuple(g_phat.reshape(1, -1)[0]) # g_phat = skewnorm.fit(data_oe_temp) #returns loc, scale, shape else: print("not given method - temperature model") g_phat = [] return g_phat
[docs] def model_inversion( self, F_phat, g_phat, n, Ts, gen_P_mc=False, gen_RL=True, temp_method="norm", method_root_scalar="brentq", b_exp=False ): """ Inversion of the TENAX model to predict return levels or plot model. Parameters ---------- F_phat : numpy.ndarray distribution values. F_phat = [kappa_0,b,lambda_0,a]. g_phat : numpy.ndarray [mu, sigma] of temperature distribution. n : float Mean number of ordinary events per year. Ts : numpy.ndarray Array of T values to use in the Monte Carlo. gen_P_mc : bool, optional Specify whether to generate Monte Carlo values for precipitation. The default is False. gen_RL : bool, optional Specify whether to generate return levels. The default is True. temp_method : str, optional Type of fit used for the temperature model. The default is "norm". method_root_scalar : str, optional method used for inversion. The default is "brentq". b_exp : bool If True, uses the exponential rather than linear fit for b. Returns ------- ret_lev : np.ndarray or list Return levels at periods specified in ``self.return_period``. Empty list ``[]`` if ``gen_RL=False``. T_mc : np.ndarray Monte Carlo generated temperature values, shape ``(n_monte_carlo, 1)``. P_mc : np.ndarray or list Monte Carlo generated precipitation values. Empty list ``[]`` if ``gen_P_mc=False``. """ P_mc = [] ret_lev = [] if temp_method == "skewnorm": pdf_values = skewnorm.pdf(Ts, *g_phat) df = np.vstack([pdf_values, Ts]) elif temp_method == "norm": pdf_values = gen_norm_pdf(Ts, g_phat[0], g_phat[1], self.beta) df = np.vstack([pdf_values, Ts]) else: print("not given correct method - temperature model") # Generates random T values according to the temperature model T_mc = randdf(self.n_monte_carlo, df, "pdf").T # Generates random P according to the magnitude model if b_exp: # exponential model for b wbl_phat = np.column_stack(( F_phat[2] * np.exp(F_phat[3] * T_mc), F_phat[0] * np.exp(F_phat[1] * T_mc) )) else: # linear model for b wbl_phat = np.column_stack(( F_phat[2] * np.exp(F_phat[3] * T_mc), F_phat[0] + F_phat[1] * T_mc )) # old vguess # vguess = 10 ** np.arange(np.log10(F_phat[2]), np.log10(5e2), 0.05 # test new vguess vguess = 10 ** np.arange(np.log10(0.05), np.log10(5e2), 0.05) if gen_RL: ret_lev = SMEV_Mc_inversion( wbl_phat, n, self.return_period, vguess, method_root_scalar=method_root_scalar, ) else: pass # Generate P_mc if needed if gen_P_mc: start = time.time() P_mc = weibull_min.ppf( np.random.rand(self.n_monte_carlo), c=wbl_phat[:, 1], scale=wbl_phat[:, 0], ) end = time.time() - start print(f"mc {self.n_monte_carlo}: {end}") else: pass return ret_lev, T_mc, P_mc
[docs] def TNX_tenax_bootstrap_uncertainty( self, P, T, blocks_id, Ts, temp_method="norm", method_root_scalar="brentq", minimize_method="Nelder-Mead", parallel=False, n_jobs=-1, ): """Bootstrap uncertainty estimation for the TENAX model. Parameters ---------- P : np.ndarray Precipitation ordinary events data. T : np.ndarray Temperature ordinary events data. blocks_id : np.ndarray Block identifiers (e.g., years) for each event. Ts : np.ndarray Array of temperature values for the Monte Carlo integration. temp_method : str, optional Distribution used for the temperature model. Defaults to ``"norm"``. method_root_scalar : str, optional Root-finding method for model inversion. Defaults to ``"brentq"``. minimize_method : str, optional Optimisation method passed to ``magnitude_model``. Defaults to ``"Nelder-Mead"``. parallel : bool, optional Run bootstrap iterations in parallel using ``ProcessPoolExecutor``. Warm start is disabled when ``parallel=True`` because iterations run in separate processes and cannot share state. Defaults to ``False``. n_jobs : int, optional Number of worker processes. ``-1`` uses all available cores. Ignored when ``parallel=False``. Defaults to ``-1``. Notes ----- **Warm start (sequential mode only):** each iteration reuses the fitted parameters from the previous iteration as the initial guess for ``magnitude_model``. Since consecutive bootstrap samples are drawn from the same dataset, their optimal parameters are typically close to each other, so the optimiser converges in fewer steps. ``self.init_param_guess`` is restored to its original value after the loop so that subsequent calls are not affected. Returns ------- F_phat_unc : np.ndarray Magnitude model parameters from each bootstrap sample, shape ``(niter, 4)``. g_phat_unc : np.ndarray Temperature model parameters from each bootstrap sample, shape ``(niter, 2)`` for ``"norm"`` or ``(niter, 3)`` for ``"skewnorm"``. RL_unc : np.ndarray Return levels from each bootstrap sample, shape ``(niter, len(return_period))``. n_unc : np.ndarray Mean number of events per block from each bootstrap sample, shape ``(niter,)``. n_err : int Number of iterations where model fitting failed. """ perc_thres = self.left_censoring[1] niter = self.niter_tenax RP = self.return_period blocks = np.unique(blocks_id) M = len(blocks) randy = np.random.randint(0, M, size=(M, niter)) # Initialize variables F_phat_unc = np.full((niter, 4), np.nan) if temp_method == "norm": g_phat_unc = np.full((niter, 2), np.nan) elif temp_method == "skewnorm": g_phat_unc = np.full((niter, 3), np.nan) else: g_phat_unc = [] RL_unc = np.full((niter, len(RP)), np.nan) n_unc = np.full(niter, np.nan) n_err = 0 if parallel: args_list = [ (P, T, blocks_id, blocks, randy[:, ii], M, perc_thres, Ts, temp_method, method_root_scalar, minimize_method, self) for ii in range(niter) ] results = Parallel(n_jobs=n_jobs)( delayed(_bootstrap_single_iteration)(args) for args in args_list ) for ii, (F_tmp, g_tmp, RL_tmp, n_tmp, err) in enumerate(results): if err == 0: F_phat_unc[ii, :] = F_tmp g_phat_unc[ii, :] = g_tmp RL_unc[ii, :] = RL_tmp n_unc[ii] = n_tmp n_err += err else: # updated after each successful magnitude_model fit _warm_start = None # restore after loop _init_param_guess_orig = self.init_param_guess _t_resample = 0.0 _t_magnitude = 0.0 _t_temperature = 0.0 _t_inversion = 0.0 for ii in range(niter): Pr, Tr, Bid = [], [], [] _t0 = time.perf_counter() for iy in range(M): selected = blocks_id == blocks[randy[iy, ii]] Pr.append(P[selected]) Tr.append(T[selected]) Bid.append(np.full(np.sum(selected), iy + 1)) Pr = np.concatenate(Pr) Tr = np.concatenate(Tr) _t_resample += time.perf_counter() - _t0 try: thr = np.quantile(Pr, perc_thres) _t0 = time.perf_counter() if _warm_start is not None: self.init_param_guess = _warm_start F_phat_temporary, _, _, _ = self.magnitude_model( Pr, Tr, thr, minimize_method=minimize_method ) _t_magnitude += time.perf_counter() - _t0 _warm_start = F_phat_temporary _t0 = time.perf_counter() g_phat_temporary = self.temperature_model( Tr, method=temp_method ) _t_temperature += time.perf_counter() - _t0 n_temporary = len(Pr) / M _t0 = time.perf_counter() RL_temporary, _, _ = self.model_inversion( F_phat_temporary, g_phat_temporary, n_temporary, Ts, temp_method=temp_method, method_root_scalar=method_root_scalar, ) _t_inversion += time.perf_counter() - _t0 F_phat_unc[ii, :] = F_phat_temporary g_phat_unc[ii, :] = g_phat_temporary RL_unc[ii, :] = RL_temporary n_unc[ii] = n_temporary except Exception: n_err += 1 print("\n--- bootstrap_uncertainty internal timings ---") for label, elapsed in [ ("resample", _t_resample), ("magnitude_model", _t_magnitude), ("temperature_model", _t_temperature), ("model_inversion", _t_inversion), ]: print(f" {label:<20} {elapsed:7.3f} s") self.init_param_guess = _init_param_guess_orig return F_phat_unc, g_phat_unc, RL_unc, n_unc, n_err
def _bootstrap_single_iteration(args): """One bootstrap iteration, designed for ProcessPoolExecutor.""" (P, T, blocks_id, blocks, randy_ii, M, perc_thres, Ts, temp_method, method_root_scalar, minimize_method, S) = args Pr, Tr = [], [] for iy in range(M): selected = blocks_id == blocks[randy_ii[iy]] Pr.append(P[selected]) Tr.append(T[selected]) Pr = np.concatenate(Pr) Tr = np.concatenate(Tr) try: thr = np.quantile(Pr, perc_thres) F_phat_tmp, _, _, _ = S.magnitude_model( Pr, Tr, thr, minimize_method=minimize_method ) g_phat_tmp = S.temperature_model(Tr, method=temp_method) n_tmp = len(Pr) / M RL_tmp, _, _ = S.model_inversion( F_phat_tmp, g_phat_tmp, n_tmp, Ts, temp_method=temp_method, method_root_scalar=method_root_scalar, ) return F_phat_tmp, np.array(g_phat_tmp), RL_tmp, n_tmp, 0 except Exception: nan_g = np.full(2 if temp_method == "norm" else 3, np.nan) nan_rl = np.full(len(S.return_period), np.nan) return np.full(4, np.nan), nan_g, nan_rl, np.nan, 1
[docs] def wbl_leftcensor_loglik( theta, t0, x1, t1, thr ): """Compute log-likelihood for a left-censored Weibull distribution. Shape and scale parameters depend linearly on temperature. Observations below the threshold are left-censored. Parameters ---------- theta : array-like Parameter vector ``[kappa_0, b, lambda_0, a]``. t0 : np.ndarray Temperature values for censored events (precipitation below ``thr``). x1 : np.ndarray Precipitation values at or above ``thr``. t1 : np.ndarray Temperature values corresponding to ``x1``. thr : float Left-censoring threshold. Returns ------- float Log-likelihood value. """ a_w, b_w, a_C, b_C = theta shapes0 = a_w + b_w * t0 scales0 = a_C * np.exp(np.minimum(b_C * t0, 709.0)) shapes1 = a_w + b_w * t1 scales1 = a_C * np.exp(np.minimum(b_C * t1, 709.0)) return ( np.sum(_wbl_logcdf(shapes0, scales0, thr)) + np.sum(_wbl_logpdf(x1, shapes1, scales1)) )
[docs] def wbl_leftcensor_loglik_H0shape( theta, t0, x1, t1, thr ): """Compute log-likelihood for a left-censored Weibull with constant shape (H0). Same as `wbl_leftcensor_loglik` but with ``b=0``, i.e. shape does not depend on temperature. Used as the null hypothesis in the likelihood-ratio test. Parameters ---------- theta : array-like Parameter vector ``[kappa_0, b, lambda_0, a]`` (``b`` is ignored). t0 : np.ndarray Temperature values for censored events (precipitation below ``thr``). x1 : np.ndarray Precipitation values at or above ``thr``. t1 : np.ndarray Temperature values corresponding to ``x1``. thr : float Left-censoring threshold. Returns ------- float Log-likelihood value. """ a_w, _, a_C, b_C = theta scales0 = a_C * np.exp(np.minimum(b_C * t0, 709.0)) scales1 = a_C * np.exp(np.minimum(b_C * t1, 709.0)) return ( np.sum(_wbl_logcdf(a_w, scales0, thr)) + np.sum(_wbl_logpdf(x1, a_w, scales1)) )
[docs] def wbl_leftcensor_loglik_bset(theta, t0, x1, t1, thr, b_set): """Compute log-likelihood for a left-censored Weibull with fixed b. Same as `wbl_leftcensor_loglik` but ``b`` is fixed to ``b_set`` and not optimised. Parameters ---------- theta : array-like Parameter vector ``[kappa_0, b, lambda_0, a]`` (``b`` is overridden by ``b_set``). t0 : np.ndarray Temperature values for censored events (precipitation below ``thr``). x1 : np.ndarray Precipitation values at or above ``thr``. t1 : np.ndarray Temperature values corresponding to ``x1``. thr : float Left-censoring threshold. b_set : float Fixed value of ``b`` (shape temperature-dependence parameter). Returns ------- float Log-likelihood value. """ a_w, _, a_C, b_C = theta b_w = b_set shapes0 = a_w + b_w * t0 scales0 = a_C * np.exp(np.minimum(b_C * t0, 709.0)) shapes1 = a_w + b_w * t1 scales1 = a_C * np.exp(np.minimum(b_C * t1, 709.0)) return ( np.sum(_wbl_logcdf(shapes0, scales0, thr)) + np.sum(_wbl_logpdf(x1, shapes1, scales1)) )
def wbl_leftcensor_loglik_exp( theta, t0, x1, t1, thr ): """Compute log-likelihood for a left-censored Weibull with exponential shape. Like `wbl_leftcensor_loglik` but shape depends exponentially on temperature: ``shape = kappa_0 * exp(b * T)``. Parameters ---------- theta : array-like Parameter vector ``[kappa_0, b, lambda_0, a]``. t0 : np.ndarray Temperature values for censored events (precipitation below ``thr``). x1 : np.ndarray Precipitation values at or above ``thr``. t1 : np.ndarray Temperature values corresponding to ``x1``. thr : float Left-censoring threshold. Returns ------- float Log-likelihood value. """ a_w, b_w, a_C, b_C = theta shapes0 = a_w * np.exp(np.minimum(b_w * t0, 709.0)) scales0 = a_C * np.exp(np.minimum(b_C * t0, 709.0)) shapes1 = a_w * np.exp(np.minimum(b_w * t1, 709.0)) scales1 = a_C * np.exp(np.minimum(b_C * t1, 709.0)) return ( np.sum(_wbl_logcdf(shapes0, scales0, thr)) + np.sum(_wbl_logpdf(x1, shapes1, scales1)) ) def wbl_leftcensor_loglik_bset_bexp( theta, t0, x1, t1, thr, b_set ): """Compute log-likelihood for a left-censored Weibull with exponential shape, fixed b. Combines `wbl_leftcensor_loglik_exp` and `wbl_leftcensor_loglik_bset`: shape depends exponentially on temperature and ``b`` is fixed to ``b_set``. Parameters ---------- theta : array-like Parameter vector ``[kappa_0, b, lambda_0, a]`` (``b`` is overridden by ``b_set``). t0 : np.ndarray Temperature values for censored events (precipitation below ``thr``). x1 : np.ndarray Precipitation values at or above ``thr``. t1 : np.ndarray Temperature values corresponding to ``x1``. thr : float Left-censoring threshold. b_set : float Fixed value of ``b``. Returns ------- float Log-likelihood value. """ a_w, _, a_C, b_C = theta b_w = b_set shapes0 = a_w * np.exp(np.minimum(b_w * t0, 709.0)) scales0 = a_C * np.exp(np.minimum(b_C * t0, 709.0)) shapes1 = a_w * np.exp(np.minimum(b_w * t1, 709.0)) scales1 = a_C * np.exp(np.minimum(b_C * t1, 709.0)) return ( np.sum(_wbl_logcdf(shapes0, scales0, thr)) + np.sum(_wbl_logpdf(x1, shapes1, scales1)) ) def _wbl_logcdf(shapes, scales, thr): """log(CDF) of Weibull at thr: log(1 - exp(-(thr/scale)^shape)).""" shapes = np.maximum(shapes, 1e-300) scales = np.maximum(scales, 1e-300) log_z = shapes * np.log(np.maximum(thr / scales, 1e-300)) z = np.exp(np.minimum(log_z, 709.0)) z = np.maximum(z, 1e-300) return np.log(-np.expm1(-z)) def _wbl_logpdf(x, shapes, scales): """log(PDF) of Weibull: log(c/scale) + (c-1)*log(x/scale) - (x/scale)^c.""" shapes = np.maximum(shapes, 1e-300) scales = np.maximum(scales, 1e-300) z = np.maximum(x / scales, 1e-300) log_z = np.log(z) return np.maximum( np.log(shapes) - np.log(scales) + np.minimum((shapes - 1) * log_z, 709.0) - np.exp(np.minimum(shapes * log_z, 709.0)), -709.0 )
[docs] def gen_norm_pdf( x: np.ndarray, mu: float, sigma: float, beta: float ) -> np.ndarray: """Compute the Generalized normal distribution PDF. Parameters ---------- x : np.ndarray Data points. mu : float Location parameter. sigma : float Scale parameter. beta : float Shape parameter. Returns ------- np.ndarray Generalized normal distribution PDF values. """ coeff = beta / (2 * sigma * gamma(1 / beta)) exponent = -((np.abs(x - mu) / sigma) ** beta) return coeff * np.exp(exponent)
[docs] def gen_norm_loglik( x: np.ndarray, par: list, beta: float ) -> float: """Compute the log-likelihood for the Generalized normal distribution. Parameters ---------- x : np.ndarray Data points. par : list Parameters ``[mu, sigma]``. beta : float Shape parameter. Returns ------- float Log-likelihood value. """ # Compute the log-likelihood pdf = gen_norm_pdf(x, par[0], par[1], beta) n = len(pdf[pdf == 0]) if n > 5: print(f"warning: {n} zero values") pdf[pdf == 0] = 1e-10 # stops issue if zero generated loglik = np.sum(np.log(pdf)) return loglik
[docs] def randdf( size, df, flag ): """Generate random numbers from a user-defined PDF or CDF. Pythonised version of MATLAB's randdf coded by halleyhit on Aug. 15th, 2018. Email: halleyhit@sjtu.edu.cn or halleyhit@163.com Parameters ---------- size : int or tuple Output size. ``10`` → 1-D array of 10; ``(10, 2)`` → 10×2 matrix. df : np.ndarray 2-row matrix: first row is function values (PDF or CDF), second row is the corresponding sampling points. flag : str ``"pdf"`` or ``"cdf"``. Returns ------- np.ndarray Random samples drawn according to the defined distribution. """ # Determine output dimensions if isinstance(size, int): n, m = 1, size elif isinstance(size, tuple) and len(size) == 2: n, m = size else: raise ValueError("Size must be an integer or a tuple of two integers") all_samples = n * m # Validate input density function if df.shape[0] != 2: raise ValueError("Density function matrix must have 2 rows") if np.any(df[0, :] < 0): raise ValueError("Function values must be non-negative") if df.shape[1] < 2: raise ValueError("Density function must have at least two columns") # Normalize pdf or cdf if flag == "pdf": df[0, :] = np.cumsum(df[0, :]) / np.sum(df[0, :]) elif flag == "cdf": if np.any(np.diff(df[0, :]) < 0): raise ValueError("CDF values must be non-decreasing") df[0, :] = df[0, :] / df[0, -1] else: raise ValueError("Flag must be 'pdf' or 'cdf'") # Add a small epsilon to ensure no repeated values df[0, :] += np.arange(df.shape[1]) * np.finfo(float).eps # Generate random samples temp = np.random.rand(all_samples) # Interpolate to get the corresponding values try: result = np.interp(temp, df[0, :], df[1, :]) except ValueError: # Handle repeated x-values by taking unique values _, unique_indices = np.unique(df[0, :], return_index=True) df_unique = df[:, unique_indices] result = np.interp(temp, df_unique[0, :], df_unique[1, :]) return result.reshape((n, m))
[docs] def MC_tSMEV_cdf( y, wbl_phat, n ): """Evaluate the Monte Carlo SMEV CDF at given values. Parameters ---------- y : float or array-like Value(s) at which to evaluate the CDF. wbl_phat : np.ndarray Weibull parameters, shape ``(N, 2)``: columns are ``[scale, shape]``. n : float Power applied to the average probability. Returns ------- np.ndarray CDF value(s) for input ``y``. """ y = np.atleast_1d(y) # Ensure y is array scale = wbl_phat[:, 0] shape = wbl_phat[:, 1] # Reshape to broadcast over y and MC samples y_grid = y[:, None] # shape: (len(y), 1) scale_grid = scale[None, :] # shape: (1, N) shape_grid = shape[None, :] # shape: (1, N) cdf_vals = 1 - np.exp(-(y_grid / scale_grid) ** shape_grid) p_avg = np.mean(cdf_vals, axis=1) # Average over MC samples return p_avg ** n
[docs] def SMEV_Mc_inversion( wbl_phat, n, target_return_periods, vguess, method_root_scalar ): """Invert the MC-SMEV CDF to find quantiles for target return periods. Parameters ---------- wbl_phat : np.ndarray Weibull parameters, shape ``(N, 2)``: columns are ``[scale, shape]``. n : float Power applied to the average probability. target_return_periods : list or array-like Desired return periods. vguess : np.ndarray Initial value grid for root-finding. method_root_scalar : str Root-finding method passed to ``scipy.optimize.root_scalar``. Returns ------- np.ndarray Quantiles corresponding to each target return period. """ # if n is numpy or panda series, this should give u just float if not isinstance(n, float): n = float(n.values[0]) else: pass pr = 1 - 1 / np.array( target_return_periods ) # Probabilities associated with target_return_periods pv = MC_tSMEV_cdf( vguess, wbl_phat, n ) # Probabilities associated with vguess values qnt = np.full( len(target_return_periods), np.nan ) # Initialize output array with NaNs for t in range(len(target_return_periods)): # Find the first guess where pv exceeds pr first_guess_idx = np.where(pv > pr[t])[0] if len(first_guess_idx) > 0: first_guess = vguess[first_guess_idx[0]] else: # Use the last valid guess if none exceeds pr last_valid_idx = np.where(pv < 1)[0] if len(last_valid_idx) > 0: first_guess = vguess[last_valid_idx[-1]] else: first_guess = vguess[-1] # Define the function for root finding def func(y): return MC_tSMEV_cdf(y, wbl_phat, n) - pr[t] # Use root_scalar as an alternative to MATLAB's fzero result = root_scalar( func, bracket=[vguess[0], vguess[-1]], x0=first_guess, method=method_root_scalar ) if result.converged: qnt[t] = result.root return qnt
[docs] def inverse_magnitude_model( F_phat, eT, qs, b_exp=False ): """ Calculate percentiles from the Weibell magnitude model Parameters ---------- F_phat : numpy.ndarray distribution values. F_phat = [kappa_0,b,lambda_0,a]. eT : numpy.ndarray Temperature values from which to produce distribution. qs : list list of percentiles to calculate (between 0 and 1). e.g. [0.85,0.95,0.99]. b_exp : bool If True, uses the exponential rather than linear fit for b. Returns ------- percentile_lines : numpy.ndarray array with shape length(qs) by length(eT) giving the magnitudes for each eT. percentile_lines[0] are the values for qs[0]. """ percentile_lines = np.zeros((len(qs), len(eT))) if b_exp: for iq, q in enumerate(qs): scale = F_phat[2] * np.exp(F_phat[3] * eT) # b is multiplicatve shape = F_phat[0] * np.exp(F_phat[1] * eT) percentile_lines[iq, :] = scale * (-np.log(1 - q)) ** (1 / shape) else: for iq, q in enumerate(qs): scale = F_phat[2] * np.exp(F_phat[3] * eT) # b is additive shape = F_phat[0] + F_phat[1] * eT percentile_lines[iq, :] = scale * (-np.log(1 - q)) ** (1 / shape) return percentile_lines