簡介
異步或非阻塞處理是一種將某些任務(wù)的執(zhí)行與程序的主要流程分離的方法。這為您提供了幾個優(yōu)勢,包括允許用戶界面代碼在沒有中斷的情況下運行。
消息傳遞是程序組件用來通信和交換信息的一種方法。它可以同步或異步實現(xiàn),并且可以允許離散進程進行無問題的通信。消息傳遞通常作為傳統(tǒng)數(shù)據(jù)庫的替代實現(xiàn),因為消息隊列通常實現(xiàn)了額外的功能,提供了增加的性能,并且可以完全駐留在內(nèi)存中。
Celery 是建立在異步消息傳遞系統(tǒng)上的任務(wù)隊列。它可以用作編程任務(wù)可以被傾倒的桶。傳遞任務(wù)的程序可以繼續(xù)執(zhí)行和響應(yīng)功能,然后稍后它可以輪詢 celery 來查看計算是否完成并檢索數(shù)據(jù)。
雖然 celery 是用 Python 編寫的,但它的協(xié)議可以在任何語言中實現(xiàn)。它甚至可以通過 webhooks 與其他語言一起使用。
通過在程序環(huán)境中實現(xiàn)作業(yè)隊列,您可以輕松卸載任務(wù)并繼續(xù)處理用戶的交互。這是增加應(yīng)用程序響應(yīng)性的簡單方法,并且在執(zhí)行長時間運行的計算時不會被鎖定。
在本指南中,我們將在 Ubuntu 12.04 VPS 上安裝和實現(xiàn)使用 RabbitMQ 作為消息系統(tǒng)的 celery 作業(yè)隊列。
安裝組件
安裝 Celery
Celery 是用 Python 編寫的,因此可以像處理常規(guī) Python 包一樣輕松安裝。
我們將按照處理 Python 包的推薦程序,通過創(chuàng)建虛擬環(huán)境來安裝我們的消息系統(tǒng)。這有助于我們保持環(huán)境穩(wěn)定,不會影響更大的系統(tǒng)。
從 Ubuntu 的默認存儲庫安裝 Python 虛擬環(huán)境包:
sudo apt-get update
sudo apt-get install python-virtualenv
我們將創(chuàng)建一個消息目錄,在這里我們將實現(xiàn)我們的系統(tǒng):
mkdir ~/messaging
cd ~/messaging
現(xiàn)在我們可以創(chuàng)建一個虛擬環(huán)境,通過以下命令安裝 celery:
virtualenv --no-site-packages venv
配置好虛擬環(huán)境后,可以通過輸入以下命令激活它:
source venv/bin/activate
您的提示符將更改以反映您現(xiàn)在正在使用我們上面創(chuàng)建的虛擬環(huán)境。這將確保我們的 Python 包安裝在本地而不是全局。
如果在任何時候我們需要停用環(huán)境(現(xiàn)在不需要),可以輸入:
deactivate
現(xiàn)在我們已經(jīng)激活了環(huán)境,可以使用 pip 安裝 celery:
pip install celery
安裝 RabbitMQ
Celery 需要一個消息代理來處理來自外部源的請求。這個代理被稱為“broker”。
有很多可供選擇的代理選項,包括關(guān)系型數(shù)據(jù)庫、NoSQL 數(shù)據(jù)庫、鍵值存儲和實際消息系統(tǒng)。
我們將配置 celery 使用 RabbitMQ 消息系統(tǒng),因為它提供了強大、穩(wěn)定的性能,并且與 celery 交互良好。這是一個很好的解決方案,因為它包含了與我們預(yù)期使用的功能很好契合的特性。
我們可以通過 Ubuntu 的存儲庫安裝 RabbitMQ:
sudo apt-get install rabbitmq-server
RabbitMQ 服務(wù)在安裝后會自動啟動在我們的服務(wù)器上。
創(chuàng)建 Celery 實例
為了使用 celery 的任務(wù)排隊功能,安裝后的第一步是創(chuàng)建一個 celery 實例。這是一個簡單的過程,導(dǎo)入包,創(chuàng)建一個“app”,然后設(shè)置 celery 能夠在后臺執(zhí)行的任務(wù)。
讓我們在我們的消息目錄內(nèi)創(chuàng)建一個名為 tasks.py
的 Python 腳本,我們可以在其中定義我們的工作人員可以執(zhí)行的任務(wù)。
sudo nano ~/messaging/tasks.py
我們應(yīng)該做的第一件事是從 celery 包中導(dǎo)入 Celery 函數(shù):
from celery import Celery
之后,我們可以創(chuàng)建一個連接到默認 RabbitMQ 服務(wù)的 celery 應(yīng)用程序?qū)嵗?/p>
from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://')
Celery
函數(shù)的第一個參數(shù)是將用于標識任務(wù)的前綴名稱。
backend
參數(shù)是一個可選參數(shù),如果您希望查詢后臺任務(wù)的狀態(tài)或檢索其結(jié)果,則是必需的。
如果您的任務(wù)只是執(zhí)行一些工作然后退出,而不返回在程序中使用的有用值,您可以將此參數(shù)省略。如果只有一些任務(wù)需要此功能,請在此啟用它,我們可以在后面逐個案例地禁用它。
broker
參數(shù)指定連接到我們代理所需的 URL。在我們的情況下,這是運行在我們服務(wù)器上的 RabbitMQ 服務(wù)。RabbitMQ 使用一種稱為“amqp”的協(xié)議運行。如果 RabbitMQ 在其默認配置下運行,celery 可以連接而無需其他信息,只需 amqp://
方案。
構(gòu)建 Celery 任務(wù)
在這個文件中,我們現(xiàn)在需要添加我們的任務(wù)。
每個 Celery 任務(wù)都必須使用裝飾器 @app.task
來引入。這允許 Celery 識別可以添加其排隊功能的函數(shù)。在每個裝飾器之后,我們只需創(chuàng)建一個我們的工作進程可以運行的函數(shù)。
我們的第一個任務(wù)將是一個簡單的函數(shù),它將一個字符串打印到控制臺。
from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://')
@app.task
def print_hello():
print 'hello there'
因為這個函數(shù)不返回任何有用的信息(它將其打印到控制臺),我們可以告訴 Celery 不使用后端來存儲關(guān)于此任務(wù)的狀態(tài)信息。這在內(nèi)部更簡單,需要更少的資源。
from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://')
@app.task(ignore_result=True)
def print_hello():
print 'hello there'
接下來,我們將添加另一個函數(shù),它將生成素數(shù)(取自 RosettaCode)。這可能是一個長時間運行的過程,因此這是一個很好的例子,說明我們在等待結(jié)果時如何處理異步工作進程。
from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://')
@app.task(ignore_result=True)
def print_hello():
print 'hello there'
@app.task
def gen_prime(x):
multiples = []
results = []
for i in xrange(2, x+1):
if i not in multiples:
results.append(i)
for j in xrange(i*i, x+1, i):
multiples.append(j)
return results
因為我們關(guān)心這個函數(shù)的返回值,并且我們想知道它何時完成(以便我們可以使用結(jié)果等),所以我們不會向這個第二個任務(wù)添加 ignore_result
參數(shù)。
保存并關(guān)閉文件。
啟動 Celery 工作進程
我們現(xiàn)在可以啟動一個工作進程,它將能夠接受來自應(yīng)用程序的連接。它將使用我們剛剛創(chuàng)建的文件來了解它可以執(zhí)行的任務(wù)。
啟動一個工作實例就像使用 celery 命令調(diào)用應(yīng)用程序名稱一樣簡單。我們將在字符串末尾包含一個 “&” 字符,將我們的工作進程放入后臺:
celery worker -A tasks &
這將啟動一個應(yīng)用程序,然后將其從終端分離,允許您繼續(xù)使用它進行其他任務(wù)。
如果您想要啟動多個工作進程,可以使用 -n
參數(shù)為每個工作進程命名:
celery worker -A tasks -n one.%h &
celery worker -A tasks -n two.%h &
當工作進程被命名時,%h
將被主機名替換。
要停止工作進程,您可以使用 kill 命令。我們可以查詢進程 ID,然后根據(jù)這些信息消除工作進程。
ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill
這將允許工作進程在退出之前完成其當前任務(wù)。
如果您希望關(guān)閉所有工作進程而不等待它們完成任務(wù),可以執(zhí)行:
ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9
使用隊列處理工作
我們可以使用我們生成的工作進程在后臺為我們的程序完成工作。
我們將在 Python 解釋器中探索不同的選項,而不是創(chuàng)建一個完整的程序來演示它是如何工作的:
python
在提示符下,我們可以將我們的函數(shù)導(dǎo)入到環(huán)境中:
from tasks import print_hello
from tasks import gen_prime
如果您測試這些函數(shù),它們似乎沒有任何特殊功能。第一個函數(shù)按預(yù)期打印一行:
print_hello()
hello there
第二個函數(shù)返回一個素數(shù)列表:
primes = gen_prime(1000)
print primes
如果我們給第二個函數(shù)一個更大的數(shù)字范圍來檢查,執(zhí)行將掛起,因為它在計算中:
primes = gen_prime(50000)
通過輸入 “CTRL-C” 來停止執(zhí)行。這個過程顯然沒有在后臺計算。
要訪問后臺工作進程,我們需要使用 .delay
方法。Celery 為我們的函數(shù)添加了額外的功能。這個方法用于將函數(shù)傳遞給工作進程執(zhí)行。它應(yīng)該立即返回:
primes = gen_prime.delay(50000)
這個任務(wù)現(xiàn)在正在由我們之前啟動的工作進程執(zhí)行。因為我們?yōu)閼?yīng)用程序配置了 backend
參數(shù),我們可以檢查計算的狀態(tài)并訪問結(jié)果。
要檢查任務(wù)是否完成,我們可以使用 .ready
方法:
primes.ready()
False
“False” 的值意味著任務(wù)仍在運行,結(jié)果尚不可用。當我們得到 “True” 的值時,我們可以對答案做些什么。
primes.ready()
True
我們可以使用 .get
方法獲取值。
如果我們已經(jīng)使用 .ready
方法驗證了值是否計算出來,那么我們可以像這樣使用該方法:
print primes.get()
[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523,
. . .
然而,如果您在調(diào)用 .get
之前沒有使用 .ready
方法,您很可能希望添加一個 “timeout” 選項,以便您的程序不必等待結(jié)果,這將違反我們的實現(xiàn)目的:
print primes.get(timeout=2)
如果超時,這將引發(fā)異常,您可以在程序中處理它。
結(jié)論
雖然這些信息足以讓你開始在程序中使用 celery,但這只是揭開了這個庫的全部功能的一角。Celery允許你將后臺任務(wù)串聯(lián)在一起,對任務(wù)進行分組,并以有趣的方式組合函數(shù)。文章來源:http://www.zghlxwxcb.cn/news/detail-830220.html
雖然 celery 是用 Python 編寫的,但可以通過 Webhooks 與其他語言一起使用。這使得它非常靈活,可以將任務(wù)移到后臺,而不受所選擇的語言的限制。文章來源地址http://www.zghlxwxcb.cn/news/detail-830220.html
到了這里,關(guān)于如何在 Ubuntu VPS 上使用 Celery 與 RabbitMQ 來做隊列的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!