目錄
一、協(xié)程
1.1、greenlet實現(xiàn)協(xié)程
1.2、yield關(guān)鍵字
1.3、asyncio
1.4、async & await關(guān)鍵字
二、協(xié)程意義
三、異步編程
3.1、事件循環(huán)
3.2、快速上手
3.3、await
3.4、Task對象
3.5、asyncio.Future對象
3.5、concurrent.futures.Future對象
3.7、異步迭代器
3.8、異步上下文管理器
四、uvloop
五、實戰(zhàn)案例
5.1、異步redis
5.2、異步MySQL
5.3、FastAPI框架
六、總結(jié)
一、協(xié)程
協(xié)程不是計算機(jī)提供,程序員人為創(chuàng)造。
協(xié)程(Coroutine),也可以被稱為微線程,是一種用戶態(tài)內(nèi)的上下文切換技術(shù)。簡而言之,其實就是通過一個線程實現(xiàn)代碼塊相互切換執(zhí)行。例如:
def func1():
print(1)
...
print(2)
def func2():
print(3)
...
print(4)
func1()
func2()
實現(xiàn)協(xié)程有這么幾種方法:
-
greenlet,早期模塊。
-
yield關(guān)鍵字。
-
asyncio裝飾器(py3.4)
-
async、await關(guān)鍵字(py3.5)【推薦】
1.1、greenlet實現(xiàn)協(xié)程
pip3 install greenlet
from greenlet import greenlet
def func1():
print(1) # 第2步:輸出 1
gr2.switch() # 第3步:切換到 func2 函數(shù)
print(2) # 第6步:輸出 2
gr2.switch() # 第7步:切換到 func2 函數(shù),從上一次執(zhí)行的位置繼續(xù)向后執(zhí)行
def func2():
print(3) # 第4步:輸出 3
gr1.switch() # 第5步:切換到 func1 函數(shù),從上一次執(zhí)行的位置繼續(xù)向后執(zhí)行
print(4) # 第8步:輸出 4
gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch() # 第1步:去執(zhí)行 func1 函數(shù)
1.2、yield關(guān)鍵字
def func1():
yield 1
yield from func2()
yield 2
def func2():
yield 3
yield 4
f1 = func1()
for item in f1:
print(item)
1.3、asyncio
在python3.4及之后的版本。
import asyncio
@asyncio.coroutine
def func1():
print(1)
# 網(wǎng)絡(luò)IO請求:下載一張圖片
yield from asyncio.sleep(2) # 遇到IO耗時操作,自動化切換到tasks中的其他任務(wù)
print(2)
@asyncio.coroutine
def func2():
print(3)
# 網(wǎng)絡(luò)IO請求:下載一張圖片
yield from asyncio.sleep(2) # 遇到IO耗時操作,自動化切換到tasks中的其他任務(wù)
print(4)
tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
我們將兩個函數(shù)放到tasks中,啟動時會隨機(jī)選一個函數(shù)執(zhí)行,遇到IO阻塞自動切換。
1.4、async & await關(guān)鍵字
在python3.5及之后的版本。
import asyncio
async def func1():
print(1)
# 網(wǎng)絡(luò)IO請求:下載一張圖片
await asyncio.sleep(2) # 遇到IO耗時操作,自動化切換到tasks中的其他任務(wù)
print(2)
async def func2():
print(3)
# 網(wǎng)絡(luò)IO請求:下載一張圖片
await asyncio.sleep(2) # 遇到IO耗時操作,自動化切換到tasks中的其他任務(wù)
print(4)
tasks = [
asyncio.ensure_future( func1() ),
asyncio.ensure_future( func2() )
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
二、協(xié)程意義
在一個線程中如果遇到IO等待時間,線程不會傻傻等,利用空閑的時候再去干點其他事。
案例:去下載三張圖片(網(wǎng)絡(luò)IO)。
(1)普通方式(同步)
pip install requests
import requests
def download_image(url):
print("開始下載:",url)
# 發(fā)送網(wǎng)絡(luò)請求,下載圖片
response = requests.get(url)
print("下載完成")
# 圖片保存到本地文件
file_name = url.rsplit('_')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(response.content)
if __name__ == '__main__':
url_list = [
'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
]
for item in url_list:
download_image(item)
(2)協(xié)程方式(異步)
pip install aiohttp
import aiohttp
import asyncio
INSTALL_AIOHTTP = """pip3 install aiohttp"""
async def fetch(session, url):
print("發(fā)送請求:", url)
async with session.get(url, verify_ssl=False) as response:
content = await response.content.read()
file_name = url.rsplit('_')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(content)
print('下載完成', url)
async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
]
tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run(main())
三、異步編程
3.1、事件循環(huán)
理解成為一個死循環(huán) ,去檢測并執(zhí)行某些代碼。
# 偽代碼
任務(wù)列表 = [ 任務(wù)1, 任務(wù)2, 任務(wù)3,... ]
while True:
可執(zhí)行的任務(wù)列表,已完成的任務(wù)列表 = 去任務(wù)列表中檢查所有的任務(wù),將'可執(zhí)行'和'已完成'的任務(wù)返回
for 就緒任務(wù) in 可執(zhí)行的任務(wù)列表:
執(zhí)行已就緒的任務(wù)
for 已完成的任務(wù) in 已完成的任務(wù)列表:
在任務(wù)列表中移除 已完成的任務(wù)
如果 任務(wù)列表 中的任務(wù)都已完成,則終止循環(huán)
import asyncio
# 去生成或獲取一個事件循環(huán)
loop = asyncio.get_event_loop()
# 將任務(wù)放到`任務(wù)列表`
loop.run_until_complete(任務(wù))
3.2、快速上手
協(xié)程函數(shù),定義函數(shù)時候 async def 函數(shù)名
。
協(xié)程對象,執(zhí)行 協(xié)程函數(shù)() 得到的協(xié)程對象。
async def func():
pass
result = func() # 協(xié)程對象
注意:執(zhí)行協(xié)程函數(shù)創(chuàng)建協(xié)程對象,函數(shù)內(nèi)部代碼不會執(zhí)行。
如果想要運行協(xié)程函數(shù)內(nèi)部代碼,必須要將協(xié)程對象交給事件循環(huán)來處理。
import asyncio
async def func():
print("快來搞我吧!")
result = func()
# loop = asyncio.get_event_loop()
# loop.run_until_complete( result )
asyncio.run(result) # python3.7
3.3、await
await + 可等待的對象(協(xié)程對象、Future、Task對象 -> IO等待)
示例1:
import asyncio
async def func():
print("來玩呀")
response = await asyncio.sleep(2)
print("結(jié)束", response)
asyncio.run(func())
示例2:
import asyncio
async def others():
print("start")
await asyncio.sleep(2)
print('end')
return '返回值'
async def func():
print("執(zhí)行協(xié)程函數(shù)內(nèi)部代碼")
# 遇到IO操作掛起當(dāng)前協(xié)程(任務(wù)),等IO操作完成之后再繼續(xù)往下執(zhí)行。當(dāng)前協(xié)程掛起時,事件循環(huán)可以去執(zhí)行其他協(xié)程(任務(wù))。
response = await others()
print("IO請求結(jié)束,結(jié)果為:", response)
asyncio.run(func())
示例3:
import asyncio
async def others():
print("start")
await asyncio.sleep(2)
print('end')
return '返回值'
async def func():
print("執(zhí)行協(xié)程函數(shù)內(nèi)部代碼")
# 遇到IO操作掛起當(dāng)前協(xié)程(任務(wù)),等IO操作完成之后再繼續(xù)往下執(zhí)行。當(dāng)前協(xié)程掛起時,事件循環(huán)可以去執(zhí)行其他協(xié)程(任務(wù))。
response1 = await others()
print("IO請求結(jié)束,結(jié)果為:", response1)
response2 = await others()
print("IO請求結(jié)束,結(jié)果為:", response2)
asyncio.run(func())
await就是等待對象的值得到結(jié)果之后再繼續(xù)向下走,其實就是同步操作。
3.4、Task對象
Tasks用于并發(fā)調(diào)度協(xié)程,通過asyncio.create_task(協(xié)程對象)
的方式創(chuàng)建Task對象,這樣可以讓協(xié)程加入事件循環(huán)中等待被調(diào)度執(zhí)行。除了使用 asyncio.create_task()
函數(shù)以外,還可以用低層級的 loop.create_task()
或 ensure_future()
函數(shù)。不建議手動實例化 Task 對象。
注意:asyncio.create_task()
函數(shù)在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低層級的 asyncio.ensure_future()
函數(shù)。
示例1:
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"
async def main():
print("main開始")
# 創(chuàng)建Task對象,將當(dāng)前執(zhí)行func函數(shù)任務(wù)添加到事件循環(huán)。
task1 = asyncio.create_task(func())
# 創(chuàng)建Task對象,將當(dāng)前執(zhí)行func函數(shù)任務(wù)添加到事件循環(huán)。
task2 = asyncio.create_task(func())
print("main結(jié)束")
# 當(dāng)執(zhí)行某協(xié)程遇到IO操作時,會自動化切換執(zhí)行其他任務(wù)。
# 此處的await是等待相對應(yīng)的協(xié)程全都執(zhí)行完畢并獲取結(jié)果
ret1 = await task1
ret2 = await task2
print(ret1, ret2)
asyncio.run(main())
示例2:
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"
async def main():
print("main開始")
task_list = [
asyncio.create_task(func(), name='n1'),
asyncio.create_task(func(), name='n2')
]
print("main結(jié)束")
done, pending = await asyncio.wait(task_list, timeout=None)
print(done)
asyncio.run(main())
示例3:
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"
task_list = [
func(),
func(),
]
done,pending = asyncio.run(asyncio.wait(task_list))
print(done)
3.5、asyncio.Future對象
Task繼承Future,Task對象內(nèi)部await結(jié)果的處理基于Future對象來的。
示例1:
import asyncio
async def main():
# 獲取當(dāng)前事件循環(huán)
loop = asyncio.get_running_loop()
# 創(chuàng)建一個任務(wù)(Future對象),這個任務(wù)什么都不干。
fut = loop.create_future()
# 等待任務(wù)最終結(jié)果(Future對象),沒有結(jié)果則會一直等下去。
await fut
asyncio.run(main())
示例2:
import asyncio
async def set_after(fut):
await asyncio.sleep(2)
fut.set_result("666")
async def main():
# 獲取當(dāng)前事件循環(huán)
loop = asyncio.get_running_loop()
# 創(chuàng)建一個任務(wù)(Future對象),沒綁定任何行為,則這個任務(wù)永遠(yuǎn)不知道什么時候結(jié)束。
fut = loop.create_future()
# 創(chuàng)建一個任務(wù)(Task對象),綁定了set_after函數(shù),函數(shù)內(nèi)部在2s之后,會給fut賦值。
# 即手動設(shè)置future任務(wù)的最終結(jié)果,那么fut就可以結(jié)束了。
await loop.create_task( set_after(fut) )
# 等待 Future對象獲取 最終結(jié)果,否則一直等下去
data = await fut
print(data)
asyncio.run(main())
3.5、concurrent.futures.Future對象
使用線程池、進(jìn)程池實現(xiàn)異步操作時用到的對象。
import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
def func(value):
time.sleep(1)
print(value)
return 123
# 創(chuàng)建線程池
pool = ThreadPoolExecutor(max_workers=5)
# 創(chuàng)建進(jìn)程池
# pool = ProcessPoolExecutor(max_workers=5)
for i in range(10):
fut = pool.submit(func, i)
print(fut)
案例:asyncio + 不支持異步的模塊
import asyncio
import requests
async def download_image(url):
# 發(fā)送網(wǎng)絡(luò)請求,下載圖片(遇到網(wǎng)絡(luò)下載圖片的IO請求,自動化切換到其他任務(wù))
print("開始下載:", url)
loop = asyncio.get_event_loop()
# requests模塊默認(rèn)不支持異步操作,所以就使用線程池來配合實現(xiàn)了。
future = loop.run_in_executor(None, requests.get, url)
response = await future
print('下載完成')
# 圖片保存到本地文件
file_name = url.rsplit('_')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(response.content)
if __name__ == '__main__':
url_list = [
'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
]
tasks = [download_image(url) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
3.7、異步迭代器
什么是異步迭代器
實現(xiàn)了 __aiter__() 和 __anext__() 方法的對象。__anext__
必須返回一個 awaitable 對象。async for 會處理異步迭代器的 __anext__() 方法所返回的可等待對象,直到其引發(fā)一個 StopAsyncIteration 異常。由 PEP 492 引入。
什么是異步可迭代對象?
可在 async for 語句中被使用的對象。必須通過它的 __aiter__() 方法返回一個 asynchronous iterator。由 PEP 492 引入。
import asyncio
class Reader(object):
""" 自定義異步迭代器(同時也是異步可迭代對象) """
def __init__(self):
self.count = 0
async def readline(self):
# await asyncio.sleep(1)
self.count += 1
if self.count == 100:
return None
return self.count
def __aiter__(self):
return self
async def __anext__(self):
val = await self.readline()
if val == None:
raise StopAsyncIteration
return val
async def func():
obj = Reader()
async for item in obj:
print(item)
asyncio.run( func() )
3.8、異步上下文管理器
此種對象通過定義 __aenter__() 和 __aexit__() 方法來對 async with 語句中的環(huán)境進(jìn)行控制。由 PEP 492 引入。
import asyncio
class AsyncContextManager:
def __init__(self):
self.conn = conn
async def do_something(self):
# 異步操作數(shù)據(jù)庫
return 666
async def __aenter__(self):
# 異步鏈接數(shù)據(jù)庫
self.conn = await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc, tb):
# 異步關(guān)閉數(shù)據(jù)庫鏈接
await asyncio.sleep(1)
async def func():
async with AsyncContextManager() as f:
result = await f.do_something()
print(result)
asyncio.run( func() )
四、uvloop
是asyncio的事件循環(huán)的替代方案。事件循環(huán) > 默認(rèn)asyncio的事件循環(huán)。
pip install uvloop
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 編寫asyncio的代碼,與之前寫的代碼一致。
# 內(nèi)部的事件循環(huán)自動化會變?yōu)閡vloop
asyncio.run(...)
注意:一個asgi -> uvicorn
內(nèi)部使用的就是uvloop
五、實戰(zhàn)案例
5.1、異步redis
在使用python代碼操作redis時,鏈接/操作/斷開都是網(wǎng)絡(luò)IO。
案例1:
pip install aioredis
import asyncio
import aioredis
async def execute(address, password):
print("開始執(zhí)行", address)
# 網(wǎng)絡(luò)IO操作:創(chuàng)建redis連接
redis = await aioredis.create_redis(address, password=password)
# 網(wǎng)絡(luò)IO操作:在redis中設(shè)置哈希值car,內(nèi)部在設(shè)三個鍵值對,即: redis = { car:{key1:1,key2:2,key3:3}}
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# 網(wǎng)絡(luò)IO操作:去redis中獲取值
result = await redis.hgetall('car', encoding='utf-8')
print(result)
redis.close()
# 網(wǎng)絡(luò)IO操作:關(guān)閉redis連接
await redis.wait_closed()
print("結(jié)束", address)
asyncio.run(execute('redis://127.0.0.1:6379', "123456"))
示例2:
import asyncio
import aioredis
async def execute(address, password):
print("開始執(zhí)行", address)
# 網(wǎng)絡(luò)IO操作:先去連接 47.93.4.197:6379,遇到IO則自動切換任務(wù),去連接47.93.4.198:6379
redis = await aioredis.create_redis_pool(address, password=password)
# 網(wǎng)絡(luò)IO操作:遇到IO會自動切換任務(wù)
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# 網(wǎng)絡(luò)IO操作:遇到IO會自動切換任務(wù)
result = await redis.hgetall('car', encoding='utf-8')
print(result)
redis.close()
# 網(wǎng)絡(luò)IO操作:遇到IO會自動切換任務(wù)
await redis.wait_closed()
print("結(jié)束", address)
task_list = [
execute('redis://47.93.4.197:6379', "root!2345"),
execute('redis://47.93.4.198:6379', "root!2345")
]
asyncio.run(asyncio.wait(task_list))
5.2、異步MySQL
pip install aiomysql
示例1:
import asyncio
import aiomysql
async def execute():
# 網(wǎng)絡(luò)IO操作:連接MySQL
conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='123', db='mysql', )
# 網(wǎng)絡(luò)IO操作:創(chuàng)建CURSOR
cur = await conn.cursor()
# 網(wǎng)絡(luò)IO操作:執(zhí)行SQL
await cur.execute("SELECT Host,User FROM user")
# 網(wǎng)絡(luò)IO操作:獲取SQL結(jié)果
result = await cur.fetchall()
print(result)
# 網(wǎng)絡(luò)IO操作:關(guān)閉鏈接
await cur.close()
conn.close()
asyncio.run(execute())
示例2:
import asyncio
import aiomysql
async def execute(host, password):
print("開始", host)
# 網(wǎng)絡(luò)IO操作:先去連接 47.93.40.197,遇到IO則自動切換任務(wù),去連接47.93.40.198:6379
conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='mysql')
# 網(wǎng)絡(luò)IO操作:遇到IO會自動切換任務(wù)
cur = await conn.cursor()
# 網(wǎng)絡(luò)IO操作:遇到IO會自動切換任務(wù)
await cur.execute("SELECT Host,User FROM user")
# 網(wǎng)絡(luò)IO操作:遇到IO會自動切換任務(wù)
result = await cur.fetchall()
print(result)
# 網(wǎng)絡(luò)IO操作:遇到IO會自動切換任務(wù)
await cur.close()
conn.close()
print("結(jié)束", host)
task_list = [
execute('47.93.41.197', "root!2345"),
execute('47.93.40.197', "root!2345")
]
asyncio.run(asyncio.wait(task_list))
5.3、FastAPI框架
安裝
pip install fastapi
pip install uvicorn (asgi內(nèi)部基于uvloop)
示例: luffy.py文章來源:http://www.zghlxwxcb.cn/news/detail-829601.html
import asyncio
import uvicorn
import aioredis
from aioredis import Redis
from fastapi import FastAPI
app = FastAPI()
# 創(chuàng)建一個redis連接池
REDIS_POOL = aioredis.ConnectionsPool('redis://47.193.14.198:6379', password="root123", minsize=1, maxsize=10)
@app.get("/")
def index():
""" 普通操作接口 """
# 如果有兩個用戶并發(fā)訪問此接口,用戶A先執(zhí)行結(jié)束并返回才能用戶B執(zhí)行,同步操作
return {"message": "Hello World"}
@app.get("/red")
async def red():
""" 異步操作接口 """
print("請求來了")
await asyncio.sleep(3)
# 連接池獲取一個連接
conn = await REDIS_POOL.acquire()
redis = Redis(conn)
# 設(shè)置值
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# 讀取值
result = await redis.hgetall('car', encoding='utf-8')
print(result)
# 連接歸還連接池
REDIS_POOL.release(conn)
return result
if __name__ == '__main__':
uvicorn.run("luffy:app", host="127.0.0.1", port=5000, log_level="info")
六、總結(jié)
最大的意義:通過一個線程利用其IO等待時間去做一些其他事情。文章來源地址http://www.zghlxwxcb.cn/news/detail-829601.html
到了這里,關(guān)于Python asyncio高性能異步編程 詳解的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!