""" Road Network Flow Model - Functions
"""
from typing import Tuple, List, Dict
from collections import defaultdict
from functools import partial
import numpy as np
import pandas as pd
import geopandas as gpd # type: ignore
import igraph # type: ignore
import nird.constants as cons
from nird.utils import get_flow_on_edges
from multiprocessing import Pool
import warnings
import pickle
import time
warnings.simplefilter("ignore")
[docs]
def select_partial_roads_osm(
road_links: gpd.GeoDataFrame,
road_nodes: gpd.GeoDataFrame,
col_name: str,
list_of_values: List[str],
) -> Tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
# road links selection
selected_links = []
for ci in list_of_values:
selected_links.append(road_links[road_links[col_name] == ci])
selected_links = pd.concat(selected_links, ignore_index=True)
selected_links = gpd.GeoDataFrame(selected_links, geometry="geometry")
selected_links["e_id"] = (
selected_links.id.str.split("_").str[-2]
+ selected_links.id.str.split("_").str[-1]
)
# delete dangling lines
nodeset = road_nodes.id.unique().tolist()
selected_links = selected_links[
(selected_links.from_id.isin(nodeset)) & (selected_links.to_id.isin(nodeset))
].reset_index(drop=True)
# road nodes selection
sel_node_idx = list(
set(selected_links.from_id.tolist() + selected_links.to_id.tolist())
)
selected_nodes = road_nodes[road_nodes.id.isin(sel_node_idx)]
selected_nodes.reset_index(drop=True, inplace=True)
selected_nodes["nd_id"] = selected_nodes["id"]
selected_nodes["lat"] = selected_nodes["geometry"].y
selected_nodes["lon"] = selected_nodes["geometry"].x
return selected_links, selected_nodes
[docs]
def create_urban_mask(etisplus_urban_roads: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""To extract urban areas across Great Britain (GB) based on ETISPLUS datasets.
Parameters
----------
etisplus_urban_roads: gpd.GeoDataFrame
D6 ETISplus Roads (2010)
Returns
-------
urban_mask: gpd.GeoDataFrame
A binary file that spatially identify the urban areas across GB
with values == 1.
"""
etisplus_urban_roads = etisplus_urban_roads[
etisplus_urban_roads["Urban"] == 1
].reset_index(drop=True)
buf_geom = etisplus_urban_roads.geometry.buffer(
500
) # create a buffer of 500 meters
uni_geom = buf_geom.unary_union # feature union within the same layer
temp = gpd.GeoDataFrame(geometry=[uni_geom])
new_geom = (
temp.explode()
) # explode multi-polygons into separate individual polygons
cvx_geom = (
new_geom.convex_hull
) # generate convex polygon for each individual polygon
urban_mask = gpd.GeoDataFrame(
geometry=cvx_geom[0], crs=etisplus_urban_roads.crs
).to_crs("27700")
return urban_mask
[docs]
def label_urban_roads(
road_links: gpd.GeoDataFrame, urban_mask: gpd.GeoDataFrame
) -> gpd.GeoDataFrame:
"""Classify road links into urban/suburban roads.
Parameters
----------
road_links: gpd.GeoDataFrame
Links of the road network.
urban_mask: gpd.GeoDataFrame
A binary file that spatially identify the urban areas across GB.
Returns
-------
road_links: gpd.GeoDataFrame
Create a column "urban" to classify road links into urban/suburban links.
"""
temp_file = road_links.sjoin(urban_mask, how="left")
temp_file["urban"] = temp_file["index_right"].apply(
lambda x: 0 if pd.isna(x) else 1
)
max_values = temp_file.groupby("e_id")["urban"].max()
road_links = road_links.merge(max_values, on="e_id", how="left")
return road_links
[docs]
def find_nearest_node(
zones: gpd.GeoDataFrame, road_nodes: gpd.GeoDataFrame
) -> Dict[str, str]:
"""Find the nearest road node for each admin unit.
Parameters
----------
zones: gpd.GeoDataFrame
Admin units.
road nodes: gpd.GeoDataFrame
Nodes of the road network.
Returns
-------
nearest_node_dict: dict
A dictionary to convert from admin units to their attached road nodes.
"""
nearest_node_dict = {}
for zidx, z in zones.iterrows():
closest_road_node = road_nodes.sindex.nearest(z.geometry, return_all=False)[1][
0
]
# for the first [x]:
# [0] represents the index of geometry;
# [1] represents the index of gdf
# the second [x] represents the No. of closest item in the returned list,
# which only return one nearest node in this case
nearest_node_dict[zidx] = closest_road_node
return nearest_node_dict # zone_idx: node_idx
[docs]
def voc_func(speed: float) -> float:
"""Calculate the Vehicle Operating Cost (VOC).
Parameters
----------
speed: float
The average flow speed: mph
Returns
-------
float
The unit vehicle operating cost: £/km
"""
s = speed * cons.CONV_MILE_TO_KM # km/hour
lpkm = 0.178 - 0.00299 * s + 0.0000205 * (s**2) # fuel consumption (liter/km)
uvoc = 140 * lpkm * cons.PENCE_TO_POUND # average petrol cost: 140 pence/liter
return uvoc
[docs]
def cost_func(
time: float,
distance: float,
voc: float,
toll: float,
) -> Tuple[float, float, float]: # time: hour, distance: mph, voc: £/km
"""Calculate the total travel cost.
Parameters
----------
time: float
Travel time: hour
distance: float
Travel distance: mile
voc:
Vehicle operating cost: £/km
Returns
-------
cost: float
The total travel costs: £
c_time: float
The time-equivalent costs: £
c_operate: float
The vehicle operating costs/fuel costs: £
"""
ave_occ = 1.06 # average car occupancy = 1.6
vot = 17.69 # value of time (VOT): 17.69 £/hour
d = distance * cons.CONV_MILE_TO_KM
c_time = time * ave_occ * vot
c_operate = d * voc
cost = time * ave_occ * vot + d * voc + toll
return cost, c_time, c_operate
[docs]
def edge_initial_speed_func(
road_links: pd.DataFrame,
free_flow_speed_dict: Dict[str, float],
urban_flow_speed_dict: Dict[str, float],
min_flow_speed_dict: Dict[str, float], # add a minimum speed cap
max_flow_speed_dict: Dict[str, float] = None,
) -> Tuple[pd.DataFrame, Dict[str, float]]:
"""Calculate the initial vehicle speed for network edges.
Parameters
----------
road_links: pd.DataFrame
network link features
free_flow_speed_dict: Dict
free-flow vehicle operating speed on roads: M, A(single/dual), B
urban_flow_speed_dict: Dict
set the maximum vehicle speed restrictions in urban areas
min_flow_speed_dict: Dict
minimum vehicle operating speeds
max_flow_speed_dict: Dict
maximum safe vehicel operatin speeds on flooded roads
Returns
-------
road_links: pd.DataFrame
with speed information
initial_speed_dict: Dict
initial vehicle operating speed of existing road links
"""
assert "urban" in road_links.columns, "urban column not exists!"
assert "asset_type" in road_links.columns, "asset_type column not exists!"
assert "tag_maxspeed" in road_links.columns, "tag_maxspeed column not exists!"
if (
"free_flow_speeds" not in road_links.columns
): # add free-flow speeds if not exist
road_links["free_flow_speeds"] = (
road_links["tag_maxspeed"] * cons.CONV_KM_TO_MILE
)
for asset_type, speed in free_flow_speed_dict.items():
road_links.loc[road_links.asset_type == asset_type, "free_flow_speeds"] = (
road_links.loc[
road_links.asset_type == asset_type, "free_flow_speeds"
].fillna(speed)
)
road_links.loc[road_links["urban"] == 1, "free_flow_speeds"] = road_links[
"asset_type"
].map(
urban_flow_speed_dict
) # urban speed restriction
road_links["min_flow_speeds"] = road_links.asset_type.map(min_flow_speed_dict)
road_links["initial_flow_speeds"] = road_links["free_flow_speeds"]
if max_flow_speed_dict is not None:
road_links["max_speeds"] = road_links["e_id"].map(max_flow_speed_dict)
# if max < min: close the roads
road_links.loc[
road_links.max_speeds < road_links.min_flow_speeds, "initial_flow_speeds"
] = 0.0
# if min < max < free
road_links.loc[
(road_links.max_speeds >= road_links.min_flow_speeds)
& (road_links.max_speeds < road_links.free_flow_speeds),
"initial_flow_speeds",
] = road_links.max_speeds
# if max > free: free flow speeds (by default)
# remove the closed/damaged roads
road_links = road_links[road_links["initial_flow_speeds"] > 0]
road_links.reset_index(drop=True, inplace=True)
initial_speed_dict = road_links.set_index("e_id")["initial_flow_speeds"]
return road_links, initial_speed_dict
[docs]
def edge_init(
road_links: gpd.GeoDataFrame,
initial_capacity_dict: Dict[str, float],
free_flow_speed_dict: Dict[str, float],
urban_flow_speed_dict: Dict[str, float],
min_flow_speed_dict: Dict[str, float],
max_flow_speed_dict: Dict[str, float],
) -> gpd.GeoDataFrame:
"""Network edges initialisation.
Parameters
----------
road_links: gpd.GeoDataFrame
initial_capacity_dict: Dict
The average daily capacity of road links (passengers per lane per day)
free_flow_speed_dict: Dict
The free-flow edge speed of road links (mph).
urban_flow_speed_dict: Dict
The maximum vehicle operating speed restriction in urban areas (mph).
min_flow_speed_dict: Dict
The minimum vehicle operating speeds (mph).
max_flow_speed_dict: Dict
The maximum flow speed of the flooded road links (mph).
Returns
-------
road_links: pd.DataFrame
with added attributes of initial edge flow, capacity, and speed.
initial_speed_dict: Dict
initial vehicle operating speed of existing road links.
"""
# initial edge flow speeds (mph)
road_links, initial_speed_dict = edge_initial_speed_func(
road_links,
free_flow_speed_dict,
urban_flow_speed_dict,
min_flow_speed_dict,
max_flow_speed_dict,
)
# key variables
# edge flows (cars/day)
road_links["acc_flow"] = 0.0
# edge capacities (cars/day)
road_links["acc_capacity"] = (
road_links.asset_type.map(initial_capacity_dict) * road_links.lanes * 2
) #!!! for doubled OD matrix
# average flow speeds (mph)
road_links["acc_speed"] = road_links["initial_flow_speeds"]
# remove edges with zero capacities
road_links = road_links[road_links.acc_capacity > 0.5].reset_index(drop=True)
return road_links, initial_speed_dict
[docs]
def speed_flow_func(
asset_type: str,
total_flow: float,
initial_flow_speed: float,
min_flow_speed: float,
flow_breakpoint_dict: Dict[str, float],
) -> float:
"""Create the speed-flow curves for different types of roads.
Parameters
----------
asset_type: str
The type or road links.
total_flow: float
The number of vehicles on road links.
initial_flow_speed: float
The initial traffic speeds of road links.
min_flow_speed: float
The minimum traffic speeds of road links.
flow_breakpoint_dict: dict
The breakpoint flows after which traffic flows starts to decrease.
Returns
-------
speed: float
The "real-time" traffic speeds on road links.
"""
vp = total_flow / 24
if asset_type == "road_motorway" and vp > flow_breakpoint_dict["road_motorway"]:
speed = initial_flow_speed - 0.033 * (
vp - flow_breakpoint_dict["road_motorway"]
)
elif asset_type == "road_trunk" and vp > flow_breakpoint_dict["road_trunk"]:
speed = initial_flow_speed - 0.033 * (vp - flow_breakpoint_dict["road_trunk"])
elif asset_type == "road_bridge" and vp > flow_breakpoint_dict["road_bridge"]:
speed = initial_flow_speed - 0.033 * (vp - flow_breakpoint_dict["road_bridge"])
elif asset_type == "road_primary" and vp > flow_breakpoint_dict["road_primary"]:
speed = initial_flow_speed - 0.05 * (vp - flow_breakpoint_dict["road_primary"])
elif asset_type == "road_secondary" and vp > flow_breakpoint_dict["road_secondary"]:
speed = initial_flow_speed - 0.05 * (
vp - flow_breakpoint_dict["road_secondary"]
)
else:
speed = initial_flow_speed
speed = max(speed, min_flow_speed)
return speed
[docs]
def create_igraph_network(
road_links: gpd.GeoDataFrame,
road_nodes: gpd.GeoDataFrame,
) -> Tuple[
igraph.Graph,
Dict[str, float],
Dict[str, float],
Dict[str, float],
Dict[str, float],
pd.DataFrame,
]:
"""Create an undirected igraph network.
Parameters
----------
road_links: gpd.GeoDataFrame
road_nodes: gpd.GeoDataFrame
initialSpeeds: dict
The free-flow speed of different types of road links.
Returns
-------
igraph.Graph
The road network.
edge_cost_dict,
Total travel cost of each edge.
edge_timecost_dict,
The time-equivalent cost of each edge.
edge_operatecost_dict,
The fuel cost of each edge.
"""
nodeList = [(node.nd_id) for _, node in road_nodes.iterrows()]
edgeNameList = road_links["e_id"].tolist()
edgeList = list(zip(road_links.from_id, road_links.to_id))
edgeLengthList = (
road_links.geometry.length * cons.CONV_METER_TO_MILE
).tolist() # mile
edgeTollList = road_links.average_toll_cost.tolist() # £
edgeSpeedList = road_links.initial_flow_speeds.tolist() # mph
# travel time
timeList = np.array(edgeLengthList) / np.array(edgeSpeedList) # hour
# total travel cost (£)
vocList = np.vectorize(voc_func, otypes=None)(edgeSpeedList) # £/km
costList, timeCostList, operateCostList = np.vectorize(cost_func, otypes=None)(
timeList, edgeLengthList, vocList, edgeTollList
)
weightList = costList.tolist()
# Node/Egde-seq objects: indices and attributes
test_net = igraph.Graph(directed=False)
test_net.add_vertices(nodeList)
test_net.add_edges(edgeList)
test_net.es["edge_name"] = edgeNameList
test_net.es["weight"] = weightList
# Cmponent costs (£)
edge_cost_dict = dict(zip(edgeNameList, weightList))
edge_timecost_dict = dict(zip(edgeNameList, timeCostList))
edge_operatecost_dict = dict(zip(edgeNameList, operateCostList))
edge_toll_dict = dict(zip(edgeNameList, edgeTollList))
edge_compc_df = pd.DataFrame(
{
"edge_name": edgeNameList,
"edge_voc": operateCostList,
"edge_vot": timeCostList,
"edge_toll": edgeTollList,
}
)
return (
test_net,
edge_cost_dict,
edge_timecost_dict,
edge_operatecost_dict,
edge_toll_dict,
edge_compc_df,
)
[docs]
def update_od_matrix(
temp_flow_matrix: pd.DataFrame,
supply_dict: Dict[str, List[float]],
destination_dict: Dict[str, List[str]],
isolated_flow_dict: Dict[Tuple[str, str], float],
) -> Tuple[
pd.DataFrame,
List[str],
Dict[str, List[float]],
Dict[str, List[str]],
]:
"""Update the OD matrix by removing unreachable desitinations from each origin;
and origins with zero supplies.
Parameters
----------
temp_flow_matrix: pd.DataFrame
A temporary flow matrix: [origins, destinations, paths, flows]
supply_dict: dict
The number of outbound trips from each origin.
destination_dict: dict
A list of destinations attached to each origin.
isolated_flow_dict: dict
The number of isolated trips between each OD pair.
Returns
-------
temp_flow_matrix: pd.DataFrame
new_list_of_origins: list
new_supply_dict: dict
new_destination: dict
"""
# drop the OD trips with no accessible route ("path = []")
# drop destinations with no accessible route from each origin
temp_df = temp_flow_matrix[temp_flow_matrix["path"].apply(lambda x: len(x) == 0)]
print(f"Non_allocated_flow: {temp_df.flow.sum()}")
for _, row in temp_df.iterrows():
origin_temp = row["origin"]
destination_temp = row["destination"]
flow_temp = row["flow"]
isolated_flow_dict[(origin_temp, destination_temp)] += flow_temp
idx_temp = destination_dict[origin_temp].index(destination_temp)
destination_dict[origin_temp].remove(destination_temp)
del supply_dict[origin_temp][idx_temp]
# drop origins of which all trips have been sent to the network
new_supply_dict = {}
new_destination_dict = {}
new_list_of_origins = []
for origin, list_of_counts in supply_dict.items():
tt_supply = sum(list_of_counts)
if tt_supply > 0:
new_list_of_origins.append(origin)
new_counts = [od_flow for od_flow in list_of_counts if od_flow != 0]
new_supply_dict[origin] = new_counts
new_destination_dict[origin] = [
dest
for idx, dest in enumerate(destination_dict[origin])
if list_of_counts[idx] != 0
]
temp_flow_matrix = temp_flow_matrix[
temp_flow_matrix["path"].apply(lambda x: len(x) != 0)
]
return (
temp_flow_matrix,
new_list_of_origins,
new_supply_dict,
new_destination_dict,
)
[docs]
def update_network_structure(
network: igraph.Graph,
length_dict: Dict[str, float],
speed_dict: Dict[str, float],
toll_dict: Dict[str, float],
temp_edge_flow: pd.DataFrame,
) -> Tuple[igraph.Graph, Dict[str, float], Dict[str, float], Dict[str, float]]:
"""Update the road network structure by removing the fully utilised edges and
updating the weights of the remaining edges.
Parameters
----------
network: igraph.Graph
A road network.
length_dict: dict
The length of road links.
speed_dict:
The average flow speed of road links.
temp_edge_flow: pd.DataFrame
A table that records edge flows.
Returns
-------
network: igraph.Graph
Th updated road network.
edge_cost_dict: dict
The updated travel cost of edges.
edge_timecost_dict: dict
The updated time-equivalent cost of edges.
edge_operatecost_dict: dict
The updated vehicle operating cost of edges.
"""
zero_capacity_edges = set(
temp_edge_flow.loc[temp_edge_flow["remaining_capacity"] < 0.5, "e_id"].tolist()
) # edge names
net_edges = network.es["edge_name"]
idx_to_remove = [
idx for idx, element in enumerate(net_edges) if element in zero_capacity_edges
]
# drop fully utilised edges
network.delete_edges(idx_to_remove)
number_of_edges = len(list(network.es))
print(f"The remaining number of edges in the network: {number_of_edges}")
# update edge weights
remaining_edges = network.es["edge_name"]
lengthList = list(
map(length_dict.get, filter(length_dict.__contains__, remaining_edges))
)
speedList = list(
map(speed_dict.get, filter(speed_dict.__contains__, remaining_edges))
)
timeList = np.where(
np.array(speedList) != 0, np.array(lengthList) / np.array(speedList), np.nan
) # hour
tollList = list(map(toll_dict.get, filter(toll_dict.__contains__, remaining_edges)))
if np.isnan(timeList).any():
idx_first_nan = np.where(np.isnan(timeList))[0][0]
length_nan = lengthList[idx_first_nan]
speed_nan = speedList[idx_first_nan]
print("ERROR: Network contains congested edges.")
print(f"The first nan item - length: {length_nan}")
print(f"The first nan item - speed: {speed_nan}")
exit()
else:
vocList = np.vectorize(voc_func, otypes=None)(speedList)
costList, timeCostList, operateCostList = np.vectorize(cost_func, otypes=None)(
timeList, lengthList, vocList, tollList
) # hour
weightList = costList.tolist() # £
network.es["weight"] = weightList
# estimate edge traveling cost (£)
edge_cost_dict = dict(
zip(
network.es["edge_name"],
weightList,
)
)
edge_timecost_dict = dict(zip(network.es["edge_name"], timeCostList))
edge_operatecost_dict = dict(zip(network.es["edge_name"], operateCostList))
edge_tollcost_dict = dict(zip(network.es["edge_name"], tollList))
edge_compc_df = pd.DataFrame(
{
"edge_name": remaining_edges,
"edge_voc": operateCostList,
"edge_vot": timeCostList,
"edge_toll": tollList,
}
)
return (
network,
edge_cost_dict,
edge_timecost_dict,
edge_operatecost_dict,
edge_tollcost_dict,
edge_compc_df,
)
[docs]
def find_least_cost_path(
params: Tuple,
) -> Tuple[int, List[str], List[int], List[float]]:
"""Find the least-cost path for each OD trip.
Parameters:
-----------
params: Tuple
The first element: the origin node (index).
The second element: a list of destination nodes (indexes).
The third element: a list of outbound trips from origin to its connected destinations.
Returns:
--------
idx_of_origin_node: int
Same as input.
list_of_idx_destination_nodes: list
Same as input.
paths: list
The least-cost paths.
flows: list
Same as input.
"""
idx_of_origin_node, list_of_idx_destination_nodes, flows = params
paths = shared_network.get_shortest_paths(
v=idx_of_origin_node,
to=list_of_idx_destination_nodes,
weights="weight",
mode="out",
output="epath",
)
edge_paths = []
for path in paths:
edge_path = []
for p in path:
edge_path.append(shared_network.es[p]["edge_name"])
edge_paths.append(edge_path)
return (
idx_of_origin_node,
list_of_idx_destination_nodes,
edge_paths,
# paths,
flows,
)
[docs]
def compute_edge_costs(
# edge_weight_df,
path: List[int],
) -> Tuple[float, float, float]:
"""Calculate the total travel cost for the path
Parameters
----------
path: List
A list of edge indexes that define the path.
Returns
-------
od_voc: float
Vehicle operating costs of each trip.
od_vot: float
Value of time of each trip.
od_toll: float
Toll costs of each trip.
"""
od_voc = edge_weight_df.loc[
edge_weight_df["edge_name"].isin(path), "edge_voc"
].sum()
od_vot = edge_weight_df.loc[
edge_weight_df["edge_name"].isin(path), "edge_vot"
].sum()
od_toll = edge_weight_df.loc[
edge_weight_df["edge_name"].isin(path), "edge_toll"
].sum()
return (od_voc, od_vot, od_toll)
[docs]
def clip_to_zero(arr: np.ndarray) -> np.ndarray:
"""Convert flows less than 0.5 to 0"""
return np.where(arr >= 0.5, arr, 0)
[docs]
def worker_init_path(shared_network_pkl: bytes) -> None:
"""Worker initialisation in multiprocesses to create a shared network
that could be used across different workers.
Parameters
----------
shared_network_pkl: bytes
The pickled file of the igraph network.
Returns
-------
None.
"""
global shared_network
shared_network = pickle.loads(shared_network_pkl)
return None
[docs]
def worker_init_edge(shared_network_pkl: bytes, shared_weight_pkl: bytes) -> None:
"""Worker initialisation in multiprocesses to create a shared network
that could be used across different workers.
Parameters
----------
shared_network_pkl: bytes
The pickled file of the igraph network.
shared_weight_pkl: bytes
The pickled flle of a dictionary of network edge weights.
Returns
-------
None.
"""
# print(os.getpid())
global shared_network
shared_network = pickle.loads(shared_network_pkl)
global edge_weight_df
edge_weight_df = pickle.loads(shared_weight_pkl)
return None
[docs]
def network_flow_model(
road_links: gpd.GeoDataFrame,
road_nodes: gpd.GeoDataFrame,
list_of_origins: List[str],
supply_dict: Dict[str, List[float]],
destination_dict: Dict[str, List[str]],
free_flow_speed_dict: Dict[str, float],
flow_breakpoint_dict: Dict[str, float],
flow_capacity_dict: Dict[str, float],
min_speed_cap: Dict[str, float],
urban_speed_cap: Dict[str, float],
max_flow_speed_dict=None,
) -> Tuple[pd.DataFrame, pd.DataFrame, Dict[Tuple[str, str], float]]:
"""Model the passenger flows on the road network.
Parameters
----------
road_links: gpd.GeoDataFrame
road_nodes: gpd.GeoDataFrame
list_of_origins: list
A list of unique origin nodes of OD flows.
supply_dict: dict
The number of outbound trips of individual origins.
destination_dict: dict
A list of destinations attached to individual origins.
free_flow_speed_dict: dict
The free-flow speed of different types of road links.
flow_breakpoint_dict: dict
The breakpoint flow of different types of road links,
beyond which the flow speeds starts to decrease according to the remaining road capacities.
flow_capacity_dict: dict
The capacity of different types of road links.
max_flow_speed_dict: dict
The maximum vehicle speed of different flooded road links.
min_speed_cap: dict
The restriction on the lowest flow rate.
urban_speed_cap: dict
The restriction on the maximum flow rate in urban areas.
max_flow_speed_dict: Dict
The maximum vehicle operating speeds.
Returns
-------
road_links: pd.DataFrame
The roak link features with simulating results of edge flows.
isolated_flow_dict: dict
The isolated trips between each OD pair.
odpfc: pd.DataFrame
The full od outputs: origins, destinations, paths, flows, costs.
"""
# initialise road links by adding columns: initial_flow_speeds,
# acc-flow, acc-capacity, acc-speed
road_links, initial_speed_dict = edge_init(
road_links,
flow_capacity_dict,
free_flow_speed_dict,
urban_speed_cap,
min_speed_cap,
max_flow_speed_dict,
)
assert "min_flow_speeds" in road_links.columns, "min_flow_speeds column not exists!"
assert (
"initial_flow_speeds" in road_links.columns
), "initial_flow_speeds column not exists!"
assert "acc_flow" in road_links.columns, "acc_flow column not exists!"
assert "acc_capacity" in road_links.columns, "acc_capacity column not exists!"
assert "acc_speed" in road_links.columns, "acc_speed column not exists!"
# network creation (igraph)
(
network,
edge_cost_dict,
edge_timeC_dict,
edge_operateC_dict,
edge_toll_dict,
edge_compc_df,
) = create_igraph_network(
road_links, road_nodes
) # this returns a network and edge weights dict(edge_name, edge_weight)
partial_speed_flow_func = partial(
speed_flow_func,
flow_breakpoint_dict=flow_breakpoint_dict,
)
# record total cost of travelling: weight * flow
total_cost = 0
time_equiv_cost = 0
operating_cost = 0
toll_cost = 0
total_remain = sum(sum(values) for values in supply_dict.values())
print(f"The initial total supply is {total_remain}")
number_of_edges = len(list(network.es))
print(f"The initial number of edges in the network: {number_of_edges}")
print(f"The initial number of origins: {len(list_of_origins)}")
number_of_destinations = sum(len(value) for value in destination_dict.values())
print(f"The initial number of destinations: {number_of_destinations}")
# road link properties
edge_cbtype_dict = road_links.set_index("e_id")["asset_type"].to_dict()
edge_length_dict = (
road_links.set_index("e_id")["geometry"].length * cons.CONV_METER_TO_MILE
).to_dict()
acc_flow_dict = road_links.set_index("e_id")["acc_flow"].to_dict()
acc_capacity_dict = road_links.set_index("e_id")["acc_capacity"].to_dict()
acc_speed_dict = road_links.set_index("e_id")["acc_speed"].to_dict()
# starts
iter_flag = 1
isolated_flow_dict = defaultdict(float)
odpfc = pd.DataFrame(
columns=[
"origin",
"destination",
"path",
"flow",
"unit_od_voc",
"unit_od_vot",
"unit_od_toll",
]
)
while total_remain > 0:
print(f"No.{iter_flag} iteration starts:")
# dump the network and edge weight for shared use in multiprocessing
shared_network_pkl = pickle.dumps(network)
shared_weight_pkl = pickle.dumps(edge_compc_df)
# find the least-cost for each OD trip
list_of_spath = []
args = []
for i in range(len(list_of_origins)):
name_of_origin_node = list_of_origins[i]
list_of_name_destination_node = destination_dict[
name_of_origin_node
] # a list of destination nodes
list_of_flows = supply_dict[name_of_origin_node]
args.append(
(
name_of_origin_node,
list_of_name_destination_node,
list_of_flows,
)
)
st = time.time()
with Pool(
processes=1, #!!! update the required number of CPU
initializer=worker_init_path,
initargs=(shared_network_pkl,),
) as pool:
list_of_spath = pool.map(find_least_cost_path, args)
# [origin(name), destinations(name), path(idx), flow(int)]
print(f"The least-cost path flow allocation time: {time.time() - st}.")
temp_flow_matrix = pd.DataFrame(
list_of_spath,
columns=[
"origin",
"destination",
"path",
"flow",
],
).explode(["destination", "path", "flow"])
# compute the total travel cost for each OD trip
st = time.time()
args = []
args = [row["path"] for _, row in temp_flow_matrix.iterrows()]
with Pool(
processes=1,
initializer=worker_init_edge,
initargs=(shared_network_pkl, shared_weight_pkl),
) as pool:
temp_flow_matrix[["unit_od_voc", "unit_od_vot", "unit_od_toll"]] = pool.map(
compute_edge_costs, args
)
print(f"The computational time for OD costs: {time.time() - st}.")
"""
# compute the total travel cost for each OD trip
tempFunc = partial(compute_edge_costs, edge_weight_df=edge_compc_df)
temp_flow_matrix[["unit_od_voc", "unit_od_vot", "unit_od_toll"]] = (
temp_flow_matrix["path"].apply(lambda x: pd.Series(tempFunc(path=x)))
)
"""
# save the mid-outputs: origin, destination, path,
odpfc = pd.concat([odpfc, temp_flow_matrix], axis=0, ignore_index=True)
odpfc.path = odpfc.path.apply(tuple)
odpfc = odpfc.groupby(["origin", "destination", "path"], as_index=False).agg(
{
"flow": "sum",
"unit_od_voc": "first", # vehicle operational cost (per trip)
"unit_od_vot": "first", # value of time (per trip)
"unit_od_toll": "first", # toll cost (per trip)
}
)
# calculate the non-allocated flows and remaining flows
(
temp_flow_matrix,
list_of_origins,
supply_dict,
destination_dict,
) = update_od_matrix(
temp_flow_matrix, supply_dict, destination_dict, isolated_flow_dict
)
number_of_destinations = sum(len(value) for value in destination_dict.values())
print(f"The remaining number of origins: {len(list_of_origins)}")
print(f"The remaining number of destinations: {number_of_destinations}")
total_remain = sum(sum(values) for values in supply_dict.values())
if total_remain == 0:
print("Iteration stops: there is no remaining flows!")
break
# calculate edge flows
temp_edge_flow = get_flow_on_edges(temp_flow_matrix, "e_id", "path", "flow")
# add/update edge attributes
temp_edge_flow["asset_type"] = temp_edge_flow["e_id"].map(edge_cbtype_dict)
temp_edge_flow["initial_flow_speeds"] = temp_edge_flow["e_id"].map(
initial_speed_dict
)
temp_edge_flow["min_flow_speeds"] = temp_edge_flow["asset_type"].map(
min_speed_cap
)
temp_edge_flow["temp_acc_flow"] = temp_edge_flow["e_id"].map(
acc_flow_dict
) # flow
temp_edge_flow["temp_acc_capacity"] = temp_edge_flow["e_id"].map(
acc_capacity_dict
) # capacity
temp_edge_flow["est_overflow"] = (
temp_edge_flow["flow"] - temp_edge_flow["temp_acc_capacity"]
) # estimated overflow: positive -> has overflow
max_overflow = temp_edge_flow["est_overflow"].max()
print(f"The maximum amount of edge overflow: {max_overflow}")
if max_overflow <= 0:
# add/update edge key variables: flow/speed/capacity
temp_edge_flow["total_flow"] = (
temp_edge_flow["flow"] + temp_edge_flow["temp_acc_flow"]
)
temp_edge_flow["speed"] = np.vectorize(partial_speed_flow_func)(
temp_edge_flow["asset_type"],
temp_edge_flow["total_flow"],
temp_edge_flow["initial_flow_speeds"],
temp_edge_flow["min_flow_speeds"],
)
temp_edge_flow["remaining_capacity"] = (
temp_edge_flow["temp_acc_capacity"] - temp_edge_flow["flow"]
)
# update dicts
# dynamic edge flows
temp_dict = temp_edge_flow.set_index("e_id")["total_flow"].to_dict()
acc_flow_dict.update(
{key: temp_dict[key] for key in acc_flow_dict.keys() & temp_dict.keys()}
)
# dynamic edge flow speeds
temp_dict = temp_edge_flow.set_index("e_id")["speed"].to_dict()
acc_speed_dict.update(
{
key: temp_dict[key]
for key in acc_speed_dict.keys() & temp_dict.keys()
}
)
# dynamic edge capacities
temp_dict = temp_edge_flow.set_index("e_id")["remaining_capacity"].to_dict()
acc_capacity_dict.update(
{
key: temp_dict[key]
for key in acc_capacity_dict.keys() & temp_dict.keys()
}
)
# update edge travel costs
temp_cost = (
temp_edge_flow["e_id"].map(edge_cost_dict) * temp_edge_flow["flow"]
)
total_cost += temp_cost.sum()
temp_cost = (
temp_edge_flow["e_id"].map(edge_timeC_dict) * temp_edge_flow["flow"]
)
time_equiv_cost += temp_cost.sum()
temp_cost = (
temp_edge_flow["e_id"].map(edge_operateC_dict) * temp_edge_flow["flow"]
)
operating_cost += temp_cost.sum()
temp_cost = (
temp_edge_flow["e_id"].map(edge_toll_dict) * temp_edge_flow["flow"]
)
toll_cost += temp_cost.sum()
print("Iteration stops: there is no edge overflow!")
break
# calculate the ratio of flow adjustment (0 < r < 1)
temp_edge_flow["r"] = np.where(
temp_edge_flow["flow"] != 0,
temp_edge_flow["temp_acc_capacity"] / temp_edge_flow["flow"],
np.nan,
)
r = temp_edge_flow.r.min()
if r < 0:
print("Error: r < 0")
break
if r == 0:
print("Error: existing network has zero-capacity links!")
break
if r >= 1:
print("Error: r >= 1!")
break
print(f"r = {r}")
# add/update edge key variables: flows/speeds/capacities
temp_edge_flow["adjusted_flow"] = temp_edge_flow["flow"] * r
temp_edge_flow["total_flow"] = (
temp_edge_flow.temp_acc_flow + temp_edge_flow.adjusted_flow
)
temp_edge_flow["speed"] = np.vectorize(partial_speed_flow_func)(
temp_edge_flow["asset_type"],
temp_edge_flow["total_flow"],
temp_edge_flow["initial_flow_speeds"],
temp_edge_flow["min_flow_speeds"],
)
temp_edge_flow["remaining_capacity"] = (
temp_edge_flow.temp_acc_capacity - temp_edge_flow.adjusted_flow
)
temp_edge_flow.loc[
temp_edge_flow.remaining_capacity < 0.5, "remaining_capacity"
] = 0.0
# update dicts
# dynamic edge flows
temp_dict = temp_edge_flow.set_index("e_id")["total_flow"].to_dict()
acc_flow_dict.update(
{key: temp_dict[key] for key in acc_flow_dict.keys() & temp_dict.keys()}
)
# dynamic edge flow speeds
temp_dict = temp_edge_flow.set_index("e_id")["speed"].to_dict()
acc_speed_dict.update(
{key: temp_dict[key] for key in acc_speed_dict.keys() & temp_dict.keys()}
)
# dynamic edge capacities
temp_dict = temp_edge_flow.set_index("e_id")["remaining_capacity"].to_dict()
acc_capacity_dict.update(
{key: temp_dict[key] for key in acc_capacity_dict.keys() & temp_dict.keys()}
)
# if remaining supply < 0.5 -> 0
supply_dict = {
k: clip_to_zero(np.array(v) * (1 - r)).tolist()
for k, v in supply_dict.items()
}
total_remain = sum(sum(values) for values in supply_dict.values())
print(f"The total remaining supply (after flow adjustment) is: {total_remain}")
# update edge travel costs
temp_cost = temp_edge_flow["e_id"].map(edge_cost_dict) * temp_edge_flow["flow"]
total_cost += temp_cost.sum()
temp_cost = temp_edge_flow["e_id"].map(edge_timeC_dict) * temp_edge_flow["flow"]
time_equiv_cost += temp_cost.sum()
temp_cost = (
temp_edge_flow["e_id"].map(edge_operateC_dict) * temp_edge_flow["flow"]
)
operating_cost += temp_cost.sum()
temp_cost = temp_edge_flow["e_id"].map(edge_toll_dict) * temp_edge_flow["flow"]
toll_cost += temp_cost.sum()
# update network structure (nodes and edges)
(
network,
edge_cost_dict,
edge_timeC_dict,
edge_operateC_dict,
edge_toll_dict,
edge_compc_df,
) = update_network_structure(
network,
edge_length_dict,
acc_speed_dict,
edge_toll_dict,
temp_edge_flow,
)
iter_flag += 1
# update the road links attributes
road_links.acc_speed = road_links.e_id.map(acc_speed_dict)
road_links.acc_flow = road_links.e_id.map(acc_flow_dict)
road_links.acc_capacity = road_links.e_id.map(acc_capacity_dict)
road_links.acc_flow = road_links.acc_flow.astype(int)
road_links.acc_capacity = road_links.acc_capacity.astype(int)
print("The flow simulation is completed!")
print(f"total travel cost is (£): {total_cost}")
print(f"total time-equiv cost is (£): {time_equiv_cost}")
print(f"total operating cost is (£): {operating_cost}")
print(f"total toll cost is (£): {toll_cost}")
return road_links, isolated_flow_dict, odpfc