Source code for nird.road

"""Road transport model functions
"""

from collections import defaultdict
from functools import partial
from typing import Dict, List, Union, Tuple

import igraph
import numpy as np
import numpy.typing as npt
import pandas as pd
import geopandas as gpd
from tqdm.auto import tqdm


import nird.constants as cons
from nird.utils import get_flow_on_edges


[docs] def select_partial_roads( road_links: gpd.GeoDataFrame, road_nodes: gpd.GeoDataFrame, col_name: str, list_of_values: List[str], ) -> Tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]: """Extract major roads Parameters ---------- road_links: gpd.GeoDataFrame Edges of the road network. road_nodes: gpd.GeoDataFrame Junctions/nodes of the road network. col_name: str The name of column containing different road classifications. list_of_values: list The road types to be selected, e.g., [Motorways, A Roads, B Roads]. Returns ------- selected_links: gpd.GeoDataFrame The selected types of road links selected_nodes: gpd.GeoDataFrame The selected types of road nodes. """ mask = road_links[col_name].isin(list_of_values) selected_links = road_links[mask].copy() selected_links["e_id"] = selected_links["id"] selected_links["from_id"] = selected_links["start_node"] selected_links["to_id"] = selected_links["end_node"] # road nodes selection sel_node_idx = list( set( selected_links.start_node.unique().tolist() + selected_links.end_node.unique().tolist() ) ) selected_nodes = road_nodes[road_nodes.id.isin(sel_node_idx)].copy() selected_nodes.reset_index(drop=True, inplace=True) selected_nodes["nd_id"] = selected_nodes.id selected_nodes["lat"] = selected_nodes.geometry.y selected_nodes["lon"] = selected_nodes.geometry.x return selected_links, selected_nodes
[docs] def create_urban_mask(etisplus_urban_roads: gpd.GeoDataFrame) -> gpd.GeoDataFrame: """Urban road classification Parameters ---------- etisplus_urban_roads: gpd.GeoDataFrame ETIS road networks: https://ftp.demis.nl/outgoing/etisplus/datadeliverables/ Returns ------- gpd.GeoDataFrame Polygons indicating urban areas of the Europe. """ etisplus_urban_roads = etisplus_urban_roads[ etisplus_urban_roads["Urban"] == 1 ].reset_index( drop=True ) # extract urban road segements buf_geom = etisplus_urban_roads.geometry.buffer( 500 ) # create a buffer of 500 meters uni_geom = buf_geom.unary_union # feature union within the same layer temp = gpd.GeoDataFrame(geometry=[uni_geom]) new_geom = ( temp.explode() ) # explode multi-polygons into separate individual polygons cvx_geom = ( new_geom.convex_hull ) # generate convex polygon for each individual polygon urban_mask = gpd.GeoDataFrame( geometry=cvx_geom[0], crs=etisplus_urban_roads.crs ).to_crs("27700") return urban_mask
[docs] def label_urban_roads( road_links: gpd.GeoDataFrame, urban_mask: gpd.GeoDataFrame ) -> gpd.GeoDataFrame: """Label road segments with "urban" attribute Parameters ---------- road_links: gpd.GeoDataFrame Edges of the road network. urban_mask: gpd.GeoDataFrame Polygons indicating urban areas of the Europe. Returns ------- gpd.GeoDataFrame Edges of the road network with urban attributes. """ temp_file = road_links.sjoin(urban_mask, how="left") temp_file["urban"] = temp_file["index_right"].apply( lambda x: 0 if pd.isna(x) else 1 ) max_values = temp_file.groupby("e_id")["urban"].max() road_links = road_links.merge(max_values, on="e_id", how="left") return road_links
[docs] def voc_func(speed: float) -> float: # speed: mile/hour """Calculate value of cost for travelling Parameters ---------- speed: float The average flow rate: mile/hour Returns ------- float Unit cost: pound/km """ # d = distance * conv_mile_to_km # km s = speed * cons.CONV_MILE_TO_KM # km/hour lpkm = 0.178 - 0.00299 * s + 0.0000205 * (s**2) # fuel cost (liter/km) c = 140 * lpkm * cons.PENCE_TO_POUND # average petrol cost: 140 pence/liter return c # pound/km
[docs] def cost_func( time: float, distance: float, voc: float ) -> float: # time: hour, distance: mile/hour, voc: pound/km """Calculate the time-equivalent cost for traveling Parameters ---------- time: float The time of travel: hour distance: float The distance of travel: mile voc: Value of Cost: pound/km Returns ------- float Time-equivalent cost: hour """ ave_occ = 1.6 vot = 20 # value of time: pounds/hour d = distance * cons.CONV_MILE_TO_KM # km t = time + d * voc / (ave_occ * vot) return t # hour
[docs] def initial_speed_func( road_type: str, form_of_road: str, free_flow_speed_dict: Dict[str, float], ) -> Union[float, None]: """Append free-flow speed to each road segment Parameters ---------- road_type: str The column of road type. form_of_road: str The column of form of road. free_flow_speed_dict: dict Free-flow speeds of different combined road types. Returns ------- float or None The initial speed: miles/hour """ if road_type == "M": return free_flow_speed_dict["M"] elif road_type == "A": if form_of_road == "Single Carriageway": return free_flow_speed_dict["A_single"] else: return free_flow_speed_dict["A_dual"] elif road_type == "B": return free_flow_speed_dict["B"] else: print("Error: initial speed!") return None
[docs] def speed_flow_func( road_type: str, isUrban: int, vp: float, free_flow_speed_dict: Dict[str, float], flow_breakpoint_dict: Dict[str, int], min_speed_cap: Dict[str, float], urban_speed_cap: Dict[str, float], ) -> Union[float, None]: """Update the average flow rate of each road segment in terms of flow changes. Parameters ---------- road_type: str The column of road type. isUrban: int 1: urban, 0: suburban vp: float The average daily flow, cars/day. free_flow_speed_dict: dict Free-flow speeds of different combined road types, mile/hour. min_speed_cap: dict The restriction on the lowest flow rate, mile/hour. urban_speed_cap: dict The restriction on the maximum flow rate in urban areas, mile/hour. Returns ------- float OR None The average flow rate, mile/hour. """ vp = vp / 24 if road_type == "M": initial_speed = free_flow_speed_dict["M"] if vp > flow_breakpoint_dict["M"]: # speed starts to decrease vt = max( (initial_speed - 0.033 * (vp - flow_breakpoint_dict["M"])), min_speed_cap["M"], ) if isUrban: return min(urban_speed_cap["M"], vt) else: return vt else: if isUrban: return min(urban_speed_cap["M"], initial_speed) else: return initial_speed elif road_type == "A_single": # A_single/A_dual initial_speed = free_flow_speed_dict["A_single"] if vp > flow_breakpoint_dict["A_single"]: vt = max( (initial_speed - 0.05 * (vp - flow_breakpoint_dict["A_single"])), min_speed_cap["A_single"], ) if isUrban: return min(urban_speed_cap["A_single"], vt) else: return vt else: if isUrban: return min(urban_speed_cap["A_single"], initial_speed) else: return initial_speed elif road_type == "A_dual": initial_speed = free_flow_speed_dict["A_dual"] if vp > flow_breakpoint_dict["A_dual"]: vt = max( (initial_speed - 0.033 * (vp - flow_breakpoint_dict["A_dual"])), min_speed_cap["A_dual"], ) if isUrban: return min(urban_speed_cap["A_dual"], vt) else: return vt else: if isUrban: return min(urban_speed_cap["A_dual"], initial_speed) else: return initial_speed elif road_type == "B": initial_speed = free_flow_speed_dict["B"] if isUrban: return min(urban_speed_cap["B"], initial_speed) else: return initial_speed else: print("Please select the road type from [M, A, B]!") return None
[docs] def filter_less_than_one(arr: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: """Convert values less than one to zero""" return np.where(arr >= 1, arr, 0.0)
[docs] def find_nearest_node( zones: gpd.GeoDataFrame, road_nodes: gpd.GeoDataFrame ) -> Dict[str, str]: """Find nearest network node for each admin centroid Parameters ---------- zones: gpd.GeoDataFrame Administrative population-weighted centroids. road_nodes: gpd.GeoDataFrame Nodes of road network. Returns ------- dict Convert from centroids to road nodes. """ nearest_node_dict = {} # node_idx: zone_idx for zidx, z in zones.iterrows(): closest_road_node = road_nodes.sindex.nearest(z.geometry, return_all=False)[1][ 0 ] # for the first [x]: # [0] represents the index of geometry; # [1] represents the index of gdf # the second [x] represents the No. of closest item in the returned list, # which only return one nearest node in this case nearest_node_dict[zidx] = closest_road_node zone_to_node = {} for zidx in range(zones.shape[0]): z = zones.loc[zidx, "code"] nidx = nearest_node_dict[zidx] n = road_nodes.loc[nidx, "nd_id"] zone_to_node[z] = n return zone_to_node
[docs] def od_interpret( od_matrix: pd.DataFrame, zone_to_node: Dict[str, str], col_origin: str, col_destination: str, col_count: str, ) -> Tuple[List[str], Dict[str, List[str]], Dict[str, List[float]]]: """Generate a list of origins along with their associated destinations and outbound trips. Parameters ---------- od_matrix: pd.DataFrame Trips between Origin-Destination pairs. zone_to_node: dict Dictionary for conversion from zone centroids to road nodes. col_origin: str Column of origins. col_destination: str Column of destinations. col_count: str Column that records the number of trips of each OD pair. Returns ------- list_of_origins: list Origins of the OD matrix destination_dict: dict Destinations attached to each origin. supply_dict: dict The number of outbound trips from each origin. """ list_of_origins = [] destination_dict: Dict[str, List[str]] = defaultdict(list) supply_dict: Dict[str, List[float]] = defaultdict(list) for idx in tqdm(range(od_matrix.shape[0]), desc="Processing"): from_zone: str = od_matrix.loc[idx, col_origin] # type: ignore to_zone: str = od_matrix.loc[idx, col_destination] # type: ignore count: float = od_matrix.loc[idx, col_count] # type: ignore try: from_node = zone_to_node[from_zone] except KeyError: print(f"No accessible network node to attached to home/origin {from_zone}!") try: to_node = zone_to_node[to_zone] except KeyError: print( f"No accessible network node attached to workplace/destination {to_zone}!" ) list_of_origins.append(from_node) # origin destination_dict[from_node].append(to_node) # origin -> destinations supply_dict[from_node].append(count) # origin -> supply return list_of_origins, destination_dict, supply_dict
[docs] def create_igraph_network( node_name_to_index: Dict[str, int], road_links: gpd.GeoDataFrame, road_nodes: gpd.GeoDataFrame, initialSpeeds: Dict[str, float], ) -> igraph.Graph: """Network creation using igraph. Parameters ---------- node_name_to_index: dict The relation between node's name and node's index in a network. road_links: gpd.GeoDataFrame Edges of a road network. road_nodes: gpd.GeoDataFrame Nodes of a road network. initialSpeeds: dict Free-flow speeds of different types of roads. Returns ------- igraph.Graph A road network. """ nodeList = [ ( node_name_to_index[node.id], { "lon": node.geometry.x, "lat": node.geometry.y, }, ) for _, node in road_nodes.iterrows() ] # (id, {x, y}) edgeNameList = [] edgeList = [] edgeLengthList = [] edgeTypeList = [] edgeFormList = [] for _, link in road_links.iterrows(): edge_name = link.e_id edge_from = link.from_id edge_to = link.to_id edge_length = link.geometry.length * cons.CONV_METER_TO_MILE # miles edge_type = link.road_classification[0] edge_form = link.form_of_way edgeNameList.append(edge_name) edgeList.append((node_name_to_index[edge_from], node_name_to_index[edge_to])) edgeLengthList.append(edge_length) edgeTypeList.append(edge_type) edgeFormList.append(edge_form) edgeSpeedList = np.vectorize(initial_speed_func)( edgeTypeList, edgeFormList, initialSpeeds, ) # miles/hour # travel time timeList = np.array(edgeLengthList) / np.array(edgeSpeedList) # hour # total travel cost (time-equivalent) vocList = np.vectorize(voc_func)(edgeSpeedList) # £/km costList = np.vectorize(cost_func)(timeList, edgeLengthList, vocList) # hour weightList = (costList * 3600).tolist() # seconds test_net = igraph.Graph(directed=False) test_net.add_vertices(nodeList) test_net.vs["nd_id"] = road_nodes.id.tolist() test_net.add_edges(edgeList) test_net.es["edge_name"] = edgeNameList test_net.es["weight"] = weightList return test_net
[docs] def initialise_igraph_network( road_links: gpd.GeoDataFrame, initial_capacity_dict: Dict[str, int], initial_speed_dict: Dict[str, float], col_road_classification: str, ) -> gpd.GeoDataFrame: """Road Network Initialisation Parameters ---------- road_links: gpd.GeoDataFrame Edges of the road network. initial_capacity_dict: dict The capacity of different types of roads. initial_speed_dict: dict Free-flow speeds of different types of roads. col_road_classification: str The column of road classification. Returns ------- gpd.GeoDataFrame Road links with combined_label, flow counts, capacities, and flow rages. """ # road_types: M, A, B road_links["road_type_label"] = road_links[col_road_classification].str[0] # road_forms: M, A_dual, A_single, B road_links["combined_label"] = road_links["road_type_label"] road_links.loc[road_links.road_type_label == "A", "combined_label"] = "A_dual" road_links.loc[ ( (road_links.road_type_label == "A") & (road_links.form_of_way.str.contains("Single")) ), "combined_label", ] = "A_single" # only single carriageways of A roads # accumulated edge flows (cars/day) road_links["acc_flow"] = 0.0 # remaining edge capacities (cars/day) road_links["acc_capacity"] = road_links["combined_label"].map(initial_capacity_dict) # average edge flow rates (miles/hour) road_links["ave_flow_rate"] = road_links["combined_label"].map(initial_speed_dict) return road_links
[docs] def update_od_matrix( temp_flow_matrix: pd.DataFrame, supply_dict: Dict[str, List[int]], destination_dict: Dict[str, List[str]], ) -> Tuple[List[str], Dict[str, List[int]], Dict[str, List[str]], float]: """Drop the origin-destination pairs with no accessible link or unallocated trips Parameters ---------- temp_flow_matrix: pd.DataFrame A temporary flow matrix: [origins, destinations, paths, flows] supply_dict: dict The number of outbound trips from each origin. destination_dict: dict List of destinations attached with each origin. Returns ------- new_list_of_origins: list new_supply_dict: dict new_destination: dict non_allocated_flow: float """ # drop the origin-destination pairs ("path = []") temp_df = temp_flow_matrix[temp_flow_matrix["path"].apply(lambda x: len(x) == 0)] non_allocated_flow = temp_df.flow.sum() print(f"Non_allocated_flow: {non_allocated_flow}") for _, row in temp_df.iterrows(): origin_temp = row["origin"] destination_temp = row["destination"] idx_temp = destination_dict[origin_temp].index(destination_temp) destination_dict[origin_temp].remove(destination_temp) del supply_dict[origin_temp][idx_temp] # drop origins with zero supply new_supply_dict = {} new_destination_dict = {} new_list_of_origins = [] for origin, list_of_counts in supply_dict.items(): tt_supply = sum(list_of_counts) if tt_supply > 0: new_list_of_origins.append(origin) new_counts = [od_flow for od_flow in list_of_counts if od_flow != 0] new_supply_dict[origin] = new_counts new_destination_dict[origin] = [ dest for idx, dest in enumerate(destination_dict[origin]) if list_of_counts[idx] != 0 ] return ( new_list_of_origins, new_supply_dict, new_destination_dict, non_allocated_flow, )
[docs] def update_network_structure( network: igraph.Graph, length_dict: Dict[str, float], speed_dict: Dict[str, float], temp_edge_flow: pd.DataFrame, ) -> Tuple[igraph.Graph, Dict[int, str]]: """Update the network structure (links and nodes) Parameters ---------- network: igraph.Graph The road network. length_dict: dict Lengths of road links. speed_dict: The flow rates of road links. temp_edge_flow: pd.DataFrame A temporary file to record edge flows in the network flow model. Returns ------- network: igraph.Graph the updated network. edge_index_to_name: The updated edge_index_to_name dict. """ zero_capacity_edges = set( temp_edge_flow.loc[temp_edge_flow["remaining_capacity"] < 1, "e_id"].tolist() ) net_edges = network.es["edge_name"] idx_to_remove = [ idx for idx, element in enumerate(net_edges) if element in zero_capacity_edges ] # drop links that have reached their full capacities network.delete_edges(idx_to_remove) number_of_edges = len(list(network.es)) print(f"The remaining number of edges in the network: {number_of_edges}") # update edge weights (time: seconds) remaining_edges = network.es["edge_name"] lengthList = list( map(length_dict.get, filter(length_dict.__contains__, remaining_edges)) ) speedList = list( map(speed_dict.get, filter(speed_dict.__contains__, remaining_edges)) ) timeList = np.where( np.array(speedList) != 0, np.array(lengthList) / np.array(speedList), np.nan ) # hours if np.isnan(timeList).any(): print("ERROR: Network contains congested edges.") exit() else: vocList = np.vectorize(voc_func)(speedList) timeList2 = np.vectorize(cost_func)(timeList, lengthList, vocList) # hours weightList = (timeList2 * 3600).tolist() # seconds network.es["weight"] = weightList # update idx_to_name dict edge_index_to_name = { idx: name for idx, name in enumerate(network.es["edge_name"]) } return network, edge_index_to_name
[docs] def network_flow_model( network: igraph.Graph, road_links: gpd.GeoDataFrame, node_name_to_index: Dict[str, int], edge_index_to_name: Dict[int, str], list_of_origins: List[str], supply_dict: Dict[str, List[int]], destination_dict: Dict[str, List[str]], free_flow_speed_dict: Dict[str, float], flow_breakpoint_dict: Dict[str, int], min_speed_cap: Dict[str, float], urban_speed_cap: Dict[str, float], col_eid: str, ) -> Tuple[Dict[str, float], Dict[str, int], Dict[str, int]]: """Network flow model Parameters ---------- network: igraph.Graph The road network. road_links: gpd.GeoDataFrame Edges of the road network. node_name_to_index: dict Convert from node name to node index of the road network. edge_index_to_name: dict Convert from edge index to edge name of the road network. list_of_origins: list Origins of the OD matrix. supply_dict: dict The counts of outbound trips from each origin of the OD matrix. destination_dict: dict List of destinations attached to each origin of the OD matrix. free_flow_speed_dict: dict Free-flow speeds of different combined road types, mile/hour. min_speed_cap: dict The restriction on the lowest flow rate, mile/hour. urban_speed_cap: dict The restriction on the maximum flow rate in urban areas, mile/hour. Returns ------- acc_speed_dict: dict The updated average flow rate on each road link. acc_flow_dict The accumulated flow amount on each road link. acc_capacity_dict The remaining capacity of each road link. """ partial_speed_flow_func = partial( speed_flow_func, free_flow_speed_dict=free_flow_speed_dict, flow_breakpoint_dict=flow_breakpoint_dict, min_speed_cap=min_speed_cap, urban_speed_cap=urban_speed_cap, ) total_remain = sum(sum(values) for values in supply_dict.values()) print(f"The initial total supply is {total_remain}") number_of_edges = len(list(network.es)) print(f"The initial number of edges in the network: {number_of_edges}") print(f"The initial number of origins: {len(list_of_origins)}") number_of_destinations = sum(len(value) for value in destination_dict.values()) print(f"The initial number of destinations: {number_of_destinations}") # road link properties edge_cbtype_dict: Dict[str, str] = road_links.set_index(col_eid)[ "combined_label" ].to_dict() edge_isUrban_dict: Dict[str, int] = road_links.set_index(col_eid)["urban"].to_dict() edge_length_dict: Dict[str, float] = ( road_links.set_index(col_eid)["geometry"].length * cons.CONV_METER_TO_MILE ).to_dict() acc_flow_dict: Dict[str, int] = road_links.set_index(col_eid)["acc_flow"].to_dict() acc_capacity_dict: Dict[str, int] = road_links.set_index(col_eid)[ "acc_capacity" ].to_dict() acc_speed_dict: Dict[str, float] = road_links.set_index(col_eid)[ "ave_flow_rate" ].to_dict() # starts iter_flag = 1 total_non_allocated_flow = 0.0 while total_remain > 0.0: print(f"No.{iter_flag} iteration starts:") list_of_spath = [] # find the shortest path for each origin-destination pair for i in tqdm(range(len(list_of_origins)), desc="Processing"): name_of_origin_node = list_of_origins[i] idx_of_origin_node = node_name_to_index[name_of_origin_node] list_of_name_destination_node = destination_dict[ name_of_origin_node ] # a list of destination nodes list_of_idx_destination_node = [ node_name_to_index[i] for i in list_of_name_destination_node ] flows = supply_dict[name_of_origin_node] paths = network.get_shortest_paths( v=idx_of_origin_node, to=list_of_idx_destination_node, weights="weight", mode="out", output="epath", ) # [origin, destination list, path list, flow list] list_of_spath.append( [name_of_origin_node, list_of_name_destination_node, paths, flows] ) # calculate edge flows temp_flow_matrix = pd.DataFrame( list_of_spath, columns=[ "origin", "destination", "path", "flow", ], ).explode(["destination", "path", "flow"]) temp_edge_flow = get_flow_on_edges(temp_flow_matrix, col_eid, "path", "flow") # create a temporary table temp_edge_flow[col_eid] = temp_edge_flow[col_eid].map( edge_index_to_name ) # edge name # road form -> combined type temp_edge_flow["combined_label"] = temp_edge_flow[col_eid].map(edge_cbtype_dict) temp_edge_flow["isUrban"] = temp_edge_flow[col_eid].map( edge_isUrban_dict ) # urban/suburban temp_edge_flow["temp_acc_flow"] = temp_edge_flow[col_eid].map( acc_flow_dict ) # flow temp_edge_flow["temp_acc_capacity"] = temp_edge_flow[col_eid].map( acc_capacity_dict ) # capacity temp_edge_flow["est_overflow"] = ( temp_edge_flow["flow"] - temp_edge_flow["temp_acc_capacity"] ) # estimated overflow: positive -> has overflow max_overflow = temp_edge_flow["est_overflow"].max() print(f"The maximum amount of overflow of edges: {max_overflow}") # break if max_overflow <= 0: temp_edge_flow["total_flow"] = ( temp_edge_flow["flow"] + temp_edge_flow["temp_acc_flow"] ) temp_edge_flow["speed"] = np.vectorize(partial_speed_flow_func)( temp_edge_flow["combined_label"], temp_edge_flow["isUrban"], temp_edge_flow["total_flow"], ) temp_edge_flow["remaining_capacity"] = ( temp_edge_flow["temp_acc_capacity"] - temp_edge_flow["flow"] ) # update dicts # accumulated edge flows temp_dict = temp_edge_flow.set_index(col_eid)["total_flow"].to_dict() acc_flow_dict.update( {key: temp_dict[key] for key in acc_flow_dict.keys() & temp_dict.keys()} ) # average flow rate temp_dict = temp_edge_flow.set_index(col_eid)["speed"].to_dict() acc_speed_dict.update( { key: temp_dict[key] for key in acc_speed_dict.keys() & temp_dict.keys() } ) # accumulated remaining capacities temp_dict = temp_edge_flow.set_index(col_eid)[ "remaining_capacity" ].to_dict() acc_capacity_dict.update( { key: temp_dict[key] for key in acc_capacity_dict.keys() & temp_dict.keys() } ) print("Iteration stops: there is no edge overflow.") break # calculate the ratio of flow adjustment (0 < r < 1) temp_edge_flow["r"] = np.where( temp_edge_flow["flow"] != 0, temp_edge_flow["temp_acc_capacity"] / temp_edge_flow["flow"], np.nan, ) r = temp_edge_flow.r.min() if r < 0: print("Error: negative r!") break if r == 0: # temp_acc_capacity = 0 print("Error: (r==0) existing network has zero-capacity links!") break if r >= 1: print("Error: (r>=1) there is no edge overflow!") break print(f"r = {r}") # set as NaN when flow is zero # update flow matrix temp_flow_matrix = temp_flow_matrix[ temp_flow_matrix["path"].apply(lambda x: len(x) != 0) ] temp_flow_matrix["flow"] = temp_flow_matrix["flow"] * r # update edge flows temp_edge_flow["adjusted_flow"] = temp_edge_flow["flow"] * r temp_edge_flow["total_flow"] = ( temp_edge_flow.temp_acc_flow + temp_edge_flow.adjusted_flow ) temp_edge_flow["speed"] = np.vectorize(partial_speed_flow_func)( temp_edge_flow["combined_label"], temp_edge_flow["isUrban"], temp_edge_flow["total_flow"], ) temp_edge_flow["remaining_capacity"] = ( temp_edge_flow.temp_acc_capacity - temp_edge_flow.adjusted_flow ) temp_edge_flow.loc[ temp_edge_flow.remaining_capacity < 0, "remaining_capacity" ] = 0.0 # capacity is non-negative # update dicts # accumulated flows temp_dict = temp_edge_flow.set_index(col_eid)["total_flow"].to_dict() acc_flow_dict.update( {key: temp_dict[key] for key in acc_flow_dict.keys() & temp_dict.keys()} ) # average flow rate temp_dict = temp_edge_flow.set_index(col_eid)["speed"].to_dict() acc_speed_dict.update( {key: temp_dict[key] for key in acc_speed_dict.keys() & temp_dict.keys()} ) # accumulated remaining capacities temp_dict = temp_edge_flow.set_index(col_eid)["remaining_capacity"].to_dict() acc_capacity_dict.update( {key: temp_dict[key] for key in acc_capacity_dict.keys() & temp_dict.keys()} ) # if remaining supply < 1 -> 0 supply_dict = { k: filter_less_than_one(np.array(v) * (1 - r)).tolist() for k, v in supply_dict.items() } total_remain = sum(sum(values) for values in supply_dict.values()) print(f"The total remaining supply is: {total_remain}") # update od matrix list_of_origins, supply_dict, destination_dict, non_allocated_flow = ( update_od_matrix(temp_flow_matrix, supply_dict, destination_dict) ) total_non_allocated_flow += non_allocated_flow # record the overall flow loss number_of_destinations = sum(len(value) for value in destination_dict.values()) print(f"The remaining number of origins: {len(list_of_origins)}") print(f"The remaining number of destinations: {number_of_destinations}") # update network structure (nodes and edges) network, edge_index_to_name = update_network_structure( network, edge_length_dict, acc_speed_dict, temp_edge_flow ) iter_flag += 1 print("The flow simulation is completed!") print(f"The total non-allocated flow is {total_non_allocated_flow}") return acc_speed_dict, acc_flow_dict, acc_capacity_dict