Source code for supy.util._gap_filler

import pandas as pd
import numpy as np


# locate the first position of period with in-between gaps
def loc_gap(ser_test, freq="1D", pattern="010"):
    rsmp = ser_test.resample(freq)
    ser_TF_10 = rsmp.apply(lambda ser: ser.isna().any()) * 1
    str_TF_10 = ser_TF_10.astype(str).str.cat()
    pos_gap = str_TF_10.find(pattern)
    loc_ser = ser_TF_10.iloc[pos_gap : pos_gap + len(pattern)].index
    return loc_ser


# fill gap with neighbouring days
def fill_gap_one(ser_test, freq="1D", pattern="010"):
    # resample into daily periods
    rsmp = ser_test.resample(freq)
    # locate the gaps according to gap pattern: 0 for NO gap, 1 for gapped
    loc_ser = loc_gap(ser_test, freq, pattern)

    # generator groups
    ser_find = (rsmp.get_group(x) for x in loc_ser)
    if len(loc_ser) == 0:
        return ser_test

    # assign series:
    # ser_prev: series prior to gapped period
    # ser_gap: series with gaps
    # ser_post: series after gapped period
    if pattern == "010":
        ser_prev, ser_gap, ser_post = ser_find
    elif pattern == "01":
        ser_prev, ser_gap = ser_find
        ser_post = pd.Series([])
    elif pattern == "10":
        ser_gap, ser_post = ser_find
        ser_prev = pd.Series([])

    # base series for gap filling
    ser_fill_base = pd.concat([ser_prev, ser_post])
    ser_fill = (
        ser_fill_base.groupby(
            [
                ser_fill_base.index.hour.rename("hr"),
                ser_fill_base.index.minute.rename("min"),
            ]
        )
        .median()
        .reset_index(drop=True)
    )
    ser_fill.index = ser_gap.index

    # calculate rescaling factor with enough values to robustly rescale
    if (pattern == "010") and (ser_gap.count() > len(ser_gap) / 2):
        scale_fill = (ser_fill / ser_gap).median()
        # correct scale_fill for edge cases
        scale_fill = 1 if abs(scale_fill) > 10 else scale_fill
        scale_fill = 1 if abs(scale_fill) < 0.1 else scale_fill
        scale_fill = 1 if np.isnan(scale_fill) else scale_fill
    else:
        scale_fill = 1
    # rescale fill based on median ratio of fill:orig at available timesteps
    ser_fill_gap = ser_fill / scale_fill

    # fill in gaps with rescaled values of the filling data
    ser_gap.loc[ser_gap.isna()] = ser_fill_gap.loc[ser_gap.isna()]
    ser_filled = pd.concat([ser_prev, ser_gap, ser_post])

    # fill the original gapped series
    ser_test_filled = ser_test.copy()
    ser_test_filled.loc[ser_filled.index] = ser_filled
    return ser_test_filled


# fill gaps iteratively
[docs]def fill_gap_all(ser_to_fill: pd.Series, freq="1D") -> pd.Series: """Fill all gaps in a time series using data from neighbouring divisions of 'freq' Parameters ---------- ser_to_fill : pd.Series Time series to gap-fill freq : str, optional Frequency to identify gapped divisions, by default '1D' Returns ------- ser_test_filled: pd.Series Gap-filled time series. Patterns -------- 010: missing data in division between others with no missing data 01: missing data in division after one with no missing data 10: division with missing data before one with no missing data """ ser_test_filled = ser_to_fill.copy() ptn_list = ["010", "01", "10"] while ser_test_filled.isna().any(): # try to different gap patterns and fill gaps try: ptn_gap = next( ptn for ptn in ptn_list if len(loc_gap(ser_test_filled, freq, ptn)) > 0 ) ser_test_filled = fill_gap_one(ser_test_filled, freq, ptn_gap) except StopIteration: pass return ser_test_filled