目錄
一、用戶均衡模型簡(jiǎn)略介紹? ?? ??
1.1 Wardrop第一原理
1.2 用戶均衡模型
1.3 BPR函數(shù)?
1.4 用戶均衡模型的積分項(xiàng)
?二、Frank Wolfe算法求解步驟
三、代碼
3.1?導(dǎo)入必要的庫(kù)
?3.2?構(gòu)建交通網(wǎng)絡(luò)
3.3 繪制交通路網(wǎng)圖
3.4?定義BPR函數(shù)
3.5?初始化路網(wǎng)流量
3.6 獲取?flow_temp
3.7?獲取下降方向descent
3.8? 定義目標(biāo)函數(shù)
3.9?一維搜索最優(yōu)步長(zhǎng),并更新流量
3.10?主函數(shù)
四、所用文件
?文章來源地址http://www.zghlxwxcb.cn/news/detail-848461.html
一、用戶均衡模型簡(jiǎn)略介紹? ?? ??
1.1 Wardrop第一原理
- ?道路的利用者,都確切知道網(wǎng)絡(luò)的交通狀態(tài),并試圖選擇最短路徑
- 當(dāng)網(wǎng)絡(luò)達(dá)到平衡狀態(tài)時(shí),每個(gè)OD對(duì)的各條被使用的路徑,行駛時(shí)間相等,且行駛時(shí)間最短
- 沒有被使用的路徑的行駛時(shí)間大于或等于最小行駛時(shí)間
1.2 用戶均衡模型
????????滿足Wardrop第一原理的交通分配模型,稱為用戶均衡模型。
????????1956年Beckmann提出了一種滿足Wardrop第一原理的數(shù)學(xué)規(guī)劃模型。模型核心是,交通網(wǎng)絡(luò)中的用戶,都試圖選擇最短路徑,而最終使得被選擇的路徑的阻抗最小且相等。該數(shù)學(xué)規(guī)劃模型為:
1.3 BPR函數(shù)?
????????在用戶均衡模型中,為路阻函數(shù),我們一般采用BPR函數(shù),即:
- ,表示最快通過路段a的時(shí)間;
- α常取0.15,β常取4.0?;
- ,表示路段a的通行能力;
- ,表示路段a的流量
1.4 用戶均衡模型的積分項(xiàng)
????????為便于后續(xù)求解,我們將BPR函數(shù)代入,進(jìn)行積分計(jì)算,過程如下:
????????因此,我們的目標(biāo)函數(shù)為:
?
?二、Frank Wolfe算法求解步驟
????????友情提醒:后面若對(duì)代碼主體邏輯有疑惑,返回看看這部分。
? ? ? ? 步驟1:初始化?;诹懔鲌D的路阻,依次搜索每一個(gè)OD對(duì) r,s 所對(duì)應(yīng)的最短路徑,并將 r,s 間的OD流量,全部分配到對(duì)應(yīng)的最短路徑上,得到初始路段流量,并令迭代次數(shù)n=1。
????????步驟2:更新路阻。根據(jù)BPR函數(shù),分別代入每個(gè)路段的初始流量,求得阻抗
? ? ? ? 步驟3:下降方向。基于阻抗,按照步驟1中的方法(最短路全有全無分配),將流量重新分配到對(duì)應(yīng)路徑上,得到臨時(shí)路段流量,進(jìn)而得到
????????步驟4:搜索最優(yōu)步長(zhǎng),并更新流量。采用二分法,搜索最優(yōu)步長(zhǎng),并令。其中,最優(yōu)步長(zhǎng)滿足:
????????步驟5:結(jié)束條件。如果,則算法結(jié)束;否則n=n+1,轉(zhuǎn)至步驟2。此處的e 表示誤差閾值,在代碼部分用max_err表示。
?
三、代碼
????????代碼基于Python的NetWorkX庫(kù)編寫,這樣將大大減少我們代碼編寫的工作量,并且更易于閱讀。我們以SiouxFalls交通網(wǎng)絡(luò)為例,進(jìn)行交通網(wǎng)絡(luò)構(gòu)建與流量分配。
3.1?導(dǎo)入必要的庫(kù)
import pandas as pd
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
from scipy.optimize import minimize_scalar
- pandas,用于讀取文件;
- numpy在計(jì)算誤差時(shí)使用;
- networkx貫穿整個(gè)代碼;
- matplotlib用于繪制交通網(wǎng)絡(luò)圖
- scipy在搜索最優(yōu)步長(zhǎng)時(shí)用到。
?3.2?構(gòu)建交通網(wǎng)絡(luò)
def build_network(Link_path, Node_path):
# 讀取點(diǎn)數(shù)據(jù)、邊數(shù)據(jù)
links_df = pd.read_csv(Link_path)
# 需要注意使用from_pandas_edge,其讀取的邊的順序和csv中邊的順序有差異
G = nx.from_pandas_edgelist(links_df, source='O', target='D', edge_attr=['FFT', 'Capacity'], create_using=nx.DiGraph())
nx.set_edge_attributes(G, 0, 'flow_temp')
nx.set_edge_attributes(G, 0, 'flow_real')
nx.set_edge_attributes(G, 0, 'descent')
nx.set_edge_attributes(G, nx.get_edge_attributes(G, "FFT"), 'weight')
# 獲取節(jié)點(diǎn)位置信息
nodes_df = pd.read_csv(Node_path)
node_positions = {}
for index, row in nodes_df.iterrows():
node_positions[row['id']] = (row['pos_x'], row['pos_y'])
# 更新圖中節(jié)點(diǎn)的位置屬性
nx.set_node_attributes(G, node_positions, 'pos')
return G
- Link_path,表示路網(wǎng)文件路徑。下載鏈接見文末,數(shù)據(jù)示例如下:
O | D | FFT | Capacity |
1 | 2 | 6 | 25900.20064 |
1 | 3 | 4 | 23403.47319 |
2 | 1 | 6 | 25900.20064 |
2 | 6 | 5 | 4958.180928 |
3 | 1 | 4 | 23403.47319 |
- Node_path,表示節(jié)點(diǎn)文件路徑。下載鏈接見文末,數(shù)據(jù)示例如下:
id | pos_x | pos_y |
1 | 2 | 2 |
2 | 13 | 2 |
3 | 2 | 5 |
4 | 5 | 5 |
5 | 9 | 5 |
- flow_real,表示每次迭代更新后的路段流量或初始化流量,所有路段的flow_real組成
- flow_temp,?所有路段的flow_temp組成
- descent,表示flow_temp與flow_real的差值,所有路段的descent組成
- weight,表示路阻。初始的路阻,由于路段流量都是0,所以直接用FFT。后續(xù)將用BPR函數(shù)計(jì)算
3.3 繪制交通路網(wǎng)圖
def draw_network(G):
pos = nx.get_node_attributes(G, "pos")
nx.draw(G, pos, with_labels=True, node_size=200, node_color='lightblue', font_size=10, font_weight='bold')
plt.show()
?????????構(gòu)建交通網(wǎng)絡(luò)后,我們來看一看這個(gè)SiouxFalls網(wǎng)絡(luò)長(zhǎng)什么樣子吧
3.4?定義BPR函數(shù)
def BPR(FFT, flow, capacity, alpha=0.15, beta=4.0):
return FFT * (1 + alpha * (flow / capacity) ** beta)
- FTT,表示最快通過時(shí)間;
- flow,表示路段流量;
- capacity,表示路段通行能力;
- alpha和beta,是BPR函數(shù)的參數(shù),在此取默認(rèn)值。
3.5?初始化路網(wǎng)流量
def all_none_initialize(G, od_df):
# 這個(gè)函數(shù)僅使用一次,用于初始化
# 在零流圖上,按最短路全有全無分配,用于更新flow_real
for _, od_data in od_df.iterrows():
source = od_data["o"]
target = od_data["d"]
demand = od_data["demand"]
# 計(jì)算最短路徑
shortest_path = nx.shortest_path(G, source=source, target=target, weight="weight")
# 更新路徑上的流量
for i in range(len(shortest_path) - 1):
u = shortest_path[i]
v = shortest_path[i + 1]
G[u][v]['flow_real'] += demand
# 初始化流量后,更新阻抗
for _, _, data in G.edges(data=True):
data['weight'] = BPR(data['FFT'], data['flow_real'], data['Capacity'])
????????這個(gè)函數(shù)僅使用一次,用于初始化。在零流圖上,按最短路全有全無分配,用于得到。
- od_df表示pd.Dataframe數(shù)據(jù)格式的OD流量信息。ODParis.csv示例數(shù)據(jù)如下:
o | d | demand |
1 | 2 | 100 |
1 | 3 | 100 |
1 | 4 | 500 |
1 | 5 | 200 |
1 | 6 | 300 |
1 | 7 | 500 |
3.6 獲取?flow_temp
????????flow_temp,即
def all_none_temp(G, od_df):
# 這個(gè)是虛擬分配,用于得到flow_temp
# 每次按最短路分配前,需要先將flow_temp歸零
nx.set_edge_attributes(G, 0, 'flow_temp')
for _, od_data in od_df.iterrows():
# 每次更新都得讀OD,后面嘗試優(yōu)化這個(gè)
source = od_data["o"]
target = od_data["d"]
demand = od_data["demand"]
# 計(jì)算最短路徑
shortest_path = nx.shortest_path(G, source=source, target=target, weight="weight")
# 更新路徑上的流量
for i in range(len(shortest_path) - 1):
u = shortest_path[i]
v = shortest_path[i + 1]
# 更新流量
G[u][v]['flow_temp'] += demand
3.7?獲取下降方向descent
????????descent,即
def get_descent(G):
for _, _, data in G.edges(data=True):
data['descent'] = data['flow_temp'] - data['flow_real']
3.8? 定義目標(biāo)函數(shù)
def objective_function(temp_step, G):
s, alpha, beta = 0, 0.15, 4.0
for _, _, data in G.edges(data=True):
x = data['flow_real'] + temp_step * data['descent']
s += data["FFT"] * (x + alpha * data["Capacity"] / (beta + 1) * (x / data["Capacity"]) ** (beta + 1))
return s
????????該部分代碼,對(duì)應(yīng)本文1.4部分的目標(biāo)函數(shù),即:
3.9?一維搜索最優(yōu)步長(zhǎng),并更新流量
def update_flow_real(G):
# 這個(gè)函數(shù)用于調(diào)整流量,即flow_real,并更新weight
best_step = get_best_step(G) # 獲取最優(yōu)步長(zhǎng)
for _, _, data in G.edges(data=True):
# 調(diào)整流量,更新路阻
data['flow_real'] += best_step * data["descent"]
data['weight'] = BPR(data['FFT'], data['flow_real'], data['Capacity'])
def get_best_step(G, tolerance=1e-4):
result = minimize_scalar(objective_function, args=(G,), bounds=(0, 1), method='bounded', tol=tolerance)
return result.x
3.10?主函數(shù)
def main():
G = build_network("Link.csv", "Node.csv") # 構(gòu)建路網(wǎng)
draw_network(G) # 繪制交通路網(wǎng)圖
od_df = pd.read_csv("ODPairs.csv") # 獲取OD需求情況
all_none_initialize(G, od_df) # 初始化路網(wǎng)流量
print("初始化流量", list(nx.get_edge_attributes(G, 'flow_real').values()))
epoch = 0 # 記錄迭代次數(shù)
err, max_err = 1, 1e-4 # 分別代表初始值、最大容許誤差
f_list_old = np.array(list(nx.get_edge_attributes(G, 'flow_real').values()))
while err > max_err:
epoch += 1
all_none_temp(G, od_df) # 全有全無分配,得到flow_temp
get_descent(G) # 計(jì)算梯度,即flow_temp-flow_real
update_flow_real(G) # 先是一維搜索獲取最優(yōu)步長(zhǎng),再調(diào)整流量,更新路阻
# 計(jì)算并更新誤差err
f_list_new = np.array(list(nx.get_edge_attributes(G, 'flow_real').values())) # 這個(gè)變量是新的路網(wǎng)流量列表
d = np.sum((f_list_new - f_list_old) ** 2)
err = np.sqrt(d) / np.sum(f_list_old)
f_list_old = f_list_new
print("均衡流量", list(nx.get_edge_attributes(G, 'flow_real').values()))
print("迭代次數(shù)", epoch)
# 導(dǎo)出網(wǎng)絡(luò)均衡流量
df = nx.to_pandas_edgelist(G)
df = df[["source", "target", "flow_real"]].sort_values(by=["source", "target"])
df.to_csv("網(wǎng)絡(luò)均衡結(jié)果.csv", index=False)
if __name__ == '__main__':
main()
- epoch,表示迭代次數(shù)
- err,表示誤差初始值
- max_err,表示最大容許誤差
- ?f_list_new, f_list_old分別表示和,用于計(jì)算誤差
四、所用文件
? ? ? ? 下載鏈接:交通分配(UE模型)文章來源:http://www.zghlxwxcb.cn/news/detail-848461.html
?
到了這里,關(guān)于基于Frank Wolfe算法,求解交通分配UE模型(Python & NetWorkX)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!