一.Asynq介紹
Asynq 是一個 Go 庫,一個高效的分布式任務(wù)隊列。
Asynq 工作原理:
- 客戶端(生產(chǎn)者)將任務(wù)放入隊列
- 服務(wù)器(消費者)從隊列中拉出任務(wù)并為每個任務(wù)啟動一個工作 goroutine
- 多個工作人員同時處理任務(wù)
git庫:https://github.com/hibiken/asynq
二.所需工具
Asynq 使用 Redis 作為消息代理。client 和 server 都需要連接到 Redis 進行寫入和讀取。
PS:請確保所使用redis >= 5.0
三.代碼示例
以記錄操作的中間件函數(shù)向數(shù)據(jù)庫寫數(shù)據(jù)的情景為例。
- 生產(chǎn)者(客戶端)函數(shù)調(diào)用入口:
其中 map 為需向數(shù)據(jù)庫寫入的內(nèi)容文章來源:http://www.zghlxwxcb.cn/news/detail-695293.html
client.Call("audit:opera", map[string]any{
"uri": uri,
"method": method,
"params": string(paramsByte),
"headers": string(headerByte),
"code": codeInt,
"model": model,
"action": action,
"user_id": userId,
"company_id": companyId,
"user_name": userName,
"company": companyName,
})
- 生產(chǎn)者函數(shù)
func Call(t string, payload map[string]any) error {
// redis連接
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
Password: "",
DB: 1,
})
defer client.Close()
switch t {
case "audit:opera":
// 初始化新任務(wù)
task, err := server.NewOperateSendTask(payload)
if err != nil {
return err
}
// 任務(wù)入隊
_, err = client.Enqueue(task, asynq.Queue("audit"))
if err != nil {
log.Err(err).Msg(fmt.Sprintf("task: %v\n", task))
return err
}
}
return nil
}
func NewOperateSendTask(data map[string]any) (*asynq.Task, error) {
payload, err := json.Marshal(data)
if err != nil {
return nil, err
}
return asynq.NewTask(consts.TypeAuditOpera, payload), nil
}
- 消費者函數(shù)
func HandlerAuditOperateTask(ctx context.Context, t *asynq.Task) error {
var record ent.OperateRecord
// 隊列中取任務(wù)
err := json.Unmarshal(t.Payload(), &record)
if err != nil {
log.Err(err).Msg("task.json.Unmarshal")
return err
}
// 真正的數(shù)據(jù)庫操作
err = dao.OperateRecord.CreateOperateRecord(&record)
if err != nil {
log.Err(err).Msg("task.dao.OperateRecord.CreateOperateRecord")
return err
}
return nil
}
- asynq初始化(消費者啟動入口,項目初始化時自動啟動)
func InitAsynq(ip string, port int, passwd string) {
addr := fmt.Sprintf("%s:%d", ip, port)
srv := asynq.NewServer(
asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
Password: "",
DB: 1,
},
// 異步隊列
asynq.Config{
Queues: map[string]int{
"audit": 3,
},
},
)
mux := asynq.NewServeMux()
// 啟動消費者
mux.HandleFunc("audit:opera", server.HandlerAuditOperateTask)
go srv.Run(mux)
}
四.Reference
Go異步任務(wù)解決方案之Asynq庫詳解:
https://www.jb51.net/article/275392.htm文章來源地址http://www.zghlxwxcb.cn/news/detail-695293.html
到了這里,關(guān)于【go】異步任務(wù)解決方案Asynq實戰(zhàn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!