Python 自帶的多進程庫 multiprocessing 可實現(xiàn)多進程。我想用這些短例子示范如何優(yōu)雅地用多線程。中文網(wǎng)絡(luò)上,有些人只是翻譯了舊版的 Python 官網(wǎng)的多進程文檔。而我這篇文章會額外講一講下方加粗部分的內(nèi)容。
- 創(chuàng)建進程 Process,fork 直接繼承資源,所以初始化更快,spawn 只繼承必要的資源,所以更省內(nèi)存,「程序的入口」 if name == main
- 進程池 Pool,Pool 只能接受一個參數(shù),但有辦法傳入多個
- 管道通信 Pipe,最基本的功能,運行速度快
- 隊列通信 Queue,有最常用的功能,運行速度稍慢
- 共享內(nèi)存 Manager Value,Python3.9 新特性 真正的共享內(nèi)存 shared_memory
如下所示,中文網(wǎng)絡(luò)上一些講 Python 多進程的文章,很多重要的東西沒講(畢竟只是翻譯了 Python 官網(wǎng)的多進程舊版文檔)。上方的加粗部分他們沒講,但是這是做多進程總需要知道的內(nèi)容。
- 若你無法流暢閱讀有專人更新的 Python 官網(wǎng)多進程英文文檔 ,那么姑且可看寫于 2019 不保證更新的南山南:一篇文章搞定 Python 多進程 (全)
- 知我莫言:談?wù)?python 的 GIL、多線程、多進程 ,點贊多但舊,寫于 2016,你還不如看我下方寫的「簡述何為多線程 threading 與多進程 processing」
1.多線程與多進程的區(qū)別
多線程 threading: 一個人有與異性聊天和看劇兩件事要做。單線程的她可以看完劇再去聊天,但這樣子可能就沒人陪她聊天了「哼,發(fā)消息不回」。我們把她看成一個 CPU 核心,為她開起多線程——先看一會劇,偶爾看看新消息,在兩件事(線程)間來回切換。多線程:單個 CPU 核心可以同時做幾件事,不至于卡在某一步傻等著。
用處:爬取網(wǎng)站信息(爬蟲),等待多個用戶輸入
多進程 processing: 一個人有很多磚需要搬,他領(lǐng)取手套、推車各種物資(向系統(tǒng)申請了資源)然后開始搬磚。然而他身邊有很多人,我們讓這些人去幫他?。ㄒ缓擞须y,八核圍觀)。于是他們做了分工,磚很快就搬完了。多進程讓多個 CPU 核心可以一起做事,不至于只有一人干活而其他人傻站著。
用處:進行高性能計算。只有多進程方案設(shè)計合理,才能加速計算。
2. 全局鎖與多進程
為何在 Python 里用多進程這么麻煩? 因為 Python 的線程是操作系統(tǒng)線程,因此要有 Python 全局解釋器鎖。一個 python 解釋器進程內(nèi)有一條主線程,以及多條用戶程序的執(zhí)行線程。即使在多核 CPU 平臺上,由于 GIL 的存在,所以禁止多線程的并行執(zhí)行?!獊碜园俣劝倏圃~條 全局解釋器鎖。發(fā)展歷程:
- Python 全局鎖。Python 3.2 的時候更新過 GIL。在我小時候,由于 Python GIL 的存在(全局解釋器鎖 Global Interpreter Lock) ,此時 Python 無法靠自己實現(xiàn)多進程
- 外部多進程通信。Python3.5。在 2015 年,要么用 Python 調(diào)用 C 語言(如 Numpy 此類用其他語言在底層實現(xiàn)多進程的第三方庫),要么需要在外部代碼(MPI 2015)
- 內(nèi)置多進程通信。Python 3.6 才讓 multiprocessing 逐漸發(fā)展成一個能用的 Python 內(nèi)置多進程庫,可以進行進程間的通信,以及有限的內(nèi)存共享
- 共享內(nèi)存。Python 3.8 在 2019 年增加了新特性 shared_memory
3.子進程 Process
多進程的主進程一定要寫在程序入口 if name ==‘main’: 內(nèi)部
def function1(id): # 這里是子進程
print(f'id {id}')
def run__process(): # 這里是主進程
from multiprocessing import Process
process = [mp.Process(target=function1, args=(1,)),
mp.Process(target=function1, args=(2,)), ]
[p.start() for p in process] # 開啟了兩個進程
[p.join() for p in process] # 等待兩個進程依次結(jié)束
# run__process() # 主線程不建議寫在 if外部。由于這里的例子很簡單,你強行這么做可能不會報錯
if __name__ =='__main__':
run__process() # 正確做法:主線程只能寫在 if內(nèi)部
盡管在這個簡單的例子里,把主進程 run__process() 寫在程序入口 if 外部不會有報錯。但是你最好還是按我要求去做。詳細解釋的內(nèi)容過長,我寫在→「Python 程序入口有重要功能(多線程)而非編程習慣」
上面的例子只是用 Process 開啟了多進程,不涉及進程通信。當我準備把一個串行任務(wù)編排成多進程時,我還需要多進程通信。進程池 Pool 可以讓主程序獲得子進程的計算結(jié)果(不太靈活,適合簡單任務(wù)),管道 Pipe 隊列 Queue 等等 可以讓進程之間進行通信(足夠靈活)。共享值 Value 共享數(shù)組 Array 共享內(nèi)容 shared_memory(Python 3.6 Python3.9 的新特性,還不太成熟)下面開講。
Python 多進程可以選擇兩種創(chuàng)建進程的方式,spawn 與 fork。分支創(chuàng)建:fork 會直接復制一份自己給子進程運行,并把自己所有資源的 handle 都讓子進程繼承,因而創(chuàng)建速度很快,但更占用內(nèi)存資源。分產(chǎn)創(chuàng)建:spawn 只會把必要的資源的 handle 交給子進程,因此創(chuàng)建速度稍慢。詳細解釋請看 Stack OverFlow multiprocessing fork vs spawn 。(分產(chǎn) spawn 是我自己隨便翻譯的,有更好的翻譯請推薦。我絕不把 handle 翻譯成句柄)
multiprocessing.set_start_method('spawn') # default on WinOS or MacOS
multiprocessing.set_start_method('fork') # default on Linux (UnixOS)
請注意:我說 分支 fork 在初始化創(chuàng)建多進程的時候比 分產(chǎn) spawn 快,而不是說高性能計算會比較快。通常高性能計算需要讓程序運行很久,因此為了節(jié)省內(nèi)存以及進程安全,我建議選擇 spawn。
4.進程池 Pool
幾乎 Python 多進程代碼都需要你明明白白地調(diào)用 Process。而進程池 Pool 會自動幫我們管理子進程。Python 的 Pool 不方便傳入多個參數(shù),我這里提供兩個解決思路:
思路 1:函數(shù) func2 需要傳入多個參數(shù),現(xiàn)在把它改成一個參數(shù),無論你直接讓 args 作為一個元組 tuple、詞典 dict、類 class 都可以
思路 2:使用 function.partial Passing multiple parameters to pool.map() function in Python。這個不靈活的方法固定了其他參數(shù),且需要導入 Python 的內(nèi)置庫,我不推薦
import time
def func2(args): # multiple parameters (arguments)
# x, y = args
x = args[0] # write in this way, easier to locate errors
y = args[1] # write in this way, easier to locate errors
time.sleep(1) # pretend it is a time-consuming operation
return x - y
def run__pool(): # main process
from multiprocessing import Pool
cpu_worker_num = 3
process_args = [(1, 1), (9, 9), (4, 4), (3, 3), ]
print(f'| inputs: {process_args}')
start_time = time.time()
with Pool(cpu_worker_num) as p:
outputs = p.map(func2, process_args)
print(f'| outputs: {outputs} TimeUsed: {time.time() - start_time:.1f} \n')
'''Another way (I don't recommend)
Using 'functions.partial'. See https://stackoverflow.com/a/25553970/9293137
from functools import partial
# from functools import partial
# pool.map(partial(f, a, b), iterable)
'''
if __name__ =='__main__':
run__pool()
5.管道 Pipe
顧名思義,管道 Pipe 有兩端,因而 main_conn, child_conn = Pipe() ,管道的兩端可以放在主進程或子進程內(nèi),我在實驗中沒發(fā)現(xiàn)主管道口 main_conn 和子管道口 child_conn 的區(qū)別。兩端可以同時放進去東西,放進去的對象都經(jīng)過了深拷貝:用 conn.send() 在一端放入,用 conn.recv() 另一端取出,管道的兩端可以同時給多個進程。conn 是 connect 的縮寫。
import time
def func_pipe1(conn, p_id):
print(p_id)
time.sleep(0.1)
conn.send(f'{p_id}_send1')
print(p_id, 'send1')
time.sleep(0.1)
conn.send(f'{p_id}_send2')
print(p_id, 'send2')
time.sleep(0.1)
rec = conn.recv()
print(p_id, 'recv', rec)
time.sleep(0.1)
rec = conn.recv()
print(p_id, 'recv', rec)
def func_pipe2(conn, p_id):
print(p_id)
time.sleep(0.1)
conn.send(p_id)
print(p_id, 'send')
time.sleep(0.1)
rec = conn.recv()
print(p_id, 'recv', rec)
def run__pipe():
from multiprocessing import Process, Pipe
conn1, conn2 = Pipe()
process = [Process(target=func_pipe1, args=(conn1, 'I1')),
Process(target=func_pipe2, args=(conn2, 'I2')),
Process(target=func_pipe2, args=(conn2, 'I3')), ]
[p.start() for p in process]
print('| Main', 'send')
conn1.send(None)
print('| Main', conn2.recv())
[p.join() for p in process]
if __name__ =='__main__':
run__pipe()
如果追求運行更快,那么最好使用管道 Pipe 而非下面介紹的隊列 Queue,詳細請移步 Python pipes and queues performance ↓
So yes, pipes are faster than queues - but only by 1.5 to 2 times, what did surprise me was that Python 3 is MUCH slower than Python 2 - most other tests I have done have been a bit up and down (as long as it is Python 3.4 - Python 3.2 seems to be a bit of a dog - especially for memory usage).
曾經(jīng)用到 Python 多線程隊列功能寫過一個實際例子 ↓,若追求極致性能,還可以把里面的 Queue 改為 Pipe。
Pipe 還有 duplex 參數(shù) 和 poll() 方法 需要了解。默認情況下 duplex==True,若不開啟雙向管道,那么傳數(shù)據(jù)的方向只能 conn1 ← conn2 。conn2.poll()==True 意味著可以馬上使用 conn2.recv() 拿到傳過來的數(shù)據(jù)。conn2.poll(n) 會讓它等待 n 秒鐘再進行查詢。
from multiprocessing import Pipe
conn1, conn2 = Pipe(duplex=True) # 開啟雙向管道,管道兩端都能存取數(shù)據(jù)。默認開啟
#
conn1.send('A')
print(conn1.poll()) # 會print出 False,因為沒有東西等待conn1去接收
print(conn2.poll()) # 會print出 True ,因為conn1 send 了個 'A' 等著conn2 去接收
print(conn2.recv(), conn2.poll(2)) # 會等待2秒鐘再開始查詢,然后print出 'A False'
盡管我下面的例子不會報錯,但這是因為它過于簡單,沒有真的開多線程去跑,也沒有寫在程序入口的 if 內(nèi)部。很多時候 Pipe 運行會快一點,但是它的功能太少了,得用 Queue。最明顯的一個區(qū)別是:
conn1, conn2 = multiprocessing.Pipe() # 管道有兩端,某一端放入的東西,只能在另一端拿到
queue = multiprocessing.Queue() # 隊列只有一個,放進去的東西可以在任何地方拿到。
6. 隊列 Queue
可以 import queue 調(diào)用 Python 內(nèi)置的隊列,在多線程里也有隊列 from multiprocessing import Queue。下面提及的都是多線程的隊列。
隊列 Queue 的功能與前面的管道 Pipe 非常相似:無論主進程或子進程,都能訪問到隊列,放進去的對象都經(jīng)過了深拷貝。不同的是:管道 Pipe 只有兩個斷開,而隊列 Queue 有基本的隊列屬性,更加靈活,詳細請移步 Stack Overflow Multiprocessing - Pipe vs Queue。
def func1(i):
time.sleep(1)
print(f'args {i}')
def run__queue():
from multiprocessing import Process, Queue
queue = Queue(maxsize=4) # the following attribute can call in anywhere
queue.put(True)
queue.put([0, None, object]) # you can put deepcopy thing
queue.qsize() # the length of queue
print(queue.get()) # First In First Out
print(queue.get()) # First In First Out
queue.qsize() # the length of queue
process = [Process(target=func1, args=(queue,)),
Process(target=func1, args=(queue,)), ]
[p.start() for p in process]
[p.join() for p in process]
if __name__ =='__main__':
run__queue()
除了上面提及的 Python 多線程,讀取多個 (???\ 大華) 網(wǎng)絡(luò)攝像頭的視頻流 ,我自己寫的開源的強化學習庫:小雅 ElegantRL 也使用了 Queue 進行多 CPU 多 GPU 訓練,為了提速,我已經(jīng)把 Queue 改為 Pipe。
7. 共享內(nèi)存 Manager
為了在 Python 里面實現(xiàn)多進程通信,上面提及的 Pipe Queue 把需要通信的信息從內(nèi)存里深拷貝了一份給其他線程使用(需要分發(fā)的線程越多,其占用的內(nèi)存越多)。而共享內(nèi)存會由解釋器負責維護一塊共享內(nèi)存(而不用深拷貝),這塊內(nèi)存每個進程都能讀取到,讀寫的時候遵守管理(因此不要以為用了共享內(nèi)存就一定變快)。
Manager 可以創(chuàng)建一塊共享的內(nèi)存區(qū)域,但是存入其中的數(shù)據(jù)需要按照特定的格式,Value 可以保存數(shù)值,Array 可以保存數(shù)組,如下。這里不推薦認為自己寫代碼能力弱的人嘗試。下面這里例子來自 Python 官網(wǎng)的 Document。
# https://docs.python.org/3/library/multiprocessing.html?highlight=multiprocessing%20array#multiprocessing.Array
from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double
class Point(Structure):
_fields_ = [('x', c_double), ('y', c_double)]
def modify(n, x, s, A):
n.value **= 2
x.value **= 2
s.value = s.value.upper()
for a in A:
a.x **= 2
a.y **= 2
if __name__ == '__main__':
lock = Lock()
n = Value('i', 7)
x = Value(c_double, 1.0/3.0, lock=False)
s = Array('c', b'hello world', lock=lock)
A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)
p = Process(target=modify, args=(n, x, s, A))
p.start()
p.join()
print(n.value)
print(x.value)
print(s.value)
print([(a.x, a.y) for a in A])
我刪掉了 Python 3.8 的 shared_momery 介紹,這部分有 Bug
下文來自 Stack Overflow,問題 Shared memory in multiprocessing 下 thuzhf 的回答 2021-01 :
For those interested in using Python3.8 's shared_memory module, it still has a bug which hasn’t been fixed and is affecting Python3.8/3.9/3.10 by now (2021-01-15). The bug is about resource tracker destroys shared memory segments when other processes should still have valid access. So take care if you use it in your code.
PyTorch 也有自帶的多進程 torch.multiprocessing
How to share a list of tensors in PyTorch multiprocessing? rozyang 的回答 ,非常簡單,核心代碼如下:
import torch.multiprocessing as mp
tensor.share_memory_()
正文已經(jīng)結(jié)束,我把部分 multiprocessing 的代碼都放在 github。希望大家能寫出讓自己滿意的多線程。我設(shè)計高性能的多進程時,會遵守以下規(guī)則:
- 盡可能少傳一點數(shù)據(jù)
- 盡可能減少主線程的負擔
- 盡可能不讓某個進程傻等著
- 盡可能減少進程間通信的頻率
開源的深度強化學習 (DRL) 算法庫 伯克利的 Ray-project Rllib 訓練快,但太復雜,OpenAI 的 SpinningUp 簡單,但不快。剛好我又懂一點多進程、Numpy、深度學習框架、深度強化學習這些雙層優(yōu)化算法,所以我覺得自己也寫一個 DRL 庫難度不大,于是開源了強化學習庫:小雅 ElegantRL。讓別人好好看看,DRL 庫挺簡單的一個東西弄那么復雜做什么?文章來源:http://www.zghlxwxcb.cn/news/detail-563394.html
盡管這個庫會一直保持框架小巧、代碼優(yōu)雅來方便入門深度強化學習的人,但 ElegantRL 卻把訓練效率放在首位(正因如此,ElegantRL 與 SpinningUp 的定位不同),所以我需要用 Python 的多進程來加速 DRL 的訓練。因而順便寫【在 Python 中優(yōu)雅地用多進程】這篇東西。文章來源地址http://www.zghlxwxcb.cn/news/detail-563394.html
到了這里,關(guān)于在Python中優(yōu)雅地用多進程:進程池 Pool、管道通信 Pipe、隊列通信 Queue、共享內(nèi)存 Manager Value的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!