協(xié)程 & asyncio & 異步
1. 協(xié)程 (coroutine)
協(xié)程不是計(jì)算機(jī)提供,而是程序員人為創(chuàng)造。
協(xié)程(coroutine),也可以被稱為微線程,是一種用戶態(tài)內(nèi)的上下文切換技術(shù)。簡(jiǎn)而言之,其實(shí)就是通過一個(gè)線程實(shí)現(xiàn)代碼塊互相切換運(yùn)行。例如:
def func1():
print(1)
...
print(2)
def func2():
print(3)
...
print(4)
func1()
func2()
實(shí)現(xiàn)協(xié)程有這么幾種方法:
-
greenlet
,早期模塊。 -
yield
關(guān)鍵字。 -
asyncio
裝飾器(python 3.4) -
async
、await
關(guān)鍵字(python 3.5)
1.1 greenlet 實(shí)現(xiàn)協(xié)程
pip3 install greenlet
from greenlet import greenlet
def func1():
print(1) # 第 2 步:輸出 1
gr2.switch() # 第 3 步:切換到 func2 函數(shù)
print(2) # 第 6 步:輸出 2
gr2.switch() # 第 7 步 切換到 func2 函數(shù),從上一次執(zhí)行的位置繼續(xù)向后執(zhí)行
def func2():
print(3) # 第 4 步:輸出 3
gr1.switch() # 第 5 步:切換到 func1 函數(shù),從上一次執(zhí)行的位置繼續(xù)向后執(zhí)行
print(4) # 第 8 步:輸出 4
gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch() # 第 1 步:去執(zhí)行 func1 函數(shù)
1.2 yield 關(guān)鍵字
def func1():
yield 1
yield from func2()
yield 2
def func2():
yield 3
yield 4
f1 = func1()
for item in f1:
print(item)
偽實(shí)現(xiàn),僅能實(shí)現(xiàn)協(xié)程的功能。
1.3 asyncio
在 python 3.4 及之后的版本。
import asyncio
@asyncio.coroutine
def func1():
print(1)
yield from asyncio.sleep(2) # 遇到 IO 耗時(shí)操作,自動(dòng)化切換到 tasks 中其它任務(wù)
print(2)
@asyncio.coroutine
def func2():
print(3)
yield from asyncio.sleep(2) # 遇到 IO 耗時(shí)操作,自動(dòng)化切換到 tasks 中其它任務(wù)
print(4)
tasks = [
asyncio.ensure_future(func1())
asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
注意:遇到 IO 阻塞自動(dòng)切換。
1.4 aynsc & await 關(guān)鍵字
在 python 3.5 及之后的版本。
import asyncio
async def func1():
print(1)
# 網(wǎng)絡(luò) IO 請(qǐng)求:下載一張圖片
await asyncio.sleep(2) # 遇到 IO 耗時(shí)操作,自動(dòng)化切換到 tasks 中的其它任務(wù)。
print(2)
async def func2():
print(3)
# 網(wǎng)絡(luò) IO 請(qǐng)求:下載一張圖片
await asyncio.sleep(2) # 遇到 IO 耗時(shí)操作,自動(dòng)化切換到 tasks 中的其它任務(wù)。
print(4)
tasks = [
asyncio.ensure_future(func1())
asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
2. 協(xié)程的意義
在一個(gè)線程中如果遇到 IO 等待時(shí)間,線程不會(huì)傻等,而是利用空閑時(shí)間再去干點(diǎn)其它事情。
案例:下載三張圖片(網(wǎng)絡(luò) IO):
-
普通方式(同步)
pip3 install requests
import requests def download_image(url): print("開始下載:", url) response = requests.get(url) print("下載完成") file_name = url.rsplit("_")[-1] with open(file_name, mode="wb") as file_object: file_object.write(response.content) url_list = [ "https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg", "https://www3.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg", "https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vk0sGY913.jpg", ] for item in url_list: download_image(item)
-
協(xié)程方式(異步)
pip3 install aiohttp
import aiohttp import asyncio async def fetch(session, url): print("發(fā)送請(qǐng)求:", url) async with session.get(url, verify_ssl=False) as response: content = await response.content.read() file_name = url.rsplit("_")[-1] with open(file_name, mode="wb") as file_object: file_object.write(content) async def main(): async with aiohttp.ClientSession() as session: url_list = [ "https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg", "https://www3.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg", "https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vk0sGY913.jpg", ] tasks = [asyncio.create_task(fetch(session, url)) for url in url_list] await asyncio.wait(tasks) if __name__ == "__main__": aynscio.run(main())
3. 異步編程
3.1 事件循環(huán)(Event Loop)
理解成一個(gè)死循環(huán),去檢測(cè)并執(zhí)行某些代碼。
task_list = [task1, task2, task3, ...]
while True:
executables, completes = [...], [...] # 在 task_list 中檢查所有任務(wù),將可執(zhí)行和已完成返回
for executable in executables:
execute executable
for complete in completes:
remove complete from task_list
if task_list == []: # 如果 task_list 中的任務(wù)都已完成,則終止循環(huán)
break
import asyncio
# 去生成或獲取一個(gè)事件循環(huán)
loop = asyncio.get_event_loop()
# 將任務(wù)放到任務(wù)列表
loop.run_until_complete(asyncio.wait(tasks))
3.2 快速上手
協(xié)程函數(shù)(coroutine function):定義函數(shù)時(shí) async def
(加上 async
關(guān)鍵字)。
協(xié)程對(duì)象(coroutine object):執(zhí)行協(xié)程函數(shù)得到的協(xié)程對(duì)象。
async def func():
pass
result = func()
注意到 result = func()
中 call 了 func()
,但并不會(huì)執(zhí)行 func()
內(nèi)部代碼,只是得到了 func()
的協(xié)程對(duì)象。
若要執(zhí)行協(xié)程函數(shù)內(nèi)部代碼,需要事件循環(huán)去處理協(xié)程函數(shù)得到的協(xié)程對(duì)象。
async def func():
print("come here.")
result = func()
loop = async.get_event_loop()
loop.run_until_complete(result)
到了 python 3.7 之后,還有更簡(jiǎn)便的寫法:
async def func():
print("come here.")
result = func()
# loop = async.get_event_loop()
# loop.run_until_complete(result)
async.run(result) # python 3.7
3.3 await 關(guān)鍵字
await 一般要加上 可等待的對(duì)象(協(xié)程對(duì)象、Future 對(duì)象、Task 對(duì)象),可以簡(jiǎn)單理解為 IO 等待(但實(shí)際上并沒有這么簡(jiǎn)單)。
示例 1:
import asyncio
async def func():
print("come here.")
response = await asyncio.sleep(2) # 沒有什么意義,假設(shè)這是一個(gè) IO 等待(例如網(wǎng)絡(luò)請(qǐng)求)
print("terminate", response)
asyncio.run(func())
在事件循環(huán)內(nèi)部,執(zhí)行協(xié)程對(duì)象 func()
時(shí)會(huì)先執(zhí)行 print("come here.")
,接下來會(huì)進(jìn)入 IO 等待,此時(shí)事件循環(huán)會(huì)跳出 func()
函數(shù)去執(zhí)行其它任務(wù),一旦 response
得到返回值(即結(jié)束 IO 等待),事件循環(huán)會(huì)在下一次循環(huán)中檢測(cè)到 IO 等待已經(jīng)結(jié)束,此刻才會(huì)繼續(xù)執(zhí)行 func()
后面的代碼(即 print("terminate", response)
)。
示例 2(協(xié)程對(duì)象之間可以嵌套):
async def others():
print("start")
await asyncio.sleep(2)
print("end")
return "返回值"
async def func():
print("執(zhí)行協(xié)程函數(shù)內(nèi)部代碼")
# 遇到 IO 操作掛起當(dāng)前協(xié)程(任務(wù)),等 IO 操作完成之后再繼續(xù)往下執(zhí)行。當(dāng)前協(xié)程掛起時(shí),事件循環(huán)可以去執(zhí)行其它協(xié)程(任務(wù))。
response = await others()
print("IO 請(qǐng)求結(jié)束,結(jié)果為:", response)
asyncio.run(func())
示例 3:
async def others():
print("start")
await asyncio.sleep(2)
print("end")
return "返回值"
async def func():
print("執(zhí)行協(xié)程函數(shù)內(nèi)部代碼")
# 遇到 IO 操作掛起當(dāng)前協(xié)程(任務(wù)),等 IO 操作完成之后再繼續(xù)往下執(zhí)行。當(dāng)前協(xié)程掛起時(shí),事件循環(huán)可以去執(zhí)行其它協(xié)程(任務(wù))。
response_1 = await others()
print("IO 請(qǐng)求結(jié)束,結(jié)果為:", response_1)
response_2 = await others()
print("IO 請(qǐng)求結(jié)束,結(jié)果為:", response_2)
asyncio.run(func())
await
關(guān)鍵字的含義就是,等待對(duì)象的值得到返回結(jié)果之后再繼續(xù)向下運(yùn)行。
3.4 Task 對(duì)象
Tasks are used to schedule coroutines concurrently.
When a coroutine is wrapped into a Task with functions like
asyncio.create_task()
the coroutine is automatically scheduled to run soon.
簡(jiǎn)單來說,它可以在事件循環(huán)中添加多個(gè)任務(wù)。
Tasks 用于并發(fā)調(diào)度協(xié)程,通過
asyncio.create_task(協(xié)程對(duì)象)
的方式創(chuàng)建 Task 對(duì)象,這樣可以讓協(xié)程加入事件循環(huán)中等待被調(diào)度執(zhí)行。除了使用asyncio.create_task()
函數(shù)以外的,還可以用低層級(jí)的loop.create_task()
或ensure_future()
函數(shù)。不建議手動(dòng)實(shí)例化 Task 對(duì)象。
示例 1(這種代碼寫得比較少):
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"
async def main():
print("main 開始")
# 創(chuàng)建協(xié)程,將協(xié)程封裝到一個(gè) Task 對(duì)象中并立即添加到事件循環(huán)的任務(wù)列表中,等待事件循環(huán)去執(zhí)行(默認(rèn)是就緒狀態(tài))。
task1 = asyncio.create_task(func())
task2 = asyncio.create_task(fucn())
# 當(dāng)執(zhí)行某協(xié)程遇到 IO 操作時(shí),會(huì)自動(dòng)化切換執(zhí)行其它任務(wù)。
# 此處的 await 時(shí)等待相對(duì)應(yīng)的協(xié)程全都執(zhí)行完畢并獲取結(jié)果。
result_1 = await task1
result_2 = await task2
print(result_1, result_2)
asyncio.run(main())
示例 2(這種代碼應(yīng)用得比較多):
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"
async def main():
print("main 開始")
# 創(chuàng)建協(xié)程任務(wù)列表
task_list = [
asyncio.create_task(func(), name="n1"), # 給 task 命名,會(huì)在返回集中顯示
asyncio.create_task(func(), name="n2")
]
# 不能直接把 task_list 以列表的形式加在 await 之后
# 注意 await 關(guān)鍵字只接受 coroutine object, task object, future object
# 此處 done 是一個(gè)集合,為 task_list 的返回值
# pending 在 timeout 不為 None 時(shí)有意義,timeout 規(guī)定了最長(zhǎng)等待時(shí)間,
# 如果超過 timeout,那么還未完成的任務(wù)將添加到 pending 中。
done, pending = await asyncio.wait(task_list, timeout=1)
print(done)
asyncio.run(main())
示例 3:
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"
# 創(chuàng)建協(xié)程任務(wù)列表
task_list = [
asyncio.create_task(func(), name="n1"), # 給 task 命名,會(huì)在返回集中顯示
asyncio.create_task(func(), name="n2")
]
done, pending = asyncio.run(asyncio.wait(task_list))
print(done)
注意到以上代碼會(huì)導(dǎo)致程序報(bào)錯(cuò)。原因是:asyncio.create_task()
會(huì)將協(xié)程對(duì)象立即添加到事件循環(huán)中,但是,事件循環(huán)是在 asyncio.run()
中被創(chuàng)造,因此此時(shí)并不存在事件循環(huán)。應(yīng)該如此修改:
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"
# 創(chuàng)建協(xié)程對(duì)象列表
task_list = [
func(),
func()
]
# 此時(shí) asyncio 會(huì)在創(chuàng)建事件循環(huán)之后,在內(nèi)部將 task_list 中的協(xié)程對(duì)象添加到事件循環(huán)中
done, pending = asyncio.run(asyncio.wait(task_list))
print(done)
3.5 Future 對(duì)象
Future
類是 Task
類的父類,即 Task
類繼承自 Future
類,Task
對(duì)象內(nèi)部 await 結(jié)果的處理基于 Future
對(duì)象而來。
A
Future
is a special low-level awaitable object that represents an eventual result of an asynchronous operation.
示例 1:
import asyncio
async def main():
# 獲取當(dāng)前事件循環(huán)
loop = asyncio.get_running_loop()
# 創(chuàng)建一個(gè)任務(wù)(Future 對(duì)象),這個(gè)任務(wù)什么都不干。
future = loop.create_future()
# 等待任務(wù)最終結(jié)果(Future 對(duì)象),沒有結(jié)果則會(huì)一直等下去。
await future
asyncio.run(main())
在上述代碼中,由于創(chuàng)建的 Future
對(duì)象什么也不干,因此 await future
將一直卡住,無法獲得返回結(jié)果,所以上述代碼是沒有實(shí)際意義的。但注意,如果某一個(gè)時(shí)刻突然給 future
賦值,那么 future
立刻可以獲得返回結(jié)果,并且跳出 await
。
示例 2(沒什么意義,用于理解 Future
對(duì)象的作用,即幫助我們等待結(jié)果):
async def set_after(future):
await asyncio.sleep(2)
future.set_result("666")
async def main():
# 獲取當(dāng)前事件循環(huán)
loop = asyncio.get_running_loop()
# 創(chuàng)建一個(gè)任務(wù)(Future 對(duì)象),沒有綁定任何行為,則這個(gè)任務(wù)永遠(yuǎn)不知道什么時(shí)候結(jié)束。
future = loop.create_future()
# 創(chuàng)建一個(gè)任務(wù)(Task 對(duì)象),綁定了 set_after 函數(shù),函數(shù)內(nèi)部在 2s 之后會(huì)給 future 賦值。
# 即手動(dòng)設(shè)置 future 任務(wù)的最終結(jié)果,那么 future 就可以結(jié)束了。
await loop.create_task(set_after(future))
# 等待 Future 對(duì)象獲取最終結(jié)果,否則一直等待下去。
data = await future
print(data)
asyncio.run(main())
3.6 concurrent 中的 Future 對(duì)象
首先注意到,concurrent
中的 Future
對(duì)象(concurrent.futures.Future
)和 asyncio
中的 Future
對(duì)象沒有關(guān)系。concurrent
中的 Future
對(duì)象是當(dāng)使用線程池、進(jìn)程池實(shí)現(xiàn)異步操作時(shí)使用到的對(duì)象。
import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
def func(value):
time.sleep(1)
return value
# 創(chuàng)建線程池
pool = ThreadPoolExecutor(max_workers=5)
# 或創(chuàng)建進(jìn)程池
# pool = ProcessPoolExecutor(max_workers=5)
for i in range(10):
# 讓 pool 拿出一個(gè)線程去執(zhí)行 func 函數(shù)
future = pool.submit(func, i)
print(future)
實(shí)際中可能會(huì)存在兩種 Future
對(duì)象交叉使用。例如:crm
項(xiàng)目中 80% 都基于協(xié)程異步編程 + MySQL,但 MySQL 不支持異步,因此在 MySQL 中使用進(jìn)程池、線程池做異步編程。
示例 1:
import time
import asyncio
import concurrent.futures
def func1():
# 某個(gè)耗時(shí)操作
time.sleep(2)
return "complete"
async def main():
loop = asyncio.get_running_loop()
# 1. Run in the default loop's executor (default to ThreadPoolExecutor)
# 第一步:內(nèi)部會(huì)先調(diào)用 ThreadPoolExecutor 的 submit 方法去線程池中申請(qǐng)一個(gè)線程去
# 執(zhí)行 func1 函數(shù),并返回一個(gè) concurrent.futures.Future 對(duì)象
# 第二步:調(diào)用 asyncio.wrap_future 將 concurrent.future.Future 對(duì)象
# 包裝為 asyncio.Future 對(duì)象。
# 因?yàn)?concurrent.futures.Future 對(duì)象不支持 await 語(yǔ)法,所以需要包裝為
# asyncio.Future 對(duì)象才能使用。
future = loop.run_in_executor(None, func1) # 返回一個(gè) Future
# 上面這一步內(nèi)部會(huì)調(diào)用 asyncio.wrap_future 將返回的 concurrent.futures.Future
# 對(duì)象轉(zhuǎn)換為 asyncio.Future 對(duì)象
# 默認(rèn) None 意味著創(chuàng)建線程池,若想使用進(jìn)程池請(qǐng)參考以下注釋代碼
result = await future
print("default thread pool", result)
# 2. Run in a custom thread pool:
# with concurrent.futures.ThreadPoolExecutor() as pool:
# result = await loop.run_in_executor(pool, func1)
# print("custom thread pool", result)
# 3. Run in a custom process pool:
# with concurrent.futures.ProcessPoolExecutor() as pool:
# result = await loop.run_in_executor(pool, func1)
# print("custom process pool", result)
asyncio.run(main())
案例:asyncio
+ 不支持異步的模塊(爬蟲)
import asyncio
import requests
async def download_image(url):
# 發(fā)送網(wǎng)絡(luò)請(qǐng)求,下載圖片(遇到網(wǎng)絡(luò)下載圖片的 IO 請(qǐng)求,自動(dòng)化切換到其它任務(wù))
print("開始下載:", url)
loop = asyncio.get_event_loop()
# requests 模塊默認(rèn)不支持異步操作,所以就用線程池配合實(shí)現(xiàn)了
future = loop.run_in_executor(None, requests.get, url)
response = await future
print("下載完成")
# 圖片保存到本地文件
file_name = url.rsplit("_")[-1]
with open(file_name, mode="wb") as file_object:
file_object.write(response.content)
if __name__ == "__main__":
url_list = [ "https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg",
"https://www3.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg",
"https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vk0sGY913.jpg",
]
tasks = [download_image(url) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
耗費(fèi)資源更大,不得已而為之。
3.7 異步迭代器
-
什么是異步迭代器?
實(shí)現(xiàn)了
__aiter__()
和__anext__()
方法的對(duì)象。__anext__()
必須返回一個(gè)awaitable
對(duì)象。async for
會(huì)處理異步迭代器的__anext__()
方法所返回的可等待對(duì)象,直到其引發(fā)一個(gè)StopAsyncIteration
異常。由PEP 492
引入。 -
什么是異步可迭代對(duì)象?
可在
async for
語(yǔ)句中被使用的對(duì)象。必須通過它的__aiter__()
方法返回一個(gè)asynchronous iterator
。由PEP 492
引入。
示例:
import asyncio
class Reader(object):
"""
自定義異步迭代器(同時(shí)也是異步可迭代對(duì)象)
"""
def __init__(self):
self.count = 0
async def readline(self):
# await asyncio.sleep(1)
self.count += 1
if self.count == 100:
return None
return self.count
def __aiter__(self):
return self
async def __anext__(self):
val = await self.readline()
if val is None:
raise StopAsyncIteration
return val
# 以下代碼會(huì)報(bào)錯(cuò),因?yàn)?async for 必須寫在協(xié)程函數(shù)內(nèi)。
# obj = Reader()
# async for item in obj:
# print(item)
async def func():
obj = Reader()
async for item in obj:
print(item)
asyncio.run(func())
3.8 異步上下文管理器
-
什么是異步上下文管理器?
此種對(duì)象通過定義
__aenter__()
和__aexit__()
方法來對(duì)async with
語(yǔ)句中的環(huán)境進(jìn)行控制。由PEP 492
引入。
示例:
import asyncio
class AsyncContextManager(object):
def __init__(self):
self.conn = conn
async def do_something(self):
# 異步操作數(shù)據(jù)庫(kù)
return 666
async def __aenter__(self):
# 異步連接數(shù)據(jù)庫(kù)
self.conn = await asyncio.sleep(1) # 可以換成連接數(shù)據(jù)庫(kù)代碼
return self
async def __aexit__(self, exc_type, exc, tb):
# 異步關(guān)閉數(shù)據(jù)庫(kù)鏈接
await asyncio.sleep(1)
# 以下代碼會(huì)報(bào)錯(cuò),因?yàn)?async with 必須寫在協(xié)程函數(shù)內(nèi)。
# obj = AsyncContextManager()
# async with obj:
# result = await obj.do_something()
# print(result)
# 或者
# async with AsyncContextManager() as f:
# result = await f.do_something()
# print(result)
async def func():
async with AsyncContextManager() as f:
result = await f.do_something()
print(result)
asyncio.run(func())
4. uvloop
uvloop
是 asyncio
事件循環(huán)的替代方案,可以提高事件循環(huán)效率,性能接近于 go
語(yǔ)言。文章來源:http://www.zghlxwxcb.cn/news/detail-710085.html
pip3 install uvloop
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# Your asyncio code here.
# 內(nèi)部的事件循環(huán)自動(dòng)會(huì)變?yōu)?uvloop
asyncio.run()
注意:asgi
是支持異步的 wsgi
網(wǎng)關(guān)接口(e.g. uvicorn
,內(nèi)部使用的就是 uvloop
)。文章來源地址http://www.zghlxwxcb.cn/news/detail-710085.html
到了這里,關(guān)于Asyncio 協(xié)程異步筆記的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!