使用典型的異步 Python 庫處理數(shù)百個 HTTP 請求、磁盤寫入和其他 I/O 密集型任務(wù)。
當在單線程同步語言的范圍內(nèi)構(gòu)建應(yīng)用程序時,局限性很快就會變得非常明顯。我首先想到的是writes:I/O 密集型任務(wù)的定義。將數(shù)據(jù)寫入文件(或數(shù)據(jù)庫)時,每個“寫入”操作都會故意占用一個線程,直到寫入完成。這對于確保大多數(shù)系統(tǒng)中的數(shù)據(jù)完整性非常有意義。例如,如果兩個操作同時嘗試更新數(shù)據(jù)庫記錄,哪一個是正確的?或者,如果腳本需要 HTTP 請求成功才能繼續(xù),那么我們?nèi)绾卫^續(xù)操作,直到我們知道請求成功?
HTTP 請求是最常見的線程阻塞操作之一。當我們編寫期望來自外部第三方的數(shù)據(jù)的腳本時,我們引入了無數(shù)的不確定性,這些不確定性只能由請求本身來回答,例如響應(yīng)時間延遲、我們期望接收的數(shù)據(jù)的性質(zhì),或者請求是否會成功。即使使用我們有信心的 API,任何操作在完成之前也不一定會成功。因此,我們被“封鎖”了。
隨著應(yīng)用程序的復雜性增加以支持更多的同時用戶交互,軟件正在遠離線性執(zhí)行的范例。因此,雖然我們可能不確定特定請求是否成功或數(shù)據(jù)庫寫入是否完成,但只要我們有辦法優(yōu)雅地處理和緩解這些問題,這是可以接受的。
一個值得異步執(zhí)行的問題
您認為 Python 腳本執(zhí)行數(shù)百個 HTTP 請求、解析每個響應(yīng)并將輸出寫入單個文件需要多長時間?如果要在簡單的 for 循環(huán)中使用請求,則需要等待相當長的時間讓 Python 執(zhí)行每個請求、打開文件、寫入文件、關(guān)閉文件,然后繼續(xù)執(zhí)行下一個請求。
我們把asyncio提高腳本效率的能力放到實際測試中。我們將為數(shù)百個 URL 的每個任務(wù)執(zhí)行兩個 I/O 阻塞操作:執(zhí)行和解析 HTTP 請求并將所需結(jié)果寫入單個文件。我們實驗的輸入將是大量 URL,預期輸出是從這些 URL 解析的元數(shù)據(jù)。讓我們看看對數(shù)百個 URL 執(zhí)行此操作需要多長時間。
該網(wǎng)站大約有 2000 篇自己發(fā)布的帖子,這使其成為這個小實驗的絕佳實驗對象。我創(chuàng)建了一個 CSV,其中包含這些帖子的 URL,這將是我們的輸入。下面先睹為快:
輸入樣本
輸入 CSV
樣本輸出
對于輸入 CSV 中找到的每個 URL,我們的腳本將獲取 URL、解析頁面并將一些選擇數(shù)據(jù)寫入單個 CSV。結(jié)果將類似于以下示例:
我們的腳本將輸出的示例
工作工具
我們需要三個核心 Python 庫來實現(xiàn)這一目標:
Asyncio:Python 的基礎(chǔ)庫,用于運行異步 IO 綁定任務(wù)。該庫在某種程度上已內(nèi)置到 Python 核心語言中,引入了async/await關(guān)鍵字,分別表示函數(shù)何時異步運行以及何時等待此類函數(shù)。
Aiohttp:在客戶端使用時,類似于Python的requests庫,用于發(fā)出異步請求。或者, aiohttp可以反向使用:作為應(yīng)用程序 Web 服務(wù)器 來處理傳入請求和提供響應(yīng),但這是另一個故事了。
Aiofiles:使寫入磁盤(例如創(chuàng)建字節(jié)并將字節(jié)寫入文件)成為一項非阻塞任務(wù),這樣即使多個任務(wù)綁定到同一個文件,多個寫入也可以在同一線程上發(fā)生而不會相互阻塞。
#安裝必要的庫 $ pip install asyncio aiohttp aiofiles
獎勵:優(yōu)化速度的依賴項
只需安裝一些補充庫,aiohttp就可以更快地執(zhí)行請求。這些庫是cchardet(字符編碼檢測)、aiodns(異步 DNS 解析)和brotlipy(無損壓縮)。我強烈建議使用下面方便提供的快捷方式安裝它們(從我這里獲取,我是互聯(lián)網(wǎng)上的陌生人):
#安裝補充依賴項以加快請求速度 $ pip install aiohttp[speedups]
準備異步腳本/應(yīng)用程序
我們將像任何其他 Python 腳本一樣構(gòu)造該腳本。我們的主模塊aiohttp_aiofiles_tutorial將處理我們的所有邏輯。config.py和main.py都位于主模塊之外,并分別為我們的腳本提供一些基本配置和入口點:
#我們的異步獲取器/編寫器的項目結(jié)構(gòu) /aiohttp-aiofiles-tutorial ├── /aiohttp_aiofiles_tutorial │ ├── __init__.py │ ├── fetcher.py │ ├── loops.py │ ├── tasks.py │ ├── parser.py │ └── /data # Source data │ ├── __init__.py │ ├── parser.py │ ├── tests │ └── urls.csv ├── /export # Destination for exported data ├── config.py ├── logger.py ├── main.py ├── pyproject.toml ├── Makefile ├── README.md └── requirements.txt
/export只是一個空目錄,我們將在其中寫入輸出文件。
/data子模塊包含上面提到的輸入 CSV,以及解析它的一些基本邏輯。沒什么值得打電話回家的,但如果你好奇的話,可以在Github 存儲庫上找到源代碼。最后附上本文代碼。
開始事情
我們卷起袖子,從強制性腳本“入口點” main.py開始。這將啟動/aiohttp_aiofiles_tutorial中的核心函數(shù),稱為init_script():
#main.py """腳本入口點。""" import asyncio from aiohttp_aiofiles_tutorial import init_script if __name__ == "__main__": asyncio.run(init_script())
init_script()這看起來像是我們正在通過運行單個函數(shù)/協(xié)程asyncio.run(),乍一看這似乎違反直覺。您可能會問, asyncio 的目的不就是同時運行多個協(xié)程嗎?
它的確是!init_script()是一個調(diào)用其他協(xié)程的協(xié)程。其中一些協(xié)程從其他協(xié)程創(chuàng)建任務(wù),其他協(xié)程執(zhí)行任務(wù),等等。asyncio.run()創(chuàng)建一個事件循環(huán),直到目標協(xié)程完成為止(包括父協(xié)程調(diào)用的所有協(xié)程),該循環(huán)不會停止運行。因此,如果我們保持干凈,asyncio.run()就是一次性調(diào)用來初始化腳本。
初始化我們的腳本
這就是樂趣的開始。我們已經(jīng)確定腳本的目的是輸出單個 CSV 文件,這就是我們開始的地方:通過在整個腳本運行的上下文中創(chuàng)建并打開一個輸出文件:
#aiohttp_aiofiles_tutorial/__init__.py """同時發(fā)出數(shù)百個請求并將響應(yīng)保存到磁盤。""" import aiofiles from config import EXPORT_FILEPATH async def init_script(): """準備輸出文件并啟動任務(wù)創(chuàng)建/執(zhí)行。""" async with aiofiles.open(EXPORT_FILEPATH, mode="w+") as outfile: await outfile.write( "title,description,primary_tag,url,published_at\n" ) # (我們其余的腳本邏輯將在這里執(zhí)行). # ...
我們的腳本首先打開一個帶有aiofiles. 只要我們的腳本通過 在打開的異步文件的上下文中運行async with aiofiles.open() as outfile:,我們就可以不斷寫入該文件,而不必擔心打開和關(guān)閉文件。
將此與 Python 中處理文件 I/O 的同步with open() as outfile:(默認)實現(xiàn)進行比較。通過使用,我們幾乎aiofiles可以同時從多個源將數(shù)據(jù)寫入同一文件。
EXPORT_FILEPATH碰巧以 CSV ( /export/hackers_pages_metadata.csv ) 為目標。每個 CSV 都需要一行標題;因此,我們await outfile.write()在打開 CSV 后立即寫入標題:
#將一行寫入由列標題組成的 CSV ... await outfile.write( "title,description,primary_tag,url,published_at\n" )
前進
下面是__init__.py的完整版本,最終將使我們的腳本付諸實踐。最值得注意的補充是協(xié)程的引入execute_fetcher_tasks();我們將一次剖析這一塊:
#aiohttp_aiofiles_tutorial/__init__.py """同時發(fā)出數(shù)百個請求并將響應(yīng)保存到磁盤。""" import asyncio import time import aiofiles from aiofiles.threadpool.text import AsyncTextIOWrapper as AsyncIOFile from aiohttp import ClientSession from config import EXPORT_FILEPATH, HTTP_HEADERS from .data import urls_to_fetch # URLs parsed from a CSV from .tasks import create_tasks # Creates one task per URL async def init_script(): """準備輸出文件并啟動任務(wù)創(chuàng)建/執(zhí)行。""" async with aiofiles.open(EXPORT_FILEPATH, mode="w+") as outfile: await outfile.write( "title,description,primary_tag,url,published_at\n" ) await execute_fetcher_tasks(outfile) await outfile.close() async def execute_fetcher_tasks(outfile: AsyncIOFile): """ 打開異步 HTTP 會話并執(zhí)行創(chuàng)建的任務(wù)。 :param AsyncIOFile outfile:要寫入的本地文件的路徑。 """ async with ClientSession(headers=HTTP_HEADERS) as session: task_list = await create_tasks(session, urls_to_fetch, outfile) await asyncio.gather(*task_list)
execute_fetcher_tasks()分解主要是為了組織我們的代碼。該協(xié)程接受outfile一個參數(shù),該參數(shù)將作為我們最終解析的數(shù)據(jù)的目的地。逐行來看:
async with ClientSession(headers=HTTP_HEADERS) as session:與 Python請求庫不同,aiohttp使我們能夠打開一個客戶端會話,該會話創(chuàng)建一個連接池,該連接池一次 最多允許 100 個活動連接。因為我們將發(fā)出 200 個以下的請求,所以獲取所有這些 URL 所需的時間將與 Python 在正常情況下獲取兩個 URL 所需的時間相當。
create_tasks():我們要定義的這個函數(shù)接受三個參數(shù)。第一個是ClientSession我們之前剛剛打開一行的異步。接下來,我們有urls_to_fetch變量(之前在腳本中導入)。這是一個簡單的 Python 字符串列表,其中每個字符串都是從我們之前的“輸入”CSV 解析而來的 URL。該邏輯通過一個簡單的函數(shù)在其他地方處理(對于本教程來說并不重要)。 最后,outfile被傳遞,因為我們稍后將寫入該文件。使用這些參數(shù),create_tasks()將為 174 個 URL 中的每一個創(chuàng)建一個任務(wù)。其中每個都會將給定 URL 的內(nèi)容下載到目標目錄。該函數(shù)返回任務(wù),但在我們發(fā)出指令之前不會執(zhí)行它們,這是通過...
asyncio.gather(*task_list):Asyncio 的gather()方法在當前運行的事件循環(huán)內(nèi)執(zhí)行一組任務(wù)。一旦開始,異步 I/O 的速度優(yōu)勢將立即顯現(xiàn)出來。
創(chuàng)建異步任務(wù)
如果您還記得的話,PythonTask包裝了我們將來要執(zhí)行的函數(shù)(協(xié)程)。此外,每個任務(wù)都可以暫時擱置以執(zhí)行其他任務(wù)。在執(zhí)行創(chuàng)建任務(wù)之前,必須傳遞預定義的協(xié)程以及適當?shù)膮?shù)。
我分開create_tasks()返回一個 Python 任務(wù)列表,其中每個“任務(wù)”將執(zhí)行獲取我們的 URL 之一:
#aiohttp_aiofiles_tutorial/tasks.py """準備要執(zhí)行的任務(wù)。""" import asyncio from asyncio import Task from typing import List from aiofiles.threadpool.text import AsyncTextIOWrapper as AsyncIOFile from aiohttp import ClientSession from .fetcher import fetch_url_and_save_data async def create_tasks( session: ClientSession, urls: List[str], outfile: AsyncIOFile ) -> List[Task]: """ 創(chuàng)建 asyncio 任務(wù)來解析 HTTP 請求響應(yīng)。 :param ClientSession session: 異步 HTTP 請求會話。 :param List[str] urls:要獲取的資源 URL。 :param AsyncIOFile outfile:要寫入的本地文件的路徑。 :returns: List[Task] """ task_list = [] for i, url in enumerate(urls): task = asyncio.create_task( fetch_url_and_save_data( session, url, outfile, len(urls), i ) ) task_list.append(task) return task_list
關(guān)于 asyncio 任務(wù)的一些值得注意的事情:
我們預先定義“工作要做” 。a 的創(chuàng)建Task并不執(zhí)行代碼。我們的腳本將使用不同的參數(shù)同時運行同一函數(shù) 174 次。我們希望預先定義這些任務(wù)是有道理的。
定義任務(wù)既快速又簡單。瞬間,CSV 中的每個 URL 都會創(chuàng)建一個相應(yīng)的任務(wù)并將其添加到task_list.
任務(wù)準備就緒后,只剩下一件事情要做了,那就是把它們?nèi)繂硬㈤_始聚會。這就是asyncio.gather(*task_list)__ init __ .py中的行發(fā)揮作用的地方。
Asyncio 的 Task 對象本身就是一個類,具有其屬性和方法,本質(zhì)上提供了一個包裝器,其中包含檢查任務(wù)狀態(tài)、取消任務(wù)等的方法。
執(zhí)行我們的任務(wù)
回到 之前create_tasks(),我們創(chuàng)建了每個任務(wù),每個任務(wù)單獨執(zhí)行一個稱為每個任務(wù)的方法fetch_url_and_save_data()。這個函數(shù)做了三件事:
通過aiohttp的會話上下文(由 處理async with session.get(url) as resp:)向給定任務(wù)的 URL 發(fā)出異步請求
將響應(yīng)正文作為字符串讀取。
html通過傳遞給我們的最后一個函數(shù),將響應(yīng)正文的內(nèi)容寫入文件parse_html_page_metadata():
#aiohttp_aiofiles_tutorial/fetcher.py """獲取 URL、提取其內(nèi)容并將解析后的數(shù)據(jù)寫入文件。""" from aiofiles.threadpool.text import AsyncTextIOWrapper as AsyncIOFile from aiohttp import ClientError, ClientSession, InvalidURL from logger import LOGGER from .parser import parse_html_page_metadata async def fetch_url_and_save_data( session: ClientSession, url: str, outfile: AsyncIOFile, total_count: int, i: int, ): """ 在解析之前從 URL 獲取原始 HTML。 :param ClientSession session: 異步 HTTP 請求會話。 :param str url: 要獲取的目標 URL。 :param AsyncIOFile outfile:要寫入的本地文件的路徑。 :param int Total_count:要獲取的 URL 總數(shù)。 :param int i: URL 總數(shù)中當前迭代的 URL。 """ try: async with session.get(url) as resp: if resp.status != 200: pass html = await resp.text() page_metadata = await parse_html_page_metadata(html, url) await outfile.write(f"{page_metadata}\n") LOGGER.info( f"Fetched URL {i} of {total_count}: {page_metadata}" ) except InvalidURL as e: LOGGER.error(f"Unable to fetch invalid URL `{url}`: {e}") except ClientError as e: LOGGER.error(f"ClientError while fetching URL `{url}`: {e}") except Exception as e: LOGGER.error( f"Unexpected error while fetching URL `{url}`: {e}" )
當通過aiohttp獲取URL 時ClientSession,調(diào)用.text()response() 上的方法將以字符串a(chǎn)wait resp.text()形式返回請求的響應(yīng)。不要與 混淆,后者返回一個字節(jié)對象(對于拉取媒體文件或字符串以外的任何內(nèi)容很有用)。.body()
如果您繼續(xù)跟蹤,我們現(xiàn)在已經(jīng)深入了三個“上下文”:
我們通過打開一個aiofiles.open()上下文來開始我們的腳本,該上下文將保持打開狀態(tài),直到我們的腳本完成。outfile這允許我們在腳本運行期間從任何任務(wù)寫入數(shù)據(jù)。
將標頭寫入 CSV 文件后,我們使用 打開了一個持久的客戶端請求會話async with ClientSession() as session,這允許我們在會話打開時批量發(fā)出請求。
在上面的代碼片段中,我們輸入了第三個也是最后一個上下文:單個 URL 的響應(yīng)上下文(通過async with session.get(url) as resp)。與其他兩個上下文不同,我們將進入和離開此上下文 174 次(每個 URL 一次)。
在每個 URL 響應(yīng)上下文中,我們最終開始生成一些輸出。這給我們留下了最后的邏輯(await parse_html_page_metadata(html, url)),它解析每個URL響應(yīng)并從頁面返回一些抓取的元數(shù)據(jù),然后將所述元數(shù)據(jù)寫入我們的outfile下一行await outfile.write(f"{page_metadata}\n")。
將解析的元數(shù)據(jù)寫入 CSV
您可能會問,我們計劃如何從 HTML 頁面中提取元數(shù)據(jù)?當然是使用BeautifulSoup !有了 HTTP 響應(yīng)的 HTML,我們就可以bs4解析每個 URL 響應(yīng)并返回以下各列的值outfile:title、description、Primary_tag、published at和url。
這五個值以逗號分隔的字符串形式返回,然后outfile作為單行寫入 CSV。
#aiohttp_aiofiles_tutorial/parser.py """從原始 HTML 中解析元數(shù)據(jù)。""" from bs4 import BeautifulSoup from bs4.builder import ParserRejectedMarkup from logger import LOGGER async def parse_html_page_metadata(html: str, url: str) -> str: """ 將頁面元數(shù)據(jù)從原始 HTML 提取到 CSV 行中。 :param str html: 給定獲取的 URL 的原始 HTML 源。 :param str url:與提取的 HTML 關(guān)聯(lián)的 URL。 :returns: str """ try: soup = BeautifulSoup(html, "html.parser") title = soup.title.string.replace(",", ";") description = ( soup.head.select_one("meta[name=description]") .get("content") .replace(",", ";") .replace('"', "`") .replace("'", "`") ) primary_tag = ( soup.head .select_one("meta[property='article:tag']") .get("content") ) published_at = ( soup.head .select_one("meta[property='article:published_time']") .get("content") .split("T")[0] ) if primary_tag is None: primary_tag = "" return f"{title}, {description}, {primary_tag}, {url}, {published_at}" except ParserRejectedMarkup as e: LOGGER.error( f"Failed to parse invalid html for {url}: {e}" ) except ValueError as e: LOGGER.error( f"ValueError occurred when parsing html for {url}: {e}" ) except Exception as e: LOGGER.error( f"Parsing failed when parsing html for {url}: {e}" )
運行珠寶,運行腳本
讓我們帶這個壞男孩去兜風吧。我在__init__.py中添加了一個計時器來記錄腳本持續(xù)時間所經(jīng)過的秒數(shù):
#aiohttp_aiofiles_tutorial/__init__.py
"""同時發(fā)出數(shù)百個請求并將響應(yīng)保存到磁盤。""" import time from time import perf_counter as timer ... async def init_script(): """Prepare output file & kickoff task creation/execution.""" start_time = timer() # Add timer to function async with aiofiles.open(EXPORT_FILEPATH, mode="w+") as outfile: await outfile.write( "title,description,primary_tag,url,published_at\n" ) await execute_fetcher_tasks(outfile) await outfile.close() LOGGER.success( f"Executed {__name__} in {time.perf_counter() - start_time:0.2f} seconds." ) # Log time of execution ...
make run如果您按照存儲庫進行操作(或者只是輸入) ,請混合該 mfing命令python3 main.py。系好安全帶:
#在約3 秒內(nèi)獲取174頁后日志的尾部 ... 16:12:34 PM | INFO: Fetched URL 165 of 173: Setting up a MySQL Database on Ubuntu, Setting up MySQL the old-fashioned way: on a linux server, DevOps, https://hackersandslackers.com/set-up-mysql-database/, 2018-04-17 16:12:34 PM | INFO: Fetched URL 164 of 173: Dropping Rows of Data Using Pandas, Square one of cleaning your Pandas Dataframes: dropping empty or problematic data., Data Analysis, https://hackersandslackers.com/pandas-dataframe-drop/, 2018-04-18 16:12:34 PM | INFO: Fetched URL 167 of 173: Installing Django CMS on Ubuntu, Get the play-by-play on how to install DjangoCMS: the largest of three major CMS products for Python`s Django framework., Software, https://hackersandslackers.com/installing-django-cms/, 2017-11-19 16:12:34 PM | INFO: Fetched URL 166 of 173: Starting a Python Web App with Flask & Heroku, Pairing Flask with zero-effort container deployments is a deadly path to addiction., Architecture, https://hackersandslackers.com/flask-app-heroku/, 2018-02-13 16:12:34 PM | INFO: Fetched URL 171 of 173: Another 'Intro to Data Analysis in Python Using Pandas' Post, An introduction to Python`s quintessential data analysis library., Data Analysis, https://hackersandslackers.com/intro-python-pandas/, 2017-11-16 16:12:34 PM | INFO: Fetched URL 172 of 173: Managing Python Environments With Virtualenv, Embrace core best-practices in Python by managing your Python packages using virtualenv and virtualenvwrapper., Software, https://hackersandslackers.com/python-virtualenv-virtualenvwrapper/, 2017-11-15 16:12:34 PM | INFO: Fetched URL 170 of 173: Visualize Folder Structures with Python’s Treelib, Using Python`s treelib library to output the contents of local directories as visual tree representations., Data Engineering, https://hackersandslackers.com/python-tree-hierachies-treelib/, 2017-11-17 16:12:34 PM | INFO: Fetched URL 169 of 173: Merge Sets of Data in Python Using Pandas, Perform SQL-like merges of data using Python`s Pandas., Data Analysis, https://hackersandslackers.com/merge-dataframes-with-pandas/, 2017-11-17 16:12:34 PM | INFO: Fetched URL 168 of 173: Starting an ExpressJS App, Installation guide for ExpressJS with popular customization options., JavaScript, https://hackersandslackers.com/create-an-expressjs-app/, 2017-11-18 16:12:34 PM | SUCCESS: Executed aiohttp_aiofiles_tutorial in 2.96 seconds.
用 Python 編寫異步腳本肯定需要更多的努力,但不會增加數(shù)百或數(shù)千倍的努力。即使您追求的不是速度,處理大型應(yīng)用程序的數(shù)量也使 asyncio 變得絕對至關(guān)重要。例如,如果您的聊天機器人或網(wǎng)絡(luò)服務(wù)器正在處理用戶的請求,那么當?shù)诙€用戶同時嘗試與您的應(yīng)用程序交互時會發(fā)生什么?通常答案是什么:用戶 1得到了他們想要的東西,而用戶 2則被困在阻塞的線程中。文章來源:http://www.zghlxwxcb.cn/article/582.html
源代碼:github.com/hackersandslackers/aiohttp-aiofiles-tutorial/tree/master/aiohttp_aiofiles_tutorial/data文章來源地址http://www.zghlxwxcb.cn/article/582.html
到此這篇關(guān)于使用AIOHTTP和AIOFiles進行異步Python HTTP請求的文章就介紹到這了,更多相關(guān)內(nèi)容可以在右上角搜索或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!