Source code for supy._supy_module

# ###########################################################################
# SuPy: SUEWS for Python
#
# Authors:
# Ting Sun, ting.sun@reading.ac.uk
#
# History:
# 20 Jan 2018: first alpha release
# 01 Feb 2018: performance improvement
# 03 Feb 2018: improvement in output processing
# 08 Mar 2018: pypi packaging
# 04 Oct 2018: overhaul of structure
# 05 Oct 2018: added sample run data
# 28 Apr 2019: added support for parallel run
###########################################################################


import time
import dask.bag as db
# from multiprocessing import Pool, cpu_count, freeze_support

import os
import sys
# import functools
from pathlib import Path
from typing import Tuple

import pandas
import pathlib


import numpy as np
import pandas as pd

from ._env import path_supy_module
from ._load import (load_InitialCond_grid_df,
                    load_SUEWS_dict_ModConfig,
                    # load_SUEWS_Forcing_ESTM_df_raw,
                    load_SUEWS_Forcing_met_df_raw,
                    load_df_state,
                    resample_forcing_met,
                    resample_linear)
from ._post import pack_df_output, pack_df_output_array, pack_df_state
from ._run import (pack_df_state_final,
                   pack_grid_dict,
                   suews_cal_tstep,
                   suews_cal_tstep_multi)
from ._save import get_save_info, save_df_output, save_df_state, save_initcond_nml


##############################################################################
# 1. compact wrapper for loading SUEWS settings
# @functools.lru_cache(maxsize=16)
[docs]def init_supy(path_init: str, force_reload=True) -> pd.DataFrame: '''Initialise supy by loading initial model states. Parameters ---------- path_init : str Path to a file that can initialise SuPy, which can be either of the follows: * SUEWS :ref:`RunControl.nml<suews:RunControl.nml>`: a namelist file for SUEWS configurations * SuPy `df_state.csv`: a CSV file including model states produced by a SuPy run via :py:func:`supy.save_supy` force_reload: boolean, optional Flag to force reload all initialisation files by clearing all cached states, with default value `True` (i.e., force reload all files). Note: If the number of simulation grids is large (e.g., > 100), `force_reload=False` is strongly recommended for better performance. Returns ------- df_state_init: pandas.DataFrame Initial model states. See `df_state_var` for details. Examples -------- 1. Use :ref:`RunControl.nml<suews:RunControl.nml>` to initialise SuPy >>> path_init = "~/SUEWS_sims/RunControl.nml" >>> df_state_init = supy.init_supy(path_init) 2. Use ``df_state.csv`` to initialise SuPy >>> path_init = "~/SuPy_res/df_state_test.csv" >>> df_state_init = supy.init_supy(path_init) ''' try: path_init_x = Path(path_init).expanduser().resolve() except FileNotFoundError: print('{path} does not exists!'.format(path=path_init_x)) else: if path_init_x.suffix == '.nml': # SUEWS `RunControl.nml`: df_state_init = load_InitialCond_grid_df( path_init_x, force_reload=force_reload) elif path_init_x.suffix == '.csv': # SuPy `df_state.csv`: df_state_init = load_df_state(path_init_x) else: print('{path} is NOT a valid file to initialise SuPy!'.format( path=path_init_x)) sys.exit() return df_state_init
# # TODO: # def load_forcing(path_pattern: str, grid: int = 0) -> pd.DataFrame: # pass # TODO: # to be superseded by a more generic wrapper: load_forcing
[docs]def load_forcing_grid(path_runcontrol: str, grid: int) -> pd.DataFrame: '''Load forcing data for a specific grid included in the index of `df_state_init </data-structure/supy-io.ipynb#df_state_init:-model-initial-states>`. Parameters ---------- path_runcontrol : str Path to SUEWS :ref:`RunControl.nml <suews:RunControl.nml>` grid : int Grid number Returns ------- df_forcing: pandas.DataFrame Forcing data. See `df_forcing_var` for details. Examples -------- >>> path_runcontrol = "~/SUEWS_sims/RunControl.nml" # a valid path to `RunControl.nml` >>> df_state_init = supy.init_supy(path_runcontrol) # get `df_state_init` >>> grid = df_state_init.index[0] # first grid number included in `df_state_init` >>> df_forcing = supy.load_forcing_grid(path_runcontrol, grid) # get df_forcing ''' try: path_runcontrol = Path(path_runcontrol).expanduser().resolve() except FileNotFoundError: print('{path} does not exists!'.format(path=path_runcontrol)) else: dict_mod_cfg = load_SUEWS_dict_ModConfig(path_runcontrol) df_state_init = init_supy(path_runcontrol) # load setting variables from dict_mod_cfg ( filecode, kdownzen, tstep_met_in, tstep_ESTM_in, multiplemetfiles, multipleestmfiles, dir_input_cfg ) = (dict_mod_cfg[x] for x in [ 'filecode', 'kdownzen', 'resolutionfilesin', 'resolutionfilesinestm', 'multiplemetfiles', 'multipleestmfiles', 'fileinputpath' ] ) tstep_mod, lat, lon, alt, timezone = df_state_init.loc[ grid, [(x, '0') for x in ['tstep', 'lat', 'lng', 'alt', 'timezone']] ].values path_site = path_runcontrol.parent path_input = path_site / dict_mod_cfg['fileinputpath'] # load raw data # met forcing df_forcing_met = load_SUEWS_Forcing_met_df_raw( path_input, filecode, grid, tstep_met_in, multiplemetfiles) # resample raw data from tstep_in to tstep_mod df_forcing_met_tstep = resample_forcing_met( df_forcing_met, tstep_met_in, tstep_mod, lat, lon, alt, timezone, kdownzen) # merge forcing datasets (met and ESTM) df_forcing_tstep = df_forcing_met_tstep.copy() # disable the AnOHM and ESTM components for now and for better performance # ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| # TS 28 Dec 2018 # pack all records of `id` into `metforcingdata_grid` for AnOHM # df_grp = df_forcing_tstep.groupby('id') # dict_id_all = {xid: df_grp.get_group(xid) # for xid in df_forcing_tstep['id'].unique()} # id_all = df_forcing_tstep['id'].apply(lambda xid: dict_id_all[xid]) # df_forcing_tstep = df_forcing_tstep.merge( # id_all.to_frame(name='metforcingdata_grid'), # left_index=True, # right_index=True) # # add Ts forcing for ESTM # if np.asscalar(df_state_init.iloc[0]['storageheatmethod'].values) == 4: # # load ESTM forcing # df_forcing_estm = load_SUEWS_Forcing_ESTM_df_raw( # path_input, filecode, grid, tstep_ESTM_in, multipleestmfiles) # # resample raw data from tstep_in to tstep_mod # df_forcing_estm_tstep = resample_linear( # df_forcing_estm, tstep_met_in, tstep_mod) # df_forcing_tstep = df_forcing_tstep.merge( # df_forcing_estm_tstep, # left_on=['iy', 'id', 'it', 'imin'], # right_on=['iy', 'id', 'it', 'imin']) # # insert `ts5mindata_ir` into df_forcing_tstep # ts_col = df_forcing_estm.columns[4:] # df_forcing_tstep['ts5mindata_ir'] = ( # df_forcing_tstep.loc[:, ts_col].values.tolist()) # df_forcing_tstep['ts5mindata_ir'] = df_forcing_tstep[ # 'ts5mindata_ir'].map(lambda x: np.array(x, order='F')) # else: # # insert some placeholder values # df_forcing_tstep['ts5mindata_ir'] = df_forcing_tstep['Tair'] # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # disable the AnOHM and ESTM components for now and for better performance # coerced precision here to prevent numerical errors inside Fortran df_forcing = df_forcing_tstep.round(10) # new columns for later use in main calculation df_forcing[['iy', 'id', 'it', 'imin']] = df_forcing[[ 'iy', 'id', 'it', 'imin']].astype(np.int64) return df_forcing
# load sample data for quickly starting a demo run # TODO: to deprecate this by renaming for case consistency: load_SampleData-->load_sample_data
[docs]def load_SampleData() -> Tuple[pandas.DataFrame, pandas.DataFrame]: '''Load sample data for quickly starting a demo run. Returns ------- df_state_init, df_forcing: Tuple[pandas.DataFrame, pandas.DataFrame] - df_state_init: `initial model states <df_state_var>` - df_forcing: `forcing data <df_forcing_var>` Examples -------- >>> df_state_init, df_forcing = supy.load_SampleData() ''' path_SampleData = Path(path_supy_module) / 'sample_run' path_runcontrol = path_SampleData / 'RunControl.nml' df_state_init = init_supy(path_runcontrol, force_reload=False) # path_input = path_runcontrol.parent / ser_mod_cfg['fileinputpath'] df_forcing = load_forcing_grid( path_runcontrol, df_state_init.index[0] ) return df_state_init, df_forcing
# input processing code end here ############################################################################## ############################################################################## # 2. compact wrapper for running a whole simulation # # main calculation # input as DataFrame
[docs]def run_supy( df_forcing: pandas.DataFrame, df_state_init: pandas.DataFrame, save_state=False, n_yr=10, ) -> Tuple[pandas.DataFrame, pandas.DataFrame]: '''Perform supy simulation. Parameters ---------- df_forcing : pandas.DataFrame forcing data for all grids in `df_state_init`. df_state_init : pandas.DataFrame initial model states; or a collection of model states with multiple timestamps, whose last temporal record will be used as the initial model states. save_state : bool, optional flag for saving model states at each time step, which can be useful in diagnosing model runtime performance or performing a restart run. (the default is False, which instructs supy not to save runtime model states). n_yr : int, optional chunk size (`n_yr` years) to split simulation periods so memory usage can be reduced. (the default is 10, which implies 10-year forcing chunks used in simulations). Returns ------- df_output, df_state_final : Tuple[pandas.DataFrame, pandas.DataFrame] - df_output: `output results <df_output_var>` - df_state_final: `final model states <df_state_var>` Examples -------- >>> df_output, df_state_final = supy.run_supy(df_forcing, df_state_init) ''' # set up a timer for simulation time start = time.time() # save df_init without changing its original data # df.copy() in pandas works as a standard python deepcopy df_init = df_state_init.copy() # print some diagnostic info print(f'====================') print(f'Simulation period:') print(f' Start: {df_forcing.index[0]}') print(f' End: {df_forcing.index[-1]}') print('') # retrieve the last temporal record as `df_init` # if a `datetime` level existing in the index if df_init.index.nlevels > 1: idx_dt = df_init.index.get_level_values('datetime').unique() dt_last = idx_dt.max() df_init = df_init.loc[dt_last] # add placeholder variables for df_forcing # `metforcingdata_grid` and `ts5mindata_ir` are used by AnOHM and ESTM, respectively # they are now temporarily disabled in supy df_forcing = df_forcing\ .assign( metforcingdata_grid=0, ts5mindata_ir=0, )\ .rename( # rename is a workaround to resolve naming inconsistency between # suews fortran code interface and input forcing file headers columns={ '%' + 'iy': 'iy', 'id': 'id', 'it': 'it', 'imin': 'imin', 'qn': 'qn1_obs', 'qh': 'qh_obs', 'qe': 'qe', 'qs': 'qs_obs', 'qf': 'qf_obs', 'U': 'avu1', 'RH': 'avrh', 'Tair': 'temp_c', 'pres': 'press_hpa', 'rain': 'precip', 'kdown': 'avkdn', 'snow': 'snowfrac_obs', 'ldown': 'ldown_obs', 'fcld': 'fcld_obs', 'Wuh': 'wu_m3', 'xsmd': 'xsmd', 'lai': 'lai_obs', 'kdiff': 'kdiff', 'kdir': 'kdir', 'wdir': 'wdir', } ) # reorder columns of df_forcing to comply with SUEWS kernel convention in receiving the input # TODO: this re-ordering can be later put into the planned input checker list_var_forcing = [ 'iy', 'id', 'it', 'imin', 'qn1_obs', 'qh_obs', 'qe', 'qs_obs', 'qf_obs', 'avu1', 'avrh', 'temp_c', 'press_hpa', 'precip', 'avkdn', 'snowfrac_obs', 'ldown_obs', 'fcld_obs', 'wu_m3', 'xsmd', 'lai_obs', 'kdiff', 'kdir', 'wdir', 'isec', 'metforcingdata_grid', 'ts5mindata_ir', ] df_forcing = df_forcing.loc[:, list_var_forcing] # grid list determined by initial states list_grid = df_init.index print(f'No. of grids: {list_grid.size}\n') # initialise dicts for holding results and model states dict_state = {} dict_output = {} # initial and final tsteps retrieved from forcing data tstep_init = df_forcing.index[0] tstep_final = df_forcing.index[-1] # tstep size retrieved from forcing data freq = df_forcing.index.freq # dict_state is used to save model states for later use dict_state = { # (t_start, grid): series_state_init.to_dict() (tstep_init, grid): pack_grid_dict(series_state_init) for grid, series_state_init in df_init.iterrows() } # remove 'problems.txt' if Path('problems.txt').exists(): os.remove('problems.txt') if save_state: # use slower more functional single step wrapper # convert df to dict with `itertuples` for better performance dict_forcing = {row.Index: row._asdict() for row in df_forcing.itertuples()} for tstep in df_forcing.index: # temporal loop # initialise output of tstep: # load met_forcing if the same across all grids: met_forcing_tstep = dict_forcing[tstep] # print(met_forcing_tstep.keys()) # spatial loop for grid in list_grid: dict_state_start = dict_state[(tstep, grid)] # calculation at one step: # series_state_end, series_output_tstep = suews_cal_tstep_df( # series_state_start, met_forcing_tstep) try: dict_state_end, dict_output_tstep = suews_cal_tstep( dict_state_start, met_forcing_tstep) except: raise RuntimeError('SUEWS kernel error') # update output & model state at tstep for the current grid dict_output.update({(tstep, grid): dict_output_tstep}) dict_state.update({(tstep + 1*freq, grid): dict_state_end}) # pack results as easier DataFrames df_output = pack_df_output(dict_output).swaplevel(0, 1) # drop unnecessary 'datetime' as it is already included in the index df_output = df_output.drop(columns=['datetime'], level=0) df_state_final = pack_df_state(dict_state).swaplevel(0, 1) else: # for multi-year run, reduce the whole df_forcing into {n_yr}-year chunks for less memory consumption yr_start = df_forcing.index.min().year yr_all = df_forcing.index.year grp_forcing_yr = df_forcing.groupby((yr_all-yr_start) // n_yr) if len(grp_forcing_yr) > 1: df_state_init_yr = df_state_init.copy() list_df_output = [] list_df_state = [] for grp in grp_forcing_yr.groups: # get forcing of a specific year df_forcing_yr = grp_forcing_yr.get_group(grp) # run supy: actual execution done in the `else` clause below df_output_yr, df_state_final_yr = run_supy( df_forcing_yr, df_state_init_yr, n_yr=n_yr) df_state_init_yr = df_state_final_yr.copy() # collect results list_df_output.append(df_output_yr) list_df_state.append(df_state_final_yr) # re-organise results of each year df_output = pd.concat(list_df_output).sort_index() df_state_final = pd.concat( list_df_state).sort_index().drop_duplicates() return df_output, df_state_final else: # for single-chunk run (1 chunk = {n_yr} years), directly put df_forcing into supy_driver for calculation # use higher level wrapper that calculate at a `block` level # for better performance # # construct input list for `Pool.starmap` # construct input list for `dask.bag` list_input = [ # (dict_state[(tstep_init, grid)], df_forcing) dict_state[(tstep_init, grid)] for grid in list_grid ] if len(list_input) > 1: # if multiple grids to simulate # run simulations in parallel # on windows `processes` has issues when importing # so set `threads` here method_parallel = 'threads' if os.name == 'nt' else 'processes' list_res = db.from_sequence(list_input)\ .map(suews_cal_tstep_multi, df_forcing)\ .compute(scheduler=method_parallel) else: # list_res = [ suews_cal_tstep_multi(input_grid, df_forcing) for input_grid in list_input] try: list_state_end, list_output_array = zip(*list_res) except: raise RuntimeError('SUEWS kernel error') # collect output arrays dict_output = { grid: dict_output_array for grid, dict_output_array in zip(list_grid, list_output_array) } # collect final states dict_state_final_tstep = { (tstep_final + freq, grid): dict_state_end for grid, dict_state_end in zip(list_grid, list_state_end) } dict_state.update(dict_state_final_tstep) # save results as time-aware DataFrame df_output0 = pack_df_output_array(dict_output, df_forcing) df_output = df_output0.replace(-999., np.nan) df_state_final = pack_df_state(dict_state).swaplevel(0, 1) # drop ESTM for now as it is not supported yet # select only those supported output groups list_group_use = [ group for group in df_output.columns.levels[0] if group not in ['ESTM'] ] df_output = df_output.loc[:, list_group_use] # trim multi-index based columns df_output.columns = df_output.columns.remove_unused_levels() # pack final model states into a proper dataframe df_state_final = pack_df_state_final(df_state_final, df_init) # show simulation time end = time.time() print(f'Execution time: {(end - start):.1f} s') print(f'====================\n') return df_output, df_state_final
############################################################################## # 3. save results of a supy run
[docs]def save_supy( df_output: pandas.DataFrame, df_state_final: pandas.DataFrame, freq_s: int = 3600, site: str = '', path_dir_save: str = Path('.'), path_runcontrol: str = None,) -> list: '''Save SuPy run results to files Parameters ---------- df_output : pandas.DataFrame DataFrame of output df_state_final : pandas.DataFrame DataFrame of final model states freq_s : int, optional Output frequency in seconds (the default is 3600, which indicates hourly output) site : str, optional Site identifier (the default is '', which indicates site identifier will be left empty) path_dir_save : str, optional Path to directory to saving the files (the default is Path('.'), which indicates the current working directory) path_runcontrol : str, optional Path to SUEWS :ref:`RunControl.nml <suews:RunControl.nml>`, which, if set, will be preferably used to derive `freq_s`, `site` and `path_dir_save`. (the default is None, which is unset) Returns ------- list a list of paths of saved files Examples -------- 1. save results of a supy run to the current working directory with default settings >>> list_path_save = supy.save_supy(df_output, df_state_final) 2. save results according to settings in :ref:`RunControl.nml <suews:RunControl.nml>` >>> list_path_save = supy.save_supy(df_output, df_state_final, path_runcontrol='path/to/RunControl.nml') 3. save results of a supy run at resampling frequency of 1800 s (i.e., half-hourly results) under the site code ``Test`` to a customised location 'path/to/some/dir' >>> list_path_save = supy.save_supy(df_output, df_state_final, freq_s=1800, site='Test', path_dir_save='path/to/some/dir') ''' # get necessary information for saving procedure if path_runcontrol is not None: freq_s, path_dir_save, site = get_save_info(path_runcontrol) # save df_output to several files list_path_save = save_df_output(df_output, freq_s, site, path_dir_save) # save df_state if path_runcontrol is not None: # save as nml as SUEWS binary list_path_nml = save_initcond_nml(df_state_final, site, path_dir_save) list_path_save = list_path_save+list_path_nml else: # save as supy csv for later use path_state_save = save_df_state(df_state_final, site, path_dir_save) # update list_path_save list_path_save.append(path_state_save) return list_path_save