Impact Studies Using SuPy¶
Aim¶
In this tutorial, we aim to perform sensitivity analysis using supy
in a parallel mode to investigate the impacts on urban climate of
- surface properties: the physical attributes of land covers (e.g., albedo, water holding capacity, etc.)
- background climate: longterm meteorological conditions (e.g., air temperature, precipitation, etc.)
Prepare supy
for the parallel mode¶
load supy
and sample dataset¶
[1]:
from dask import delayed
from dask import dataframe as dd
import os
import supy as sp
import seaborn as sns
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from time import time
import logging
logging.basicConfig()
logging.getLogger().setLevel(logging.WARNING)
get_ipython().run_line_magic('matplotlib', 'inline')
# produce high-quality figures, which can also be set as one of ['svg', 'pdf', 'retina', 'png']
# 'svg' produces high quality vector figures
%config InlineBackend.figure_format = 'svg'
# show supy version info
sp.show_version()
supy: 2019.8.30dev
supy_driver: 2019a4
[2]:
# load sample datasets
df_state_init, df_forcing = sp.load_SampleData()
# perform an example run to get output samples for later use
df_output, df_state_final = sp.run_supy(df_forcing, df_state_init)
Paralell setup for supy
using dask
¶
Given the nature of impact studies that requires multiple independent models with selected parameters/variables varying across the setups, such simulations well fall into the scope of so-called *embarrassingly parallel computation* that is fully supported by dask
. Also, as supy
is readily built on the data structure pandas.DataFrame
, we can fairly easily transfer it to the dask
framework for
parallel operations thanks to `dask.dataframe
<http://docs.dask.org/en/latest/dataframe.html>`__, a specialized dataframe
extending pandas.DataFrame
’s ability in parallel operations.
Prior to version 2019.5, for a given forcing dataset df_forcing
, supy
would loop over the grids in a df_state_init
to conduct simulations. Since version 2019.5, supy
has been using the dask.dataframe
to gain the parallel benefits through its parallelized apply
method.
dask.dataframe
essentially divides the work into pieces for parallel operations. As such, depending on the number of processors in your computer, it would be more efficient to set the partition number as the multipliers of CPU numbers.
[3]:
import platform
import psutil
list_info=['machine','system','mac_ver','processor']
for info in list_info:
info_x=getattr(platform,info)()
print(info,':',info_x)
cpu_count=psutil.cpu_count()
print('number of CPU processors:',cpu_count)
mem_size=psutil.virtual_memory().total/1024**3
print('memory size (GB):',mem_size)
machine : x86_64
system : Darwin
mac_ver : ('10.14.6', ('', '', ''), 'x86_64')
processor : i386
number of CPU processors: 12
memory size (GB): 32.0
To demonstrate the parallelization, we simply duplicate the contents in df_state_init
to make it seemingly large. Note we intentionally choose 24
as the number for copies to accompany the power of CPU.
Before we move on to the parallel mode, we perform a simulation in the traditional serial way to see the baseline performance.
Baseline serial run¶
[4]:
# just run for 30 days
df_forcing_part = df_forcing.iloc[:288*30]
df_state_init_mgrids = df_state_init.copy()
# construct a multi-grid `df_state_init`
for i in range(24-1):
df_state_init_mgrids = df_state_init_mgrids.append(
df_state_init, ignore_index=True)
# perform a serial run
t0 = time()
for i in range(24-1):
xx = sp.run_supy(df_forcing_part, df_state_init_mgrids.iloc[[i]])
t1 = time()
t_ser = t1-t0
logging.warning(f'Execution time: {t_ser:.2f} s')
WARNING:root:Execution time: 7.61 s
Parallel run¶
[5]:
# parallel run is enabled in supy by default
t0 = time()
xx = sp.run_supy(df_forcing_part, df_state_init_mgrids)
t1 = time()
t_par = t1-t0
logging.warning(f'Execution time: {t_par:.2f} s')
WARNING:root:Execution time: 4.13 s
Benchmark test¶
Note: this test may take a considerably long time depending on the machine performance
[6]:
# different running length
list_sim_len = [
day * 288 for day in [30, 90, 120, 150, 180, 270, 365, 365 * 2, 365 * 3]
]
# number of test grids
n_grid = 12
# construct a multi-grid `df_state_init`
df_state_init_m = df_state_init.copy()
for i in range(n_grid - 1):
df_state_init_m = df_state_init_m.append(df_state_init, ignore_index=True)
# construct a longer`df_forcing` for three years
df_forcing_m = pd.concat([df_forcing for i in range(3)])
df_forcing_m.index = pd.date_range(df_forcing.index[0],
freq=df_forcing.index.freq,
periods=df_forcing_m.index.size)
dict_time_ser = dict()
dict_time_par = dict()
for sim_len in list_sim_len:
df_forcing_part = df_forcing_m.iloc[:sim_len]
logging.warning(f'Sim days: {sim_len / 288}')
logging.warning(f'No. of grids: {df_state_init_m.shape[0]}')
# serial run
logging.warning('serial:')
t0 = time()
for i in range(df_state_init_m.shape[0]):
sp.run_supy(df_forcing_part, df_state_init_m.iloc[[i]])
t1 = time()
t_test = t1 - t0
logging.warning(f'Execution time: {t_test:.2f} s')
dict_time_ser.update({sim_len: t_test})
# parallel run
logging.warning('parallel:')
t0 = time()
sp.run_supy(df_forcing_part, df_state_init_m)
t1 = time()
t_test = t1 - t0
logging.warning(f'Execution time: {t_test:.2f} s\n')
dict_time_par.update({sim_len: t_test})
WARNING:root:Sim days: 30.0
WARNING:root:No. of grids: 12
WARNING:root:serial:
WARNING:root:Execution time: 4.07 s
WARNING:root:parallel:
WARNING:root:Execution time: 2.17 s
WARNING:root:Sim days: 90.0
WARNING:root:No. of grids: 12
WARNING:root:serial:
WARNING:root:Execution time: 8.99 s
WARNING:root:parallel:
WARNING:root:Execution time: 4.07 s
WARNING:root:Sim days: 120.0
WARNING:root:No. of grids: 12
WARNING:root:serial:
WARNING:root:Execution time: 10.88 s
WARNING:root:parallel:
WARNING:root:Execution time: 5.08 s
WARNING:root:Sim days: 150.0
WARNING:root:No. of grids: 12
WARNING:root:serial:
WARNING:root:Execution time: 13.29 s
WARNING:root:parallel:
WARNING:root:Execution time: 5.80 s
WARNING:root:Sim days: 180.0
WARNING:root:No. of grids: 12
WARNING:root:serial:
WARNING:root:Execution time: 15.47 s
WARNING:root:parallel:
WARNING:root:Execution time: 6.70 s
WARNING:root:Sim days: 270.0
WARNING:root:No. of grids: 12
WARNING:root:serial:
WARNING:root:Execution time: 22.23 s
WARNING:root:parallel:
WARNING:root:Execution time: 9.84 s
WARNING:root:Sim days: 365.0
WARNING:root:No. of grids: 12
WARNING:root:serial:
WARNING:root:Execution time: 29.06 s
WARNING:root:parallel:
WARNING:root:Execution time: 13.65 s
WARNING:root:Sim days: 730.0
WARNING:root:No. of grids: 12
WARNING:root:serial:
WARNING:root:Execution time: 67.05 s
WARNING:root:parallel:
WARNING:root:Execution time: 28.66 s
WARNING:root:Sim days: 1095.0
WARNING:root:No. of grids: 12
WARNING:root:serial:
WARNING:root:Execution time: 98.66 s
WARNING:root:parallel:
WARNING:root:Execution time: 48.68 s
[7]:
df_benchmark = pd.DataFrame([
dict_time_par,
dict_time_ser])\
.transpose()\
.rename(columns={0: 'parallel', 1: 'serial'})
idx_bmk = (df_benchmark.index / 288).astype(int)
df_benchmark.index = idx_bmk.set_names('Length of Simulation Period (day)')
# calculate execution time ratio between parallel and serial runs
ser_ratio = df_benchmark['parallel'] / df_benchmark['serial']
df_benchmark = df_benchmark.assign(ratio=ser_ratio)\
.rename(columns={'ratio': 'ratio (=p/s, right)'})
# show executation times and ratio on plot
ax = df_benchmark.plot(secondary_y='ratio (=p/s, right)',
marker='o',
fillstyle='none')
ax.set_ylabel('Execution Time (s)')
lines = ax.get_lines() + ax.right_ax.get_lines()
ax.legend(lines, [l.get_label() for l in lines], loc='best')
ax.right_ax.set_ylabel('Execution Ratio (=p/s)', color='C2')
ax.right_ax.spines['right'].set_color('C2')
ax.right_ax.tick_params(axis='y', colors='C2')
Surface properties: surface albedo¶
Examine the default albedo values loaded from the sample dataset¶
[8]:
df_state_init.alb
[8]:
ind_dim | (0,) | (1,) | (2,) | (3,) | (4,) | (5,) | (6,) |
---|---|---|---|---|---|---|---|
grid | |||||||
98 | 0.12 | 0.15 | 0.12 | 0.18 | 0.21 | 0.21 | 0.1 |
Copy the initial condition DataFrame
to have a clean slate for our study¶
Note: DataFrame.copy()
defaults to deepcopy
[9]:
df_state_init_test = df_state_init.copy()
Set the Bldg
land cover to 100% for this study¶
[10]:
df_state_init_test.sfr = 0
df_state_init_test.loc[:, ('sfr', '(1,)')] = 1
df_state_init_test.sfr
[10]:
ind_dim | (0,) | (1,) | (2,) | (3,) | (4,) | (5,) | (6,) |
---|---|---|---|---|---|---|---|
grid | |||||||
98 | 0 | 1 | 0 | 0 | 0 | 0 | 0 |
Construct a df_state_init_x
dataframe to perform supy
simulation with specified albedo¶
[11]:
# create a `df_state_init_x` with different surface properties
n_test = 48
list_alb_test = np.linspace(0.1, 0.8, n_test).round(2)
df_state_init_x = df_state_init_test.append(
[df_state_init_test]*(n_test-1), ignore_index=True)
# here we modify surface albedo
df_state_init_x.loc[:, ('alb', '(1,)')] = list_alb_test
Conduct simulations with supy
¶
[12]:
df_forcing_part = df_forcing.loc['2012 01':'2012 07']
df_res_alb_test,df_state_final_x = sp.run_supy(df_forcing_part, df_state_init_x)
Examine the simulation results¶
[13]:
# choose results of July 2012 for analysis
df_res_alb_test_july=df_res_alb_test.SUEWS.unstack(0).loc['2012 7']
df_res_alb_T2_stat = df_res_alb_test_july.T2.describe()
df_res_alb_T2_diff = df_res_alb_T2_stat.transform(
lambda x: x - df_res_alb_T2_stat.iloc[:, 0])
df_res_alb_T2_diff.columns = list_alb_test-list_alb_test[0]
[14]:
ax_temp_diff = df_res_alb_T2_diff.loc[['max', 'mean', 'min']].T.plot()
ax_temp_diff.set_ylabel('$\Delta T_2$ ($^{\circ}}$C)')
ax_temp_diff.set_xlabel(r'$\Delta\alpha$')
ax_temp_diff.margins(x=0.2, y=0.2)
Why a bi-linear \(\Delta \alpha-\Delta T_{2,max}\) relationship?¶
Although the relations for mean and minimum \(T_2\) demonstrate single linear patterns, the one for maximum \(T_2\), interestingly, consists of two linear sections.
[15]:
df_t2=df_res_alb_test_july.T2
df_t2.columns=list_alb_test
df_t2.idxmax().unique()
[15]:
array(['2012-07-25T13:35:00.000000000', '2012-07-25T15:30:00.000000000'],
dtype='datetime64[ns]')
By looking into the peaking times of \(T_{2,max}\), we see a shift in the peaking times from 13:35 to 15:30 on 2012-07-25 as albedo increases. Taking the two ending cases, \(\alpha=0.1\) and \(\alpha=0.8\), we see diurnal cycles of \(T_2\) evolves according to the albedo: peak is delayed as albedo increases.
[16]:
df_t2.loc['2012-07-25'].iloc[:,[0,-1]].plot()
[16]:
<matplotlib.axes._subplots.AxesSubplot at 0x7f9630da73c8>
Furthermore, when the \(\Delta \alpha-\Delta T_{2}\) relations at the two peaking times are shown below, we can see the bi-linear relation based on the \(T_{2,max}\) values for the July 2012 is actually composed of two linear relations at different times under different peaking scenarios.
[17]:
ax_t2_max=df_t2.loc['2012-07-25 13:35':'2012-07-25 15:30'].iloc[[0,-1]].T.plot()
ax_t2_max.set_xlabel(r'$\alpha$')
ax_t2_max.set_ylabel('$T_{2,max}$ ($^{\circ}}$C)')
[17]:
Text(0, 0.5, '$T_{2,max}$ ($^{\\circ}}$C)')
Background climate: air temperature¶
Examine the monthly climatology of air temperature loaded from the sample dataset¶
[18]:
df_plot = df_forcing.Tair.iloc[:-1].resample('1m').mean()
ax_temp = df_plot.plot.bar(color='tab:blue')
ax_temp.set_xticklabels(df_plot.index.strftime('%b'))
ax_temp.set_ylabel('Mean Air Temperature ($^\degree$C)')
ax_temp.set_xlabel('Month')
ax_temp
[18]:
<matplotlib.axes._subplots.AxesSubplot at 0x7f9630d7f668>
Construct a function to perform parallel supy
simulation with specified diff_airtemp_test
: the difference in air temperature between the one used in simulation and loaded from sample dataset.¶
Note: forcing data ``df_forcing`` has different data structure from ``df_state_init``; so we need to modify ``run_supy_mgrids`` to implement a ``run_supy_mclims`` for different climate scenarios
Let’s start the implementation of run_supy_mclims
with a small problem of four forcing groups (i.e., climate scenarios), where the air temperatures differ from the baseline scenario with a constant bias.
[19]:
# save loaded sample datasets
df_forcing_part_test = df_forcing.loc['2012 1':'2012 7'].copy()
df_state_init_test = df_state_init.copy()
[20]:
# create a dict with four forcing conditions as a test
n_test = 4
list_TairDiff_test = np.linspace(0., 2, n_test).round(2)
dict_df_forcing_x = {
tairdiff: df_forcing_part_test.copy()
for tairdiff in list_TairDiff_test}
for tairdiff in dict_df_forcing_x:
dict_df_forcing_x[tairdiff].loc[:, 'Tair'] += tairdiff
dd_forcing_x = {
k: delayed(sp.run_supy)(df, df_state_init_test)[0]
for k, df in dict_df_forcing_x.items()}
df_res_tairdiff_test0 = delayed(pd.concat)(
dd_forcing_x,
keys=list_TairDiff_test,
names=['tairdiff'],
)
[21]:
# test the performance of a parallel run
t0 = time()
df_res_tairdiff_test = df_res_tairdiff_test0\
.compute(scheduler='threads')\
.reset_index('grid', drop=True)
t1 = time()
t_par = t1 - t0
print(f'Execution time: {t_par:.2f} s')
Execution time: 6.83 s
[22]:
# function for multi-climate `run_supy`
# wrapping the above code into one
def run_supy_mclims(df_state_init, dict_df_forcing_mclims):
dd_forcing_x = {
k: delayed(sp.run_supy)(df, df_state_init_test)[0]
for k, df in dict_df_forcing_x.items()}
df_output_mclims0 = delayed(pd.concat)(
dd_forcing_x,
keys=list(dict_df_forcing_x.keys()),
names=['clm'],
).compute(scheduler='threads')
df_output_mclims = df_output_mclims0.reset_index('grid', drop=True)
return df_output_mclims
Construct dict_df_forcing_x
with multiple forcing DataFrame
s¶
[23]:
# save loaded sample datasets
df_forcing_part_test = df_forcing.loc['2012 1':'2012 7'].copy()
df_state_init_test = df_state_init.copy()
# create a dict with a number of forcing conditions
n_test = 24 # can be set with a smaller value to save simulation time
list_TairDiff_test = np.linspace(0., 2, n_test).round(2)
dict_df_forcing_x = {
tairdiff: df_forcing_part_test.copy()
for tairdiff in list_TairDiff_test}
for tairdiff in dict_df_forcing_x:
dict_df_forcing_x[tairdiff].loc[:, 'Tair'] += tairdiff
Perform simulations¶
[24]:
# run parallel simulations using `run_supy_mclims`
t0 = time()
df_airtemp_test_x = run_supy_mclims(df_state_init_test, dict_df_forcing_x)
t1 = time()
t_par = t1-t0
print(f'Execution time: {t_par:.2f} s')
Execution time: 38.52 s
Examine the results¶
[25]:
df_airtemp_test = df_airtemp_test_x.SUEWS.unstack(0)
df_temp_diff=df_airtemp_test.T2.transform(lambda x: x - df_airtemp_test.T2[0.0])
df_temp_diff_ana=df_temp_diff.loc['2012 7']
df_temp_diff_stat=df_temp_diff_ana.describe().loc[['max', 'mean', 'min']].T
[26]:
ax_temp_diff_stat=df_temp_diff_stat.plot()
ax_temp_diff_stat.set_ylabel('$\\Delta T_2$ ($^{\\circ}}$C)')
ax_temp_diff_stat.set_xlabel('$\\Delta T_{a}$ ($^{\\circ}}$C)')
ax_temp_diff_stat.set_aspect('equal')
The \(T_{2}\) results indicate the increased \(T_{a}\) has different impacts on the \(T_{2}\) metrics (minimum, mean and maximum) but all increase linearly with \(T_{a}.\) The maximum \(T_{2}\) has the stronger response compared to the other metrics.