Modelling infrastructure exposure and risk¶
This notebook forms the basis of “Hands-On 6” in the CCG course.
It uses the road network and flood dataset extracted in the previous tutorial.
Exposure - overlay sample flood extent with the network and estimate flood depth of exposure
Vulnerability - assume depth-damage curve (fragility curve) for the road and
show how the exposure is translated to damage
create a table with probability, flood depth, length exposed, fragility, cost/km, direct damage
Risk - show a risk calculation on the table and generate the result
Future risk - repeat with climate projections and compare with baseline
By the end of this tutorial you should be able to: * Assess direct damage and indirect disruptions to infrastructure assets * Apply the risk calculation to understand how to generate loss-probability curves * Show how different flood hazards introduce uncertainty in risk estimations
[ ]:
# Imports from Python standard library
import os
from pathlib import Path
# see https://docs.python.org/3/library/glob.html
from glob import glob
# Imports from other Python packages
import geopandas as gpd
# numpy is used by pandas and geopandas to store data in efficient arrays
# we use it in this notebook to help with trapezoidal integration
# see https://numpy.org/
import numpy as np
import pandas as pd
# seaborn helps produce more complex plots
# see https://seaborn.pydata.org/
import seaborn as sns
from scipy.integrate import simpson
import snail.damages
import snail.intersection
import snail.io
from pyproj import Geod
# tqdm lets us show progress bars (and تقدّم means "progress" in Arabic)
# see https://tqdm.github.io/
from tqdm.notebook import tqdm
Change this to point to your data folder as in the previous tutorial:
[ ]:
dir = (
Path(os.getcwd()).resolve().parents[3]
) # get parent directory of snail package
data_folder = os.path.join(dir, "ghana_tutorial")
# data_folder = Path("YOUR_PATH/ghana_tutorial")
1. Exposure¶
List all the hazard files in the flood_layer
folder:
[ ]:
hazard_paths = sorted(
glob(str(data_folder + "/flood_layer/gha/wri_aqueduct_version_2/wri*.tif"))
)
hazard_files = pd.DataFrame({"path": hazard_paths})
hazard_files["key"] = [Path(path).stem for path in hazard_paths]
hazard_files, grids = snail.io.extend_rasters_metadata(hazard_files)
hazard_files.head(5)
[ ]:
assert len(grids) == 1
grid = grids[0]
Read in roads again, then do intersections against all hazard scenarios.
[ ]:
roads_file = data_folder + "/GHA_OSM_roads.gpkg"
roads = gpd.read_file(roads_file, layer="edges")
roads.head(2)
[ ]:
# split roads along hazard data grid
# TODO top-level "overlay_rasters"
# TODO for vector in vectors / for raster in rasters "overlay_raster"
# push into split_linestrings, flag to disable
prepared = snail.intersection.prepare_linestrings(roads)
flood_intersections = snail.intersection.split_linestrings(prepared, grid)
# push into split_linestrings
flood_intersections = snail.intersection.apply_indices(
flood_intersections, grid, index_i="i_0", index_j="j_0"
)
flood_intersections = snail.io.associate_raster_files(
flood_intersections, hazard_files
)
# calculate the length of each stretch of road
# don't include in snail wrapper top-level function
geod = Geod(ellps="WGS84")
flood_intersections["length_m"] = flood_intersections.geometry.apply(
geod.geometry_length
)
[ ]:
# save to file
output_file = os.path.join(
data_folder,
"results",
str(Path(roads_file).name).replace(
".gpkg", "_edges___exposure.geoparquet"
),
)
flood_intersections.to_parquet(output_file)
[ ]:
flood_intersections.columns
[ ]:
data_cols = [col for col in flood_intersections.columns if "wri" in col]
[ ]:
data_cols
[ ]:
# find any max depth and filter > 0
all_intersections = flood_intersections[
flood_intersections[data_cols].max(axis=1) > 0
]
# subset columns
all_intersections = all_intersections.drop(
columns=["osm_id", "name", "from_id", "to_id", "geometry", "i_0", "j_0"]
)
# melt and check again for depth
all_intersections = all_intersections.melt(
id_vars=["id", "split", "road_type", "length_m"],
value_vars=data_cols,
var_name="key",
value_name="depth_m",
).query("depth_m > 0")
all_intersections.head(5)
[ ]:
river = all_intersections[all_intersections.key.str.contains("inunriver")]
coast = all_intersections[all_intersections.key.str.contains("inuncoast")]
coast_keys = coast.key.str.extract(
r"wri_aqueduct-version_2-(?P<hazard>\w+)_(?P<rcp>[^_]+)_(?P<sub>[^_]+)_(?P<epoch>[^_]+)_rp(?P<rp>[^-]+)-gha"
)
coast = pd.concat([coast, coast_keys], axis=1)
river_keys = river.key.str.extract(
r"wri_aqueduct-version_2-(?P<hazard>\w+)_(?P<rcp>[^_]+)_(?P<gcm>[^_]+)_(?P<epoch>[^_]+)_rp(?P<rp>[^-]+)-gha"
)
river = pd.concat([river, river_keys], axis=1)
[ ]:
coast.rp = coast.rp.apply(lambda rp: float(rp.replace("_", ".").lstrip("0")))
coast.head(5)
[ ]:
# river.rp = river.rp.apply(lambda rp: float(rp.replace("_", ".").lstrip("0")))
river.gcm = river.gcm.str.replace("0", "")
river.head(5)
Summarise total length of roads exposed to depth 2m or greater river flooding, under different return periods and climate scenarios:
[ ]:
summary = (
river[river.depth_m >= 2.0]
.drop(columns=["id", "split", "road_type", "key"])
.groupby(["hazard", "rcp", "gcm", "epoch", "rp"])
.sum()
.drop(columns=["depth_m"])
)
summary
Plot exposure against return period, with separate plot areas for each Representative Concentration Pathway (RCP), and different colours for the different Global Climate Models (GCM):
[ ]:
plot_data = summary.reset_index()
plot_data = plot_data[plot_data.epoch.isin(["1980", "2080"])]
plot_data.rp = plot_data.rp.apply(lambda rp: int(rp.lstrip("0")))
plot_data["probability"] = 1 / plot_data.rp
plot_data.head(5)
[ ]:
sns.relplot(
data=plot_data,
x="rp",
y="length_m",
hue="gcm",
col="rcp",
kind="line",
marker="o",
)
2. Vulnerability¶
Set up fragility curve assumptions, where probability of damage (pfail
) depends on whether a road is paved and the depth of flood it is exposed to.
These assumptions are derived from Koks, E.E., Rozenberg, J., Zorn, C. et al. A global multi-hazard risk analysis of road and railway infrastructure assets. Nat Commun 10, 2677 (2019). https://doi.org/10.1038/s41467-019-10442-3, Figure S3, extrapolated to 2m and 3m depths.
The analysis is likely to be highly sensitive to these assumptions, and this approach is strongly limited by the availability and quality of fragility data, as well as the assumption that fragility can be related to flood depth alone - flood water velocity would be an important factor in a more detailed vulnerability assessment.
[ ]:
paved = snail.damages.PiecewiseLinearDamageCurve(
pd.DataFrame(
{
"intensity": [0.0, 0.999999999, 1, 2, 3],
"damage": [0.0, 0.0, 0.1, 0.3, 0.5],
}
)
)
unpaved = snail.damages.PiecewiseLinearDamageCurve(
pd.DataFrame(
{
"intensity": [0.0, 0.999999999, 1, 2, 3],
"damage": [0.0, 0.0, 0.9, 1.0, 1.0],
}
)
)
paved, unpaved
[ ]:
paved.plot(), unpaved.plot()
Set up cost assumptions.
These are taken from Koks et al (2019) again, Table S8, construction costs to be assumed as an estimate of full rehabilitation after flood damage.
Again the analysis is likely to be highly sensitive to these assumptions, which should be replaced by better estimates if available.
[ ]:
costs = pd.DataFrame(
{
"kind": ["paved_four_lane", "paved_two_lane", "unpaved"],
"cost_usd_per_km": [3_800_000, 932_740, 22_780],
}
)
costs
Set up assumptions about which roads are paved or unpaved, and number of lanes.
[ ]:
sorted(river.road_type.unique())
Assume all tertiary
roads are unpaved, all others are paved.
[ ]:
river["paved"] = ~(river.road_type == "tertiary")
[ ]:
def kind(road_type):
if road_type in ("trunk", "trunk_link", "motorway"):
return "paved_four_lane"
elif road_type in ("primary", "primary_link", "secondary"):
return "paved_two_lane"
else:
return "unpaved"
river["kind"] = river.road_type.apply(kind)
[ ]:
river = river.merge(costs, on="kind")
Use the damage curve to estimate proportion_damaged
for each exposed section.
[ ]:
river.head(2)
[ ]:
paved_depths = river.loc[river.paved, "depth_m"]
paved_damage = paved.damage_fraction(paved_depths)
river.loc[river.paved, "proportion_damaged"] = paved_damage
unpaved_depths = river.loc[~river.paved, "depth_m"]
unpaved_damage = paved.damage_fraction(unpaved_depths)
river.loc[~river.paved, "proportion_damaged"] = unpaved_damage
Finally estimate cost of rehabilitation for each exposed section
[ ]:
river["damage_usd"] = river.length_m * river.cost_usd_per_km * 1e-3
river.head(2)
[ ]:
river.to_csv(
os.path.join(data_folder, "results/inunriver_damages_rp.csv"), index=False
)
[ ]:
summary = (
river.drop(
columns=[
"id",
"split",
"length_m",
"key",
"depth_m",
"paved",
"kind",
"cost_usd_per_km",
"proportion_damaged",
]
)
.groupby(["road_type", "hazard", "rcp", "gcm", "epoch", "rp"])
.sum()
)
summary
3. Risk¶
Calculate expected annual damages for each road under historical hazard.
Start by selecting only historical intersections, and keeping only the road ID, return period, and cost of rehabilitation if damaged.
[ ]:
historical = river[river.rcp == "historical"][["id", "rp", "damage_usd"]]
Sum up the expected damage for each road, per return period, then pivot the table to create columns for each return period - now there is one row per road.
[ ]:
historical = historical.groupby(["id", "rp"]).sum().reset_index()
historical = historical.pivot(index="id", columns="rp").replace(
float("NaN"), 0
)
historical.columns = [f"rp{int(rp)}" for _, rp in historical.columns]
historical.head(2)
Calculate expected annual damages, integrating under the expected damage curve over return periods.
[ ]:
def calculate_ead(df):
rp_cols = sorted(
list(df.columns), key=lambda col: 1 / int(col.replace("rp", ""))
)
rps = np.array([int(col.replace("rp", "")) for col in rp_cols])
probabilities = 1 / rps
rp_damages = df[rp_cols]
return simpson(rp_damages, x=probabilities, axis=1)
historical["ead_usd"] = calculate_ead(historical)
historical.head(2)
[ ]:
historical.to_csv(
os.path.join(data_folder, "results/inunriver_damages_ead__historical.csv")
)
4. Future risk¶
Calculate expected annual damages under each future scenario (for each global climate model and representative concentration pathway).
This follows the same method as for historical flooding above, with the added variables of climate model and rcp.
[ ]:
future = river[["id", "rp", "rcp", "gcm", "epoch", "damage_usd"]].copy()
Sum up the expected damage for each road, per return period, gcm and rcp
[ ]:
future = (
future.groupby(["id", "rp", "rcp", "gcm", "epoch"]).sum().reset_index()
)
future.head(2)
Pivot the table to create columns for each return period - now there is one row per road, gcm and rcp.
[ ]:
future = future.pivot(
index=["id", "rcp", "gcm", "epoch"], columns="rp"
).replace(float("NaN"), 0)
future.columns = [f"rp{int(rp)}" for _, rp in future.columns]
future.head(2)
Calculate expected annual damages, integrating under the expected damage curve over return periods.
[ ]:
future["ead_usd"] = calculate_ead(future)
[ ]:
future.to_csv(os.path.join(data_folder, "results/inunriver_damages_ead.csv"))
Pick out an individual road by id, to spot check uncertainty:
[ ]:
future.reset_index().id.unique()
[ ]:
# future.loc["roade_1002"]
Summarise total expected annual (direct) damages, showing variation between climate models and representative concentration pathways.
[ ]:
summary = (
future.reset_index()[["rcp", "gcm", "epoch", "ead_usd"]]
.groupby(["rcp", "gcm", "epoch"])
.sum()
.reset_index()
)
summary.epoch = summary.epoch.astype(int)
summary.head()
[ ]:
sns.lmplot(
data=summary,
col="rcp",
x="epoch",
y="ead_usd",
hue="gcm", # fit_reg=False
)