目錄
一、依賴包情況
二、項(xiàng)目目錄結(jié)構(gòu)
?? 2.1、怎么將django的應(yīng)用創(chuàng)建到apps包
三、celery的配置
2.1、celery_task/celery.py
2.2、celery_task/async_task.py
2.3、celery_task/scheduler_task.py
2.4、utils/check_task.py
四、apps/user中配置相關(guān)處理視圖
4.1、基本配置
4.2、user的models
4.3、user的視圖函數(shù)
五、調(diào)用函數(shù)測(cè)試
5.1、啟動(dòng)項(xiàng)目
5.2、測(cè)試項(xiàng)目:Postman接口工具
六、報(bào)錯(cuò)
celery集成Django的官方文檔:
First steps with Django — Celery 5.3.4 documentationhttps://docs.celeryq.dev/en/stable/django/first-steps-with-django.html#using-celery-with-django
一、依賴包情況
python==3.9.0
django==3.2.0
celery==5.3.1
django-redis==5.3.0
eventlet==0.33.3? #windows系統(tǒng)需要使用到
注意:還需要在系統(tǒng)中安裝好redis數(shù)據(jù)庫(kù),不然無(wú)法使用。
二、項(xiàng)目目錄結(jié)構(gòu)
?????????????????
?? 2.1、怎么將django的應(yīng)用創(chuàng)建到apps包
·1、創(chuàng)建user應(yīng)用
cd apps
python ../manage.py startapp user
?? 2、修改user包下的apps.py模塊
from django.apps import AppConfig
class UserConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
#原來(lái)是 name = 'user',改成下面的
name = 'apps.user'
3、注冊(cè)到settings.py文件中
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'apps.user.apps.UserConfig', #注冊(cè)u(píng)ser應(yīng)用,from apps.user.apps import UserConfig
]
三、celery的配置
概述:celery應(yīng)用程序和任務(wù)都放到celery_task包中,其中celery.py是創(chuàng)建Celery的實(shí)例對(duì)象的,async_task.py用來(lái)寫異步任務(wù),scheduler_task.py用來(lái)寫定時(shí)任務(wù)的。utils/check_task.py用來(lái)檢測(cè)任務(wù)id是否結(jié)束并獲取任務(wù)的返回值的。
2.1、celery_task/celery.py
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta
# 消息中間件,密碼是你redis的密碼
# broker='redis://:123456@127.0.0.1:6379/2' 密碼123456
broker = 'redis://127.0.0.1:6379/0' # 無(wú)密碼
# 任務(wù)結(jié)果存儲(chǔ)
backend = 'redis://127.0.0.1:6379/1'
#包含任務(wù)的所有模塊的導(dǎo)入路徑:
task_module = [
'celery_task.async_task', #寫任務(wù)模塊導(dǎo)入路徑,該模塊主要寫異步任務(wù)的方法
'celery_task.scheduler_task', #寫任務(wù)模塊導(dǎo)入路徑,該模塊主要寫定時(shí)任務(wù)的方法
]
# 生成celery對(duì)象,'task'相當(dāng)于key,用于區(qū)分celery對(duì)象
# broker是指定消息處理,backend是指定結(jié)果后端的存儲(chǔ)位置 include參數(shù)需要指定任務(wù)模塊
app = Celery('task', broker=broker, backend=backend, include=task_module)
# 配置
# 時(shí)區(qū)
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 定時(shí)任務(wù)配置
app.conf.beat_schedule = {
# 名字隨意命名
'add-func-30-seconds': {
# 執(zhí)行add_task下的addy函數(shù)
'task': 'celery_task.scheduler_task.add_func', # 任務(wù)函數(shù)的導(dǎo)入路徑,from celery_task.scheduler_task import add_func
# 每10秒執(zhí)行一次
'schedule': timedelta(seconds=30),
# add函數(shù)傳遞的參數(shù)
'args': (10, 21)
},
#名字隨意起
'add-func-5-minutes': {
'task': 'celery_task.scheduler_task.add_func',# 任務(wù)函數(shù)的導(dǎo)入路徑,from celery_task.scheduler_task import add_func
# crontab不傳的參數(shù)默認(rèn)就是每的意思,比如這里是每年每月每日每天每小時(shí)的5分執(zhí)行該任務(wù)
'schedule': crontab(minute='5'), # 之前時(shí)間點(diǎn)執(zhí)行,每小時(shí)的第5分鐘執(zhí)行任務(wù), 改成小時(shí),分鐘,秒 就是每天的哪個(gè)小時(shí)哪分鐘哪秒鐘執(zhí)行
'args': (19, 22) #定時(shí)任務(wù)需要的參數(shù)
},
#緩存用戶數(shù)據(jù)到cache中
'cache-user-func':{
'task':'celery_task.scheduler_task.cache_user_func',#導(dǎo)入任務(wù)函數(shù):from celery_task.scheduler_task import cache_user_func
'schedule':timedelta(minutes=1),#每1分鐘執(zhí)行一次,將用戶消息緩存到cache中
}
}
'''
配置:也可以使用下面這種方式:
app.conf.update(
task_serializer='json',
accept_content=['json'], # Ignore other content
result_serializer='json',
timezone='Asia/Shanghai',
enable_utc=False,
)
'''
2.2、celery_task/async_task.py
# 因?yàn)樾枰玫絛jango中的內(nèi)容,所以需要配置django環(huán)境
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "study_celery.settings")
import django
django.setup()
# 導(dǎo)入celery對(duì)象app
from celery_task.celery import app
# 導(dǎo)入django自帶的發(fā)送郵件模塊
from django.core.mail import send_mail
import threading
from study_celery import settings
'''
1、沒(méi)有返回值的,@app.task(ignore_result=True)
2、有返回值的任務(wù),@app.task
'''
#沒(méi)有返回值,禁用掉結(jié)果后端
@app.task
def send_email_task(email,code): # 此時(shí)可以直接傳郵箱,還能減少一次數(shù)據(jù)庫(kù)的IO操作
'''
:param email: 接收消息的郵箱,用戶的郵箱
:return:
'''
# 啟用線程發(fā)送郵件,此處最好加線程池
t = threading.Thread(
target=send_mail,
args=(
"登錄前獲取的驗(yàn)證碼", # 郵件標(biāo)題
'點(diǎn)擊該郵件激活你的賬號(hào),否則無(wú)法登陸', # 給html_message參數(shù)傳值后,該參數(shù)信息失效
settings.EMAIL_HOST_USER, # 用于發(fā)送郵件的郵箱地址
[email], # 接收郵件的郵件地址,可以寫多個(gè)
),
# html_message中定義的字符串即HTML格式的信息,可以在一個(gè)html文件中寫好復(fù)制出來(lái)放在該字符串中
kwargs={
'html_message': f"<p></p> <p>驗(yàn)證碼:{code}</p>"
}
)
t.start()
return {'email':email,'code':code}
2.3、celery_task/scheduler_task.py
# 因?yàn)樾枰玫絛jango中的內(nèi)容,所以需要配置django環(huán)境
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "study_celery.settings")
import django
django.setup()
from celery_task.celery import app
from apps.user.views import models as user_models
from django.core.cache import cache
import time
from django.forms import model_to_dict
#有返回值,返回值可以從結(jié)果后端中獲取
@app.task
def add_func(a,b):
print('執(zhí)行了加法函數(shù)')
cache.set('add_ret',{'time':time.strftime('%Y-%m-%d %H:%M:%S'),'ret':a+b})
return a+b
#不需要返回值,禁用掉結(jié)果后端
@app.task(ignore_result=True)
def cache_user_func():
user = user_models.UserModel.objects.all()
user_dict = {}
for obj in user:
user_dict[obj.account] = model_to_dict(obj)
cache.set('all-user-data',user_dict,timeout=35*60)
2.4、utils/check_task.py
from celery.result import AsyncResult
from celery_task.celery import app
'''驗(yàn)證任務(wù)的執(zhí)行狀態(tài)的'''
def check_task_status(task_id):
'''
任務(wù)的執(zhí)行狀態(tài):
PENDING :等待執(zhí)行
STARTED :開(kāi)始執(zhí)行
RETRY :重新嘗試執(zhí)行
SUCCESS :執(zhí)行成功
FAILURE :執(zhí)行失敗
:param task_id:
:return:
'''
result = AsyncResult(id=task_id, app=app)
dic = {
'type':result.status,
'msg':'',
'data':'',
'code':400
}
if result.status == 'PENDING':
dic['msg'] = '任務(wù)等待中'
elif result.status == 'STARTED':
dic['msg'] = '任務(wù)開(kāi)始執(zhí)行'
elif result.status == 'RETRY':
dic['msg']='任務(wù)重新嘗試執(zhí)行'
elif result.status =='FAILURE':
dic['msg'] = '任務(wù)執(zhí)行失敗了'
elif result.status == 'SUCCESS':
result = result.get()
dic['msg'] = '任務(wù)執(zhí)行成功'
dic['data'] = result
dic['code'] = 200
# result.forget() # 將結(jié)果刪除
# async.revoke(terminate=True) # 無(wú)論現(xiàn)在是什么時(shí)候,都要終止
# async.revoke(terminate=False) # 如果任務(wù)還沒(méi)有開(kāi)始執(zhí)行呢,那么就可以終止。
return dic
四、apps/user中配置相關(guān)處理視圖
4.1、基本配置
1、study_celery/settings.py
#cache緩存
CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": "redis://127.0.0.1:6379",
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
"CONNECTION_POOL_KWARGS": {"max_connections": 100}
# "PASSWORD": "123",
},
'TIMEOUT':30*60 #緩存過(guò)期時(shí)間
}
}
#郵件配置
# EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = 'smtp.qq.com' # 如果是 163 改成 smtp.163.com
EMAIL_PORT = 465
EMAIL_HOST_USER = '2414155342@qq.com' # 發(fā)送郵件的郵箱帳號(hào)
EMAIL_HOST_PASSWORD = 'qq郵箱的授權(quán)碼' # 授權(quán)碼,各郵箱的設(shè)置中啟用smtp服務(wù)時(shí)獲取
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
# 這樣收到的郵件,收件人處就會(huì)這樣顯示
# DEFAULT_FROM_EMAIL = '<'xxxxx@qq.com>'
EMAIL_USE_SSL = True # 使用ssl
# EMAIL_USE_TLS = False # 使用tls
# EMAIL_USE_SSL 和 EMAIL_USE_TLS 是互斥的,即只能有一個(gè)為 True
2、study_celery/urls.py
from django.contrib import admin
from django.urls import path,include
urlpatterns = [
path('admin/', admin.site.urls),
path('user/',include('apps.user.urls'))
]
3、apps/user/urls.py
from django.urls import path
from . import views
urlpatterns = [
]
4.2、user的models
apps/user/models.py
from django.db import models
# Create your models here.
class UserModel(models.Model):
id = models.AutoField(primary_key=True)
name = models.CharField(max_length=32)
account = models.CharField(max_length=64)
password = models.CharField(max_length=256)
email = models.EmailField()
執(zhí)行數(shù)據(jù)庫(kù)遷移命令
python manage.py makemigrations
python manage.py migrate
4.3、user的視圖函數(shù)
1、apps/user/views.py
from django.contrib.auth.hashers import make_password, check_password
from django.views import View
from django.http import JsonResponse
from . import models
from django.core.cache import cache
import time
from celery_task.async_task import send_email_task
# Create your views here.
class ResgiterView(View):
#注冊(cè)用戶
def post(self,request):
name = request.POST.get('name')
account = request.POST.get('account')
password = request.POST.get('password')
email = request.POST.get('email')
obj = models.UserModel.objects.filter(account=account).first()
if obj:
return JsonResponse({'code':400,'msg':'賬戶已經(jīng)存在了'})
password = make_password(password)
instance = models.UserModel.objects.create(name=name,account=account,password=password,email=email)
return JsonResponse({'code':200,'msg':'注冊(cè)用戶成功'})
class LoginView(View):
#用戶登錄
def post(self,request):
account = request.POST.get('account')
password = request.POST.get('password')
code = request.POST.get('code') #驗(yàn)證碼
email_code = cache.get(f'email_{account}')#發(fā)給郵箱的驗(yàn)證碼.get(f'email_{account})
print(code,email_code)
if code and email_code:
if code != email_code:
return JsonResponse({'code':400,'msg':'驗(yàn)證碼錯(cuò)誤'})
else:
return JsonResponse({'code':400,'msg':'請(qǐng)先點(diǎn)擊發(fā)送郵件獲取驗(yàn)證碼'})
obj = models.UserModel.objects.filter(account=account).first()
if not obj:
return JsonResponse({'code':400,'msg':'當(dāng)前用戶不存在'})
pwd_true = check_password(password,obj.password)
response = JsonResponse({'code':200,'msg':'登錄成功'})
if pwd_true:
return response
else:
return JsonResponse({'code':400,'msg':'用戶名或密碼錯(cuò)誤'})
class LoginSendEmailView(View):
#用戶登錄前,需要驗(yàn)證碼,發(fā)送驗(yàn)證碼給用戶的郵箱
def post(self,request):
account = request.POST.get('account')
email = request.POST.get('email')
code = str(time.time())[-5:]
cache.set(f'email_{account}',code)
print(cache.get(f'email_{account}'))
res = send_email_task.delay(email,code)
task_id = res.id
print('驗(yàn)證碼是',code)
return JsonResponse({'code':200,'msg':f'請(qǐng)查看{email}郵箱中是否收到郵件','task_id':task_id})
class AllUserDataView(View):
#查詢cache_user_func定時(shí)任務(wù)執(zhí)行時(shí)存到cache中的用戶數(shù)據(jù)
def get(self,request):
key = 'all-user-data'
data = cache.get(key)
if data:
return JsonResponse({'code':200,'data':data})
else:
return JsonResponse({'code':400,'msg':'沒(méi)有相關(guān)數(shù)據(jù)'})
class AddFuncDataView(View):
#查詢add_func 定時(shí)任務(wù)執(zhí)行時(shí)存到cache中的數(shù)據(jù)
def get(self, request):
data = cache.get('add_ret')
print(data, type(data))
return JsonResponse({'code': 200, 'data': data})
class UserTaskIdGetDataView(View):
def get(self,request):
from utils.check_task import check_task_status
#檢查任務(wù)是否成功了,獲取任務(wù)的返回值
task_id = request.GET.get('task_id')
if not task_id:
return JsonResponse({'code':400,'msg':'沒(méi)有攜帶任務(wù)id'})
ret = check_task_status(task_id)
return JsonResponse(ret)
2、apps/user/urls.py
from django.urls import path
from . import views
urlpatterns = [
path('login/',views.LoginView.as_view(),name='user-login'),#登錄
path('register/', views.ResgiterView.as_view(), name='user-register'),#注冊(cè)
path('login/code/',views.LoginSendEmailView.as_view(),name='user-login-send-email'),#登錄驗(yàn)證碼
path('all/user/data/',views.AllUserDataView.as_view(),name='user-all-user-data'),#獲取定時(shí)任務(wù)cache_user_func緩存到cache中的用戶數(shù)據(jù)
path('add/result/',views.AddFuncDataView.as_view(),name='user-add-result'),#獲取定時(shí)任務(wù)add_func緩存到cache的計(jì)算結(jié)果
path('task-id/result/',views.UserTaskIdGetDataView.as_view(),name='user-task-id-data'),#通過(guò)任務(wù)的task-id獲取到任務(wù)返回值
]
五、調(diào)用函數(shù)測(cè)試
5.1、啟動(dòng)項(xiàng)目
1、啟動(dòng)django項(xiàng)目
python manage.py runserver
2、啟動(dòng)celery異步
#windows系統(tǒng)
celery -A celery_task worker -l info? -P? eventlet
#linux系統(tǒng)
celery -A celery_task worker -l info?
3、啟動(dòng)celery定時(shí)(定時(shí)任務(wù)也是提交給異步的)
celery -A celery_task beat -l info
5.2、測(cè)試項(xiàng)目:Postman接口工具
1、注冊(cè):url= /user/register/
2、登錄前點(diǎn)擊獲取驗(yàn)證碼:返回任務(wù)id, url=/user/login/code/
?把task_id=17ee8389-14ab-4da1-b88a-56446afb4493 復(fù)制下來(lái),4中可以用到
3、登錄:code是發(fā)送步驟2中發(fā)送給郵箱的驗(yàn)證碼 ,url=/user/login/
4、通過(guò)任務(wù)id,獲取到發(fā)送郵箱異步任務(wù)的返回值,url=/user/task-id/result/
復(fù)制2返回值中的task_id,獲取該任務(wù)的返回值
5、獲取定時(shí)任務(wù)中,add_func緩存在cache中的計(jì)算數(shù)據(jù)
6、獲取定時(shí)任務(wù)中,cache_user_func緩存到cache中的用戶數(shù)據(jù)
?總結(jié):
1、異步任務(wù),一般是保存文件、發(fā)送郵件、耗時(shí)操作等
2、定時(shí)任務(wù),定時(shí)處理某些數(shù)據(jù)。
六、報(bào)錯(cuò)
1、先去celery.py中查看定時(shí)任務(wù)的配置,模塊的路徑是不是有問(wèn)題,千萬(wàn)不要多了一個(gè)空格啥的文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-643298.html
2、如果通過(guò)任務(wù)id獲取任務(wù)的返回值不成功,看看是不是添加@app.task(ignore_result=True),如果是就去掉這個(gè)參數(shù)配置文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-643298.html
到了這里,關(guān)于Django框架-使用celery(一):django使用celery的通用配置,不受版本影響的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!