由于最短路算法的完美实现需要遵循“无后效性”原则(遍历到某一节点时,不应再与前驱节点产生联系),因此在一些真实的交通网络路径搜索应用场景中,如考虑交叉口转向延误时,最短路算法的准确率可能会打折扣。对于常见的Dijkstra算法也是如此。

但值得注意的是,利用原图的线图进行最短路规划就可以规避这一后向性问题。这是因为在交通网络的线图中,原图中交叉口处不同向的转向延误都会以不同的边进行表示,而原图中的边则以节点表示,且此时各节点的代价变为了恒定的“阻抗”,和前驱、后继节点不再相关。

因此,虽然线图在实际的搜索场景中可能遇到诸多困难(如:缺乏真实地理信息、时间复杂度增大等),但作为考虑转向延误时的最短路参照,并用于计算相对误差则完全没问题。

具体的使用方法是:

  • 随机给出路径的起点org_source、终点org_target,利用实验算法获取最短路长度和具体的路径;
  • 将原图利用一定的方法变为线图,注意维持和原图的映射关系;
  • 将具体路径中的org_source及其后继节点org_source_succ、org_target及其前驱节点org_target_prev导入线图Dijkstra中,利用映射关系获取在新图中的source和target(当然,这里新的起、终点相比原图向内缩了半个单位,因此后续求取最短路长度时需要加上首尾缺失的一段路);
  • 在线图上进行考虑节点阻抗的Dijkstra路径规划,获取最短路长度,再和实验算法的结果进行对比并计算相对误差。
Python
import pandas as pd
import igraph as ig
import math
import numpy as np
from collections import defaultdict
import heapq


class DijkstraLineGraph(object):
    def __init__(self, node_file, link_file, delay_dict):
        self.node_file = node_file
        self.link_file = link_file
        self.delay_dict = delay_dict
        self.G = ig.Graph(directed=True)
        self.dual_G = ig.Graph(directed=True)
    
    # 生成原图
    def connect(self):
        df_node = pd.read_csv(self.node_file)
        df_link = pd.read_csv(self.link_file)
        self.G.add_vertices(df_node['n_id'].tolist())
        self.G.add_edges([(row['from_n_id'], row['to_n_id']) for _, row in df_link.iterrows()])
        self.G.es['weight'] = df_link['weight'].tolist()

    # 生成线图,并保留必要的映射关系
    def dual_graph(self):
        for edge in self.G.es:
            # 这里delay已经改名叫impedance了
            self.dual_G.add_vertex(name=str(edge.index), impedance=edge['weight'])
        for v in self.G.vs:
            for u in self.G.vs[self.G.predecessors(v.index)]:
                for w in self.G.vs[self.G.successors(v.index)]:
                    impedance = 0
                    if v.index in self.delay_dict:
                        if (u.index, w.index) in self.delay_dict[v.index]:
                            impedance = self.delay_dict[v.index][(u.index, w.index)]

                    self.dual_G.add_edge(str(self.G.get_eid(u.index, v.index)), str(self.G.get_eid(v.index, w.index)), weight=impedance, originid=v.index)

    def get_cost_sum(self, route_list):
        sum_cost = 0
        if len(route_list) > 2:
            for i in range(1, len(route_list) - 1):
                sum_cost += self.dual_G.vs.find(name=str(route_list[i]))['impedance']
        for i in range(len(route_list) - 1):
            sum_cost += self.dual_G.es[self.dual_G.get_eid(route_list[i], route_list[i + 1])]['weight']
        return sum_cost

    def dual_dijkstra_planner(self, org_source, org_source_succ, org_target_prev, org_target):
        source = self.G.es[self.G.get_eid(org_source, org_source_succ)].index
        target = self.G.es[self.G.get_eid(org_target_prev, org_target)].index
        # 采用堆优化提高效率
        priority_queue = []
        heapq.heappush(priority_queue, (0, source, None))
        time_cost = defaultdict(lambda: math.inf)
        time_cost[source] = 0
        prev_nodes = defaultdict(lambda: None)
        while priority_queue:
            current_cost, curr, prev = heapq.heappop(priority_queue)
            if curr == target:
                path = []
                while curr is not None:
                    path.append(int(curr))
                    curr = prev_nodes[curr]
                path.reverse()
                total_cost = self.get_cost_sum(path) + self.G.es[self.G.get_eid(org_source, org_source_succ)]['weight'] + self.G.es[self.G.get_eid(org_target_prev, org_target)]['weight']
                return path, total_cost  # 返回path列表和time_cost值
            for succ in self.dual_G.neighbors(curr, mode="OUT"):
                edge_weight = self.dual_G.es[self.dual_G.get_eid(curr, succ)]['weight']
                delay = self.dual_G.vs.find(name=str(curr))['impedance']
                cost_through_current = current_cost + edge_weight + delay
                if succ not in time_cost or cost_through_current < time_cost[succ]:
                    time_cost[succ] = cost_through_current
                    prev_nodes[succ] = curr
                    heapq.heappush(priority_queue, (cost_through_current, succ, curr))
        return None

吐槽:对比后发现即便存在后向性问题,基于点标号的实验算法也只有0.4%的相对误差。足以证明某些文章有夸大其词的嫌疑。并且所谓的无后向性的弧段Dijkstra方法,其时间复杂度高达O(mn),且难以进行优化,完全无法适应真实路径搜索的需要……