一、簡介
Celery 是使用 python 編寫的分布式任務(wù)調(diào)度框架。
它有幾個主要的概念:
celery 應(yīng)用
-
用戶編寫的代碼腳本,用來定義要執(zhí)行的任務(wù),然后通過 broker 將任務(wù)發(fā)送到消息隊(duì)列中
broker
-
代理,通過消息隊(duì)列在客戶端和 worker 之間進(jìn)行協(xié)調(diào)。
-
celery 本身并不包含消息隊(duì)列,它支持一下消息隊(duì)列
RabbitMQ
Rdis
Amazon SQS
Zookeeper
-
更多關(guān)于 Broker 見官方文檔(末尾點(diǎn)擊閱讀原文)
backend
-
數(shù)據(jù)庫,用來存儲任務(wù)返回的結(jié)果。
worker
-
工人,用來執(zhí)行 broker 分派的任務(wù)。
任務(wù)
-
任務(wù),定義的需要執(zhí)行的任務(wù)
版本要求
Celery5.1 要求:
-
python(3.6,3.7,3.8)
Celery 是一個資金最少的項(xiàng)目,所以我們不支持 Microsoft Windows。
更多更詳細(xì)的版本要求見官方文檔
安裝
使用 pip 安裝:
pip install -U Celery
?如果你想學(xué)習(xí)自動化測試,我這邊給你推薦一套視頻,這個視頻可以說是B站播放全網(wǎng)第一的接口自動化測試教程,同時在線人數(shù)到達(dá)1000人,并且還有筆記可以領(lǐng)取及各路大神技術(shù)交流:798478386? ????
【已更新】B站講的最詳細(xì)的Python接口自動化測試實(shí)戰(zhàn)教程全集(實(shí)戰(zhàn)最新版)_嗶哩嗶哩_bilibili【已更新】B站講的最詳細(xì)的Python接口自動化測試實(shí)戰(zhàn)教程全集(實(shí)戰(zhàn)最新版)共計(jì)200條視頻,包括:1.【接口自動化】目前軟件測試的市場行情以及測試人員能力標(biāo)準(zhǔn)。、2.【接口自動化】全面熟練Requests庫以及底層方法調(diào)用邏輯、3.【接口自動化】接口自動化實(shí)戰(zhàn)及正則和JsonPath提取器的應(yīng)用等,UP主更多精彩視頻,請關(guān)注UP賬號。https://www.bilibili.com/video/BV17p4y1B77x/?spm_id_from=333.337&vd_source=488d25e59e6c5b111f7a1a1a16ecbe9a?
捆綁包
Celery 還定義了一組包,用于安裝 Celery 和給定的依賴項(xiàng)。
可以在 pip 命令中實(shí)現(xiàn)中括號來指定這些依賴項(xiàng)。
pip install "celery[librabbitmq]"
pip install "celery[librabbitmq,redis,auth,msgpack]"
二、簡單使用
1. 選擇一個 broker
使用 celery 首先需要選擇一個消息隊(duì)列。安裝任意你熟悉的前面提到的 celery 支持的消息隊(duì)列。
2. 編寫一個 celery 應(yīng)用
首先我們需要編寫一個 celery 應(yīng)用,它用來創(chuàng)建任務(wù)和管理 wokers,它要能夠被其他的模塊導(dǎo)入。
創(chuàng)建一個tasks.py 文件:
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
第一個參數(shù)tasks是當(dāng)前模塊的名稱,它可以省略,建議以當(dāng)前模塊名為名稱。
第二個關(guān)鍵字參數(shù) broker='redis://localhost:6379/0'指定我們使用 Redis 作為消息隊(duì)列,并指定連接地址。
3.運(yùn)行 celery 的 worker 服務(wù)
cd 到 tasks.py 所在目錄,然后運(yùn)行下面的命令來啟動 worker 服務(wù)
celery -A tasks worker --loglevel=INFO
4. 調(diào)用任務(wù)
>>> from tasks import add
>>> add.delay(4,4)
通過調(diào)用任務(wù)的 delay 來執(zhí)行對應(yīng)的任務(wù)。celery 會把執(zhí)行命令發(fā)送到 broker,broker 再將消息發(fā)送給 worker 服務(wù)來執(zhí)行,如果一切正常你將會在 worker 服務(wù)的日志中看到接收任務(wù)和執(zhí)行任務(wù)的日志。
5. 保存結(jié)果
如果你想要跟蹤任務(wù)的狀態(tài)以及保存任務(wù)的返回結(jié)果,celery 需要把它發(fā)送到某個地方。celery 提供多種結(jié)果后端。
我們這里以 reids 為例,修改 tasks.py中的代碼,添加一個 Redis 后端。
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')
更多結(jié)果后端見官方文檔。(末尾點(diǎn)擊閱讀原文)
重新啟動 worker 服務(wù),重新打開 python 解釋器
>>> from tasks import add
>>> result = add.delay(4,4)
ready()方法返回任務(wù)是否執(zhí)行完成:
>>> result.ready()
False
還可以等待結(jié)果完成,但很少使用這種方法,因?yàn)樗鼘惒秸{(diào)用轉(zhuǎn)換為同步調(diào)用???????
>>> result.get(timeout=1)
8
三、在應(yīng)用中使用 celery
創(chuàng)建項(xiàng)目
項(xiàng)目結(jié)構(gòu):
proj/__init__.py
/celery.py
/tasks.py
proj/celery.py
from celery import Celery
app = Celery('proj',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1',
include=['proj.tasks']
)# 配置
app.conf.update(
result_expires=3600, # 結(jié)果過期時間
)
在這個模塊中我們創(chuàng)建了一個 Celery 模塊。要在你的項(xiàng)目中使用 celery 只需要導(dǎo)入此實(shí)例。
proj/tasks.py
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.tas
kdef xsum(numbers)
return sum(numbers)
啟動 worker
celery -A proj worker -l INFO
調(diào)用任務(wù)???????
>>> from proj.tasks import add
>>> add.delay(2, 2)
四、在 django 中使用celery
要在你的 django 項(xiàng)目中使用 celery,首先需要定義一個 Celery 的實(shí)例。
如果你又 django 項(xiàng)目如下:
- proj/
- manage.py
- proj/
- __init__.py
- settings.py
- urls.py
那么推薦的方法是創(chuàng)建一個新的proj/proj/celery.py模塊來定義芹菜實(shí)例:file:proj/proj/celery.py
import os
from celery import Celery
# 為`celery`設(shè)置默認(rèn)的django設(shè)置模塊
os.environ.setdefault('DJANGO_SETTINGS_MODULE','proj.settings')
app = Celery('proj')
# 設(shè)置配置來源
app.config_from_object('django.conf:settings',namespace='CELERY')
# 加載所有的已注冊django應(yīng)用中的任務(wù)
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
然后你需要在你的 proj/proj/__init__.py模塊中導(dǎo)入這個應(yīng)用程序。這樣就可以保證 Django 啟動時加載應(yīng)用程序,以便于 @shared_task 裝飾器的使用。
proj/proj/__init__.py:
from .celery import app as celery_app
__all__ = ('celery_app',)
請注意,此示例項(xiàng)目布局適用于較大的項(xiàng)目,對于簡單的項(xiàng)目,可以使用包含定義應(yīng)用程序和任務(wù)的單個模塊。
接下來我們來解釋一下 celery.py 中的代碼,首先,我們設(shè)置celery命令行程序的環(huán)境變量DJANGO_SETTINGS_MODULE的默認(rèn)值:
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
這一行的作用是加載當(dāng)前 django 項(xiàng)目的環(huán)境設(shè)置,特別是當(dāng)需要在異步任務(wù)中用到 ORM。它必須在創(chuàng)建應(yīng)用程序?qū)嵗啊?/p>
app = Celery('proj')
我們還添加了 Django 設(shè)置模塊作為 Celery 的配置源。這意味著我們不必使用多個配置文件,而是直接在 Django 的配置文件中配置 Celery。
app.config_from_object('django.conf:settings', namespace='CELERY')
大寫命名空間意味著所有Celery配置項(xiàng)必須以大寫指定,并以 CELERY_ 開頭,因此例如broker_url?設(shè)置變?yōu)?CELERY_BROKER_URL。
例如,Django 項(xiàng)目的配置文件可能包括:
settings.py???????
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30*60
接下來,可重用應(yīng)用程序的常見做法是在單獨(dú)的tasks.py模塊中定義所有任務(wù)Celery有一種方法可以自動發(fā)現(xiàn)這些模塊:
app.autodiscover_tasks()
使用上面的行,Celery 將按照tasks.py 約定自動從所有已安裝的應(yīng)用程序中發(fā)現(xiàn)任務(wù):
- app1/
- tasks.py
- models.py
- app2/
- tasks.py
- models.py
這樣就不必手動將各個模塊添加到CELERY_IMPORTS 設(shè)置中。
使用 @shared_task 裝飾器
我們編寫的任務(wù)可能會存在于可重用的應(yīng)用程序中,而可重用的應(yīng)用程序不能依賴與項(xiàng)目本身,因此無法直接導(dǎo)入 celery 應(yīng)用實(shí)例。文章來源:http://www.zghlxwxcb.cn/news/detail-647992.html
@shared_task裝飾器可以讓我們無需任何具體的 celery 實(shí)例創(chuàng)建任務(wù):demoapp/tasks.py文章來源地址http://www.zghlxwxcb.cn/news/detail-647992.html
# Create your tasks here
from demoapp.models import Widget
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
def mul(x, y):
return x * y
@shared_task
def xsum(numbers):
return sum(numbers)
@shared_task
def count_widgets():
return Widget.objects.count()
@shared_task
def rename_widget(widget_id, name):
w = Widget.objects.get(id=widget_id)
w.name = name
w.save()
到了這里,關(guān)于python 異步任務(wù)框架 Celery 入門,速看!的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!