說(shuō)明
在Python - 深度學(xué)習(xí)系列32 - glm2接口部署實(shí)踐提到,通過(guò)部署本地化大模型來(lái)完成特定的任務(wù)。
由于大模型的部署依賴顯卡,且常規(guī)量級(jí)的任務(wù)需要大量的worker支持,從成本考慮,租用算力機(jī)是比較經(jīng)濟(jì)的。由于任務(wù)是屬于超高計(jì)算傳輸比的類型,且算力機(jī)隨時(shí)可能出現(xiàn)不穩(wěn)定的情況。
所以,使用消息隊(duì)列完成此項(xiàng)任務(wù)是比較合適的。本次目標(biāo):
- 1 回顧并快速搭建RabbitMQ和RabbitAgent服務(wù)的方法
- 2 在無(wú)端口算力租用商(AutoDL)下部署chatglm2服務(wù),并啟動(dòng)Worker處理數(shù)據(jù)
- 3 在有端口算力租用商(仙宮云)下部署chatglm2服務(wù),并用nginx反向代理,然后在異地啟動(dòng)worker測(cè)試
內(nèi)容
1 構(gòu)建消息隊(duì)列(Server)
1.1 RabbitMQ鏡像
先采用之前的命令啟動(dòng)
在算力機(jī)使用阿里云鏡像倉(cāng)庫(kù)拉取,分鐘級(jí)完成啟動(dòng)
1.2 RabbitAgent服務(wù)
理論上,應(yīng)該封裝為鏡像后,以容器方式啟動(dòng)。不過(guò)租用的算力機(jī)系統(tǒng)盤太小(50G),裝完CUDA之后只剩下10G多的空間,所以這次就把項(xiàng)目文件搬過(guò)去,在宿主機(jī)啟動(dòng)。
以后這類輕量級(jí)的服務(wù),可以用一個(gè)很小的python環(huán)境鏡像封裝。
res = req.post('http://IP:24098/send_workq_message/', json = para_dict)
<Response [200]>
# 6 永久啟動(dòng)服務(wù)
nohup python3 server.py >/dev/null 2>&1 &
消費(fèi)者(手動(dòng)確認(rèn)消息模式)
import pika
import json
credentials = pika.PlainCredentials('user', 'passwd')
connection = pika.BlockingConnection(pika.ConnectionParameters('IP', port, '/', credentials))
channel = connection.channel()
def callback(ch, method, properties, body):
input_data = json.loads(body.decode())
print(f" [x] Received ",input_data)
# time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)
# channel.queue_declare(queue='hello1',durable=True)
# 消費(fèi)者預(yù)取消息數(shù)
channel.basic_qos(prefetch_count=3)
# 1 消費(fèi)持久化的隊(duì)列
#channel.basic_consume(queue='hello1',
# on_message_callback=callback, auto_ack =False)
# 2 消費(fèi)非持久化隊(duì)列
channel.basic_consume(queue='hello2',
on_message_callback=callback, auto_ack =False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
[*] Waiting for messages. To exit press CTRL+C
[x] Received {'msg_id': 1, 'msg': 'first msg'}
[x] Done
[x] Received {'msg_id': 2, 'msg': 'second msg'}
[x] Done
[x] Received {'msg_id': 1, 'msg': 'first msg'}
[x] Done
[x] Received {'msg_id': 2, 'msg': 'second msg'}
[x] Done
1.3 將任務(wù)數(shù)據(jù)通過(guò)RabbitAgent寫入
寫入2.8萬(wàn)條,耗時(shí)5秒。
2 無(wú)端口算力租用商Worker測(cè)試
除了一定要返回的結(jié)果數(shù)據(jù),還應(yīng)該加上機(jī)器名稱,顯卡配置與處理時(shí)長(zhǎng)。
2.1 啟動(dòng)服務(wù)
無(wú)端口算力機(jī)的代表就是AutoDL了,他們家機(jī)器也偏貴,4090一小時(shí)2.5~2.6元,比仙宮云高不少(我還是比較prefer后者的)。目前暫時(shí)沒(méi)發(fā)現(xiàn)AutoDL有什么特別的優(yōu)點(diǎn),中規(guī)中矩。
發(fā)送文件
rsync -rvltz -e 'ssh -p 44620' --progress /data/implement_container_file/chatglm2_6b_int4_api/workspace root@connect.westb.seetacloud.com:/root/autodl-tmp/
然后修改api.py中模型加載的位置和端口號(hào),啟動(dòng)3個(gè)服務(wù)。
以下是獲取單條數(shù)據(jù)并進(jìn)行調(diào)試的方法
import pika
import json
credentials = pika.PlainCredentials('x', 'xxx')
connection = pika.BlockingConnection(pika.ConnectionParameters('xxxx', 24091, '/', credentials,heartbeat=600))
channel = connection.channel()
# 聲明一個(gè)隊(duì)列
channel.queue_declare(queue='ent_intro_task', durable=True)
# 從隊(duì)列中獲取一條消息
method_frame, header_frame, body = channel.basic_get(queue='ent_intro_task')
data = body.decode('utf-8')
data1 = json.loads(data)
...
connection.close()
完成測(cè)試之后,打包為ent_intro_worker.py
,該腳本接受一個(gè)端口輸入,以便將worker和server匹配起來(lái),充分利用資源。
import pika
import json
import time
import sys
import requests as req
# 獲取命令行參數(shù)
if len(sys.argv) > 1:
parameter_value = sys.argv[1]
print("傳入的參數(shù)值為:", parameter_value)
else:
print("未傳入?yún)?shù)")
def send_resp(a_message):
message_list = [a_message]
para_dict = {}
para_dict['rabbit'] = 'rabbit01'
para_dict['routing_key'] = 'ent_intro_result'
para_dict['durable'] = True
para_dict['message_list'] = message_list
para_dict['queue'] = 'ent_intro_result'
resp = req.post('http://IP:PORT/send_workq_message/', json = para_dict)
return True
tmp ='''
成立日期:%s
注冊(cè)地址:%s
%s簡(jiǎn)介,字?jǐn)?shù)在100-200字之間
'''
credentials = pika.PlainCredentials('andy', 'andy123')
connection = pika.BlockingConnection(pika.ConnectionParameters('IP', PORT, '/', credentials, heartbeat=600))
channel = connection.channel()
# 手動(dòng)確認(rèn)
def callback(ch, method, properties, body):
input_data = json.loads(body.decode())
print(f" [x] Received ",input_data)
tick1 = time.time()
prompt_content = {'prompt': tmp % (input_data['reg_dt'], input_data['addr'], input_data['ent_table_name'] )}
res = req.post('http://127.0.0.1:%s/' % parameter_value, json =prompt_content).json()
tick2 = time.time()
a_message = {}
a_message['company'] = input_data['ent_table_name']
a_message['intro'] = res['response']
a_message['spends'] = tick2-tick1
send_resp(a_message)
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.queue_declare(queue='ent_intro_task',durable=True)
channel.basic_qos(prefetch_count=3)
channel.basic_consume(queue='ent_intro_task',
on_message_callback=callback, auto_ack =False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
該worker獲取數(shù)據(jù),然后向本地大模型服務(wù)請(qǐng)求結(jié)果,然后將結(jié)果寫到結(jié)果隊(duì)列。啟動(dòng)worker進(jìn)行測(cè)試,python3 ent_intro_worker.py 24096
。
沒(méi)問(wèn)題后就轉(zhuǎn)入后臺(tái)運(yùn)行:nohup python3 ent_intro_worker.py 24096 >/dev/null 2>&1 &
3 有端口算力租用商Worker測(cè)試
3.1 負(fù)載均衡
由于單個(gè)的量化模型不足以充分利用顯卡的性能,所以就要啟動(dòng)多個(gè)同樣的服務(wù)。調(diào)用時(shí)需要進(jìn)行多個(gè)服務(wù)的端口指定,這樣就比較麻煩。
用nginx進(jìn)行負(fù)載均衡,然后只暴露一個(gè)端口作為服務(wù)接口。然后接下來(lái)在遠(yuǎn)程主機(jī)調(diào)用這個(gè)服務(wù)接口(worker)。
租用一臺(tái)仙宮云主機(jī)。數(shù)據(jù)上傳有點(diǎn)問(wèn)題,感覺(jué)它的云盤是外掛的,而且不穩(wěn)定。最終我把數(shù)據(jù)先傳到系統(tǒng)盤,再?gòu)南到y(tǒng)盤傳到云盤才成功。另外在啟動(dòng)服務(wù)時(shí),模型的加載時(shí)間明顯太長(zhǎng)了。感覺(jué)云盤是機(jī)械盤。
rsync -rvltz -e 'ssh -p 111' --progress /data/implement_container_file/chatglm2_6b_int4_api/workspace root@m1ehp5n70rxvg81b.ssh.x-gpu.com:/root/
==> /root/cloud/
安裝包
pip3 install -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple/
啟動(dòng)三個(gè)服務(wù)。
安裝、配置并啟動(dòng)nginx。
events {
#設(shè)置工作模式為epoll,除此之外還有select,poll,kqueue,rtsig和/dev/poll模式
use epoll;
#定義每個(gè)進(jìn)程的最大連接數(shù),受系統(tǒng)進(jìn)程的最大打開文件數(shù)量限制
worker_connections 1024;
}
http{
# 配置nginx上傳文件最大限制
client_max_body_size 50000m;
upstream multi_ma {
# fair;
server 172.17.0.1:10000 ;
server 172.17.0.1:10001 ;
server 172.17.0.1:10002 ;
}
server {
listen 80;
location / {
proxy_pass http://multi_ma;
}
}
}
遠(yuǎn)端使用worker調(diào)用。
實(shí)操時(shí)發(fā)現(xiàn),雖然仙宮云可以給一個(gè)80端口,但是似乎也是容器里的虛擬環(huán)境,不讓再安裝包了,所以也沒(méi)法安裝nginx。不過(guò)理論上應(yīng)該可以實(shí)現(xiàn)。
最后,還是用類似AutoDL的方式啟動(dòng)3個(gè)worker。
兩塊4090之后,速度明顯快多了。
3.2 獲取結(jié)果并入庫(kù)
建立對(duì)應(yīng)的表
# 2 導(dǎo)入包
from Basefuncs import *
# 快速載入連接
def make_local_wmongo_connect(server_name):
try:
tem_w = from_pickle(server_name)
print('【Loading cur_w】from pickle')
except:
w = WMongo('w')
tem_w = w.TryConnectionOnceAndForever(server_name =server_name)
to_pickle(tem_w, server_name)
return tem_w
m8_cur_w = make_local_wmongo_connect('m8.24003')
# 建立索引
m8_cur_w.set_a_index(tier1='llm', tier2 = 'company_intro', idx_var='pid')
m8_cur_w.set_a_index(tier1='llm', tier2 = 'company_intro', idx_var='company')
m8_cur_w.set_a_index(tier1='llm', tier2 = 'company_intro', idx_var='model_name')
從結(jié)果隊(duì)列里取數(shù),然后入庫(kù)
# 封裝函數(shù)
def get_some_batch_updated():
credentials = pika.PlainCredentials('xxx', 'xxx')
connection = pika.BlockingConnection(pika.ConnectionParameters('IP', port, '/', credentials, heartbeat=600))
# 2 迭代的獲取數(shù)據(jù)
res_list = []
with connection.channel() as channel:
for i in range(100):
# 聲明一個(gè)隊(duì)列
channel.queue_declare(queue='ent_intro_result', durable=True)
# 從隊(duì)列中獲取一條消息
method_frame, header_frame, body = channel.basic_get(queue='ent_intro_result')
res_dict = json.loads(body.decode())
res_list.append(res_dict)
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
# 3 拼湊為標(biāo)準(zhǔn)數(shù)據(jù)框
res_df = pd.DataFrame(res_list)
# 增加必要的模型字段
res_df['model_name'] = 'chatglm2_6b_int4'
res_df['pid'] = (res_df['company'] + res_df['model_name']).apply(md5_trans)
m8_cur_w.insert_or_update_with_key(tier1 = 'llm', tier2 = 'company_intro', data_listofdict= res_df.to_dict(orient='records') , key_name='pid')
connection.close()
# 獲取并存儲(chǔ)100條
get_some_batch_updated()
4 結(jié)語(yǔ)
本次完成了:
- 1 RabbitMQ 和 RabbitAgent的建立。這使得其他機(jī)器可以不必要使用端口,非常適合超高計(jì)算傳輸比的任務(wù)。
- 2 將原始數(shù)據(jù)通過(guò)rabbit agent 發(fā)布到任務(wù)隊(duì)列
- 3 將chatglm2-6b部署到算力租用機(jī):測(cè)試了主流的三家autodl, anygpu和仙宮云,都是ok的
- 4 在各算力機(jī)上啟動(dòng)worker進(jìn)行處理
- 5 將結(jié)果獲取,然后存在本地的mongo
沒(méi)能成功完成的實(shí)踐是在仙宮云使用nginx做負(fù)載均衡,簡(jiǎn)化worker的請(qǐng)求。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-849709.html
結(jié)論:用llm來(lái)做任務(wù)成本還是比較高的。價(jià)格折算下來(lái),大約 ¥1/千條。所以,要把大模型用在高價(jià)值領(lǐng)域,例如替代人工打標(biāo),寫函數(shù)這些。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-849709.html
到了這里,關(guān)于Python 全棧系列239 使用消息隊(duì)列完成分布式任務(wù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!