Celery是一個簡單、靈活、可靠的分布式系統(tǒng),用于處理大量消息,同時為操作提供維護此類系統(tǒng)所需的工具。是一個專注于實時處理的任務隊列,同時還支持任務調度。
目錄
應用場景
問題
解決
celery架構圖
安裝
配置celery
Settings.py配置
創(chuàng)建celery
修改__init__
開啟celery
異步執(zhí)行
創(chuàng)建任務文件
視圖
路由
演示
啟動django
重啟celery
瀏覽器訪問
獲取執(zhí)行結果
AsyncResult屬性和方法
演示
視圖
路由
瀏覽器訪問
Celery報錯
kombu.async.timer
cannot import name 'current_app'
AttributeError
定時執(zhí)行
增加配置
多個任務執(zhí)行
創(chuàng)建執(zhí)行方法
啟動celery
綁定任務
實現(xiàn)綁定
實現(xiàn)錯誤重試
celery管理和監(jiān)控
安裝
運行命令
運行flower
瀏覽器訪問
總結
參考文章
應用場景
問題
1.用戶發(fā)起請求,需要等待響應返回;但是在視圖中如果有一些耗時操作,可能導致用戶會等待很長時間,這樣用戶體驗非常不好。
2.網站隔一段時間要同步一次數(shù)據,但是http請求是需要觸發(fā)的。
解決
使用celery來解決:耗時的操作方法在celery中異步執(zhí)行;還可以使用celery定時執(zhí)行。
?
celery架構圖
celery由以下四部分構成:
任務(Task)、代理(Broker)、任務執(zhí)行(Worker)、結果存儲(Backend)。
?
安裝
命令如下:
pip install celery
pip install redis
# window下安裝,linux下不需要
pip install eventlet
?
配置celery
Settings.py配置
Settings.py中在最下面配置
# celery配置
# Broker配置,使用Redis作為消息中間件
BROKER_URL = 'redis://127.0.0.1:6379/1'
# 若有密碼這樣配置
# BROKER_URL = 'redis://:pwd@127.0.0.1:6379/1'
# BACKEND配置,這里使用redis
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
# 若有密碼這樣配置
# CELERY_RESULT_BACKEND = 'redis://:pwd@127.0.0.1:6379/1'
# 結果序列化方案
CELERY_RESULT_SERIALIZER = 'json'
# 任務結果過期時間,秒
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
# 時區(qū)配置
CELERY_TIMEZONE = 'Asia/Shanghai'
# 指定導入的任務模塊,可以指定多個
# CELERY_IMPORTS = (
# 'other_dir.tasks',
# )
?
創(chuàng)建celery
在工程目錄下的project目錄下創(chuàng)建celery.py文件。
內容如下:
import os
from celery import Celery
from django.conf import settings
# 設置系統(tǒng)環(huán)境變量,安裝django,必須設置,否則在啟動celery時會報錯
# project 是當前工程目錄名
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings')
celery_app = Celery('project')
celery_app.config_from_object('django.conf:settings')
celery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
修改__init__
在工程目錄下project的__init__文件中添加
from .celery import celery_app
__all__ = ['celery_app']
開啟celery
Window命令如下:
celery -A project worker -l debug -P eventlet
pip 安裝eventlet,在啟動celery的參數(shù)加上eventlet,
原理是windows不支持celery的進程執(zhí)行。
Linux 命令如下:
celery -A project worker -l
成功截圖如下:
Redis數(shù)據庫?
?
異步執(zhí)行
創(chuàng)建任務文件
在子應用目錄下創(chuàng)建tasks.py來執(zhí)行任務(寫耗時異步執(zhí)行)
內容如下:
from celery import shared_task
import time
@shared_task
def add(x, y):
print('添加一個事件')
time.sleep(10)
print(x, y)
print('事件執(zhí)行完成')
視圖
應用視圖views.py中增加引入和視圖方法
from .tasks import *
def task_add(request):
add.delay(100, 200)
return HttpResponse(f'調用函數(shù)結果')
路由
urlpatterns = [
# celery
path(r'task_add', views.task_add, name='task_add'),
]
演示
啟動django
python manage.py runserver
重啟celery
celery -A project worker -l debug -P eventlet
瀏覽器訪問
然后瀏覽器訪問task路由。
?通過worker的控制臺,可以看到任務被worker處理。
獲取執(zhí)行結果
可通過AsyncResult對象通過返回的事件id來獲取事件信息。
AsyncResult屬性和方法
state: 返回任務狀態(tài),等同status;
task_id: 返回任務id;
result: 返回任務結果,同get()方法;
ready(): 判斷任務是否執(zhí)行以及有結果,有結果為True,否則False;
info(): 獲取任務信息,默認為結果;
wait(t): 等待t秒后獲取結果,若任務執(zhí)行完畢,則不等待直接獲取結果,若任務在執(zhí)行中,則wait期間一直阻塞,直到超時報錯;
successful(): 判斷任務是否成功,成功為True,否則為False;
演示
視圖
from celery import result
from django.http import JsonResponse
def task_info(request):
task_id = request.GET.get('task_id')
res = result.AsyncResult(task_id)
if res.ready():
return JsonResponse({'status': res.state, 'result': res.get()})
else:
return JsonResponse({'status': res.state, 'result': ''})
路由
path(r'task_info/<str:id>', views.task_info, name='task_info'),
瀏覽器訪問
地址欄傳入新增事件時,返回的事件id
?
Celery報錯
kombu.async.timer
開啟celery,提示錯誤Kombu.async.timer import Entry......
解決方法:
卸載celery,重裝固定版本
命令如下:
pip install celery==4.3
?
cannot import name 'current_app'
cannot import name 'current_app' from 'celery'
解決方法:
打開D:\python3.7\lib\site-packages\celery\local.py"
原方法
def getappattr(path):
"""Get attribute from current_app recursively.
Example: ``getappattr('amqp.get_task_consumer')``.
"""
from celery import current_app
return current_app._rgetattr(path)
修改為:
def getappattr(path):
"""Get attribute from current_app recursively.
Example: ``getappattr('amqp.get_task_consumer')``.
"""
# from celery import current_app
from celery._state import current_app
return current_app._rgetattr(path)
AttributeError
AttributeError: 'EntryPoints' object has no attribute 'get'
解決方法:
安裝固定版本的importlib-metadata
命令如下:
pip install importlib-metadata==4.13.0
?
定時執(zhí)行
增加配置
工程目錄下/settings.py
CELERYBEAT_SCHEDULE = {
'every_5_seconds': {
# 任務路徑 應用目錄/任務文件/方法
'task': 'myapp.tasks.schedule_execute',
# 每5秒執(zhí)行一次
'schedule': 5,
# 設置參數(shù)
'args': (14, 6, 5)
}
}
多個任務執(zhí)行
可在原來基礎上繼續(xù)增加
CELERYBEAT_SCHEDULE = {
'every_5_seconds': {
# 任務路徑 應用目錄/任務文件/方法
'task': 'myapp.tasks.schedule_execute',
# 每5秒執(zhí)行一次
'schedule': 5,
# 設置參數(shù)
'args': (14, 6, 5)
},
'every_10_seconds': {
# 任務路徑 應用目錄/任務文件/方法
'task': 'myapp.tasks.schedule_execute',
# 每10秒執(zhí)行一次
'schedule': 10,
# 設置參數(shù)
'args': (18, 10, 10)
},
}
創(chuàng)建執(zhí)行方法
在task.py中設置執(zhí)行方法并記錄日志
from celery import shared_task
import logging
logger = logging.getLogger(__name__)
@shared_task
def schedule_execute(x, y, s):
logger.info('_____' * 20)
logger.info('每%d秒執(zhí)行一次' % s)
logger.info('%d + %d = %d' % (x, y, (x+y)))
logger.info('執(zhí)行結束')
logger.info('_____' * 20)
啟動celery
(兩個cmd)分別啟動worker和beat
celery -A project worker -l debug -P eventlet
celery beat -A project -l debug
可通過控制臺查看執(zhí)行結果。
綁定任務
Celery可通過task綁定到實例獲取到task的上下文,這樣我們可以在task運行時候獲取到task的狀態(tài),記錄相關日志等
實現(xiàn)綁定
修改應用任務文件內容:在裝飾器中加入參數(shù) bind=True,在add函數(shù)中的第一個參數(shù)設置為self。
內容如下:
@shared_task(bind=True)
def add(self, x, y):
print('添加一個事件')
time.sleep(2)
print(x, y)
logger.info(self.name)
logger.info(dir(self))
print('事件執(zhí)行完成')
實現(xiàn)錯誤重試
self對象是myapp.tasks.add實例,可用于實現(xiàn)重試。
@shared_task(bind=True)
def add(self, x, y):
try:
print('添加一個事件')
time.sleep(2)
print(x, y)
logger.info(self.name)
# 沒有state屬性 只是為了驗證重試
logger.info(self.state)
logger.info(dir(self))
print('事件執(zhí)行完成')
except Exception as e:
# 出錯每5秒嘗試一次,嘗試3次
self.retry(exc=e, countdown=5, max_retries=3)
?
celery管理和監(jiān)控
celery通過flower組件實現(xiàn)管理和監(jiān)控功能。
Flower官網
Getting started — Flower 2.0.0 documentation
安裝
pip install flower
運行命令
celery -A project flower
Or
celery -A project flower --port=5001
參數(shù)說明
-A 項目名稱
--port 端口號,默認5555
運行flower
運行命令,顯示如下:
瀏覽器訪問
http://127.0.0.1:port
?
總結
本文主要介紹了celery的應用場景;
如何安裝及安裝哪些類庫;
異步和定時執(zhí)行實現(xiàn)以及任務可視化管理。
參考文章
https://www.cnblogs.com/chunyouqudongwuyuan/p/16892475.html
Django 中celery的使用_django celery_寵乖儀的博客-CSDN博客
在django中使用celery_嗶哩嗶哩_bilibili文章來源:http://www.zghlxwxcb.cn/news/detail-502629.html
一文讀懂 Python 分布式任務隊列 celery文章來源地址http://www.zghlxwxcb.cn/news/detail-502629.html
到了這里,關于Django高級擴展之celery使用的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!