"""Road Network Flow Model - Functions
This module provides a set of functions to model and analyze road network flows.
It includes utilities for road network initialization, traffic flow simulation,
cost calculations, and network updates. The functions are designed to handle
geospatial data and integrate with igraph for network analysis.
Key Features:
- Road network extraction and classification.
- Traffic speed and flow modeling.
- Cost estimation for vehicle operations and travel time.
- Network flow simulation with iterative updates.
- Integration with multiprocessing for performance optimization.
"""
from typing import Tuple, List, Dict
from collections import defaultdict
from functools import partial
import numpy as np
import pandas as pd
import geopandas as gpd # type: ignore
import igraph # type: ignore
import nird.constants as cons
from nird.utils import get_flow_on_edges
from multiprocessing import Pool
import warnings
import pickle
import time
warnings.simplefilter("ignore")
[docs]
def select_partial_roads(
road_links: gpd.GeoDataFrame,
road_nodes: gpd.GeoDataFrame,
col_name: str,
list_of_values: List[str],
) -> Tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
"""Extract a subset of the road network based on specified road types.
Parameters
----------
road_links: gpd.GeoDataFrame
Geospatial data representing the links (edges) of the road network.
road_nodes: gpd.GeoDataFrame
Geospatial data representing the nodes (vertices) of the road network.
col_name: str
Column name in `road_links` that specifies the road type.
list_of_values: List[str]
List of road types to filter and extract from the network.
Returns
-------
selected_links: gpd.GeoDataFrame
Filtered road links corresponding to the specified road types.
selected_nodes: gpd.GeoDataFrame
Filtered road nodes connected to the selected links.
"""
# road links selection
selected_links = road_links[road_links[col_name].isin(list_of_values)].reset_index(
drop=True
)
selected_links.rename(
columns={"id": "e_id", "start_node": "from_id", "end_node": "to_id"},
inplace=True,
)
# road nodes selection
sel_node_idx = list(
set(selected_links.from_id.tolist() + selected_links.to_id.tolist())
)
selected_nodes = road_nodes[road_nodes.id.isin(sel_node_idx)]
selected_nodes.reset_index(drop=True, inplace=True)
selected_nodes.rename(columns={"id": "nd_id"}, inplace=True)
selected_nodes["lat"] = selected_nodes["geometry"].y
selected_nodes["lon"] = selected_nodes["geometry"].x
return selected_links, selected_nodes
[docs]
def create_urban_mask(
etisplus_urban_roads: gpd.GeoDataFrame,
) -> gpd.GeoDataFrame:
"""Generate a spatial mask for urban areas in Great Britain.
Parameters
----------
etisplus_urban_roads: gpd.GeoDataFrame
Geospatial data of ETISPLUS urban roads (2010 dataset).
Returns
-------
urban_mask: gpd.GeoDataFrame
A geospatial mask identifying urban areas with a binary value (1 for urban).
"""
etisplus_urban_roads = etisplus_urban_roads[
etisplus_urban_roads["Urban"] == 1
].reset_index(drop=True)
buf_geom = etisplus_urban_roads.geometry.buffer(
500
) # create a buffer of 500 meters
uni_geom = buf_geom.unary_union # feature union within the same layer
temp = gpd.GeoDataFrame(geometry=[uni_geom])
new_geom = (
temp.explode()
) # explode multi-polygons into separate individual polygons
cvx_geom = (
new_geom.convex_hull
) # generate convex polygon for each individual polygon
urban_mask = gpd.GeoDataFrame(
geometry=cvx_geom[0], crs=etisplus_urban_roads.crs
).to_crs("27700")
return urban_mask
[docs]
def label_urban_roads(
road_links: gpd.GeoDataFrame, urban_mask: gpd.GeoDataFrame
) -> gpd.GeoDataFrame:
"""Label road links as urban or suburban based on spatial intersection.
Parameters
----------
road_links: gpd.GeoDataFrame
Geospatial data representing the links of the road network.
urban_mask: gpd.GeoDataFrame
Geospatial mask identifying urban areas.
Returns
-------
road_links: gpd.GeoDataFrame
Updated road links with a new column "urban" (1 for urban, 0 for suburban).
"""
temp_file = road_links.sjoin(urban_mask, how="left")
temp_file["urban"] = temp_file["index_right"].apply(
lambda x: 0 if pd.isna(x) else 1
)
max_values = temp_file.groupby("e_id")["urban"].max()
road_links = road_links.merge(max_values, on="e_id", how="left")
return road_links
[docs]
def find_nearest_node(
zones: gpd.GeoDataFrame,
road_nodes: gpd.GeoDataFrame,
zone_id_column: str,
node_id_column: str,
) -> Dict[str, str]:
"""Find the nearest road node for each admin unit.
Parameters
----------
zones: gpd.GeoDataFrame
Admin units.
road nodes: gpd.GeoDataFrame
Nodes of the road network.
Returns
-------
nearest_node_dict: dict
A dictionary to convert from admin units to their attached road nodes.
"""
nearest_nodes = gpd.sjoin_nearest(zones, road_nodes)
nearest_nodes = nearest_nodes.drop_duplicates(subset=[zone_id_column], keep="first")
nearest_node_dict = dict(
zip(nearest_nodes[zone_id_column], nearest_nodes[node_id_column])
)
return nearest_node_dict # zone_idx: node_idx
[docs]
def voc_func(
speed: float,
) -> float:
"""Calculate the Vehicle Operating Cost (VOC).
Parameters
----------
speed: float
The average flow speed: mph
Returns
-------
float
The unit vehicle operating cost: £/km
"""
s = speed * cons.CONV_MILE_TO_KM # km/hour
lpkm = 0.178 - 0.00299 * s + 0.0000205 * (s**2) # fuel consumption (liter/km)
voc_per_km = (
140 * lpkm * cons.PENCE_TO_POUND
) # average petrol cost: 140 pence/liter
return voc_per_km # £/km
[docs]
def cost_func(
time: float,
distance: float,
voc_per_km: float,
toll: float,
) -> Tuple[float, float, float]: # time: hour, distance: mph, voc: £/km
"""Calculate the total travel cost.
Parameters
----------
time: float
Travel time: hour
distance: float
Travel distance: mile
voc:
Vehicle operating cost: £/km
Returns
-------
cost: float
The total travel costs: £
c_time: float
The time-equivalent costs: £
c_operate: float
The vehicle operating costs/fuel costs: £
"""
ave_occ = 1.06 # average car occupancy = 1.06
vot = 17.69 # value of time (VOT): 17.69 £/hour
d = distance * cons.CONV_MILE_TO_KM
c_time = time * ave_occ * vot
c_operate = d * voc_per_km
cost = time * ave_occ * vot + d * voc_per_km + toll
return cost, c_time, c_operate
[docs]
def edge_reclassification_func(
road_links: pd.DataFrame,
) -> pd.DataFrame:
"""Reclassify network edges to "M, A_dual, A_single, B"."""
road_links["combined_label"] = "A_dual"
road_links.loc[road_links.road_classification == "Motorway", "combined_label"] = "M"
road_links.loc[road_links.road_classification == "B Road", "combined_label"] = "B"
road_links.loc[
(road_links.road_classification == "A Road")
& (road_links.form_of_way == "Single Carriageway"),
"combined_label",
] = "A_single"
return road_links
[docs]
def edge_initial_speed_func(
road_links: pd.DataFrame,
free_flow_speed_dict: Dict[str, float],
urban_flow_speed_dict: Dict[str, float],
min_flow_speed_dict: Dict[str, float],
max_flow_speed_dict: Dict[str, float] = None,
) -> Tuple[pd.DataFrame, Dict[str, float]]:
"""Calculate the initial vehicle speed for network edges.
Parameters
----------
road_links: pd.DataFrame
network link features
free_flow_speed_dict: Dict
free-flow vehicle operating speed on roads: M, A(single/dual), B
urban_flow_speed_dict: Dict
set the maximum vehicle speed restrictions in urban areas
min_flow_speed_dict: Dict
minimum vehicle operating speeds
max_flow_speed_dict: Dict
maximum safe vehicel operatin speeds on flooded roads
Returns
-------
road_links: pd.DataFrame
with speed information
initial_speed_dict: Dict
initial vehicle operating speed of existing road links
"""
assert "combined_label" in road_links.columns, "combined_label column not exists!"
assert "urban" in road_links.columns, "urban column not exists!"
if (
"free_flow_speeds" not in road_links.columns
): # add free-flow speeds if not exist
road_links["free_flow_speeds"] = road_links.combined_label.map(
free_flow_speed_dict
)
road_links.loc[road_links["urban"] == 1, "free_flow_speeds"] = road_links[
"combined_label"
].map(
urban_flow_speed_dict
) # urban speed restriction
road_links["min_flow_speeds"] = road_links.combined_label.map(min_flow_speed_dict)
road_links["initial_flow_speeds"] = road_links["free_flow_speeds"]
if max_flow_speed_dict is not None:
road_links["max_speeds"] = road_links["e_id"].map(max_flow_speed_dict)
# if max < min: close the roads
road_links.loc[
road_links.max_speeds < road_links.min_flow_speeds, "initial_flow_speeds"
] = 0.0
# if min < max < free
road_links.loc[
(road_links.max_speeds >= road_links.min_flow_speeds)
& (road_links.max_speeds < road_links.free_flow_speeds),
"initial_flow_speeds",
] = road_links.max_speeds
# if max > free: free flow speeds (by default)
# remove the closed/damaged roads
road_links = road_links[road_links["initial_flow_speeds"] > 0]
road_links.reset_index(drop=True, inplace=True)
return road_links
[docs]
def edge_init(
road_links: gpd.GeoDataFrame,
capacity_plph_dict: Dict[str, float],
free_flow_speed_dict: Dict[str, float],
urban_flow_speed_dict: Dict[str, float],
min_flow_speed_dict: Dict[str, float],
max_flow_speed_dict: Dict[str, float],
) -> gpd.GeoDataFrame:
"""Network edges initialisation.
Parameters
----------
road_links: gpd.GeoDataFrame
capacity_plph_dict: Dict
The designed edge capacity (per lane per hour).
free_flow_speed_dict: Dict
The free-flow edge speed of different types of road links.
urban_flow_speed_dict: Dict
The maximum vehicle operating speed restriction in urban areas.
min_flow_speed_dict: Dict
The minimum vehicle operating speeds.
max_flow_speed_dict: Dict
The maximum flow speed of the flooded road links.
Returns
-------
road_links: gpd.GeoDataFrame
Road links with added attributes.
"""
# reclassification
road_links = edge_reclassification_func(road_links)
road_links = edge_initial_speed_func(
road_links,
free_flow_speed_dict,
urban_flow_speed_dict,
min_flow_speed_dict,
max_flow_speed_dict,
)
assert "combined_label" in road_links.columns, "combined_label column not exists!"
assert (
"initial_flow_speeds" in road_links.columns
), "initial_flow_speeds column not exists!"
# initialise key variables
road_links["acc_flow"] = 0.0
road_links["acc_capacity"] = (
road_links["combined_label"].map(capacity_plph_dict) * road_links["lanes"] * 24
)
road_links["acc_speed"] = road_links["initial_flow_speeds"]
# remove invalid road links
road_links = road_links[road_links.acc_capacity > 0.5].reset_index(drop=True)
return road_links
[docs]
def speed_flow_func(
combined_label: str,
total_flow: float,
initial_flow_speed: float,
min_flow_speed: float,
flow_breakpoint_dict: Dict[str, float],
) -> float:
"""Create the speed-flow curves for different types of roads.
Parameters
----------
combined_label: str
The type or road links.
total_flow: float
The number of vehicles on road links.
initial_flow_speed: float
The initial traffic speeds of road links.
min_flow_speed: float
The minimum traffic speeds of road links.
flow_breakpoint_dict: dict
The breakpoint flows after which traffic flows starts to decrease.
Returns
-------
speed: float
The "real-time" traffic speeds on road links.
"""
vp = total_flow / 24
if combined_label == "M" and vp > flow_breakpoint_dict["M"]:
speed = initial_flow_speed - 0.033 * (vp - flow_breakpoint_dict["M"])
elif combined_label == "A_single" and vp > flow_breakpoint_dict["A_single"]:
speed = initial_flow_speed - 0.05 * (vp - flow_breakpoint_dict["A_single"])
elif combined_label == "A_dual" and vp > flow_breakpoint_dict["A_dual"]:
speed = initial_flow_speed - 0.033 * (vp - flow_breakpoint_dict["A_dual"])
elif combined_label == "B" and vp > flow_breakpoint_dict["B"]:
speed = initial_flow_speed - 0.05 * (vp - flow_breakpoint_dict["B"])
else:
speed = initial_flow_speed
speed = max(speed, min_flow_speed)
return speed
[docs]
def create_igraph_network(
road_links: gpd.GeoDataFrame,
) -> igraph.Graph:
"""Create an undirected igraph network."""
road_links["edge_length_mile"] = (
road_links.geometry.length * cons.CONV_METER_TO_MILE
)
road_links["time_hr"] = (
1.0 * road_links.edge_length_mile / road_links.acc_speed
) # return inf -> acc_speed is 0?
road_links["voc_per_km"] = np.vectorize(voc_func, otypes=None)(road_links.acc_speed)
(
road_links["weight"],
road_links["time_cost"],
road_links["operating_cost"],
) = np.vectorize(cost_func, otypes=None)(
road_links["time_hr"],
road_links["edge_length_mile"],
road_links["voc_per_km"],
road_links["average_toll_cost"],
)
graph_df = road_links[
[
"from_id",
"to_id",
"e_id",
"weight",
"time_cost",
"operating_cost",
"average_toll_cost",
]
]
network = igraph.Graph.TupleList(
graph_df.itertuples(index=False),
edge_attrs=list(graph_df.columns)[2:],
directed=False,
)
return network
[docs]
def update_network_structure(
network: igraph.Graph,
temp_edge_flow: pd.DataFrame,
road_links: gpd.GeoDataFrame,
) -> igraph.Graph:
"""Drop fully utilised edges and Update edge weights.
Parameters
----------
network: igraph network
temp_edge_flow: the remaining edge capacity at the current iteration
road_links: road links
Returns
-------
The updated igraph network
"""
# drop fully utilised edges from the network
zero_capacity_edges = set(
temp_edge_flow.loc[temp_edge_flow["acc_capacity"] < 0.5, "e_id"].tolist()
)
edges_to_remove = [e.index for e in network.es if e["e_id"] in zero_capacity_edges]
network.delete_edges(edges_to_remove)
print(f"The remaining number of edges in the network: {len(list(network.es))}")
# update edges' weights
remaining_edges = network.es["e_id"]
graph_df = road_links[road_links.e_id.isin(remaining_edges)][
[
"from_id",
"to_id",
"e_id",
"weight",
"time_cost",
"operating_cost",
"average_toll_cost",
]
]
network = igraph.Graph.TupleList(
graph_df.itertuples(index=False),
edge_attrs=list(graph_df.columns)[2:],
directed=False,
)
return network
[docs]
def update_od_matrix(
temp_flow_matrix: pd.DataFrame,
remain_od: pd.DataFrame,
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""Divide the OD matrix into allocated and unallocated sections.
Parameters
----------
temp_flow_matrix: origin, destination, path, flow, operating_cost_per_flow,
time_cost_per_flow, toll_cost_per_flow
remain_od: origin, destination, flow
Returns
-------
temp_flow_matrix: flow matrix with valid path
isolated_flow_matrix: flow matrix with no path
remain_od: the remaining od matrix
"""
mask = temp_flow_matrix["path"].apply(lambda x: len(x) == 0)
isolated_flow_matrix = temp_flow_matrix.loc[
mask, ["origin", "destination", "flow"]
] # drop the cost columns
# print(f"Non_allocated_flow: {isolated_flow_matrix.flow.sum()}")
temp_flow_matrix = temp_flow_matrix[~mask]
remain_origins = temp_flow_matrix.origin.unique().tolist()
remain_destinations = temp_flow_matrix.destination.unique().tolist()
remain_od = remain_od[
(
remain_od.origin_node.isin(remain_origins)
& (remain_od.destination_node.isin(remain_destinations))
)
].reset_index(drop=True)
return temp_flow_matrix, remain_od, isolated_flow_matrix
[docs]
def find_least_cost_path(
params: Tuple,
) -> Tuple[int, List[str], List[int], List[float]]:
"""Find the least-cost path for each OD trip.
Parameters
-----------
params: Tuple
The first element: the origin node (index).
The second element: a list of destination nodes (indexes).
The third element: a list of outbound trips from origin to its connected destinations.
Returns
--------
idx_of_origin_node: int
Same as input.
list_of_idx_destination_nodes: list
Same as input.
paths: list
The least-cost paths.
flows: list
Same as input.
"""
origin_node, destination_nodes, flows = params
paths = shared_network.get_shortest_paths(
v=origin_node,
to=destination_nodes,
weights="weight",
mode="out",
output="epath",
) # paths: o - d(s)
edge_paths = []
operating_costs = []
time_costs = []
toll_costs = []
for path in paths: # path: o - d
edge_path = []
operating_cost = []
time_cost = []
toll_cost = []
for p in path: # p: each line segment
edge_path.append(shared_network.es[p]["e_id"])
operating_cost.append(shared_network.es[p]["operating_cost"])
time_cost.append(shared_network.es[p]["time_cost"])
toll_cost.append(shared_network.es[p]["average_toll_cost"])
edge_paths.append(edge_path) # a list of lists
operating_costs.append(sum(operating_cost)) # a list of values
time_costs.append(sum(time_cost)) # a list of values
toll_costs.append(sum(toll_cost)) # a list of values
return (
origin_node,
destination_nodes,
edge_paths,
flows,
operating_costs,
time_costs,
toll_costs,
)
[docs]
def compute_edge_costs(
# edge_weight_df,
path: List[int],
) -> Tuple[float, float, float]:
"""Calculate the total travel cost for the path
Parameters
----------
path: List
A list of edge indexes that define the path.
Returns
-------
od_voc: float
Vehicle operating costs of each trip.
od_vot: float
Value of time of each trip.
od_toll: float
Toll costs of each trip.
"""
od_voc = edge_weight_df.loc[edge_weight_df["e_id"].isin(path), "voc"].sum()
od_vot = edge_weight_df.loc[edge_weight_df["e_id"].isin(path), "time_cost"].sum()
od_toll = edge_weight_df.loc[
edge_weight_df["e_id"].isin(path), "average_toll_cost"
].sum()
return (od_voc, od_vot, od_toll)
[docs]
def worker_init_path(
shared_network_pkl: bytes,
) -> None:
"""Worker initialisation in multiprocesses to create a shared network
that could be used across different workers.
Parameters
----------
shared_network_pkl: bytes
The pickled file of the igraph network.
Returns
-------
None.
"""
global shared_network
shared_network = pickle.loads(shared_network_pkl)
return None
[docs]
def worker_init_edge(
shared_network_pkl: bytes,
shared_weight_pkl: bytes,
) -> None:
"""Worker initialisation in multiprocesses to create a shared network
that could be used across different workers.
Parameters
----------
shared_network_pkl: bytes
The pickled file of the igraph network.
shared_weight_pkl: bytes
The pickled flle of a dictionary of network edge weights.
Returns
-------
None.
"""
# print(os.getpid())
global shared_network
shared_network = pickle.loads(shared_network_pkl)
global edge_weight_df
edge_weight_df = pickle.loads(shared_weight_pkl)
return None
[docs]
def network_flow_model(
road_links: gpd.GeoDataFrame,
network: igraph.Graph,
remain_od: pd.DataFrame,
flow_breakpoint_dict: Dict[str, float],
num_of_cpu,
) -> Tuple[gpd.GeoDataFrame, List, List]:
"""Process-based Network Flow Simulation.
Parameters
----------
road_links: road links
network: igraph network
remain_od: od matrix
flow_breakpoint_dict: flow breakpoints of different types of road links
Returns
-------
road_links: with added attributes (acc_flow, acc_capacity, acc_speed)
isolation: non-allocated od matrix
odpfc: allocated od matrix
"""
partial_speed_flow_func = partial(
speed_flow_func,
flow_breakpoint_dict=flow_breakpoint_dict,
)
total_remain = remain_od.Car21.sum()
print(f"The initial supply is {total_remain}")
number_of_edges = len(list(network.es))
print(f"The initial number of edges in the network: {number_of_edges}")
number_of_origins = remain_od.origin_node.unique().shape[0]
print(f"The initial number of origins: {number_of_origins}")
number_of_destinations = remain_od.destination_node.unique().shape[0]
print(f"The initial number of destinations: {number_of_destinations}")
# starts
total_cost = time_equiv_cost = operating_cost = toll_cost = 0
odpfc, isolation = [], []
initial_sumod = remain_od["Car21"].sum()
assigned_sumod = 0
iter_flag = 1
while total_remain > 0:
print(f"No.{iter_flag} iteration starts:")
# check isolations
mask = remain_od.origin_node.isin(network.vs["name"]) & (
remain_od.destination_node.isin(network.vs["name"])
)
isolation.extend(
remain_od.loc[
~mask, ["origin_node", "destination_node", "Car21"]
].values.tolist()
)
remain_od = remain_od[mask].reset_index(drop=True)
# dump the network and edge weight for shared use in multiprocessing
shared_network_pkl = pickle.dumps(network)
# find the least-cost path for each OD trip
list_of_spath = []
args = []
list_of_origin_nodes = list(set(remain_od["origin_node"].tolist()))
for origin_node in list_of_origin_nodes:
destination_nodes = remain_od.loc[
remain_od["origin_node"] == origin_node, "destination_node"
].tolist()
flows = remain_od.loc[
remain_od["origin_node"] == origin_node, "Car21"
].tolist()
args.append((origin_node, destination_nodes, flows))
st = time.time()
with Pool(
processes=num_of_cpu,
initializer=worker_init_path,
initargs=(shared_network_pkl,),
) as pool:
list_of_spath = pool.map(find_least_cost_path, args)
# [origin(name), destinations(name), path(idx), flow(int)]
print(f"The least-cost path flow allocation time: {time.time() - st}.")
temp_flow_matrix = pd.DataFrame(
list_of_spath,
columns=[
"origin",
"destination",
"path",
"flow",
"operating_cost_per_flow",
"time_cost_per_flow",
"toll_cost_per_flow",
],
).explode(
[
"destination",
"path",
"flow",
"operating_cost_per_flow",
"time_cost_per_flow",
"toll_cost_per_flow",
]
)
# calculate the non-allocated flows and remaining flows
(
temp_flow_matrix,
remain_od,
isolated_flow_matrix,
) = update_od_matrix(temp_flow_matrix, remain_od)
assigned_sumod += temp_flow_matrix.flow.sum()
percentage_sumod = assigned_sumod / initial_sumod
# update the isolated flows
isolation.extend(isolated_flow_matrix.to_numpy().tolist())
# calculate the remaining flows
number_of_origins = remain_od.origin_node.unique().shape[0]
number_of_destinations = remain_od.destination_node.unique().shape[0]
print(f"The remaining number of origins: {number_of_origins}")
print(f"The remaining number of destinations: {number_of_destinations}")
total_remain = remain_od.Car21.sum()
if total_remain == 0:
print("Iteration stops: there is no remaining flows!")
break
# calculate edge flows: e_id, flow
temp_edge_flow = get_flow_on_edges(
temp_flow_matrix,
"e_id",
"path",
"flow",
)
# add/update edge attributes
temp_edge_flow = temp_edge_flow.merge(
road_links[
[
"e_id",
"combined_label",
"min_flow_speeds",
"initial_flow_speeds",
"acc_flow",
"acc_capacity",
"acc_speed",
]
],
on="e_id",
how="left",
)
temp_edge_flow["est_overflow"] = (
temp_edge_flow["flow"] - temp_edge_flow["acc_capacity"]
) # estimated overflow: positive -> has overflow
max_overflow = temp_edge_flow["est_overflow"].max()
print(f"The maximum amount of edge overflow: {max_overflow}")
if max_overflow <= 0 or percentage_sumod > 0.9:
# update the origin-destination-path-cost matrix
odpfc.extend(temp_flow_matrix.to_numpy().tolist())
# add/update edge key variables: flow/speed/capacity
temp_edge_flow["acc_flow"] = (
temp_edge_flow["flow"] + temp_edge_flow["acc_flow"]
)
temp_edge_flow["acc_capacity"] = (
temp_edge_flow["acc_capacity"] - temp_edge_flow["flow"]
)
temp_edge_flow["acc_speed"] = np.vectorize(partial_speed_flow_func)(
temp_edge_flow["combined_label"],
temp_edge_flow["acc_flow"],
temp_edge_flow["initial_flow_speeds"],
temp_edge_flow["min_flow_speeds"],
)
# update road_links (flows, capacities, and speeds) based on temp_edge_flow
road_links = road_links.set_index("e_id")
road_links.update(
temp_edge_flow.set_index("e_id")[
["acc_flow", "acc_capacity", "acc_speed"]
]
)
road_links = road_links.reset_index()
# update travel costs: based on temp_flow_matrix
# operating costs
temp_cost = (
temp_flow_matrix.operating_cost_per_flow * temp_flow_matrix.flow
).sum()
operating_cost += temp_cost
# time costs
temp_cost = (
temp_flow_matrix.time_cost_per_flow * temp_flow_matrix.flow
).sum()
time_equiv_cost += temp_cost
# toll costs
temp_cost = (
temp_flow_matrix.toll_cost_per_flow * temp_flow_matrix.flow
).sum()
toll_cost += temp_cost
# total cost
total_cost += time_equiv_cost + operating_cost + toll_cost
if max_overflow > 0:
print(
"Iteration stops: "
f"with {percentage_sumod * 100}% flows sent to the network!"
)
else:
print(
"Iteration stops: there is no edge overflow! "
f"with {percentage_sumod * 100}% flows sent to the network!"
)
break
# calculate the ratio of flow adjustment (0 < r < 1)
temp_edge_flow["r"] = np.where(
temp_edge_flow["flow"] != 0,
temp_edge_flow["acc_capacity"] / temp_edge_flow["flow"],
np.nan,
)
r = temp_edge_flow.r.min()
if r < 0:
print("Error: r < 0")
break
if r == 0:
print("Error: existing network has zero-capacity links!")
break
if r >= 1:
print("Error: r >= 1")
break
print(f"r = {r}")
# update the origin-destination-path-cost matrix
odpfc.extend(
(temp_flow_matrix.assign(flow=temp_flow_matrix["flow"] * r))
.to_numpy()
.tolist()
)
# add/update edge key variables: flows/speeds/capacities
temp_edge_flow["adjusted_flow"] = temp_edge_flow["flow"] * r
temp_edge_flow["acc_flow"] = (
temp_edge_flow.acc_flow + temp_edge_flow.adjusted_flow
)
temp_edge_flow["acc_capacity"] = (
temp_edge_flow.acc_capacity - temp_edge_flow.adjusted_flow
)
temp_edge_flow["acc_speed"] = np.vectorize(partial_speed_flow_func)(
temp_edge_flow["combined_label"],
temp_edge_flow["acc_flow"],
temp_edge_flow["initial_flow_speeds"],
temp_edge_flow["min_flow_speeds"],
)
# update road links (flows, capacities, speeds) based on temp_edge_flow
road_links = road_links.set_index("e_id")
road_links.update(
temp_edge_flow.set_index("e_id")[["acc_flow", "acc_capacity", "acc_speed"]]
)
road_links = road_links.reset_index()
# update travel costs based on temp_flow_matrix
# operating costs
temp_cost = (
temp_flow_matrix.operating_cost_per_flow * temp_flow_matrix.flow * r
).sum()
operating_cost += temp_cost
# time costs
temp_cost = (
temp_flow_matrix.time_cost_per_flow * temp_flow_matrix.flow * r
).sum()
time_equiv_cost += temp_cost
# toll costs
temp_cost = (
temp_flow_matrix.toll_cost_per_flow * temp_flow_matrix.flow * r
).sum()
toll_cost += temp_cost
# total cost
total_cost += time_equiv_cost + operating_cost + toll_cost
# update remaining od flows
remain_od["Car21"] = remain_od["Car21"] * (1 - r)
remain_od.loc[remain_od.Car21 < 0.5, "Car21"] = 0
total_remain = remain_od.Car21.sum()
print(f"The total remaining supply (after flow adjustment) is: {total_remain}")
# update network structure (nodes and edges)
network = update_network_structure(network, temp_edge_flow, road_links)
iter_flag += 1
cList = [time_equiv_cost, operating_cost, toll_cost, total_cost]
# update the road links attributes
road_links.acc_flow = road_links.acc_flow.astype(int)
road_links.acc_capacity = road_links.acc_capacity.astype(int)
road_links = road_links.iloc[:, :-6] # drop cost-related columns
print("The flow simulation is completed!")
print(f"total travel cost is (£): {total_cost}")
print(f"total time-equiv cost is (£): {time_equiv_cost}")
print(f"total operating cost is (£): {operating_cost}")
print(f"total toll cost is (£): {toll_cost}")
return road_links, isolation, odpfc, cList