【Consul】基于Go實現(xiàn)Consul服務的注冊、注銷、修改、監(jiān)控注冊的服務變化、實時同步服務信息機制
大家好 我是寸鐵??
總結了一篇【Consul】基于Go實現(xiàn)Consul服務的注冊、注銷、修改、監(jiān)控注冊的服務變化、實時同步服務信息機制?
這應該是目前全網最全的使用golang手搓Consul服務信息機制?
喜歡的小伙伴可以點點關注 ??
前言
consul常常被用來作服務注冊與服務發(fā)現(xiàn),而它的watch機制則可被用來監(jiān)控一些數據的更新,包括:nodes, KV pairs, health checks等。另外,在監(jiān)控到數據變化后,還可以調用外部處理程序,此處理程序可以是任何可執(zhí)行文件或HTTP調用,具體說明可見官網。
介紹
consul
中的watch
可以監(jiān)聽service
,k-v
,check
,event
等事件的變化,實時獲取最新的數據。consul
支持以下watch
類型:
key
監(jiān)聽一個consul kv中的keykeyprefix
監(jiān)聽consul kv中的key的前綴services
監(jiān)聽有效服務的變化nodes
監(jiān)聽節(jié)點的變化service
監(jiān)聽服務的變化checks
監(jiān)聽check的變化event
監(jiān)聽自定義事件的變化
從以上可以看出consul提供非常豐富的監(jiān)聽類型,通過這些類型我們可以實時觀測到consul整個集群中的變化,從而實現(xiàn)一些特別的需求,比如:服務告警,配置實時更新等功能。
啟動與后臺展示
Windows
啟動命令如下:
consul agent -dev
后臺運行結果展示如下:
UI界面展示
Consul服務
AgentService結構體字段具體信息
1.Kind:服務類型,是一個自定義類型 ServiceKind。
2.ID:服務的唯一標識符。(一個ID只能對應一個服務)
3.Service:服務的名稱(一個服務可以創(chuàng)建多個ID)
4.Tags:服務的標簽,以字符串數組形式存儲。
5.Meta:服務的元數據,以鍵值對的形式存儲。
6.Port:服務的端口號。
7.Address:服務的地址。
8.SocketPath:服務的套接字路徑,可選的,以字符串形式存儲。
9.TaggedAddresses:帶標簽的地址,以鍵值對的形式存儲,鍵是標簽,值是 ServiceAddress 類型。
10.Weights:服務的權重,類型為 AgentWeights。
11.EnableTagOverride:是否啟用標簽覆蓋。
12.CreateIndex:創(chuàng)建索引,無符號整數類型,用于 JSON 序列化時忽略。
13.ModifyIndex:修改索引,無符號整數類型,用于 JSON 序列化時忽略。
14.ContentHash:內容哈希,字符串類型,用于 JSON 序列化時忽略。
15.Proxy:代理配置,類型為 AgentServiceConnectProxyConfig,可選的。
16.Connect:連接信息,類型為 AgentServiceConnect,可選的。
17.PeerName:對等名稱,字符串類型,可選的。
18.Namespace:命名空間,字符串類型,用于 JSON 序列化時忽略。
19.Partition:分區(qū),字符串類型,用于 JSON 序列化時忽略。
20.Datacenter:數據中心,字符串類型,用于 JSON 序列化時忽略。
21.Locality:地點信息,類型為 Locality,用于 JSON 序列化時忽略。
打印的結果如下:
服務的類型Kind:
服務的ID: him-service-1
服務的名字Service: him-service
服務的標簽Tags: [tag1 tag2]
服務的元數據Meta: map[]
服務的端口Port: 8082
服務的地址Address: 127.0.0.4
服務的套接字路徑SocketPath:
服務的帶標簽的地址TaggedAddresses: map[lan_ipv4:{127.0.0.4 8082} wan_ipv4:{127.0.0.4 8082}]
服務的權重Weights: {1 1}
服務是否啟用標簽覆蓋EnableTagOverride: false
服務創(chuàng)建的索引CreateIndex: 5235
服務修改的索引ModifyIndex: 5235
服務內容哈希ContentHash:
服務代理配置Proxy: &{[] 0 <nil> map[] [] {} {false []} <nil>}
服務連接信息Connect: &{false <nil>}
服務對等名稱PeerName:
服務的命名空間Namespace:
服務的數據中心Datacenter:
服務的分區(qū)Partition:
服務的地點信息Locality: <nil>
同下:
注冊Consul服務
package main
import (
"fmt"
"github.com/hashicorp/consul/api"
)
func main() {
//寫api的配置信息
config := api.DefaultConfig()
//注冊到consul上的地址
config.Address = "127.0.0.1:8500" // Consul 服務器地址
//將config注冊到客戶端,由客戶端實現(xiàn)
client, err := api.NewClient(config)
if err != nil {
panic(err)
}
// 創(chuàng)建一個新的服務條目
registration := new(api.AgentServiceRegistration)
registration.ID = "my-service-3"
registration.Name = "my-service"
registration.Port = 8083
registration.Address = "127.0.0.1"
registration.Tags = []string{"tag1", "tag2"}
reg := &api.AgentServiceRegistration{
Name: registration.Name, // 服務名稱
ID: registration.ID, // 服務 ID,必須唯一
Address: registration.Address, //服務的地址
Port: registration.Port, // 服務端口 服務所在的監(jiān)聽端口
Tags: registration.Tags, // 可選:服務標簽
}
// 將服務注冊到 Consul
err = client.Agent().ServiceRegister(reg)
if err != nil {
panic(err)
}
fmt.Println("Service registered successfully")
}
注銷Consul服務
package main
import (
"fmt"
"log"
"github.com/hashicorp/consul/api"
)
func main() {
// 創(chuàng)建Consul客戶端
config := api.DefaultConfig()
client, err := api.NewClient(config)
if err != nil {
log.Fatal(err)
}
// 創(chuàng)建注銷的服務ID
serviceID := "my-service-2"
// 注銷服務
agent := client.Agent()
if err := agent.ServiceDeregister(serviceID); err != nil {
log.Fatal(err)
}
fmt.Println("Service deregistered successfully")
}
實時同步更新Consul服務的信息
package main
import (
"fmt"
"github.com/hashicorp/consul/api"
"time"
)
type Service struct {
Name string
ID string
Address string
Port int
Tags []string
}
var serviceMap = map[string][]map[string]Service{}
func main() {
serviceMap = make(map[string][]map[string]Service)
// 創(chuàng)建Consul客戶端
config := api.DefaultConfig()
client, err := api.NewClient(config)
if err != nil {
panic(err)
}
for {
// 查詢Consul客戶端的服務目錄,得到所有的服務名稱。
catalog := client.Catalog()
//得到consul上的所有服務services
allServices, _, err := catalog.Services(nil)
if err != nil {
panic(err)
}
fmt.Println("所有服務services的名稱:", allServices)
//創(chuàng)建當前的ServiceMap記錄當前的Service 出現(xiàn)則標記為true
var currentServiceMap map[string]bool
currentServiceMap = make(map[string]bool, 0)
//遍歷所有的Consul服務,將所有的Consul服務的信息存入map中。
//如服務: my-service first-service
for serviceName, _ := range allServices {
currentServiceMap[serviceName] = true
// 通過服務名稱查詢服務實例 如my-service下有兩個服務實例: my-service1 my-service2
// services是每個服務下的服務實例集合
services, _, err := client.Health().Service(serviceName, "", true, nil)
if err != nil {
panic(err)
}
var servcieSlice []map[string]Service
servcieSlice = make([]map[string]Service, 0)
// 遍歷服務實例集合
for _, service := range services {
fmt.Printf("Service %s:%d\n", service.Service.Service, service.Service.Port)
//map先定義
var instanceMap map[string]Service
//定義后用make進行創(chuàng)建
instanceMap = make(map[string]Service)
//每一個服務的實例ID對應該服務的信息
instanceMap[service.Service.ID] = Service{
Name: service.Service.Service,
ID: service.Service.ID,
Address: service.Service.Address,
Port: service.Service.Port,
Tags: service.Service.Tags,
}
fmt.Println(instanceMap)
servcieSlice = append(servcieSlice, instanceMap)
將serviceMap全局map之前出現(xiàn)過的service.Service.Service不存在則賦值為nil
//if _, ok := serviceMap[service.Service.Service]; !ok {
// serviceMap[service.Service.Service] = nil
//}
//service.Service.Service作為serviceMap的鍵,map[string]Service{}作為值存儲入map中。
//這里最后一個值會覆蓋掉同一個鍵前面多個值,采用一個值對應一個map的數組
serviceMap[service.Service.Service] = servcieSlice
}
fmt.Println(serviceMap)
}
//serviceMap為全局的Map,最后遍歷一遍serviceMap,看一下里面的serviceName在當前的currentServiceMap中能否找到。
//如果說找不到,則把serviceMap中這個serviceName服務給刪除掉。
for serviceName := range serviceMap {
//遍歷一遍全局的serviceMap,把不存在的服務刪除掉。
//如果說currentServiceMap不存在serviceName,則把serviceName從serviceMap中移除。
if _, ok := currentServiceMap[serviceName]; !ok {
delete(serviceMap, serviceName)
}
}
time.Sleep(10 * time.Second)
}
}
長輪詢方式監(jiān)控服務
package main
import (
"fmt"
"github.com/hashicorp/consul/api"
)
func main() {
// 創(chuàng)建Consul客戶端配置
config := api.DefaultConfig()
client, err := api.NewClient(config)
if err != nil {
panic(err)
}
// 創(chuàng)建WatchParams
params := &api.QueryOptions{
WaitIndex: 0, // 初始的索引,設置為0表示從最新的變更開始監(jiān)聽
WaitTime: 1000, // 設置長輪詢的等待時間,單位為秒
}
// 循環(huán)監(jiān)聽服務變化
for {
// 查詢Consul客戶端的服務目錄,得到所有的服務名稱
catalog := client.Catalog()
allServices, _, err := catalog.Services(params)
if err != nil {
panic(err)
}
fmt.Println("所有服務的名稱:", allServices)
// 查詢服務健康狀態(tài)
//for serviceName := range allServices {
services, _, err := client.Health().Service("my-service", "", true, params)
if err != nil {
panic(err)
}
for _, service := range services {
fmt.Printf("服務: %s, 端口號: %v\n", service.Service.Service, service.Service.Port)
}
//}
// 更新WaitIndex,以便下次長輪詢從更新后的索引開始
params.WaitIndex = 0 // 使用長輪詢時,將WaitIndex設置為0,以獲取最新的變更
}
}
Watch機制監(jiān)控注冊的服務變化
consul官方提供了Golang版的
watch
包。其實際上也是對watch機制進行了一層封裝,最終代碼實現(xiàn)的還是對consul HTTP API
的 endpoints的使用。 文章開始說過,“在監(jiān)控到數據變化后,還可以調用外部處理程序”。是了,數據變化后調用外部處理程序才是有意義的,Golang的watch包中對應的外部處理程序是一個函數handler。因為業(yè)務的關系,這里只實現(xiàn)了watch對service的變化的監(jiān)控,其主要創(chuàng)建了一個plan
來對整個服務的變化做一個監(jiān)控,以及再為每個服務創(chuàng)建一個plan
,對單個服務變化作監(jiān)控
。話不多說,上代碼:
//Watch機制同步
package main
import (
"fmt"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
"log"
)
var serviceMap map[string][]map[string]Service
type Service struct {
Name string
ID string
Address string
Port int
Tags []string
}
func main() {
serviceMap = make(map[string][]map[string]Service)
// 創(chuàng)建Consul客戶端
config := api.DefaultConfig()
client, err := api.NewClient(config)
if err != nil {
panic(err)
}
// 初始化監(jiān)視器計劃
params := map[string]interface{}{"type": "services"}
plan, err := watch.Parse(params)
if err != nil {
log.Fatal(err)
}
// 設置監(jiān)視器的處理函數
plan.Handler = func(idx uint64, data interface{}) {
services, ok := data.(map[string][]string)
if !ok {
log.Println("Error: Data format unexpected")
return
}
// 重置服務映射
serviceMap = make(map[string][]map[string]Service)
// 在這里阻塞住了
// 遍歷服務列表
for serviceName := range services {
// 查詢服務實例
instances, _, err := client.Health().Service(serviceName, "", true, nil)
if err != nil {
log.Printf("Error retrieving instances for service %s: %v\n", serviceName, err)
continue
}
// 創(chuàng)建服務實例切片
var serviceInstances []map[string]Service
// 遍歷服務實例
for _, instance := range instances {
service := Service{
Name: instance.Service.Service,
ID: instance.Service.ID,
Address: instance.Service.Address,
Port: instance.Service.Port,
Tags: instance.Service.Tags,
}
instanceMap := map[string]Service{instance.Service.ID: service}
serviceInstances = append(serviceInstances, instanceMap)
}
// 更新服務映射
serviceMap[serviceName] = serviceInstances
}
// 輸出服務映射
fmt.Println("Updated Service Map:")
for serviceName, instances := range serviceMap {
fmt.Println("Service:", serviceName)
for _, instance := range instances {
for id, service := range instance {
fmt.Printf("Instance ID: %s, Address: %s, Port: %d, Tags: %v\n", id, service.Address, service.Port, service.Tags)
}
}
}
}
// 啟動監(jiān)視器
plan.Run("http://localhost:8500")
// 保持程序運行,直到手動中斷
select {}
}
長時間沒有響應,則進程結束。
采用goroutinr消息型監(jiān)控服務
package main
import (
"context"
"fmt"
"github.com/hashicorp/consul/api"
)
// 定義服務信息
type Service struct {
Name string
ID string
Address string
Port int
Tags []string
}
// 全部服務信息的字典
var servicesMap = map[string]map[string]Service{}
// 記錄本次目錄存在的服務的字典,布爾型,用于和lastServiceMap進行判斷
var currentServicesMap = map[string]bool{}
// 記錄上次目錄存在的服務的字典,布爾型,用于和currentServicesMap進行判斷
var lastServiceMap = map[string]bool{}
// 取消服務的協(xié)程的字典,存的是服務名和上下文cancel方法的映射,用于刪除指定的goroutine
var withCancelMap = map[string]context.CancelFunc{}
func main() {
//初始化map
currentServicesMap = make(map[string]bool)
lastServiceMap = make(map[string]bool)
servicesMap = make(map[string]map[string]Service)
withCancelMap = make(map[string]context.CancelFunc)
// 創(chuàng)建Consul客戶端
config := api.DefaultConfig()
client, err := api.NewClient(config)
if err != nil {
panic(err)
}
// 創(chuàng)建WatchParams
//params := &api.QueryOptions{
// WaitIndex: 0, // 初始的索引,設置為0表示從最新的變更開始監(jiān)聽
//}
//配置參數放在外面,則達到獲取目錄信息阻塞的效果
queryOptions := &api.QueryOptions{
WaitIndex: 0, // 初始索引
}
for {
catalog := client.Catalog()
// 查詢Consul客戶端的服務目錄,得到所有的服務名稱。
//得到consul上的所有服務services
//目錄發(fā)生變化再更新,不發(fā)生變化則不更新,需要在這里阻塞。
allServices, meta, err := catalog.Services(queryOptions)
fmt.Println(err)
if err != nil {
panic(err)
}
//currentServicesMap記錄allServices出現(xiàn)過的服務
for name := range allServices {
currentServicesMap[name] = true
}
//遍歷一遍上次服務的哈希表lastServiceMap
for lastServiceName := range lastServiceMap {
//如果說上次服務的哈希表中存在這個服務,現(xiàn)在遍歷目錄沒有這個服務
if allServices[lastServiceName] == nil {
//將lastServiceName從當前的currentServicesMap移除
delete(currentServicesMap, lastServiceName)
}
}
//如果沒有一個檢查機制的話,這里相當于一直去讀取目錄,讀完目錄后再不斷啟協(xié)程
//這里就會造成不斷的for死循環(huán),所以需要一個檢查機制,控制目錄的更新。
//當目錄沒發(fā)生更新的時候則阻塞,目錄發(fā)生更新了則進行檢查。
//這樣就確保了你目錄沒更新時,我啟動的go func就一直在監(jiān)聽服務的變化即可
fmt.Println("所有服務services的名稱:", allServices)
//如果說協(xié)程中不啟動go routine則相當于每次遍歷服務時都去創(chuàng)建go routine 導致啟動的go routine數量比較多
//每次目錄發(fā)生變化后,這里就會創(chuàng)建新的協(xié)程,就會導致多了幾個協(xié)程。
//正確的話,這里應該是先阻塞,然后如果說哪個協(xié)程監(jiān)聽到變化,則這個協(xié)程發(fā)消息即可
//其他的協(xié)程不用動,需要編寫一個服務的檢查機制,讓之前啟動的goroutine去監(jiān)控對應的服務即可,其他不變化的go routine不用動。
fmt.Println("lastServiceMap:", lastServiceMap)
fmt.Println("currentServicesMap:", currentServicesMap)
for serviceName := range allServices {
if currentServicesMap[serviceName] && lastServiceMap[serviceName] {
//如果說上次和這次都存在該服務,說明之前已經創(chuàng)建過了,則跳過
fmt.Println("跳過……", serviceName)
continue
} else if currentServicesMap[serviceName] && !lastServiceMap[serviceName] {
//如果說上次不存在,這次存在則說明要進行創(chuàng)建
//如果說目錄多增加了一個服務,則啟動多一個服務。
fmt.Println("啟動協(xié)程……", serviceName)
//使用上下文進行協(xié)程的注銷
ctx, cancel := context.WithCancel(context.Background())
//存儲一個serviceName為鍵、cancel()方法為值的withCancelMap
withCancelMap[serviceName] = cancel
go syncInfo(ctx, serviceName, client, queryOptions)
//range上次的lastServiceMap
}
}
//比較上次目錄的服務信息和這次目錄的服務信息
//執(zhí)行刪除指定go routine的操作
for lastName := range lastServiceMap {
if !currentServicesMap[lastName] && lastServiceMap[lastName] {
//如果說上次該服務存在,這次該服務不存在則說明要進行銷毀
fmt.Println("進入刪除協(xié)程函數……", lastName)
//當走到這里時,執(zhí)行serviceName對應的cancel方法
cacnelService := withCancelMap[lastName]
cacnelService() //取消掉該服務的協(xié)程
fmt.Println("刪除協(xié)程……", lastName)
//由于該服務lastName已經取消了,則從serviceMap中移除掉
delete(servicesMap, lastName)
//由于該服務lastName已經取消了,則從lastServiceMap中移除掉
delete(lastServiceMap, lastName)
//由于該服務lastName已經取消了,則從withCancelMap中移除掉
delete(withCancelMap, lastName)
}
}
// 賦值 currentServicesMap 的值給 lastServiceMap,用于下一次的目錄信息檢查。
for key, value := range currentServicesMap {
lastServiceMap[key] = value
}
//fmt.Println(serviceMap)
// 更新長輪詢參數中的索引
queryOptions.WaitIndex = meta.LastIndex
}
}
/*
將服務的各個實例的信息同步更新到全部服務信息的字典中,對開啟服務的協(xié)程持續(xù)監(jiān)控服務的信息變化。
當服務不存在時,執(zhí)行取消服務所在的協(xié)程的操作。
*/
func syncInfo(ctx context.Context, serviceName string, client *api.Client, params *api.QueryOptions) {
for {
select {
//執(zhí)行ctx綁定cancel
//執(zhí)行cancel時,綁定對應的ctx執(zhí)行Done()方法,取消掉協(xié)程。
case <-ctx.Done(): //會有一點延遲
fmt.Println("協(xié)程取消了……")
return
default:
//params要作為參數傳入函數,這樣
params := &api.QueryOptions{
WaitIndex: 0, // 初始的索引,設置為0表示從最新的變更開始監(jiān)聽
}
fmt.Println("1111111111", serviceName)
services, meta, err := client.Health().Service(serviceName, "", true, params)
fmt.Println("2222222222", serviceName)
if err != nil {
panic(err)
}
//var servcieSlice []map[string]Service
//servcieSlice = make([]map[string]Service, 0)
// 遍歷服務實例集合
//map先定義
var instanceMap map[string]Service
//定義后用make進行創(chuàng)建
instanceMap = make(map[string]Service)
for _, service := range services {
//fmt.Printf("Service %s:%d\n", service.Service.Service, service.Service.Port)
//每一個服務的實例ID對應該服務的信息
instanceMap[service.Service.ID] = Service{
Name: service.Service.Service,
ID: service.Service.ID,
Address: service.Service.Address,
Port: service.Service.Port,
Tags: service.Service.Tags,
}
//fmt.Println(instanceMap)打印出服務中每個實例的信息
servicesMap[service.Service.Service] = instanceMap
}
fmt.Println(instanceMap)
params.WaitIndex = meta.LastIndex // 更新版本,根據和上次的不同進行變化。
}
}
}
參考網址
https://vearne.cc/archives/13983
https://juejin.cn/post/6984378158347157512
https://juejin.cn/post/6883095345623597064
https://zhuanlan.zhihu.com/p/111673886
https://developer.hashicorp.com/consul/api-docs/catalog
看到這里的小伙伴,恭喜你又掌握了一個技能??
希望大家能取得勝利,堅持就是勝利??
我是寸鐵!我們下期再見??
往期好文??
保姆級教程
【保姆級教程】Windows11下go-zero的etcd安裝與初步使用
【保姆級教程】Windows11安裝go-zero代碼生成工具goctl、protoc、go-zero
【Go-Zero】手把手帶你在goland中創(chuàng)建api文件并設置高亮
報錯解決
【Go-Zero】Error: user.api 27:9 syntax error: expected ‘:‘ | ‘IDENT‘ | ‘INT‘, got ‘(‘ 報錯解決方案及api路由注意事項
【Go-Zero】Error: only one service expected goctl一鍵轉換生成rpc服務錯誤解決方案
【Go-Zero】【error】 failed to initialize database, got error Error 1045 (28000):報錯解決方案
【Go-Zero】Error 1045 (28000): Access denied for user ‘root‘@‘localhost‘ (using password: YES)報錯解決方案
【Go-Zero】type mismatch for field “Auth.AccessSecret“, expect “string“, actual “number“報錯解決方案
【Go-Zero】Error: user.api 30:2 syntax error: expected ‘)‘ | ‘KEY‘, got ‘IDENT‘報錯解決方案
【Go-Zero】Windows啟動rpc服務報錯panic:context deadline exceeded解決方案
Go面試向
【Go面試向】defer與time.sleep初探
【Go面試向】defer與return的執(zhí)行順序初探
【Go面試向】Go程序的執(zhí)行順序
【Go面試向】rune和byte類型的認識與使用文章來源:http://www.zghlxwxcb.cn/news/detail-844885.html
【Go面試向】實現(xiàn)map穩(wěn)定的有序遍歷的方式文章來源地址http://www.zghlxwxcb.cn/news/detail-844885.html
到了這里,關于【Consul】基于Golang實現(xiàn)Consul服務的注冊、注銷、修改、監(jiān)控注冊的服務變化、實時同步服務信息機制的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!