Source code for nird.road_recovery

"""Road Network Flow Model - Functions

This module provides a set of functions to model and analyze road network flows.
It includes utilities for road network initialization, traffic flow simulation,
cost calculations, and network updates. The functions are designed to handle
geospatial data and integrate with igraph for network analysis.

Key Features:
- Road network extraction and classification.
- Traffic speed and flow modeling.
- Cost estimation for vehicle operations and travel time.
- Network flow simulation with iterative updates.
- Integration with multiprocessing for performance optimization.
"""

from typing import Tuple, List, Dict
from collections import defaultdict
from functools import partial

import numpy as np
import pandas as pd
import geopandas as gpd  # type: ignore
import igraph  # type: ignore

import nird.constants as cons
from nird.utils import get_flow_on_edges

from multiprocessing import Pool
import warnings
import pickle
import time

warnings.simplefilter("ignore")


[docs] def select_partial_roads( road_links: gpd.GeoDataFrame, road_nodes: gpd.GeoDataFrame, col_name: str, list_of_values: List[str], ) -> Tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]: """Extract a subset of the road network based on specified road types. Parameters ---------- road_links: gpd.GeoDataFrame Geospatial data representing the links (edges) of the road network. road_nodes: gpd.GeoDataFrame Geospatial data representing the nodes (vertices) of the road network. col_name: str Column name in `road_links` that specifies the road type. list_of_values: List[str] List of road types to filter and extract from the network. Returns ------- selected_links: gpd.GeoDataFrame Filtered road links corresponding to the specified road types. selected_nodes: gpd.GeoDataFrame Filtered road nodes connected to the selected links. """ # road links selection selected_links = road_links[road_links[col_name].isin(list_of_values)].reset_index( drop=True ) selected_links.rename( columns={"id": "e_id", "start_node": "from_id", "end_node": "to_id"}, inplace=True, ) # road nodes selection sel_node_idx = list( set(selected_links.from_id.tolist() + selected_links.to_id.tolist()) ) selected_nodes = road_nodes[road_nodes.id.isin(sel_node_idx)] selected_nodes.reset_index(drop=True, inplace=True) selected_nodes.rename(columns={"id": "nd_id"}, inplace=True) selected_nodes["lat"] = selected_nodes["geometry"].y selected_nodes["lon"] = selected_nodes["geometry"].x return selected_links, selected_nodes
[docs] def create_urban_mask( etisplus_urban_roads: gpd.GeoDataFrame, ) -> gpd.GeoDataFrame: """Generate a spatial mask for urban areas in Great Britain. Parameters ---------- etisplus_urban_roads: gpd.GeoDataFrame Geospatial data of ETISPLUS urban roads (2010 dataset). Returns ------- urban_mask: gpd.GeoDataFrame A geospatial mask identifying urban areas with a binary value (1 for urban). """ etisplus_urban_roads = etisplus_urban_roads[ etisplus_urban_roads["Urban"] == 1 ].reset_index(drop=True) buf_geom = etisplus_urban_roads.geometry.buffer( 500 ) # create a buffer of 500 meters uni_geom = buf_geom.unary_union # feature union within the same layer temp = gpd.GeoDataFrame(geometry=[uni_geom]) new_geom = ( temp.explode() ) # explode multi-polygons into separate individual polygons cvx_geom = ( new_geom.convex_hull ) # generate convex polygon for each individual polygon urban_mask = gpd.GeoDataFrame( geometry=cvx_geom[0], crs=etisplus_urban_roads.crs ).to_crs("27700") return urban_mask
[docs] def label_urban_roads( road_links: gpd.GeoDataFrame, urban_mask: gpd.GeoDataFrame ) -> gpd.GeoDataFrame: """Label road links as urban or suburban based on spatial intersection. Parameters ---------- road_links: gpd.GeoDataFrame Geospatial data representing the links of the road network. urban_mask: gpd.GeoDataFrame Geospatial mask identifying urban areas. Returns ------- road_links: gpd.GeoDataFrame Updated road links with a new column "urban" (1 for urban, 0 for suburban). """ temp_file = road_links.sjoin(urban_mask, how="left") temp_file["urban"] = temp_file["index_right"].apply( lambda x: 0 if pd.isna(x) else 1 ) max_values = temp_file.groupby("e_id")["urban"].max() road_links = road_links.merge(max_values, on="e_id", how="left") return road_links
[docs] def find_nearest_node( zones: gpd.GeoDataFrame, road_nodes: gpd.GeoDataFrame, zone_id_column: str, node_id_column: str, ) -> Dict[str, str]: """Find the nearest road node for each admin unit. Parameters ---------- zones: gpd.GeoDataFrame Admin units. road nodes: gpd.GeoDataFrame Nodes of the road network. Returns ------- nearest_node_dict: dict A dictionary to convert from admin units to their attached road nodes. """ nearest_nodes = gpd.sjoin_nearest(zones, road_nodes) nearest_nodes = nearest_nodes.drop_duplicates(subset=[zone_id_column], keep="first") nearest_node_dict = dict( zip(nearest_nodes[zone_id_column], nearest_nodes[node_id_column]) ) return nearest_node_dict # zone_idx: node_idx
[docs] def voc_func( speed: float, ) -> float: """Calculate the Vehicle Operating Cost (VOC). Parameters ---------- speed: float The average flow speed: mph Returns ------- float The unit vehicle operating cost: £/km """ s = speed * cons.CONV_MILE_TO_KM # km/hour lpkm = 0.178 - 0.00299 * s + 0.0000205 * (s**2) # fuel consumption (liter/km) voc_per_km = ( 140 * lpkm * cons.PENCE_TO_POUND ) # average petrol cost: 140 pence/liter return voc_per_km # £/km
[docs] def cost_func( time: float, distance: float, voc_per_km: float, toll: float, ) -> Tuple[float, float, float]: # time: hour, distance: mph, voc: £/km """Calculate the total travel cost. Parameters ---------- time: float Travel time: hour distance: float Travel distance: mile voc: Vehicle operating cost: £/km Returns ------- cost: float The total travel costs: £ c_time: float The time-equivalent costs: £ c_operate: float The vehicle operating costs/fuel costs: £ """ ave_occ = 1.06 # average car occupancy = 1.06 vot = 17.69 # value of time (VOT): 17.69 £/hour d = distance * cons.CONV_MILE_TO_KM c_time = time * ave_occ * vot c_operate = d * voc_per_km cost = time * ave_occ * vot + d * voc_per_km + toll return cost, c_time, c_operate
[docs] def edge_reclassification_func( road_links: pd.DataFrame, ) -> pd.DataFrame: """Reclassify network edges to "M, A_dual, A_single, B".""" road_links["combined_label"] = "A_dual" road_links.loc[road_links.road_classification == "Motorway", "combined_label"] = "M" road_links.loc[road_links.road_classification == "B Road", "combined_label"] = "B" road_links.loc[ (road_links.road_classification == "A Road") & (road_links.form_of_way == "Single Carriageway"), "combined_label", ] = "A_single" return road_links
[docs] def edge_initial_speed_func( road_links: pd.DataFrame, free_flow_speed_dict: Dict[str, float], urban_flow_speed_dict: Dict[str, float], min_flow_speed_dict: Dict[str, float], max_flow_speed_dict: Dict[str, float] = None, ) -> Tuple[pd.DataFrame, Dict[str, float]]: """Calculate the initial vehicle speed for network edges. Parameters ---------- road_links: pd.DataFrame network link features free_flow_speed_dict: Dict free-flow vehicle operating speed on roads: M, A(single/dual), B urban_flow_speed_dict: Dict set the maximum vehicle speed restrictions in urban areas min_flow_speed_dict: Dict minimum vehicle operating speeds max_flow_speed_dict: Dict maximum safe vehicel operatin speeds on flooded roads Returns ------- road_links: pd.DataFrame with speed information initial_speed_dict: Dict initial vehicle operating speed of existing road links """ assert "combined_label" in road_links.columns, "combined_label column not exists!" assert "urban" in road_links.columns, "urban column not exists!" if ( "free_flow_speeds" not in road_links.columns ): # add free-flow speeds if not exist road_links["free_flow_speeds"] = road_links.combined_label.map( free_flow_speed_dict ) road_links.loc[road_links["urban"] == 1, "free_flow_speeds"] = road_links[ "combined_label" ].map( urban_flow_speed_dict ) # urban speed restriction road_links["min_flow_speeds"] = road_links.combined_label.map(min_flow_speed_dict) road_links["initial_flow_speeds"] = road_links["free_flow_speeds"] if max_flow_speed_dict is not None: road_links["max_speeds"] = road_links["e_id"].map(max_flow_speed_dict) # if max < min: close the roads road_links.loc[ road_links.max_speeds < road_links.min_flow_speeds, "initial_flow_speeds" ] = 0.0 # if min < max < free road_links.loc[ (road_links.max_speeds >= road_links.min_flow_speeds) & (road_links.max_speeds < road_links.free_flow_speeds), "initial_flow_speeds", ] = road_links.max_speeds # if max > free: free flow speeds (by default) # remove the closed/damaged roads road_links = road_links[road_links["initial_flow_speeds"] > 0] road_links.reset_index(drop=True, inplace=True) return road_links
[docs] def edge_init( road_links: gpd.GeoDataFrame, capacity_plph_dict: Dict[str, float], free_flow_speed_dict: Dict[str, float], urban_flow_speed_dict: Dict[str, float], min_flow_speed_dict: Dict[str, float], max_flow_speed_dict: Dict[str, float], ) -> gpd.GeoDataFrame: """Network edges initialisation. Parameters ---------- road_links: gpd.GeoDataFrame capacity_plph_dict: Dict The designed edge capacity (per lane per hour). free_flow_speed_dict: Dict The free-flow edge speed of different types of road links. urban_flow_speed_dict: Dict The maximum vehicle operating speed restriction in urban areas. min_flow_speed_dict: Dict The minimum vehicle operating speeds. max_flow_speed_dict: Dict The maximum flow speed of the flooded road links. Returns ------- road_links: gpd.GeoDataFrame Road links with added attributes. """ # reclassification road_links = edge_reclassification_func(road_links) road_links = edge_initial_speed_func( road_links, free_flow_speed_dict, urban_flow_speed_dict, min_flow_speed_dict, max_flow_speed_dict, ) assert "combined_label" in road_links.columns, "combined_label column not exists!" assert ( "initial_flow_speeds" in road_links.columns ), "initial_flow_speeds column not exists!" # initialise key variables road_links["acc_flow"] = 0.0 road_links["acc_capacity"] = ( road_links["combined_label"].map(capacity_plph_dict) * road_links["lanes"] * 24 ) road_links["acc_speed"] = road_links["initial_flow_speeds"] # remove invalid road links road_links = road_links[road_links.acc_capacity > 0.5].reset_index(drop=True) return road_links
[docs] def speed_flow_func( combined_label: str, total_flow: float, initial_flow_speed: float, min_flow_speed: float, flow_breakpoint_dict: Dict[str, float], ) -> float: """Create the speed-flow curves for different types of roads. Parameters ---------- combined_label: str The type or road links. total_flow: float The number of vehicles on road links. initial_flow_speed: float The initial traffic speeds of road links. min_flow_speed: float The minimum traffic speeds of road links. flow_breakpoint_dict: dict The breakpoint flows after which traffic flows starts to decrease. Returns ------- speed: float The "real-time" traffic speeds on road links. """ vp = total_flow / 24 if combined_label == "M" and vp > flow_breakpoint_dict["M"]: speed = initial_flow_speed - 0.033 * (vp - flow_breakpoint_dict["M"]) elif combined_label == "A_single" and vp > flow_breakpoint_dict["A_single"]: speed = initial_flow_speed - 0.05 * (vp - flow_breakpoint_dict["A_single"]) elif combined_label == "A_dual" and vp > flow_breakpoint_dict["A_dual"]: speed = initial_flow_speed - 0.033 * (vp - flow_breakpoint_dict["A_dual"]) elif combined_label == "B" and vp > flow_breakpoint_dict["B"]: speed = initial_flow_speed - 0.05 * (vp - flow_breakpoint_dict["B"]) else: speed = initial_flow_speed speed = max(speed, min_flow_speed) return speed
[docs] def create_igraph_network( road_links: gpd.GeoDataFrame, ) -> igraph.Graph: """Create an undirected igraph network.""" road_links["edge_length_mile"] = ( road_links.geometry.length * cons.CONV_METER_TO_MILE ) road_links["time_hr"] = ( 1.0 * road_links.edge_length_mile / road_links.acc_speed ) # return inf -> acc_speed is 0? road_links["voc_per_km"] = np.vectorize(voc_func, otypes=None)(road_links.acc_speed) ( road_links["weight"], road_links["time_cost"], road_links["operating_cost"], ) = np.vectorize(cost_func, otypes=None)( road_links["time_hr"], road_links["edge_length_mile"], road_links["voc_per_km"], road_links["average_toll_cost"], ) graph_df = road_links[ [ "from_id", "to_id", "e_id", "weight", "time_cost", "operating_cost", "average_toll_cost", ] ] network = igraph.Graph.TupleList( graph_df.itertuples(index=False), edge_attrs=list(graph_df.columns)[2:], directed=False, ) return network
[docs] def update_network_structure( network: igraph.Graph, temp_edge_flow: pd.DataFrame, road_links: gpd.GeoDataFrame, ) -> igraph.Graph: """Drop fully utilised edges and Update edge weights. Parameters ---------- network: igraph network temp_edge_flow: the remaining edge capacity at the current iteration road_links: road links Returns ------- The updated igraph network """ # drop fully utilised edges from the network zero_capacity_edges = set( temp_edge_flow.loc[temp_edge_flow["acc_capacity"] < 0.5, "e_id"].tolist() ) edges_to_remove = [e.index for e in network.es if e["e_id"] in zero_capacity_edges] network.delete_edges(edges_to_remove) print(f"The remaining number of edges in the network: {len(list(network.es))}") # update edges' weights remaining_edges = network.es["e_id"] graph_df = road_links[road_links.e_id.isin(remaining_edges)][ [ "from_id", "to_id", "e_id", "weight", "time_cost", "operating_cost", "average_toll_cost", ] ] network = igraph.Graph.TupleList( graph_df.itertuples(index=False), edge_attrs=list(graph_df.columns)[2:], directed=False, ) return network
[docs] def extract_od_pairs( od: pd.DataFrame, ) -> Tuple[List[str], Dict[str, List[str]], Dict[str, List[int]]]: """Prepare the OD matrix. Parameters ---------- od: pd.DataFrame Table of origin-destination passenger flows. Returns ------- list_of_origin_nodes: list A list of origin nodes. dict_of_destination_nodes: dict[str, list[str]] A dictionary recording a list of destination nodes for each origin node. dict_of_origin_supplies: dict[str, list[int]] A dictionary recording a list of flows for each origin-destination pair. """ list_of_origin_nodes = [] dict_of_destination_nodes: Dict[str, List[str]] = defaultdict(list) dict_of_origin_supplies: Dict[str, List[float]] = defaultdict(list) for row in od.itertuples(): from_node = row.origin_node to_node = row.destination_node Count: float = row.Car21 list_of_origin_nodes.append(from_node) # [nd_id...] dict_of_destination_nodes[from_node].append(to_node) # {nd_id: [nd_id...]} dict_of_origin_supplies[from_node].append(Count) # {nd_id: [car21...]} # Extract identical origin nodes list_of_origin_nodes = list(set(list_of_origin_nodes)) list_of_origin_nodes.sort() return ( list_of_origin_nodes, dict_of_destination_nodes, dict_of_origin_supplies, )
[docs] def update_od_matrix( temp_flow_matrix: pd.DataFrame, remain_od: pd.DataFrame, ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: """Divide the OD matrix into allocated and unallocated sections. Parameters ---------- temp_flow_matrix: origin, destination, path, flow, operating_cost_per_flow, time_cost_per_flow, toll_cost_per_flow remain_od: origin, destination, flow Returns ------- temp_flow_matrix: flow matrix with valid path isolated_flow_matrix: flow matrix with no path remain_od: the remaining od matrix """ mask = temp_flow_matrix["path"].apply(lambda x: len(x) == 0) isolated_flow_matrix = temp_flow_matrix.loc[ mask, ["origin", "destination", "flow"] ] # drop the cost columns # print(f"Non_allocated_flow: {isolated_flow_matrix.flow.sum()}") temp_flow_matrix = temp_flow_matrix[~mask] remain_origins = temp_flow_matrix.origin.unique().tolist() remain_destinations = temp_flow_matrix.destination.unique().tolist() remain_od = remain_od[ ( remain_od.origin_node.isin(remain_origins) & (remain_od.destination_node.isin(remain_destinations)) ) ].reset_index(drop=True) return temp_flow_matrix, remain_od, isolated_flow_matrix
[docs] def find_least_cost_path( params: Tuple, ) -> Tuple[int, List[str], List[int], List[float]]: """Find the least-cost path for each OD trip. Parameters ----------- params: Tuple The first element: the origin node (index). The second element: a list of destination nodes (indexes). The third element: a list of outbound trips from origin to its connected destinations. Returns -------- idx_of_origin_node: int Same as input. list_of_idx_destination_nodes: list Same as input. paths: list The least-cost paths. flows: list Same as input. """ origin_node, destination_nodes, flows = params paths = shared_network.get_shortest_paths( v=origin_node, to=destination_nodes, weights="weight", mode="out", output="epath", ) # paths: o - d(s) edge_paths = [] operating_costs = [] time_costs = [] toll_costs = [] for path in paths: # path: o - d edge_path = [] operating_cost = [] time_cost = [] toll_cost = [] for p in path: # p: each line segment edge_path.append(shared_network.es[p]["e_id"]) operating_cost.append(shared_network.es[p]["operating_cost"]) time_cost.append(shared_network.es[p]["time_cost"]) toll_cost.append(shared_network.es[p]["average_toll_cost"]) edge_paths.append(edge_path) # a list of lists operating_costs.append(sum(operating_cost)) # a list of values time_costs.append(sum(time_cost)) # a list of values toll_costs.append(sum(toll_cost)) # a list of values return ( origin_node, destination_nodes, edge_paths, flows, operating_costs, time_costs, toll_costs, )
[docs] def compute_edge_costs( # edge_weight_df, path: List[int], ) -> Tuple[float, float, float]: """Calculate the total travel cost for the path Parameters ---------- path: List A list of edge indexes that define the path. Returns ------- od_voc: float Vehicle operating costs of each trip. od_vot: float Value of time of each trip. od_toll: float Toll costs of each trip. """ od_voc = edge_weight_df.loc[edge_weight_df["e_id"].isin(path), "voc"].sum() od_vot = edge_weight_df.loc[edge_weight_df["e_id"].isin(path), "time_cost"].sum() od_toll = edge_weight_df.loc[ edge_weight_df["e_id"].isin(path), "average_toll_cost" ].sum() return (od_voc, od_vot, od_toll)
[docs] def worker_init_path( shared_network_pkl: bytes, ) -> None: """Worker initialisation in multiprocesses to create a shared network that could be used across different workers. Parameters ---------- shared_network_pkl: bytes The pickled file of the igraph network. Returns ------- None. """ global shared_network shared_network = pickle.loads(shared_network_pkl) return None
[docs] def worker_init_edge( shared_network_pkl: bytes, shared_weight_pkl: bytes, ) -> None: """Worker initialisation in multiprocesses to create a shared network that could be used across different workers. Parameters ---------- shared_network_pkl: bytes The pickled file of the igraph network. shared_weight_pkl: bytes The pickled flle of a dictionary of network edge weights. Returns ------- None. """ # print(os.getpid()) global shared_network shared_network = pickle.loads(shared_network_pkl) global edge_weight_df edge_weight_df = pickle.loads(shared_weight_pkl) return None
[docs] def network_flow_model( road_links: gpd.GeoDataFrame, network: igraph.Graph, remain_od: pd.DataFrame, flow_breakpoint_dict: Dict[str, float], num_of_cpu, ) -> Tuple[gpd.GeoDataFrame, List, List]: """Process-based Network Flow Simulation. Parameters ---------- road_links: road links network: igraph network remain_od: od matrix flow_breakpoint_dict: flow breakpoints of different types of road links Returns ------- road_links: with added attributes (acc_flow, acc_capacity, acc_speed) isolation: non-allocated od matrix odpfc: allocated od matrix """ partial_speed_flow_func = partial( speed_flow_func, flow_breakpoint_dict=flow_breakpoint_dict, ) total_remain = remain_od.Car21.sum() print(f"The initial supply is {total_remain}") number_of_edges = len(list(network.es)) print(f"The initial number of edges in the network: {number_of_edges}") number_of_origins = remain_od.origin_node.unique().shape[0] print(f"The initial number of origins: {number_of_origins}") number_of_destinations = remain_od.destination_node.unique().shape[0] print(f"The initial number of destinations: {number_of_destinations}") # starts total_cost = time_equiv_cost = operating_cost = toll_cost = 0 odpfc, isolation = [], [] initial_sumod = remain_od["Car21"].sum() assigned_sumod = 0 iter_flag = 1 while total_remain > 0: print(f"No.{iter_flag} iteration starts:") # check isolations mask = remain_od.origin_node.isin(network.vs["name"]) & ( remain_od.destination_node.isin(network.vs["name"]) ) isolation.extend( remain_od.loc[ ~mask, ["origin_node", "destination_node", "Car21"] ].values.tolist() ) remain_od = remain_od[mask].reset_index(drop=True) # dump the network and edge weight for shared use in multiprocessing shared_network_pkl = pickle.dumps(network) # find the least-cost path for each OD trip list_of_spath = [] args = [] list_of_origin_nodes = list(set(remain_od["origin_node"].tolist())) for origin_node in list_of_origin_nodes: destination_nodes = remain_od.loc[ remain_od["origin_node"] == origin_node, "destination_node" ].tolist() flows = remain_od.loc[ remain_od["origin_node"] == origin_node, "Car21" ].tolist() args.append((origin_node, destination_nodes, flows)) st = time.time() with Pool( processes=num_of_cpu, initializer=worker_init_path, initargs=(shared_network_pkl,), ) as pool: list_of_spath = pool.map(find_least_cost_path, args) # [origin(name), destinations(name), path(idx), flow(int)] print(f"The least-cost path flow allocation time: {time.time() - st}.") temp_flow_matrix = pd.DataFrame( list_of_spath, columns=[ "origin", "destination", "path", "flow", "operating_cost_per_flow", "time_cost_per_flow", "toll_cost_per_flow", ], ).explode( [ "destination", "path", "flow", "operating_cost_per_flow", "time_cost_per_flow", "toll_cost_per_flow", ] ) # calculate the non-allocated flows and remaining flows ( temp_flow_matrix, remain_od, isolated_flow_matrix, ) = update_od_matrix(temp_flow_matrix, remain_od) assigned_sumod += temp_flow_matrix.flow.sum() percentage_sumod = assigned_sumod / initial_sumod # update the isolated flows isolation.extend(isolated_flow_matrix.to_numpy().tolist()) # calculate the remaining flows number_of_origins = remain_od.origin_node.unique().shape[0] number_of_destinations = remain_od.destination_node.unique().shape[0] print(f"The remaining number of origins: {number_of_origins}") print(f"The remaining number of destinations: {number_of_destinations}") total_remain = remain_od.Car21.sum() if total_remain == 0: print("Iteration stops: there is no remaining flows!") break # calculate edge flows: e_id, flow temp_edge_flow = get_flow_on_edges( temp_flow_matrix, "e_id", "path", "flow", ) # add/update edge attributes temp_edge_flow = temp_edge_flow.merge( road_links[ [ "e_id", "combined_label", "min_flow_speeds", "initial_flow_speeds", "acc_flow", "acc_capacity", "acc_speed", ] ], on="e_id", how="left", ) temp_edge_flow["est_overflow"] = ( temp_edge_flow["flow"] - temp_edge_flow["acc_capacity"] ) # estimated overflow: positive -> has overflow max_overflow = temp_edge_flow["est_overflow"].max() print(f"The maximum amount of edge overflow: {max_overflow}") if max_overflow <= 0 or percentage_sumod > 0.9: # update the origin-destination-path-cost matrix odpfc.extend(temp_flow_matrix.to_numpy().tolist()) # add/update edge key variables: flow/speed/capacity temp_edge_flow["acc_flow"] = ( temp_edge_flow["flow"] + temp_edge_flow["acc_flow"] ) temp_edge_flow["acc_capacity"] = ( temp_edge_flow["acc_capacity"] - temp_edge_flow["flow"] ) temp_edge_flow["acc_speed"] = np.vectorize(partial_speed_flow_func)( temp_edge_flow["combined_label"], temp_edge_flow["acc_flow"], temp_edge_flow["initial_flow_speeds"], temp_edge_flow["min_flow_speeds"], ) # update road_links (flows, capacities, and speeds) based on temp_edge_flow road_links = road_links.set_index("e_id") road_links.update( temp_edge_flow.set_index("e_id")[ ["acc_flow", "acc_capacity", "acc_speed"] ] ) road_links = road_links.reset_index() # update travel costs: based on temp_flow_matrix # operating costs temp_cost = ( temp_flow_matrix.operating_cost_per_flow * temp_flow_matrix.flow ).sum() operating_cost += temp_cost # time costs temp_cost = ( temp_flow_matrix.time_cost_per_flow * temp_flow_matrix.flow ).sum() time_equiv_cost += temp_cost # toll costs temp_cost = ( temp_flow_matrix.toll_cost_per_flow * temp_flow_matrix.flow ).sum() toll_cost += temp_cost # total cost total_cost += time_equiv_cost + operating_cost + toll_cost if max_overflow > 0: print( "Iteration stops: " f"with {percentage_sumod * 100}% flows sent to the network!" ) else: print( "Iteration stops: there is no edge overflow! " f"with {percentage_sumod * 100}% flows sent to the network!" ) break # calculate the ratio of flow adjustment (0 < r < 1) temp_edge_flow["r"] = np.where( temp_edge_flow["flow"] != 0, temp_edge_flow["acc_capacity"] / temp_edge_flow["flow"], np.nan, ) r = temp_edge_flow.r.min() if r < 0: print("Error: r < 0") break if r == 0: print("Error: existing network has zero-capacity links!") break if r >= 1: print("Error: r >= 1") break print(f"r = {r}") # update the origin-destination-path-cost matrix odpfc.extend( (temp_flow_matrix.assign(flow=temp_flow_matrix["flow"] * r)) .to_numpy() .tolist() ) # add/update edge key variables: flows/speeds/capacities temp_edge_flow["adjusted_flow"] = temp_edge_flow["flow"] * r temp_edge_flow["acc_flow"] = ( temp_edge_flow.acc_flow + temp_edge_flow.adjusted_flow ) temp_edge_flow["acc_capacity"] = ( temp_edge_flow.acc_capacity - temp_edge_flow.adjusted_flow ) temp_edge_flow["acc_speed"] = np.vectorize(partial_speed_flow_func)( temp_edge_flow["combined_label"], temp_edge_flow["acc_flow"], temp_edge_flow["initial_flow_speeds"], temp_edge_flow["min_flow_speeds"], ) # update road links (flows, capacities, speeds) based on temp_edge_flow road_links = road_links.set_index("e_id") road_links.update( temp_edge_flow.set_index("e_id")[["acc_flow", "acc_capacity", "acc_speed"]] ) road_links = road_links.reset_index() # update travel costs based on temp_flow_matrix # operating costs temp_cost = ( temp_flow_matrix.operating_cost_per_flow * temp_flow_matrix.flow * r ).sum() operating_cost += temp_cost # time costs temp_cost = ( temp_flow_matrix.time_cost_per_flow * temp_flow_matrix.flow * r ).sum() time_equiv_cost += temp_cost # toll costs temp_cost = ( temp_flow_matrix.toll_cost_per_flow * temp_flow_matrix.flow * r ).sum() toll_cost += temp_cost # total cost total_cost += time_equiv_cost + operating_cost + toll_cost # update remaining od flows remain_od["Car21"] = remain_od["Car21"] * (1 - r) remain_od.loc[remain_od.Car21 < 0.5, "Car21"] = 0 total_remain = remain_od.Car21.sum() print(f"The total remaining supply (after flow adjustment) is: {total_remain}") # update network structure (nodes and edges) network = update_network_structure(network, temp_edge_flow, road_links) iter_flag += 1 cList = [time_equiv_cost, operating_cost, toll_cost, total_cost] # update the road links attributes road_links.acc_flow = road_links.acc_flow.astype(int) road_links.acc_capacity = road_links.acc_capacity.astype(int) road_links = road_links.iloc[:, :-6] # drop cost-related columns print("The flow simulation is completed!") print(f"total travel cost is (£): {total_cost}") print(f"total time-equiv cost is (£): {time_equiv_cost}") print(f"total operating cost is (£): {operating_cost}") print(f"total toll cost is (£): {toll_cost}") return road_links, isolation, odpfc, cList