前言:
收到留言: "我的爬取的數(shù)據(jù)處理有點(diǎn)大,scrapy抓網(wǎng)頁挺快,處理數(shù)據(jù)慢了!"
-----針對這位粉絲留言,我只想說:'你那培訓(xùn)班老師可能給你漏了課程! 大概你們上課講的案例屬于demo,他教了你一些基本操作,但他沒有對相關(guān)業(yè)務(wù)對你講透! 你研究一下pipelines,或者看我現(xiàn)在給你講的.
正文
首先,你要清楚,當(dāng)在Scrapy框架中,pipelines是順序執(zhí)行的,對item的處理通常是同步進(jìn)行。
這時(shí)候,你要分析2件事:
1.我的數(shù)據(jù)要不要清洗
2.我的數(shù)據(jù)準(zhǔn)備怎么存儲
分開講:
1.我的數(shù)據(jù)要不要清洗:
如果需要清洗,item的數(shù)據(jù)里比較多,我建議你轉(zhuǎn)一下pd.dataframe;這樣,會比正常運(yùn)算要快得多;然后,給你3條建議:
- 避免在循環(huán)內(nèi)使用 df.apply():--->? ? apply() 是行或列級別的操作函數(shù),效率相對較低。如果可以,嘗試用更高效的Pandas內(nèi)建函數(shù)代替,比如使用邏輯運(yùn)算與 numpy 的向量化操作。
- 對于字符串處理,如果數(shù)據(jù)量很大,應(yīng)當(dāng)盡量使用向量化方法,例如 .str 方法或其他Pandas字符串操作代替 lambda 函數(shù)。
- 當(dāng)創(chuàng)建新的列時(shí),用條件表達(dá)式替代 .apply(lambda) 可以獲得更好的性能,條件表達(dá)式在Pandas中是向量化的。
如果pandas處理之后,不滿足:
分離繁重操作:
如果有些操作很繁重,可以將它們移動(dòng)到Scrapy的middleware或者擴(kuò)展來進(jìn)行,這樣可能有助于提高item pipeline的處理速度。這時(shí)候,你就可以通過外部自己寫一個(gè)多線程/多進(jìn)程來處理你的數(shù)據(jù)工作!
當(dāng)然,處理item的數(shù)據(jù)清理工作,我建議你用:
ItemAdapter
什么是ItemAdapter?
-它是一個(gè)包裝類,允許我們以一致的方式處理不同種類的數(shù)據(jù)結(jié)構(gòu),例如dict、scrapy.Item以及自定義的數(shù)據(jù)類。無論內(nèi)部的數(shù)據(jù)存儲格式如何,ItemAdapter都能讓我們同等的獲取和設(shè)置Item中的字段值。
ItemAdapter的使用場景
ItemAdapter特別適用于編寫更通用的Pipeline代碼。無論傳入的Item是Scrapy的Item實(shí)例還是普通的dict,甚至是自定義的類實(shí)例,你都可以使用相同的方法來處理它們。這樣的設(shè)計(jì)大大提升了代碼的復(fù)用性和可維護(hù)性。
案例:
import scrapy
from itemadapter import ItemAdapter
import pandas as pd
import numpy
class JihaiPipeline:
def open_spider(self, spider):
# 初始化工作,例如連接數(shù)據(jù)庫
pass
def close_spider(self, spider):
# 清理工作,例如關(guān)閉數(shù)據(jù)庫連接
pass
def process_item(self, item, spider):
# 使用ItemAdapter包裝item
adapter = ItemAdapter(item)
# 進(jìn)行數(shù)據(jù)處理...
# 例如,假設(shè)我們需要給所有Item添加一個(gè)新字段
adapter['new_field'] = '丟一個(gè)新的字段進(jìn)去'
# 處理完后,返回item
return item
在上面的代碼中,我們沒有直接操作原始的item對象,而是將其通過ItemAdapter(item)包裝起來。然后就可以像操作字典一樣,通過adapter['new_field']來設(shè)置新字段。在管道中修改完數(shù)據(jù)后,可以直接將Item傳遞到下一個(gè)管道。
ItemAdapter中的向量化操作
對于爬蟲項(xiàng)目,可能需要對數(shù)據(jù)進(jìn)行更復(fù)雜的清洗和轉(zhuǎn)換操作。在Pandas的幫助下,我們可以執(zhí)行向量化的數(shù)據(jù)處理工作,這是一種高效處理數(shù)據(jù)的方式。通過Pandas,利用DataFrame進(jìn)行復(fù)雜的數(shù)據(jù)清洗和分析變得相當(dāng)簡便
案例:
class JihaiPipeline:
# ...之前的方法...
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# 假設(shè)我們的item有一個(gè)成績的列表需要處理
grades = adapter.get('grades', [])
# 使用Pandas創(chuàng)建DataFrame
df = pd.DataFrame(grades)
# 執(zhí)行一些復(fù)雜的計(jì)算操作,例如計(jì)算平均分
adapter['average_grade'] = df['score'].mean()
# 返回處理后的item
return item
在這個(gè)例子中,我們先獲取了成績列表,然后使用這個(gè)列表創(chuàng)建了一個(gè)Pandas DataFrame。之后我們就可以利用DataFrame提供的方法進(jìn)行各種操作,比如這里計(jì)算了一個(gè)平均分成績,然后將其添加到了item中。
小總結(jié):
ItemAdapter提供了一個(gè)透明的方式來處理項(xiàng),幫助你更簡單地編寫與項(xiàng)結(jié)構(gòu)無關(guān)的代碼。與Pandas結(jié)合使用,它也使得在Scrapy中進(jìn)行復(fù)雜數(shù)據(jù)處理成為可能。記住,一致性、可讀性和可維護(hù)性是編寫高質(zhì)量爬蟲代碼時(shí)的關(guān)鍵點(diǎn)。
2.我的數(shù)據(jù)準(zhǔn)備怎么存儲?
?
如果你的數(shù)據(jù)比較單一,你直接存(就跟你老師教你的那樣!) 如果你的數(shù)據(jù)已經(jīng)到達(dá)了你的瓶頸,你最好做個(gè)分離;然后看我之前的文章,例如:存入sql--->你首先要想到的就是異步!
在Scrapy中,最佳實(shí)踐通常是將數(shù)據(jù)處理(清洗、轉(zhuǎn)換等)與數(shù)據(jù)存儲(寫入數(shù)據(jù)庫等)分離。這為你的數(shù)據(jù)處理流水線提供了更好的組織結(jié)構(gòu)和可擴(kuò)展性。每個(gè)Pipeline應(yīng)該只負(fù)責(zé)一個(gè)操作或一組相關(guān)操作。這樣做的好處是:
1. 職責(zé)分離:這使得每個(gè)pipeline的職責(zé)更清晰。如果以后需要更改存儲邏輯,只需要更改保存到SQL的pipeline,而不需要觸及數(shù)據(jù)處理的pipeline。
2. 模塊化:如果在將來需要將數(shù)據(jù)存儲到不同的后端(例如不同的數(shù)據(jù)庫,或者文件系統(tǒng)等),你可以簡單地添加一個(gè)新的pipeline來處理這種情況,而不是更改現(xiàn)有代碼。
3. 可維護(hù)性:代碼維護(hù)更簡單,因?yàn)閿?shù)據(jù)清洗和存儲是分開的,錯(cuò)誤更容易追蹤,代碼更容易調(diào)試。
4. 可測試性:獨(dú)立的pipeline更容易進(jìn)行單元測試。
既然已經(jīng)完成了數(shù)據(jù)處理,并且將結(jié)果整理成了待存儲的格式,接下來的邏輯步驟是將這些數(shù)據(jù)保存到SQL數(shù)據(jù)庫。創(chuàng)建一個(gè)新的Pipeline類專門用于與SQL數(shù)據(jù)庫的交互,這樣,你的 `XXXPipeline` 負(fù)責(zé)處理數(shù)據(jù),并將處理后的數(shù)據(jù)傳遞給稍后在settings.py文件中定義優(yōu)先級更低的SQL存儲pipeline。
下面是創(chuàng)建一個(gè)專門用于存儲數(shù)據(jù)到SQL數(shù)據(jù)庫的pipeline的簡單例子(要異步,往前看我文章有介紹):
# sql_pipeline.py
import scrapy
from scrapy import Item
from itemadapter import ItemAdapter
class SQLStorePipeline:
def open_spider(self, spider):
# 這里設(shè)置數(shù)據(jù)庫連接
self.connection = create_connection_to_database()
def close_spider(self, spider):
# 關(guān)閉數(shù)據(jù)庫連接
self.connection.close()
def process_item(self, item, spider):
# 提取ItemAdapter
adapter = ItemAdapter(item)
# 保存到數(shù)據(jù)庫的邏輯
save_to_database(self.connection, adapter.as_dict())
return item # 注意,返回item是為了允許多個(gè)pipeline
def create_connection_to_database():
# 創(chuàng)建數(shù)據(jù)庫鏈接邏輯
pass
def save_to_database(connection, item_data):
# 將item數(shù)據(jù)保存到數(shù)據(jù)庫的邏輯
pass
在`settings.py`文件中,您需要確保新的`SQLStorePipeline`在`XXXPipeline`之后執(zhí)行。這可以通過為它們分配不同的`ITEM_PIPELINES`值來實(shí)現(xiàn):
# settings.py
ITEM_PIPELINES = {
'myproject.pipelines.XXXPipeline': 300, #處理數(shù)據(jù)清理的
'myproject.pipelines.SQLStorePipeline': 800, #存儲的
}
這樣,每個(gè)item首先通過`JihaiPipeline`進(jìn)行處理,然后再通過`SQLStorePipeline`進(jìn)行存儲。
通過這種方式,您既保持了pipeline的職責(zé)分割,又為后續(xù)的維護(hù)和可能的擴(kuò)展性打下了良好的基礎(chǔ)。如果有多個(gè)數(shù)據(jù)存儲或處理需求,遵循這種模式是非常有好處的。
總結(jié):
你就記住,如果你的item數(shù)據(jù)量比較大,一定要分離! 分完了,很多都能處理了! 另外,你記得itemAdapter的用法~ 他應(yīng)該算是一個(gè)引子,透過他~你寫著寫著就會冒出很多怪招出來~ 然后,再不行,你就進(jìn)行分布式! 反正你的業(yè)務(wù)已經(jīng)模塊化了,拿一個(gè)機(jī)器專門清理,拿一個(gè)機(jī)器專門存儲~或者,丟到中間件,甩到外部去做多線程處理!這樣,在爬蟲過程中,對數(shù)據(jù)的清理和存儲的工作量,就能被劃分掉,不就輕了么...? ?
請你看到這文章,給我點(diǎn)個(gè)贊!!文章來源:http://www.zghlxwxcb.cn/news/detail-767396.html
(讓我知道你來了)文章來源地址http://www.zghlxwxcb.cn/news/detail-767396.html
到了這里,關(guān)于爬蟲工作量由小到大的思維轉(zhuǎn)變---<第十三章 Scrapy之pipelines分離的思考>的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!