# suppress pandas warnings
from supy_driver import meteo
from supy_driver import atmmoiststab_module as stab
import os
from .._env import logger_supy
from numpy import cos, deg2rad, sin, sqrt
import pandas as pd
import numpy as np
from pathlib import Path
import time
import warnings
warnings.simplefilter(action="ignore", category=FutureWarning)
################################################
# more ERA-5 related functions
################################################
# utility functions
def roundPartial(value, resolution):
return round(value / resolution) * resolution
"""Geopotential Functions on WGS84 Reference Ellipsoid
This module contains code for converting Geopotential to Geometric and vice-versa on the WGS84 reference ellipsoid
ERA-5 utility functions from Chris Roth
# https://pypi.org/project/eratools/
"""
Rmax_WGS84 = 6378137
Rmin_WGS84 = Rmax_WGS84 * (1 - 1 / 298.257223563)
def _geoid_radius(latitude: float) -> float:
"""Calculates the GEOID radius at a given latitude
Parameters
----------
latitude : float
Latitude (degrees)
Returns
-------
R : float
GEOID Radius (meters)
"""
lat = deg2rad(latitude)
return sqrt(1 / (cos(lat) ** 2 / Rmax_WGS84 ** 2 + sin(lat) ** 2 / Rmin_WGS84 ** 2))
def geometric2geopotential(z: float, latitude: float) -> float:
"""Converts geometric height to geopotential height
Parameters
----------
z : float
Geometric height (meters)
latitude : float
Latitude (degrees)
Returns
-------
h : float
Geopotential Height (meters) above the reference ellipsoid
"""
twolat = deg2rad(2 * latitude)
g = 9.80616 * (1 - 0.002637 * cos(twolat) + 0.0000059 * cos(twolat) ** 2)
re = _geoid_radius(latitude)
return z * g * re / (re + z)
def geopotential2geometric(h: float, latitude: float) -> float:
"""Converts geopotential height to geometric height
Parameters
----------
h : float
Geopotential height (meters)
latitude : float
Latitude (degrees)
Returns
-------
z : float
Geometric Height (meters) above the reference ellipsoid
"""
twolat = deg2rad(2 * latitude)
g = 9.80616 * (1 - 0.002637 * cos(twolat) + 0.0000059 * cos(twolat) ** 2)
re = _geoid_radius(latitude)
return h * re / (g * re - h)
# functions to interpolate the atmospheric variables to a specified height/altitude
def get_ser_val_alt(lat: float, lon: float, da_alt_x, da_alt, da_val,) -> pd.Series:
"""interpolate atmospheric variable to a specified altitude
Parameters
----------
lat : float
latitude of specified site
lon : float
longitude of specified site
da_alt_x
desired altitude to interpolate variable at
da_alt
altitude associated with `da_val`: variable array to interpolate
da_val
atmospheric variable to interpolate
Returns
-------
pd.Series
interpolated values at the specified altitude of site positioned by [`lat`, `lon`]
"""
from scipy.interpolate import interp1d
alt_t_1d = da_alt.sel(latitude=lat, longitude=lon, method="nearest")
val_t_1d = da_val.sel(latitude=lat, longitude=lon, method="nearest")
alt_x = da_alt_x.sel(latitude=lat, longitude=lon, method="nearest")[0]
val_alt = np.array(
[interp1d(alt_1d, val_1d)(alt_x) for alt_1d, val_1d in zip(alt_t_1d, val_t_1d)]
)
ser_alt = pd.Series(val_alt, index=da_val.time.values, name=da_val.name,)
return ser_alt
def get_df_val_alt(lat: float, lon: float, da_alt_meas, ds_val):
"""interpolate atmospheric variables to a specified altitude
Parameters
----------
lat : float
latitude of specified site
lon : float
longitude of specified site
da_alt_x
desired altitude to interpolate variable at
da_alt
altitude associated with `da_val`: variable array to interpolate
da_val
atmospheric varialble to interpolate
Returns
-------
pd.DataFrame
interpolated values at the specified altitude of site positioned by [`lat`, `lon`]
"""
from scipy.interpolate import interp1d
da_alt = geopotential2geometric(ds_val.z, ds_val.latitude)
# generate pressure series for grid x
da_alt_x = da_alt.sel(latitude=lat, longitude=lon, method="nearest")
alt_meas_x = da_alt_meas.sel(latitude=lat, longitude=lon, method="nearest")[0]
val_pres = np.array([interp1d(alt, da_alt_x.level)(alt_meas_x) for alt in da_alt_x])
df_val_alt = pd.concat(
[
get_ser_val_alt(lat, lon, da_alt_meas, da_alt, ds_val[var])
for var in ds_val.data_vars
],
axis=1,
)
# add pressure
df_val_alt["p"] = val_pres
df_val_alt.index = df_val_alt.index.set_names("time")
df_val_alt.columns = df_val_alt.columns.set_names("var")
return df_val_alt
# cds download related functions
def gen_dict_dt(dt_index):
list_key = ["year", "month", "day", "time"]
list_fmt = ["%Y", "%m", "%d", "%H:%M"]
dict_dt = {
k: dt_index.dt.strftime(fmt).unique().tolist()
for k, fmt in zip(list_key, list_fmt)
}
return dict_dt
def gen_dict_dt_sub(dt_index):
# divide by [year, month] for surface level data
ser_dict_sub = dt_index.groupby(dt_index.dt.strftime("%Y%m")).apply(gen_dict_dt)
dict_sub = ser_dict_sub.unstack().T.to_dict()
return dict_sub
# generate filename
def gen_fn(dict_x):
lat_x, lon_x = dict_x["area"][:2]
yr_x = dict_x["year"][0]
mon_x = dict_x["month"][0]
type_x = "sfc" if "orography" in dict_x["variable"] else "ml"
lat_x = f"{lat_x}N" if lat_x > 0 else f"{-lat_x}S"
lon_x = f"{lon_x}E" if lon_x > 0 else f"{-lon_x}W"
fn_x = f"{lat_x}{lon_x}-{yr_x}{mon_x}-{type_x}.nc"
return fn_x
# dict_x: a dict describing download elements
def gen_dict_proc(dict_x):
type_x = "sfc" if "orography" in dict_x["variable"] else "ml"
dict_feed = {
"sfc": "reanalysis-era5-single-levels",
"ml": "reanalysis-era5-pressure-levels",
}
feed_x = dict_feed[type_x]
dict_proc = dict(name=feed_x, request=dict_x, target=gen_fn(dict_x),)
return dict_proc
list_var_sfc = [
"10m_u_component_of_wind",
"10m_v_component_of_wind",
"2m_dewpoint_temperature",
"2m_temperature",
"orography",
"surface_pressure",
"surface_solar_radiation_downwards",
"surface_thermal_radiation_downwards",
"surface_sensible_heat_flux",
"surface_latent_heat_flux",
"surface_net_solar_radiation",
"surface_net_thermal_radiation",
"total_precipitation",
"forecast_albedo",
"forecast_surface_roughness",
"friction_velocity",
]
list_var_ml = [
"geopotential",
# 'relative_humidity',
"specific_humidity",
"temperature",
"u_component_of_wind",
"v_component_of_wind",
]
list_pres_level = [
"1",
"2",
"3",
"5",
"7",
"10",
"20",
"30",
"50",
"70",
"100",
"125",
"150",
"175",
"200",
"225",
"250",
"300",
"350",
"400",
"450",
"500",
"550",
"600",
"650",
"700",
"750",
"775",
"800",
"825",
"850",
"875",
"900",
"925",
"950",
"975",
"1000",
]
# generate a dict of reqs kwargs for (lat_x,lon_x) spanning [start, end]
def gen_req_sfc(lat_x, lon_x, start, end, grid=[0.125, 0.125], scale=0):
"""generate a dict of reqs kwargs for (lat_x,lon_x) spanning [start, end]
Parameters
----------
lat_x : [type]
[description]
lon_x : [type]
[description]
start : [type]
[description]
end : [type]
[description]
grid : list, optional
[description] (the default is [0.125, 0.125], which [default_description])
scale : int, optional
[description] (the default is 0, which [default_description])
Returns
-------
[type]
[description]
Examples
--------
>>> gen_req_sfc(28, 116, '2015-01', '2015-01-31 23', grid=[0.125, 0.125], scale=0)
"""
# scale is a factor to rescale grid size
size = grid[0] * scale
# generate pd.Series for timestamps
ser_datetime = pd.date_range(start, end, freq="1h").to_series()
# surface requests
lat_c, lon_c = (roundPartial(x, grid[0]) for x in [lat_x, lon_x])
area = [lat_c + size, lon_c - size, lat_c - size, lon_c + size]
dict_req_sfc = {
"variable": list_var_sfc,
"product_type": "reanalysis",
"area": area,
"grid": grid,
"format": "netcdf",
}
list_dict_req_sfc = [
{**dict_req_sfc, **dict_dt}
for dict_dt in list(gen_dict_dt_sub(ser_datetime).values())
]
dict_req_sfc = {
gen_fn(dict_req): gen_dict_proc(dict_req) for dict_req in list_dict_req_sfc
}
return dict_req_sfc
def sel_list_pres(ds_sfc_x):
"""
select proper levels for model level data download
"""
p_min, p_max = ds_sfc_x.sp.min().values, ds_sfc_x.sp.max().values
# adjust p_max (p_min) if level for p_max (p_min) is already below (above) that of 1000 (975) hPa
p_max = p_max if p_max < 1000e2 else 1000e2 - 1
p_min = p_min if p_min < 900e2 else 900e2 + 1
list_pres_level = [
"1",
"2",
"3",
"5",
"7",
"10",
"20",
"30",
"50",
"70",
"100",
"125",
"150",
"175",
"200",
"225",
"250",
"300",
"350",
"400",
"450",
"500",
"550",
"600",
"650",
"700",
"750",
"775",
"800",
"825",
"850",
"875",
"900",
"925",
"950",
"975",
"1000",
]
ser_pres_level = pd.Series(list_pres_level).map(int) * 100
pos_lev_max, pos_lev_min = (
ser_pres_level[ser_pres_level > p_max].idxmin(),
ser_pres_level[ser_pres_level < p_min].idxmax(),
)
list_pres_sel = ser_pres_level.loc[pos_lev_min:pos_lev_max] / 100
list_pres_sel = list_pres_sel.map(int).map(str).to_list()
return list_pres_sel
# for each sfc data file, determine the necessary vertical levels to model level data download
def gen_req_ml(fn_sfc, grid=[0.125, 0.125], scale=0):
import xarray as xr
ds_sfc_x = xr.load_dataset(fn_sfc)
list_pres_sel = sel_list_pres(ds_sfc_x)
size = grid[0] * scale
lat_x, lon_x = ds_sfc_x.latitude.values[0], ds_sfc_x.longitude.values[0]
lat_c, lon_c = (roundPartial(x, grid[0]) for x in [lat_x, lon_x])
area = [lat_c + size, lon_c - size, lat_c - size, lon_c + size]
idx_time = ds_sfc_x.time.to_pandas()
dict_dt = list(gen_dict_dt_sub(idx_time).values())[0]
# model level requests
dict_req_ml = {
"variable": list_var_ml,
"product_type": "reanalysis",
"area": area,
"grid": grid,
"format": "netcdf",
}
dict_req_ml.update({"level": list_pres_sel})
dict_req_ml.update(dict_dt)
dict_req_ml = {gen_fn(dict_req_ml): gen_dict_proc(dict_req_ml)}
# close nc files
# ds_sfc_x.close()
return dict_req_ml
def download_cds(fn, dict_req):
import cdsapi
import tempfile
c = cdsapi.Client()
path_fn = Path(fn)
if path_fn.exists():
logger_supy.warning(f"{fn} exists!")
else:
logger_supy.info(f"To download: {fn}")
# this will download the file to a secure temporary directory without requirement for extra permission
td=tempfile.gettempdir()
os.chdir(td)
c.retrieve(**dict_req)
# move the downloaded file to desired location
Path(path_fn.name).replace(fn)
# hold on a bit for the next request
time.sleep(0.0100)
[docs]def download_era5(
lat_x: float,
lon_x: float,
start: str,
end: str,
dir_save=Path("."),
grid=[0.125, 0.125],
scale=0,
) -> dict:
"""Generate ERA-5 cdsapi-based requests and download data for area of interests.
Parameters
----------
lat_x : float
Latitude of centre at the area of interest.
lon_x : float
Longitude of centre at the area of interest.
start : str
Any datetime-like string that can be parsed by `pandas.daterange()`.
end : str
Any datetime-like string that can be parsed by `pandas.daterange()`.
grid : list, optional
grid size used in CDS request API, by default [0.125, 0.125].
scale : int, optional
scaling factor that determines the area of interest (i.e., `area=grid[0]*scale`), by default 0.
dir_save: Path or path-like string
path to directory for saving downloaded ERA5 netCDF files.
Returns
-------
dict
key: name of downloaded file.
value: CDS API request used for downloading the file named by the corresponding key.
Note
----
This function uses CDS API to download ERA5 data; follow this for configuration first: https://cds.climate.copernicus.eu/api-how-to
"""
# generate requests for surface level data
dict_req_sfc = gen_req_sfc(lat_x, lon_x, start, end, grid=[0.125, 0.125], scale=0,)
# parse and create (if needed) the saving directory
path_dir_save = Path(dir_save).expanduser().resolve()
if not path_dir_save.exists():
path_dir_save.mkdir(parents=True)
for fn_sfc, dict_req in dict_req_sfc.items():
download_cds(path_dir_save / fn_sfc, dict_req)
dict_req_ml = {}
for fn_sfc in dict_req_sfc.keys():
dict_req = gen_req_ml(path_dir_save / fn_sfc, grid, scale)
dict_req_ml.update(dict_req)
for fn_ml, dict_req in dict_req_ml.items():
download_cds(path_dir_save / fn_ml, dict_req)
dict_req_all = {**dict_req_sfc, **dict_req_ml}
dict_req_all = {
str(path_dir_save / fn): dict_req for fn, dict_req in dict_req_all.items()
}
return dict_req_all
# generate requests
def gen_req_era5(
lat_x: float,
lon_x: float,
start: str,
end: str,
grid=[0.125, 0.125],
scale=0,
dir_save=Path("."),
) -> dict:
"""Generate ERA-5 cdsapi-based requests and download data for area of interests.
Parameters
----------
lat_x : float
Latitude of centre at the area of interest.
lon_x : float
Longitude of centre at the area of interest.
start : str
Any datetime-like string that can be parsed by `pandas.daterange()`
end : str
Any datetime-like string that can be parsed by `pandas.daterange()`
grid : list, optional
grid size used in CDS request API, by default [0.125, 0.125]
scale : int, optional
scaling factor that determines the area of interest (i.e., `area=grid[0]*scale`), by default 0
Returns
-------
dict
key: name of downloaded file
value: CDS API request used for downloading the file named by the corresponding key
Note
----
1. This function uses CDS API to download ERA5 data; follow this for configuration first: https://cds.climate.copernicus.eu/api-how-to
"""
# path to directory for saving results
path_dir_save = Path(dir_save).expanduser().resolve()
# generate requests for surface level data
dict_req_sfc = gen_req_sfc(lat_x, lon_x, start, end, grid=[0.125, 0.125], scale=0,)
# generate requests for atmospheric level data
dict_req_ml = {}
for fn_sfc in dict_req_sfc.keys():
dict_req = gen_req_ml(path_dir_save / fn_sfc, grid, scale)
dict_req_ml.update(dict_req)
# collect all requests
dict_req_all = {**dict_req_sfc, **dict_req_ml}
dict_req_all = {
str(path_dir_save / fn): dict_req for fn, dict_req in dict_req_all.items()
}
return dict_req_all
# load downloaded files
def load_filelist_era5(
lat_x: float,
lon_x: float,
start: str,
end: str,
grid=[0.125, 0.125],
scale=0,
dir_save=Path("."),
):
# download data: existing files will be excluded from the downloading list
download_era5(lat_x, lon_x, start, end, dir_save, grid, scale)
# attempt to generate requests
dict_req_all = gen_req_era5(lat_x, lon_x, start, end, grid, scale, dir_save)
# downloaded files
list_fn_sfc = [fn for fn in dict_req_all.keys() if fn.endswith("sfc.nc")]
list_fn_ml = [fn for fn in dict_req_all.keys() if fn.endswith("ml.nc")]
return list_fn_sfc, list_fn_ml
# generate supy forcing using ERA-5 data
[docs]def gen_forcing_era5(
lat_x: float,
lon_x: float,
start: str,
end: str,
dir_save=Path("."),
grid=[0.125, 0.125],
hgt_agl_diag=100.0,
scale=0,
simple_mode=True,
) -> list:
"""Generate SUEWS forcing files using ERA-5 data.
Parameters
----------
lat_x : float
Latitude of centre at the area of interest.
lon_x : float
Longitude of centre at the area of interest.
start : str
Any datetime-like string that can be parsed by `pandas.daterange()`.
end : str
Any datetime-like string that can be parsed by `pandas.daterange()`.
dir_save: Path or path-like string
path to directory for saving downloaded ERA5 netCDF files.
grid : list, optional
grid size used in CDS request API, by default [0.125, 0.125].
hgt_agl_diag: float
height above ground level to diagnose forcing variables, by default 0; the ground level is taken from ERA5 grid altitude.
scale : int, optional
scaling factor that determines the area of interest (i.e., `area=grid[0]*scale`),
by default 0
simple_mode: boolean
if use the *simple* mode for diagnosing the forcing variables, by default `True`.
In the simple mode, temperature is diagnosed using environmental lapse rate 6.5 K/km and wind speed using MOST under neutral condition.
If `False`, MOST with consideration of stability conditions will be used to diagnose forcing variables.
Returns
-------
List
A list of files in SUEWS forcing input format.
Note
----
1. This function uses CDS API to download ERA5 data; follow this for configuration first: https://cds.climate.copernicus.eu/api-how-to
2. The generated forcing files can be imported using `supy.util.read_forcing` to get simulation-ready `pandas.DataFrame`s.
3. See Section 3.10.2 and 3.10.3 in the reference for details of diagnostics calculation.
Reference
---------
ECMWF, S. P. (2016). In IFS documentation CY41R2 Part IV: Physical Processes. ECMWF: Reading, UK, 111-113. https://www.ecmwf.int/en/elibrary/16648-part-iv-physical-processes
"""
import xarray as xr
# download data
list_fn_sfc, list_fn_ml = load_filelist_era5(
lat_x, lon_x, start, end, grid, scale, dir_save
)
# load data from from `sfc` files
ds_sfc = xr.open_mfdataset(list_fn_sfc, concat_dim="time").load()
# close dangling handlers
ds_sfc.close()
# generate diagnostics at a higher level
ds_diag = gen_ds_diag_era5(list_fn_sfc, list_fn_ml, hgt_agl_diag, simple_mode)
# merge diagnostics above with surface variables
ds_forcing_era5 = ds_sfc.merge(ds_diag)
# convert to dataframe for further processing
df_forcing_raw = ds_forcing_era5[
[
"ssrd",
"strd",
"sshf",
"slhf",
"tp",
"uv_z",
"theta_z",
"q_z",
"p_z",
"alt_z",
]
].to_dataframe()
# split based on grid coordinates
grp_grid = df_forcing_raw.groupby(level=["latitude", "longitude"])
# generate dataframe acceptable by supy
df_forcing = grp_grid.apply(
lambda df: format_df_forcing(
df.reset_index(["latitude", "longitude"], drop=True)
)
)
# save results as SUEWS met input files
list_fn = save_forcing_era5(df_forcing, dir_save)
return list_fn
# format dataframe to SUEWS convention
def format_df_forcing(df_forcing_raw):
from atmosp import calculate as ac
df_forcing_grid = df_forcing_raw.copy().round(3)
# convert energy fluxes: [J m-2] to [W m-2]
df_forcing_grid.loc[:, ["ssrd", "strd", "sshf", "slhf"]] /= 3600
# reverse the sign of qh and qe
df_forcing_grid.loc[:, ["sshf", "slhf"]] *= -1
# convert rainfall: from [m] to [mm]
df_forcing_grid.loc[:, "tp"] *= 1000
# get dry bulb temperature and relative humidity
df_forcing_grid = df_forcing_grid.assign(
Tair=ac(
"T",
qv=df_forcing_grid.q_z,
theta=df_forcing_grid.theta_z,
p=df_forcing_grid.p_z,
)
- 273.15
)
df_forcing_grid = df_forcing_grid.assign(
RH=ac(
"RH",
qv=df_forcing_grid.q_z,
theta=df_forcing_grid.theta_z,
p=df_forcing_grid.p_z,
)
)
# convert atmospheric pressure: [Pa] to [kPa]
df_forcing_grid.loc[:, "p_z"] /= 1000
# renaming for consistency with SUEWS
df_forcing_grid = df_forcing_grid.rename(
{
"ssrd": "kdown",
"strd": "ldown",
"sshf": "qh",
"slhf": "qe",
"tp": "rain",
"uv_z": "U",
"p_z": "pres",
},
axis=1,
)
col_suews = [
"iy",
"id",
"it",
"imin",
"qn",
"qh",
"qe",
"qs",
"qf",
"U",
"RH",
"Tair",
"pres",
"rain",
"kdown",
"snow",
"ldown",
"fcld",
"Wuh",
"xsmd",
"lai",
"kdiff",
"kdir",
"wdir",
"alt_z",
]
df_forcing_grid=df_forcing_grid.reindex(col_suews,axis=1)
df_forcing_grid = df_forcing_grid.assign(
iy=df_forcing_grid.index.year,
id=df_forcing_grid.index.dayofyear,
it=df_forcing_grid.index.hour,
imin=df_forcing_grid.index.minute,
)
# corrections
df_forcing_grid.loc[:, "RH"] = df_forcing_grid.loc[:, "RH"].where(
df_forcing_grid.loc[:, "RH"].between(0.001, 105), 105
)
df_forcing_grid.loc[:, "kdown"] = df_forcing_grid.loc[:, "kdown"].where(
df_forcing_grid.loc[:, "kdown"] > 0, 0
)
# trim decimals
df_forcing_grid.iloc[:, 4:] = df_forcing_grid.iloc[:, 4:].round(2)
df_forcing_grid = df_forcing_grid.replace(np.nan, -999).asfreq("1h")
return df_forcing_grid
# generate supy forcing using ERA-5 data
def gen_ds_diag_era5(list_fn_sfc, list_fn_ml, hgt_agl_diag=100, simple_mode=True):
import xarray as xr
from atmosp import calculate as ac
# list_fn_sfc, list_fn_ml = load_download_era5(
# lat_x, lon_x, start, end, grid, scale, dir_save)
# load data from from `sfc` files
ds_sfc = xr.open_mfdataset(list_fn_sfc, concat_dim="time").load()
# close dangling handlers
ds_sfc.close()
# load data from from `ml` files
ds_ml = xr.open_mfdataset(list_fn_ml, concat_dim="time").load()
# close dangling handlers
ds_ml.close()
# surface level atmospheric pressure
pres_z0 = ds_sfc.sp
# hgt_agl_diag: height where to calculate diagnostics
# hgt_agl_diag = 100
# determine a lowest level higher than surface at all times
level_sel = get_level_diag(ds_sfc, ds_ml, hgt_agl_diag)
# retrieve variables from the identified lowest level
ds_ll = ds_ml.sel(
time=ds_ml.time, level=xr.DataArray(level_sel.values, dims="time")
)
# altitude
alt_z0 = geopotential2geometric(ds_sfc.z, ds_sfc.latitude)
alt_za = geopotential2geometric(ds_ll.z, ds_ll.latitude)
# atmospheric pressure [Pa]
pres_za = pres_z0 * 0 + ds_ll.level * 100
# u-wind [m s-1]
u_za = ds_ll.u
# u-wind [m s-1]
v_za = ds_ll.v
# wind speed [m s-1]
uv_za = np.sqrt(u_za ** 2 + v_za ** 2)
# potential temperature [K]
theta_za = ds_ll.t
# specific humidity [kg kg-1]
q_za = ds_ll.q
# ------------------------
# retrieve surface data
# wind speed
u10 = ds_sfc.u10
v10 = ds_sfc.v10
uv10 = np.sqrt(u10 ** 2 + v10 ** 2)
# sensible/latent heat flux [W m-2]
# conversion from cumulative value to hourly average
qh = -ds_sfc.sshf / 3600
qe = -ds_sfc.slhf / 3600
# surface roughness [m]
z0m = ds_sfc.fsr
# friction velocity [m s-1]
ustar = ds_sfc.zust
# air temperature
t2 = ds_sfc.t2m
# dew point
d2 = ds_sfc.d2m
# specific humidity
q2 = ac("qv", Td=d2, T=t2, p=pres_z0)
# diagnose wind, temperature and humidity at 100 m agl or `hgt_agl_max` (see below)
# conform dimensionality using an existing variable
za = alt_za - alt_z0
z = za * 0 + hgt_agl_diag
da_alt_z = (alt_z0 + z).rename("alt_z")
ds_alt_z = da_alt_z.to_dataset()
# get dataset of diagnostics
if simple_mode:
ds_diag = diag_era5_simple(z0m, ustar, pres_z0, uv10, t2, q2, z)
else:
ds_diag = diag_era5(
za,
uv_za,
theta_za,
q_za,
pres_za,
qh,
qe,
z0m,
ustar,
pres_z0,
uv10,
t2,
q2,
z,
)
# merge altitude
ds_diag = ds_diag.merge(ds_alt_z).drop("level")
return ds_diag
# a simple way to diagnose variables at a higher level
def diag_era5_simple(z0m, ustar, pres_z0, uv10, t2, q2, z):
from atmosp import calculate as ac
import xarray as xr
from ._atm import cal_lat_vap, cal_cp, cal_psi_mom, cal_psi_heat
# constants
# environmental lapse rate [K m^-1]
env_lapse = 6.5 / 1000.0
# gravity [m s^-2]
grav = 9.80616
# Gas constant for dry air [J K^-1 kg^-1]
rd = 287.04
# correct temperature using lapse rate
t_z = t2 - (z - 2) * env_lapse
# barometric equation with varying temperature:
# (https://en.wikipedia.org/wiki/Barometric_formula)
# p_z = pres_z0 * np.exp((grav * (0 - z)) / (rd * t2))
p_z = pres_z0 * (t2 / (t2 + env_lapse * (z - 2))) ** (grav / (rd * env_lapse))
# correct humidity assuming invariable relative humidity
RH_z = ac("RH", qv=q2, p=pres_z0, theta=t2) + 0 * t_z
q_z = ac("qv", RH=RH_z, p=p_z, theta=t_z) + 0 * t_z
# correct wind speed using log law; assuming neutral condition (without stability correction)
uv_z = uv10 * (np.log((z + z0m) / z0m) / np.log((10 + z0m) / z0m))
# generate dataset
ds_diag = xr.merge(
[
uv_z.rename("uv_z"),
t_z.rename("theta_z"),
q_z.rename("q_z"),
RH_z.rename("RH_z"),
p_z.rename("p_z"),
]
)
return ds_diag
# diagnose ISL variable using MOST
def diag_era5(
za, uv_za, theta_za, q_za, pres_za, qh, qe, z0m, ustar, pres_z0, uv10, t2, q2, z
):
# reference:
# Section 3.10.2 and 3.10.3 in
# IFS Documentation CY41R2: Part IV: Physical Processes
# https://www.ecmwf.int/en/elibrary/16648-part-iv-physical-processes
from atmosp import calculate as ac
import xarray as xr
from ._atm import cal_lat_vap, cal_cp, cal_psi_mom, cal_psi_heat
# von Karman constant
kappa = 0.4
# gravity acceleration
g = 9.8
# note the roughness correction: see EC technical report
z0m = np.where(z0m < 0.03, z0m, 0.03)
# air density
avdens = ac("rho", qv=q2, p=pres_z0, theta=t2)
# vapour pressure
lv_j_kg = cal_lat_vap(q2, t2, pres_z0)
# heat capacity
avcp = cal_cp(q2, t2, pres_z0)
# temperature/humidity scales
tstar = -qh / (avcp * avdens) / ustar
# qstar = -qe / (lv_j_kg * avdens) / ustar
l_mod = ustar ** 2 / (g / t2 * kappa * tstar)
zoL = np.where(
np.abs((z + z0m) / l_mod) < 5,
(z + z0m) / l_mod,
np.sign((z + z0m) / l_mod) * 5,
)
# l_mod = np.where(np.abs(l_mod) < 5, l_mod, np.sign(l_mod)*5)
# `stab_psi_mom`, `stab_psi_heat`
# stability correction for momentum
psim_z = cal_psi_mom(zoL)
psim_z0 = cal_psi_mom(z0m / l_mod)
psim_10 = cal_psi_mom((10 + z0m) / l_mod)
# wind speed
# uv_z = uv_za * (
# (np.log(z / z0m) - psim_z + psim_z0) / (np.log(za / z0m) - psim_za + psim_z0)
# )
uv_z = uv10 * (
(np.log((z + z0m) / z0m) - psim_z + psim_z0)
/ (np.log((10 + z0m) / z0m) - psim_10 + psim_z0)
)
# uv_z = ustar / kappa * (np.log(z / z0m) - psim_z + psim_z0)
# stability correction for heat
psih_z = cal_psi_heat(zoL)
psih_2 = cal_psi_heat(2 / l_mod)
psih_z0 = cal_psi_heat(z0m / l_mod)
psih_za = cal_psi_heat(za / l_mod)
# atmospheric pressure: assuming same air density at `za`
# using iteration to get `p_z`
p_z = pres_z0 + (pres_za - pres_z0) * z / za
# specific humidity
# q_z = q_za + qstar / kappa * (np.log(z / za) - psih_z + psih_za)
q_z = q2 + (q_za - q2) * (
(np.log(z / z0m) - psih_z + psih_z0) / (np.log(za / z0m) - psih_za + psih_z0)
)
# potential temperature
# theta_z = theta_za + tstar / kappa * (np.log(z / za) - psih_z + psih_za)
# theta_z = t2 + (theta_za - t2) * ((np.log(z / z0m) - psih_z + psih_z0) /
# (np.log(za / z0m) - psih_za + psih_z0))
# dry static energy: eq 3.5 in EC tech report;
# also AMS ref: http://glossary.ametsoc.org/wiki/Dry_static_energy
# 2 m agl:
cp2 = cal_cp(q2, t2, pres_z0 / 100)
cp_za = cal_cp(q_za, theta_za, pres_za / 100)
s2 = g * 2 + cp2 * t2
# za:
t_za = ac("T", qv=q_za, p=pres_za, theta=theta_za)
s_za = g * za + cp_za * t_za
# s_z = s2 + (s_za - s2) * (
# (np.log(z / z0m) - psih_z + psih_z0) / (np.log(za / z0m) - psih_za + psih_z0)
# )
s_z = s2 + (s_za - s2) * (
(np.log(z / 2) - psih_z + psih_2) / (np.log(za / 2) - psih_za + psih_2)
)
# calculate potential temperature at z
theta_z_x = theta_za
dif = 10
while dif > 0.1:
cp_z = cal_cp(q_z, theta_z_x, p_z / 100)
t_z = (s_z - g * z) / cp_z
theta_z = ac("theta", T=t_z, qv=q_z, p=p_z)
dif = np.mean(np.abs(theta_z_x - theta_z))
theta_z_x = theta_z
theta_z = theta_z + theta_za * 0
RH_z = ac("RH", qv=q_z, p=p_z, theta=theta_z) + 0 * q_z
RH_z = RH_z.where(RH_z < 105, 105)
# generate dataset
ds_diag = xr.merge(
[
uv_z.rename("uv_z"),
theta_z.rename("theta_z"),
q_z.rename("q_z"),
RH_z.rename("RH_z"),
p_z.rename("p_z"),
]
)
return ds_diag
# save ERA5 forcing dataframe to SUEWS-simulation ready txt files
def save_forcing_era5(df_forcing_era5, dir_save):
gpb = df_forcing_era5.groupby(["latitude", "longitude"])
list_grid = list(gpb.groups.keys())
list_fn = []
path_dir_save = Path(dir_save)
# split into grids
for lat, lon in list_grid:
df_grid = df_forcing_era5.loc[lat, lon]
s_lat = f"{lat}N" if lat >= 0 else f"{lat}S"
s_lon = f"{lon}E" if lon >= 0 else f"{lon}W"
alt_z = df_grid.alt_z[0]
df_grid = df_grid.drop("alt_z", axis=1)
s_alt = f"{alt_z:.1f}A"
idx_grid = df_grid.index
# split into years
grp_year = df_grid.groupby(idx_grid.year)
for year in grp_year.groups:
df_year = grp_year.get_group(year)
idx_year = df_year.index
s_year = idx_year[0].year
s_freq = idx_year.freq / pd.Timedelta("1T")
s_fn = f"ERA5_UTC-{s_lat}-{s_lon}-{s_alt}_{s_year}_data_{s_freq:.0f}.txt"
path_fn = path_dir_save / s_fn
df_year.to_csv(path_fn, sep=" ", index=False)
# collect file names
list_fn.append(str(path_fn))
return list_fn
def get_level_diag(ds_sfc, ds_ml, hgt_agl_diag):
# get altitude from `sfc` files
da_gph_sfc = ds_sfc.z
da_lat_sfc = da_gph_sfc.latitude
da_alt_sfc = geopotential2geometric(da_gph_sfc, da_lat_sfc)
# get altitude from `ml` files
da_gph_ml = ds_ml.z
da_lat_ml = da_gph_ml.latitude
da_alt_ml = geopotential2geometric(da_gph_ml, da_lat_ml)
# determine a lowest level higher than surface at all times
# hgt_agl_diag = 100
ind_alt = ((da_alt_sfc + hgt_agl_diag) < da_alt_ml).compute()
level_sel = (ind_alt.sum(dim="level") - 1).values.flatten()
level_sel = da_alt_ml.level[level_sel]
return level_sel