Source code for pyTENAX.smev

import math
import numpy as np
import pandas as pd
from typing import Union, Tuple, Dict
import statsmodels.api as sm

try:
    from numba import njit as _njit, prange as _prange

    @_njit
    def _smev_inner_loop_numba_seq(
        data,
        start_indices,
        end_indices,
        window_size,
        n_events,
    ):
        """Single-threaded JIT kernel: sliding-window maximum over each event.

        For every ordinary event (defined by its start/end index into `data`),
        computes the maximum rolling sum over a window of `window_size` steps.
        Replicates np.convolve(..., 'same') but avoids Python overhead so
        numba can JIT-compile the entire loop.

        Parameters
        ----------
        data : np.ndarray[int64]
            Full precipitation series scaled by 10000 (integer arithmetic
            avoids floating-point ties).
        start_indices : np.ndarray[int64]
            Index into `data` where each event starts.
        end_indices : np.ndarray[int64]
            Index into `data` where each event ends.
        window_size : int
            Number of timesteps in the aggregation window.
        n_events : int
            Total number of ordinary events.

        Returns
        -------
        max_vals : np.ndarray[int64]
            Maximum rolling sum (scaled) for each event.
        max_global_idx : np.ndarray[int64]
            Index into `data` of the window centre that achieves the maximum.
        """
        max_vals = np.empty(n_events, dtype=np.int64)
        max_global_idx = np.empty(n_events, dtype=np.int64)
        for i in range(n_events):
            si = start_indices[i]
            ei = end_indices[i]
            if si == ei:
                # Single-timestep event — no window needed
                max_vals[i] = data[si]
                max_global_idx[i] = si
            else:
                slice_len = ei - si + 1
                # 'same' convolution output length = max(slice, window)
                output_len = max(slice_len, window_size)
                min_len = min(slice_len, window_size)
                # Centre offset matches numpy's 'same' padding convention
                offset = (min_len - 1) // 2
                best_val = np.int64(-9223372036854775807)  # int64 minimum
                best_idx = 0
                # Slide the window across the event
                for j in range(output_len):
                    full_idx = j + offset
                    # Clamp window bounds to the actual slice
                    start_k = max(full_idx - (window_size - 1), 0)
                    end_k = min(full_idx + 1, slice_len)
                    # Sum values inside the current window position
                    s = np.int64(0)
                    for k in range(start_k, end_k):
                        s += data[si + k]
                    if s > best_val:
                        best_val = s
                        best_idx = j
                max_vals[i] = best_val
                max_global_idx[i] = si + best_idx
        return max_vals, max_global_idx

    @_njit(parallel=True)
    def _smev_inner_loop_numba(
        data,
        start_indices,
        end_indices,
        window_size,
        n_events,
    ):
        """Parallelised JIT kernel: sliding-window maximum over each event.

        Identical logic to `_smev_inner_loop_numba_seq` but events are
        processed in parallel using numba's `prange`. Safe because each
        iteration writes to a unique output index.

        Parameters
        ----------
        data : np.ndarray[int64]
            Full precipitation series scaled by 10000 (integer arithmetic
            avoids floating-point ties).
        start_indices : np.ndarray[int64]
            Index into `data` where each event starts.
        end_indices : np.ndarray[int64]
            Index into `data` where each event ends.
        window_size : int
            Number of timesteps in the aggregation window.
        n_events : int
            Total number of ordinary events.

        Returns
        -------
        max_vals : np.ndarray[int64]
            Maximum rolling sum (scaled) for each event.
        max_global_idx : np.ndarray[int64]
            Index into `data` of the window centre that achieves the maximum.
        """
        # data must be int64 (scaled by 10000) so sums are exact —
        # no floating-point ties
        max_vals = np.empty(n_events, dtype=np.int64)
        max_global_idx = np.empty(n_events, dtype=np.int64)
        for i in _prange(n_events):  # parallel loop — each i is independent
            si = start_indices[i]
            ei = end_indices[i]
            if si == ei:
                # Single-timestep event — no window needed
                max_vals[i] = data[si]
                max_global_idx[i] = si
            else:
                slice_len = ei - si + 1
                # 'same' convolution output length = max(slice, window);
                # numpy's start offset is (min(n,m)-1)//2
                output_len = max(slice_len, window_size)
                min_len = min(slice_len, window_size)
                # Centre offset matches numpy's 'same' padding convention
                offset = (min_len - 1) // 2
                best_val = np.int64(-9223372036854775807)  # int64 minimum
                best_idx = 0
                # Slide the window across the event
                for j in range(output_len):
                    full_idx = j + offset
                    # Clamp window bounds to the actual slice
                    start_k = max(full_idx - (window_size - 1), 0)
                    end_k = min(full_idx + 1, slice_len)
                    # Sum values inside the current window position
                    s = np.int64(0)
                    for k in range(start_k, end_k):
                        s += data[si + k]
                    if s > best_val:
                        best_val = s
                        best_idx = j
                max_vals[i] = best_val
                max_global_idx[i] = si + best_idx
        return max_vals, max_global_idx

    _NUMBA_AVAILABLE = True

except ImportError:
    _NUMBA_AVAILABLE = False


[docs] class SMEV: def __init__( self, return_period: list[Union[int, float]], durations: list[int], time_resolution: int, tolerance: float = 0.1, min_event_duration: int = 30, storm_separation_time: int = 24, left_censoring: list = [0, 1], min_rain: Union[float, int] = 0, ): """Initiates SMEV class. Parameters ---------- return_period : list[Union[int, float]] List of return periods of interest [years]. durations : list[int] List of durations of interest [min]. time_resolution : int Temporal resolution of the precipitation data [min]. tolerance : float, optional Maximum allowed fraction of missing data in one year. If exceeded, year will be disregarded from samples. Defaults to 0.1. min_event_duration : int, optional Minimum event duration [min]. Defaults to 30. storm_separation_time : int, optional Separation time between independent storms [hours]. Defaults to 24. left_censoring : list, optional 2-elements list with the limits in probability of the data to be used for the parameters estimation. Defaults to [0, 1]. min_rain : Union[float, int], optional Minimum rainfall value. Defaults to 0. """ self.return_period = return_period self.durations = durations self.time_resolution = time_resolution self.tolerance = tolerance self.min_event_duration = min_event_duration self.storm_separation_time = storm_separation_time self.left_censoring = left_censoring self.min_rain = min_rain self.__incomplete_years_removed__ = False
[docs] def remove_incomplete_years( self, data_pr: pd.DataFrame, name_col="value", nan_to_zero=True, ) -> pd.DataFrame: """Delete incomplete years in precipitation data. An incomplete year is defined as a year where observations are missing above a given threshold. Parameters ---------- data_pr : pd.DataFrame Dataframe containing (hourly) precipitation values. name_col : str, optional Column name in `data_pr` with precipitation values. Defaults to "value". nan_to_zero : bool, optional Set `nan` to zero. Defaults to True. Returns ------- pd.DataFrame Dataframe containing (hourly) precipitation values with incomplete years removed. """ # Step 1: get resolution of dataset (MUST BE SAME in whole dataset!!!) time_res = ( (data_pr.index[-1] - data_pr.index[-2]).total_seconds() / 60 ) # Validate: if user provided time_resolution, it must match the data if ( self.time_resolution is not None and self.time_resolution != time_res ): raise ValueError( "time_resolution provided " f"({self.time_resolution} min) does not match " f"the resolution detected from data ({time_res} min)." ) # Step 2: Resample by year and count total and NaN values yearly_valid = data_pr.resample("YE").apply( lambda x: x.notna().sum() ) # Count not NaNs per year # Step 3: Estimate expected length of yearly timeseries expected = pd.DataFrame(index=yearly_valid.index) # 1440 = number of minutes in a day expected["Total"] = 1440 / time_res * 365 # Step 4: Calculate percentage of missing data per year valid_percentage = yearly_valid[name_col] / expected["Total"] # Step 5: Filter out years where more than tolerance% of values are NaN years_to_remove = valid_percentage[ valid_percentage < (1 - self.tolerance) ].index # Step 6: Remove data for those years from the original DataFrame data_cleanded = data_pr[ ~data_pr.index.year.isin(years_to_remove.year) ] # Replace NaN values with 0 in the specific column if nan_to_zero: data_cleanded.loc[:, name_col] = ( data_cleanded[name_col].fillna(0) ) self.time_resolution = time_res self.__incomplete_years_removed__ = True return data_cleanded
[docs] def get_ordinary_events( self, data: Union[pd.DataFrame, np.ndarray], dates: np.ndarray, name_col: str = "value", check_gaps=True, ) -> list: """Extract ordinary precipitation events from a time series. Groups timesteps at or above ``self.min_rain`` into independent storm events separated by at least ``self.storm_separation_time`` hours. Optionally removes events too close to dataset boundaries or data gaps. Parameters ---------- data : Union[pd.DataFrame, np.ndarray] Precipitation values. dates : np.ndarray Timestamps of the precipitation data. name_col : str, optional Column name to use when ``data`` is a DataFrame. Defaults to "value". check_gaps : bool, optional Remove events that fall within ``storm_separation_time`` of the dataset boundaries or internal data gaps. Defaults to True. Returns ------- list List of np.ndarray, each containing the timestamps of one ordinary event (values >= ``self.min_rain`` separated by more than ``self.storm_separation_time`` hours). """ if not self.__incomplete_years_removed__: raise ValueError( "You must run 'remove_incomplete_years' " "before running this function. " "If you are sure your data is complete, set " "self.__incomplete_years_removed__ = True " "to bypass this check." ) if isinstance(data, pd.DataFrame): data = np.array(data[name_col]) dates = dates.astype("datetime64[ns]") if self.min_rain == 0: above_threshold_indices = np.where(data > self.min_rain)[0] else: above_threshold_indices = np.where(data >= self.min_rain)[0] if len(above_threshold_indices) == 0: return [] # Get dates at above-threshold positions above_dates = dates[above_threshold_indices] # Compute time differences between consecutive above-threshold # timesteps (in nanoseconds) time_diffs_above = np.diff(above_dates).astype(np.int64) # Find where gaps exceed separation time (hours to nanoseconds) separation_ns = int(self.storm_separation_time * 3.6e12) gap_mask = time_diffs_above > separation_ns # Split indices at gap locations split_points = np.where(gap_mask)[0] + 1 # Split into groups of indices, then map back to dates index_groups = np.split(above_threshold_indices, split_points) # Convert to list of date arrays (same format as original) consecutive_values = [dates[group] for group in index_groups] if check_gaps: # Remove event too close to the start of the dataset if (consecutive_values[0][0] - dates[0]).item() < ( self.storm_separation_time * 3.6e12 ): # numpy dt is in nanoseconds consecutive_values.pop(0) else: pass # Remove event too close to the end of the dataset if (dates[-1] - consecutive_values[-1][-1]).item() < ( self.storm_separation_time * 3.6e12 ): # numpy dt is in nanoseconds consecutive_values.pop() else: pass # Locate OE that ends before gaps in data starts. # Calculate the differences between consecutive elements time_diffs = np.diff(dates) # difference of first element is time resolution time_res = time_diffs[0] # Identify gaps larger than separation time sep_td = np.timedelta64( int(self.storm_separation_time * 3.6e12), "ns" ) gap_indices_end = np.where(time_diffs > sep_td)[0] # Extend by one index to also check OE near gap start gap_indices_start = gap_indices_end + 1 match_info = [] for gap_idx in gap_indices_end: end_date = dates[gap_idx] start_date = end_date - np.timedelta64( int(self.storm_separation_time * 3.6e12), "ns" ) temp_date_array = np.arange(start_date, end_date, time_res) for i, sub_array in enumerate(consecutive_values): match_indices = np.where( np.isin(sub_array, temp_date_array) )[0] if match_indices.size > 0: match_info.append(i) for gap_idx in gap_indices_start: start_date = dates[gap_idx] end_date = start_date + np.timedelta64( int(self.storm_separation_time * 3.6e12), "ns" ) temp_date_array = np.arange(start_date, end_date, time_res) for i, sub_array in enumerate(consecutive_values): match_indices = np.where( np.isin(sub_array, temp_date_array) )[0] if match_indices.size > 0: match_info.append(i) for del_index in sorted(match_info, reverse=True): del consecutive_values[del_index] return consecutive_values
[docs] def remove_short( self, list_ordinary: list, ) -> Tuple[np.ndarray, np.ndarray, pd.DataFrame]: """Remove ordinary events that are too short. Parameters ---------- list_ordinary : list List of ordinary events as returned by `get_ordinary_events()`. Each event may contain pd.Timestamp or np.datetime64 values. Returns ------- arr_vals : np.ndarray Boolean array (all True) of length equal to the number of kept events, one entry per event that passed the duration filter. arr_dates : np.ndarray Array of (end, start) date tuples for each kept event. n_ordinary_per_year : pd.DataFrame DataFrame with the count of ordinary events per year. """ if not self.__incomplete_years_removed__: raise ValueError( "You must run 'remove_incomplete_years' " "before running this function. " "If you are sure your data is complete, set " "self.__incomplete_years_removed__ = True " "to bypass this check." ) # Convert pd.Timestamp events to np.datetime64 if needed if isinstance(list_ordinary[0][0], pd.Timestamp): list_ordinary = [ np.array([t.to_datetime64() for t in ev]) for ev in list_ordinary ] min_duration = np.timedelta64(int(self.min_event_duration), "m") time_res = np.timedelta64(int(self.time_resolution), "m") ll_short = [ (ev[-1] - ev[0]).astype("timedelta64[m]") + time_res >= min_duration for ev in list_ordinary ] ll_dates = [ (ev[-1], ev[0]) if keep else (np.nan, np.nan) for ev, keep in zip(list_ordinary, ll_short) ] arr_vals = np.array(ll_short)[ll_short] arr_dates = np.array(ll_dates)[ll_short] filtered_list = [ ev for ev, keep in zip(list_ordinary, ll_short) if keep ] list_year = pd.DataFrame( [ev[0].astype("datetime64[Y]").item().year for ev in filtered_list], columns=["year"], ) n_ordinary_per_year = list_year.reset_index().groupby(["year"]).count() return arr_vals, arr_dates, n_ordinary_per_year
[docs] def get_ordinary_events_values( self, data: np.ndarray, dates: np.ndarray, arr_dates_oe: np.ndarray, method: str = "vectorized", ) -> Tuple[Dict[str, pd.DataFrame], Dict[str, pd.DataFrame]]: """Extract ordinary events and annual maxima from precipitation data. Parameters ---------- data : np.ndarray Full precipitation time series. dates : np.ndarray Timestamps of the full precipitation dataset. arr_dates_oe : np.ndarray End and start times of ordinary events as returned by remove_short. method : str, optional Backend used for the sliding-window maximum search. Defaults to ``"vectorized"``. One of: - ``"vectorized"`` — pure numpy, ``np.convolve`` per event. - ``"njit"`` — numba JIT-compiled loop, single-threaded. - ``"njit_parallel"`` — numba JIT-compiled loop, parallelised over events. Requires ``numba`` to be installed. Notes ----- When using ``method="njit"`` or ``method="njit_parallel"``, the first call in a Python session triggers JIT compilation and can take several seconds. Run a warmup call before any timed or production code:: # Warmup — compile both kernels once at session start S_SMEV.get_ordinary_events_values( data=df_arr, dates=df_dates, arr_dates_oe=arr_dates, method="njit" ) S_SMEV.get_ordinary_events_values( data=df_arr, dates=df_dates, arr_dates_oe=arr_dates, method="njit_parallel" ) # Subsequent calls are fast dict_ordinary, dict_AMS = S_SMEV.get_ordinary_events_values( data=df_arr, dates=df_dates, arr_dates_oe=arr_dates, method="njit_parallel" ) Returns ------- dict_ordinary : dict Key is duration (str), value is a ``pd.DataFrame`` with columns ``year``, ``oe_time``, ``ordinary`` (event depth/intensity). Example: ``{"10": pd.DataFrame( columns=['year', 'oe_time', 'ordinary'])}``. dict_AMS : dict Key is duration (str), value is a ``pd.DataFrame`` with columns ``year`` and ``AMS`` (annual maximum value). """ if method in ("njit", "njit_parallel") and not _NUMBA_AVAILABLE: raise ImportError( "numba is required for method='njit'/'njit_parallel'. " "Install with: pip install numba" ) dict_ordinary = {} dict_AMS = {} time_index = dates.reshape(-1) n_events = arr_dates_oe.shape[0] oe_end = arr_dates_oe[:, 0].astype("datetime64[ns]") oe_start = arr_dates_oe[:, 1].astype("datetime64[ns]") start_indices = np.searchsorted(time_index, oe_start).astype(np.int64) end_indices = np.searchsorted(time_index, oe_end).astype(np.int64) ll_yrs = np.array([ oe_end[i].astype("datetime64[Y]").item().year for i in range(n_events) ], dtype=np.int64) unique_years = np.unique(ll_yrs) year_masks = {yr: ll_yrs == yr for yr in unique_years} data_int = np.round(data * 10000).astype(np.int64) for d in range(len(self.durations)): window_size = int(self.durations[d] / self.time_resolution) if method == "vectorized": ones_kernel = np.ones(window_size, dtype=np.int64) max_vals = np.empty(n_events, dtype=np.float64) max_global_idx = np.empty(n_events, dtype=np.int64) for i in range(n_events): si = start_indices[i] ei = end_indices[i] if si == ei: max_vals[i] = data_int[si] / 10000.0 max_global_idx[i] = si else: arr_conv = np.convolve( data_int[si:ei + 1], ones_kernel, "same" ) ll_idx = np.nanargmax(arr_conv) max_vals[i] = arr_conv[ll_idx] / 10000.0 max_global_idx[i] = si + ll_idx elif method == "njit": max_vals_int, max_global_idx = _smev_inner_loop_numba_seq( data_int, start_indices, end_indices, window_size, n_events ) max_vals = max_vals_int / 10000.0 else: # njit_parallel max_vals_int, max_global_idx = _smev_inner_loop_numba( data_int, start_indices, end_indices, window_size, n_events ) max_vals = max_vals_int / 10000.0 ll_dates_arr = time_index[max_global_idx] ams_vals = np.array([ np.max(max_vals[mask]) for _, mask in year_masks.items() ]) df_ams = pd.DataFrame({"year": unique_years, "AMS": ams_vals}) df_oe = pd.DataFrame({ "year": ll_yrs, "oe_time": ll_dates_arr, "ordinary": max_vals, }) dict_AMS[f"{self.durations[d]}"] = df_ams dict_ordinary[f"{self.durations[d]}"] = df_oe return dict_ordinary, dict_AMS
[docs] def estimate_smev_parameters( self, ordinary_events: Union[np.ndarray, pd.Series, list], data_portion: list[Tuple[int, float]], ) -> list[float]: """Estimate shape and scale parameters of the Weibull distribution. Parameters ---------- ordinary_events : np.ndarray or pd.Series or list Values of ordinary events. data_portion : list Lower and upper limits of the probabilities of data to be used for the parameters estimation. Returns ------- list[float] Shape and scale parameters of the Weibull distribution. """ sorted_df = np.sort(ordinary_events) ecdf = np.arange(1, 1 + len(sorted_df)) / (1 + len(sorted_df)) # fidx: first index of data to keep fidx = max(1, math.floor((len(sorted_df)) * data_portion[0])) # tidx: last index of data to keep tidx = math.ceil(len(sorted_df) * data_portion[1]) # if censoring set to [0,1], take all values if fidx == 1: to_use = np.arange(fidx - 1, tidx) else: # e.g. [0.5,1] out of 1000 samples takes indexes 500-999 (top 500) to_use = np.arange(fidx, tidx) # Select only the subset of sorted values for the chosen quantile range to_use_array = sorted_df[to_use] X = np.log(np.log(1 / (1 - ecdf[to_use]))) Y = np.log(to_use_array) X = sm.add_constant(X) model = sm.OLS(Y, X) results = model.fit() param = results.params slope = float(param[1]) intercept = float(param[0]) shape = 1 / slope scale = np.exp(intercept) weibull_param = [shape, scale] return weibull_param
[docs] def smev_return_values( self, return_period: Union[int, float, list, np.ndarray], shape: float, scale: float, n: float, ) -> Union[float, np.ndarray]: """Calculate return values (rainfall intensity) from Weibull parameters. Parameters ---------- return_period : Union[int, float, list, np.ndarray] Return period(s) of interest. Scalar returns a float, array-like returns np.ndarray. shape : float Shape parameter value. scale : float Scale parameter value. n : float SMEV parameter `n`. Returns ------- Union[float, np.ndarray] Rainfall intensity value(s). """ return_period = np.asarray(return_period) quantile = 1 - (1 / return_period) if shape == 0 or n == 0: intensity = 0 else: intensity = scale * ( (-1) * (np.log(1 - quantile ** (1 / n))) ) ** (1 / shape) return intensity
[docs] def do_smev_all( self, dict_ordinary: Dict[str, pd.DataFrame], n: float, ) -> Dict[str, dict]: """Run SMEV parameter estimation and return level computation for all durations. Parameters ---------- dict_ordinary : Dict[str, pd.DataFrame] Dictionary of ordinary events per duration, as returned by `get_ordinary_events_values`. n : float Mean number of ordinary events per year. Returns ------- Dict[str, dict] Keys are duration strings (e.g. ``"10"``). Each value is a dict with keys ``'SMEV_phat'`` (``list[float]`` of length 2: ``[shape, scale]``) and ``'RLs'`` (``float`` or ``np.ndarray`` of return levels, one per return period). """ dict_smev_outputs = {} for d in range(len(self.durations)): P = dict_ordinary[f"{self.durations[d]}"]["ordinary"] # Estimate shape and scale parameters of weibull distribution smev_shape, smev_scale = self.estimate_smev_parameters( P, self.left_censoring ) # Estimate return period (quantiles) with SMEV smev_rl = self.smev_return_values( self.return_period, smev_shape, smev_scale, n ) dict_smev_outputs[f"{self.durations[d]}"] = { "SMEV_phat": [smev_shape, smev_scale], "RLs": smev_rl, } return dict_smev_outputs
def _run_smev_all_durations( self, dict_ordinary: Dict[str, pd.DataFrame], n: float, ) -> pd.DataFrame: """Estimate SMEV parameters and return levels for all durations. Convenience wrapper around :meth:`estimate_smev_parameters` and :meth:`smev_return_values` that iterates over all configured durations and collects results into a single summary DataFrame. Parameters ---------- dict_ordinary : Dict[str, pd.DataFrame] Ordinary events per duration as returned by :meth:`get_ordinary_events_values`. n : float Mean number of ordinary events per year (from :meth:`remove_short`). Returns ------- pd.DataFrame One row per duration (index e.g. ``"10 min"``), columns: ``N_oe``, ``n_mean``, ``shape``, ``scale``, and one column per return period (``"RP 2yr"``, ``"RP 5yr"``, …). """ rows = {} for dur in [str(d) for d in self.durations]: P = dict_ordinary[dur]["ordinary"].to_numpy() shape, scale = self.estimate_smev_parameters( P, self.left_censoring ) rl = self.smev_return_values(self.return_period, shape, scale, n) rows[f"{dur} min"] = ( [len(P), round(n, 2), round(shape, 4), round(scale, 4)] + [round(v, 2) for v in rl] ) col_names = ["N_oe", "n_mean", "shape", "scale"] + [ f"RP {rp}yr" for rp in self.return_period ] return pd.DataFrame(rows, index=col_names).T
[docs] @staticmethod def get_stats( df: pd.DataFrame, ) -> Tuple[pd.Series, pd.Series, pd.Series, pd.Series]: """Compute statistics of precipitation values. Statistics are total precipitation per year, mean precipitation per year, standard deviation of precipitation per year, and count of precipitation events per year. Parameters ---------- df : pd.DataFrame Dataframe with precipitation values. Returns ------- total_prec : pd.Series Total precipitation per year. mean_prec : pd.Series Mean precipitation per year. sd_prec : pd.Series Standard deviation of precipitation per year. count_prec : pd.Series Count of precipitation events per year. """ if not isinstance(df, pd.DataFrame): raise TypeError("df is not a pandas dataframe") total_prec = df.groupby(df.index.year)["value"].sum() mean_prec = ( df[df.value > 0] .groupby(df[df.value > 0].index.year)["value"] .mean() ) sd_prec = ( df[df.value > 0] .groupby(df[df.value > 0].index.year)["value"] .std() ) count_prec = ( df[df.value > 0] .groupby(df[df.value > 0].index.year)["value"] .count() ) return total_prec, mean_prec, sd_prec, count_prec
[docs] def smev_bootstrap_uncertainty( self, P: np.ndarray, blocks_id: np.ndarray, niter: int, n: float, ): """Bootstrap uncertainty of SMEV return values. Parameters ---------- P : np.ndarray Array of precipitation data. blocks_id : np.ndarray Array of block identifiers (e.g., years). niter : int Number of bootstrap iterations. n : float SMEV parameter `n`. Returns ------- np.ndarray Array with bootstrapped return value uncertainty. """ rp = self.return_period blocks = np.unique(blocks_id) n_blocks = len(blocks) randy = np.random.randint(0, n_blocks, size=(n_blocks, niter)) # Initialize variables rl_unc = np.full((niter, len(rp)), np.nan) n_err = 0 # Random sampling iterations for ii in range(niter): pr = [] bid = [] # Create bootstrapped data sample and 'fake' block ids for iy in range(n_blocks): selected = blocks_id == blocks[randy[iy, ii]] pr.append(P[selected]) bid.append( np.full(np.sum(selected), iy + 1) ) # MATLAB indexing starts at 1 # Concatenate the resampled data pr = np.concatenate(pr) bid = np.concatenate(bid) try: # estimate shape and scale parameters of weibull distribution smev_shape, smev_scale = self.estimate_smev_parameters( pr, self.left_censoring ) # estimate return period (quantiles) with SMEV smev_rp = self.smev_return_values( self.return_period, smev_shape, smev_scale, n ) # Store results rl_unc[ii, :] = smev_rp except Exception: n_err += 1 return rl_unc