分析結論
多進程可以實現(xiàn)逐行遍歷同一個文件(可以保證每一個進程遍歷的行都是完整且互不重復的),且可以提高遍歷性能。
多進程 / 多線程遍歷文件速度
- 單進程、多線程讀取同一個文件時,每個線程的運行時間并不能隨線程數(shù)的增加的降低;
- 多進程讀取同一個文件時,每個進程的運行時間隨線程數(shù)的增加而降低。
進一步優(yōu)化方法
通過統(tǒng)計讀取到的字符串長度,計算當前文件指針位置,從而避免在每次遍歷中均需使用 file.tell()
獲取當前文件指針位置。
分析過程
構造 11202 MB、691130 行的測試數(shù)據(jù)。具體測試數(shù)據(jù)特征如下:
- 文件大小:11746098941 Bytes(11202 MB)
- 行數(shù):691130
import time
1. 單進程、單線程遍歷文件
t1 = time.time()
with open(path, "r", encoding="UTF-8") as file:
for _ in file:
pass
t2 = time.time()
print(t2 - t1)
運行時間:21.79 秒(三次測試 23.55、20.84、21.00 取平均值)
2. 多線程遍歷文件
import os
from theading import Thread
定義每個線程 / 進程的遍歷函數(shù)如下:通過捕獲 UnicodeDecodeError
異常,避免出現(xiàn)剛好切分到半個字的情況;通過在遍歷前先 file.readline()
,使每一個切分點中尚未結束的行一定歸屬上一進程 / 線程而不是下一進程 / 線程,從而保證每一個進程 / 線程遍歷的每一行都是完整且互不重復的。
def read(path, start, end):
"""每個進程 / 線程的遍歷函數(shù)
Parameters
----------
path : str
文件路徑
start : int
本分塊的開始位置
end : int
本分塊的結束位置
"""
cnt = 0
with open(path, "r", encoding="UTF-8") as file:
file.seek(start) # 移動到目標位置
if start != 0:
while True:
try:
file.readline() # 跳過當前行(所有未遍歷完的行屬于上一段)
break
except UnicodeDecodeError: # 剛好切分到半個字,向后移動一個字符
file.seek(start + 1)
while file.tell() <= end:
file.readline()
cnt += 1
定義多線程遍歷函數(shù):
def multi_thread_load_csv(path: str, n_thread: int):
"""多線程遍歷函數(shù)"""
size = os.path.getsize(path) # 獲取文件大小用于分塊
thread_lst = []
for i in range(n_thread):
s = size // n_thread * i # 計算當前分塊開始位置
e = size // n_thread * (i + 1) # 計算當前分塊結束位置
thread = Thread(target=read, args=(s, e))
thread.start()
thread_lst.append(thread)
for thread in thread_lst:
thread.join()
測試線程數(shù)為 1 - 10 之間的讀取時間,每種線程數(shù)測試 10 次,測試代碼及結果如下:
import numpy as np
for k in range(1, 11):
use_time = []
for _ in range(10):
t1 = time.time()
multi_thread_load_csv("/home/txjiang/archive_analysis/gather_detail.txt", k)
t2 = time.time()
use_time.append(t2 - t1)
print(f"線程={k} 平均時間={np.average(use_time)} 標準差={np.std(use_time)}")
線程數(shù) | 用時的平均值 | 用時的標準差 |
---|---|---|
1 | 49.0841 | 0.6299 |
2 | 53.2189 | 1.4267 |
3 | 53.5290 | 1.3273 |
4 | 56.4923 | 1.4843 |
5 | 56.6679 | 3.2745 |
6 | 56.5164 | 1.7789 |
7 | 58.2352 | 1.1148 |
8 | 58.2353 | 0.6817 |
9 | 60.9896 | 1.3365 |
10 | 64.2063 | 2.3251 |
因為在每一行的遍歷中需要增加一次 file.tell()
,所以單線程時的速度相較于直接讀取會更慢。
3. 多進程遍歷文件
from multiprocessing import Process
定義多進程遍歷函數(shù):
def multi_process_load_csv(path: str, n_process: int):
"""多進程遍歷函數(shù)"""
size = os.path.getsize(path) # 獲取文件大小用于分塊
process_lst = []
for i in range(n_process):
s = size // n_process * i # 計算當前分塊開始位置
e = size // n_process * (i + 1) # 計算當前分塊結束位置
process = Process(target=read, args=(s, e))
process.start()
process_lst.append(process)
for process in process_lst:
process.join()
測試線程數(shù)為 1 - 10 之間的讀取時間,每種線程數(shù)測試 10 次,測試代碼及結果如下:文章來源:http://www.zghlxwxcb.cn/news/detail-699457.html
import numpy as np
for k in range(1, 11):
use_time = []
for _ in range(10):
t1 = time.time()
multi_process_load_csv("/home/txjiang/archive_analysis/gather_detail.txt", k)
t2 = time.time()
use_time.append(t2 - t1)
print(f"線程={k} 平均時間={np.average(use_time)} 標準差={np.std(use_time)}")
進程數(shù) | 用時的平均值 | 用時的標準差 |
---|---|---|
1 | 50.1561 | 0.8431 |
2 | 26.5089 | 0.5581 |
3 | 17.7663 | 0.2771 |
4 | 13.4338 | 0.3024 |
5 | 10.7654 | 0.2950 |
6 | 9.1950 | 0.3471 |
7 | 7.7160 | 0.1764 |
8 | 7.0321 | 0.1938 |
9 | 6.3484 | 0.2150 |
10 | 5.6354 | 0.1271 |
11 | 5.1283 | 0.2361 |
12 | 4.7841 | 0.0512 |
13 | 4.5149 | 0.2186 |
14 | 4.1525 | 0.0533 |
15 | 3.9554 | 0.1442 |
16 | 3.8481 | 0.1167 |
17 | 3.6455 | 0.0763 |
18 | 3.4030 | 0.0255 |
19 | 3.3732 | 0.2159 |
20 | 3.1933 | 0.0674 |
21 | 3.0091 | 0.0845 |
22 | 2.9235 | 0.0646 |
23 | 2.9474 | 0.2234 |
24 | 2.7500 | 0.0382 |
25 | 2.6592 | 0.0340 |
26 | 2.5687 | 0.0333 |
27 | 2.6273 | 0.3457 |
28 | 2.4343 | 0.0253 |
29 | 2.3647 | 0.0223 |
30 | 2.2572 | 0.0343 |
因為在每一行的遍歷中需要增加一次 file.tell()
,所以單進程時的速度相較于直接讀取會更慢。文章來源地址http://www.zghlxwxcb.cn/news/detail-699457.html
到了這里,關于Python 性能優(yōu)化|多線程讀取文件的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!