由于最短路算法的完美实现需要遵循“无后效性”原则(遍历到某一节点时,不应再与前驱节点产生联系),因此在一些真实的交通网络路径搜索应用场景中,如考虑交叉口转向延误时,最短路算法的准确率可能会打折扣。对于常见的Dijkstra算法也是如此。
但值得注意的是,利用原图的线图进行最短路规划就可以规避这一后向性问题。这是因为在交通网络的线图中,原图中交叉口处不同向的转向延误都会以不同的边进行表示,而原图中的边则以节点表示,且此时各节点的代价变为了恒定的“阻抗”,和前驱、后继节点不再相关。
因此,虽然线图在实际的搜索场景中可能遇到诸多困难(如:缺乏真实地理信息、时间复杂度增大等),但作为考虑转向延误时的最短路参照,并用于计算相对误差则完全没问题。
具体的使用方法是:
- 随机给出路径的起点org_source、终点org_target,利用实验算法获取最短路长度和具体的路径;
- 将原图利用一定的方法变为线图,注意维持和原图的映射关系;
- 将具体路径中的org_source及其后继节点org_source_succ、org_target及其前驱节点org_target_prev导入线图Dijkstra中,利用映射关系获取在新图中的source和target(当然,这里新的起、终点相比原图向内缩了半个单位,因此后续求取最短路长度时需要加上首尾缺失的一段路);
- 在线图上进行考虑节点阻抗的Dijkstra路径规划,获取最短路长度,再和实验算法的结果进行对比并计算相对误差。
Python
import pandas as pd
import igraph as ig
import math
import numpy as np
from collections import defaultdict
import heapq
class DijkstraLineGraph(object):
def __init__(self, node_file, link_file, delay_dict):
self.node_file = node_file
self.link_file = link_file
self.delay_dict = delay_dict
self.G = ig.Graph(directed=True)
self.dual_G = ig.Graph(directed=True)
# 生成原图
def connect(self):
df_node = pd.read_csv(self.node_file)
df_link = pd.read_csv(self.link_file)
self.G.add_vertices(df_node['n_id'].tolist())
self.G.add_edges([(row['from_n_id'], row['to_n_id']) for _, row in df_link.iterrows()])
self.G.es['weight'] = df_link['weight'].tolist()
# 生成线图,并保留必要的映射关系
def dual_graph(self):
for edge in self.G.es:
# 这里delay已经改名叫impedance了
self.dual_G.add_vertex(name=str(edge.index), impedance=edge['weight'])
for v in self.G.vs:
for u in self.G.vs[self.G.predecessors(v.index)]:
for w in self.G.vs[self.G.successors(v.index)]:
impedance = 0
if v.index in self.delay_dict:
if (u.index, w.index) in self.delay_dict[v.index]:
impedance = self.delay_dict[v.index][(u.index, w.index)]
self.dual_G.add_edge(str(self.G.get_eid(u.index, v.index)), str(self.G.get_eid(v.index, w.index)), weight=impedance, originid=v.index)
def get_cost_sum(self, route_list):
sum_cost = 0
if len(route_list) > 2:
for i in range(1, len(route_list) - 1):
sum_cost += self.dual_G.vs.find(name=str(route_list[i]))['impedance']
for i in range(len(route_list) - 1):
sum_cost += self.dual_G.es[self.dual_G.get_eid(route_list[i], route_list[i + 1])]['weight']
return sum_cost
def dual_dijkstra_planner(self, org_source, org_source_succ, org_target_prev, org_target):
source = self.G.es[self.G.get_eid(org_source, org_source_succ)].index
target = self.G.es[self.G.get_eid(org_target_prev, org_target)].index
# 采用堆优化提高效率
priority_queue = []
heapq.heappush(priority_queue, (0, source, None))
time_cost = defaultdict(lambda: math.inf)
time_cost[source] = 0
prev_nodes = defaultdict(lambda: None)
while priority_queue:
current_cost, curr, prev = heapq.heappop(priority_queue)
if curr == target:
path = []
while curr is not None:
path.append(int(curr))
curr = prev_nodes[curr]
path.reverse()
total_cost = self.get_cost_sum(path) + self.G.es[self.G.get_eid(org_source, org_source_succ)]['weight'] + self.G.es[self.G.get_eid(org_target_prev, org_target)]['weight']
return path, total_cost # 返回path列表和time_cost值
for succ in self.dual_G.neighbors(curr, mode="OUT"):
edge_weight = self.dual_G.es[self.dual_G.get_eid(curr, succ)]['weight']
delay = self.dual_G.vs.find(name=str(curr))['impedance']
cost_through_current = current_cost + edge_weight + delay
if succ not in time_cost or cost_through_current < time_cost[succ]:
time_cost[succ] = cost_through_current
prev_nodes[succ] = curr
heapq.heappush(priority_queue, (cost_through_current, succ, curr))
return None
吐槽:对比后发现即便存在后向性问题,基于点标号的实验算法也只有0.4%的相对误差。足以证明某些文章有夸大其词的嫌疑。并且所谓的无后向性的弧段Dijkstra方法,其时间复杂度高达O(mn),且难以进行优化,完全无法适应真实路径搜索的需要……