国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Strimzi Kafka Bridge(橋接)實戰(zhàn)之二:生產(chǎn)和發(fā)送消息

這篇具有很好參考價值的文章主要介紹了Strimzi Kafka Bridge(橋接)實戰(zhàn)之二:生產(chǎn)和發(fā)送消息。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

歡迎訪問我的GitHub

這里分類和匯總了欣宸的全部原創(chuàng)(含配套源碼):https://github.com/zq2599/blog_demos

本篇概覽

  • 本文是《Strimzi Kafka Bridge(橋接)實戰(zhàn)之》系列的第二篇,咱們直奔bridge的重點:常用接口,用實際操作體驗如何用bridge完成常用的消息收發(fā)業(yè)務(wù)

  • 官方的openapi接口文檔地址 : https://strimzi.io/docs/bridge/in-development/#_openapi

  • 整篇文章由以下內(nèi)容構(gòu)成:

  1. 準(zhǔn)備工作:創(chuàng)建topic
  2. 生產(chǎn)消息
  3. 消費消息,strimzi bridge消費消息的邏輯略有些特殊,就是要提前創(chuàng)建strimzi bridge consumer,再通過consumer來調(diào)用拉取消息的接口
  • 完成本篇實戰(zhàn)后,相信您已經(jīng)可以數(shù)量的通過http來使用kafka的服務(wù)了

準(zhǔn)備工作:創(chuàng)建topic

  • 遺憾的是,bridge未提供創(chuàng)建topic的API,所以咱們還是用命令來創(chuàng)建吧
  • ssh登錄kubernetes的宿主機(jī)
  • 執(zhí)行創(chuàng)建名為bridge-quickstart-topic的topic,共四個分區(qū)
kubectl -n aabbcc \
run kafka-producer \
-ti \
--image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 \
--rm=true \
--restart=Never \
-- bin/kafka-topics.sh \
--bootstrap-server my-cluster-kafka-bootstrap:9092 \
--create \
--topic bridge-quickstart-topic \
--partitions 4 \
--replication-factor 1
  • 檢查topic創(chuàng)建是否成功
kubectl -n aabbcc \
run kafka-producer \
-ti \
--image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 \
--rm=true \
--restart=Never \
-- bin/kafka-topics.sh \
--bootstrap-server my-cluster-kafka-bootstrap:9092 \
--describe \
--topic bridge-quickstart-topic
  • 如下圖,可見topic的創(chuàng)建符合預(yù)期
    Strimzi Kafka Bridge(橋接)實戰(zhàn)之二:生產(chǎn)和發(fā)送消息
  • 接下來的操作都是向bridge發(fā)送http請求完成的,我這邊宿主機(jī)的IP地址是192.168.0.1,bridge的NodePort端口號31331

查看指定topic的詳情

  • 如下請求,可以取得topicbridge-quickstart-topic的詳情
curl -X GET \
  http://192.168.0.1:31331/topics/bridge-quickstart-topic
  • 收到響應(yīng)如下,是這個topic的詳細(xì)信息
{
	"name": "bridge-quickstart-topic",
	"configs": {
		"compression.type": "producer",
		"leader.replication.throttled.replicas": "",
		"message.downconversion.enable": "true",
		"min.insync.replicas": "1",
		"segment.jitter.ms": "0",
		"cleanup.policy": "delete",
		"flush.ms": "9223372036854775807",
		"follower.replication.throttled.replicas": "",
		"segment.bytes": "1073741824",
		"retention.ms": "604800000",
		"flush.messages": "9223372036854775807",
		"message.format.version": "3.0-IV1",
		"max.compaction.lag.ms": "9223372036854775807",
		"file.delete.delay.ms": "60000",
		"max.message.bytes": "1048588",
		"min.compaction.lag.ms": "0",
		"message.timestamp.type": "CreateTime",
		"preallocate": "false",
		"min.cleanable.dirty.ratio": "0.5",
		"index.interval.bytes": "4096",
		"unclean.leader.election.enable": "false",
		"retention.bytes": "-1",
		"delete.retention.ms": "86400000",
		"segment.ms": "604800000",
		"message.timestamp.difference.max.ms": "9223372036854775807",
		"segment.index.bytes": "10485760"
	},
	"partitions": [
		{
			"partition": 0,
			"leader": 0,
			"replicas": [
				{
					"broker": 0,
					"leader": true,
					"in_sync": true
				}
			]
		},
		{
			"partition": 1,
			"leader": 0,
			"replicas": [
				{
					"broker": 0,
					"leader": true,
					"in_sync": true
				}
			]
		},
		{
			"partition": 2,
			"leader": 0,
			"replicas": [
				{
					"broker": 0,
					"leader": true,
					"in_sync": true
				}
			]
		},
		{
			"partition": 3,
			"leader": 0,
			"replicas": [
				{
					"broker": 0,
					"leader": true,
					"in_sync": true
				}
			]
		}
	]
}

批量生產(chǎn)消息(同步)

  • 試試bridge提供的批量生產(chǎn)消息的API,以下命令會生產(chǎn)了三條消息,第一條通過key的hash值確定分區(qū),第二條用partition參數(shù)明確指定了分區(qū)是2,第三條的分區(qū)是按照輪詢策略更新的
curl -X POST \
  http://42.193.162.141:31331/topics/bridge-quickstart-topic \
  -H 'content-type: application/vnd.kafka.json.v2+json' \
  -d '{
    "records": [
        {
            "key": "my-key",
            "value": "sales-lead-0001"
        },
        {
            "value": "sales-lead-0002",
            "partition": 2
        },
        {
            "value": "sales-lead-0003"
        }
    ]
}'
  • bridge響應(yīng)如下,會返回每一條消息的partition和offset,這就是同步消息的特點,等到meta信息更新完畢后才會返回
{
	"offsets": [{
		"partition": 0,
		"offset": 0
	}, {
		"partition": 2,
		"offset": 0
	}, {
		"partition": 3,
		"offset": 0
	}]
}

批量生產(chǎn)消息(異步)

  • 有的場景下,例如追求高QPS并且對返回的meta信息不關(guān)注,可以考慮異步的方式發(fā)送消息,也就是說bridge收到響應(yīng)后立即返回200,這種異步模式和前面的同步模式只有一個參數(shù)的差別:在請求url中增加async=true即可
curl -X POST \
  http://42.193.162.141:31331/topics/bridge-quickstart-topic?async=true \
  -H 'content-type: application/vnd.kafka.json.v2+json' \
  -d '{
    "records": [
        {
            "key": "my-key",
            "value": "sales-lead-0001"
        },
        {
            "value": "sales-lead-0002",
            "partition": 2
        },
        {
            "value": "sales-lead-0003"
        }
    ]
}'
  • 沒有響應(yīng)body,請您自行請求感受一下,響應(yīng)明顯比同步模式快

查看partition

  • 查看tipic的parition情況
curl -X GET \
  http://42.193.162.141:31331/topics/bridge-quickstart-topic/partitions
  • 響應(yīng)
[{
	"partition": 0,
	"leader": 0,
	"replicas": [{
		"broker": 0,
		"leader": true,
		"in_sync": true
	}]
}, {
	"partition": 1,
	"leader": 0,
	"replicas": [{
		"broker": 0,
		"leader": true,
		"in_sync": true
	}]
}, {
	"partition": 2,
	"leader": 0,
	"replicas": [{
		"broker": 0,
		"leader": true,
		"in_sync": true
	}]
}, {
	"partition": 3,
	"leader": 0,
	"replicas": [{
		"broker": 0,
		"leader": true,
		"in_sync": true
	}]
}]
  • 查看指定partition
curl -X GET \
  http://42.193.162.141:31331/topics/bridge-quickstart-topic/partitions/0
  • 響應(yīng)
{
	"partition": 0,
	"leader": 0,
	"replicas": [{
		"broker": 0,
		"leader": true,
		"in_sync": true
	}]
}
  • 查看指定partition的offset情況
curl -X GET \
  http://42.193.162.141:31331/topics/bridge-quickstart-topic/partitions/0/offsets
  • 響應(yīng)
{
	"beginning_offset": 0,
	"end_offset": 5
}

創(chuàng)建bridge consumer

  • 通過bridge消費消息,有個特別且重要的前提:創(chuàng)建bridge consumer,只有先創(chuàng)建了bridge consumer,才能順利從kafka的broker取到消息
  • 以下命令創(chuàng)建了一個bridge consumer,各參數(shù)的含義稍后會說明
curl -X POST http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group \
  -H 'content-type: application/vnd.kafka.v2+json' \
  -d '{
    "name": "bridge-quickstart-consumer",
    "auto.offset.reset": "earliest",
    "format": "json",
    "enable.auto.commit": false,
    "fetch.min.bytes": 16,
    "consumer.request.timeout.ms": 300000
  }'
  • 上述請求的參數(shù)解釋:
  1. 對應(yīng)kafka的group為bridge-quickstart-consumer-group
  2. 此bridge consumer的name等于bridge-quickstart-consumer
  3. 參數(shù)enable.auto.commit表示是否自動提交offset,這里設(shè)置成false,表示無需自動提交,后面的操作中會調(diào)用API請求來更新offset
  4. 參數(shù)fetch.min.bytes要特別注意,其值等于16,表示唯有消息內(nèi)容攢夠了16字節(jié),拉取消息的請求才能獲取到消息,如果消息內(nèi)容長度不到16字節(jié),收到的響應(yīng)body就是空
  5. 參數(shù)consumer.request.timeout.ms也要注意,這里我設(shè)置了300秒,如果超過300秒沒有去拉取消息,這個消費者就會被kafka移除(被移除后如果再去拉取消息,kafka會報錯:Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the grou)
  • 收到響應(yīng)如下,instance_id表示這個bridge consumer的身份id,base_uri則是訂閱消息時必須使用的請求地址
{
	"instance_id": "bridge-quickstart-consumer",
	"base_uri": "http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer"
}

如何刪除bridge consumer

  • 以下命令可以刪除consumer,重點是將身份id放入path中
curl -X DELETE http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer

訂閱指定topic的消息

  • 創(chuàng)建bridge consumer成功后,接下來就能以這個consumer的身份去訂閱kafka消息了
  • 執(zhí)行以下命令可以訂閱topic為bridge-quickstart-topic的kafka消息,注意請求地址就是前面創(chuàng)建bridge consumer時返回的base_uri字段
curl -X POST http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/subscription \
  -H 'content-type: application/vnd.kafka.v2+json' \
  -d '{
    "topics": [
        "bridge-quickstart-topic"
    ]
}'
  • 從上述請求body可以看出,此請求可以一次訂閱多個topic,而且還可以使用topic_pattern(正則表達(dá)式)的形式來一次訂閱多個topic
  • 訂閱完成后,接下來就能主動拉取消息了

拉取消息

  • 在拉取消息之前,請確保已經(jīng)提前生產(chǎn)了消息
  • 執(zhí)行以下命令拉取一條消息
curl -X GET http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \
  -H 'accept: application/vnd.kafka.json.v2+json'
  • 然而,當(dāng)您執(zhí)行了上述命令后,會發(fā)現(xiàn)返回body為空,別擔(dān)心,這是正常的現(xiàn)象,按照官方的說法,拉取到的第一條消息就是空的,這是因為拉取操作出觸發(fā)了rebalancing邏輯(rebalancing是kafka的概覽,是處理多個partition消費的操作),再次執(zhí)行上述命令去拉取消息,這下正常了,body如下
[
	{
		"topic": "bridge-quickstart-topic",
		"key": "my-key",
		"value": "sales-lead-0001",
		"partition": 0,
		"offset": 0
	}, {
		"topic": "bridge-quickstart-topic",
		"key": "my-key",
		"value": "sales-lead-0001",
		"partition": 0,
		"offset": 1
	}
]

提交offset

  • 前面在創(chuàng)建bridge consumer的時候,參數(shù)enable.auto.commit的值等于fasle,表示由調(diào)用方主動提交offset到kafka,因此在拉取到消息之后,需要手動更新kafka consumer的offset
curl -X POST http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/offsets
  • 該請求無返回body,只要返回碼是204就表示成功

設(shè)定offset

  • 試想這樣的場景:共生產(chǎn)了100條消息,消費者也已經(jīng)將這100條全部消費完畢,現(xiàn)在由于某種原因,需要從91條開始,重新消費91-100這10條消息(例如需要重新計算),此時可以主動設(shè)定offset
  • 先執(zhí)行以下命令,生產(chǎn)一條消息
curl -X POST \
  http://42.193.162.141:31331/topics/bridge-quickstart-topic \
  -H 'content-type: application/vnd.kafka.json.v2+json' \
  -d '{
    "records": [
        {
            "value": "sales-lead-a002-01234567890123456789",
            "partition": 2
        }
    ]
}'
  • 如下圖紅色箭頭,可見當(dāng)前partition已經(jīng)生產(chǎn)了75條消息了
    Strimzi Kafka Bridge(橋接)實戰(zhàn)之二:生產(chǎn)和發(fā)送消息
  • 咱們先拉取消息,將消息都消費掉
    Strimzi Kafka Bridge(橋接)實戰(zhàn)之二:生產(chǎn)和發(fā)送消息
  • 由于沒有新生產(chǎn)消息,此時再拉去應(yīng)該拉取不到了
  • 現(xiàn)在執(zhí)行以下請求,就可以將offset設(shè)置到74
curl -X POST http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions \
  -H 'content-type: application/vnd.kafka.v2+json' \
  -d '{
    "offsets": [
        {
            "topic": "bridge-quickstart-topic",
            "partition": 2,
            "offset": 74
        }
    ]
}'
  • 再次拉取消息,發(fā)現(xiàn)74和之后的所有消息都可以拉去到了(注意,包含了74)
    Strimzi Kafka Bridge(橋接)實戰(zhàn)之二:生產(chǎn)和發(fā)送消息
  • 至此,咱們對生產(chǎn)和發(fā)送消息的常用接口都已經(jīng)操作了一遍,對于常規(guī)的業(yè)務(wù)場景已經(jīng)夠用,接下來的文章,咱們以此為基礎(chǔ),玩出更多花樣來

歡迎關(guān)注博客園:程序員欣宸

學(xué)習(xí)路上,你不孤單,欣宸原創(chuàng)一路相伴...文章來源地址http://www.zghlxwxcb.cn/news/detail-712033.html

到了這里,關(guān)于Strimzi Kafka Bridge(橋接)實戰(zhàn)之二:生產(chǎn)和發(fā)送消息的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • kafka入門(五):kafka生產(chǎn)者發(fā)送消息

    構(gòu)建消息,即創(chuàng)建 ProduceRecord 對象。 (1) kafka發(fā)送消息,最常見的構(gòu)造方法是: topic 表示主題, value 表示值。 (2) kafka發(fā)送消息指定key,ProducerRecord 的 key ,既可以作為消息的唯一id,也可以用來決定消息該被寫到主題的哪個分區(qū)。擁有相同key 的消息,將被寫到同一個分區(qū)。

    2024年01月17日
    瀏覽(41)
  • 分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的方式

    分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的方式

    不管是把Kafka作為消息隊列、消息總線還是數(shù)據(jù)存儲平臺,總是需要一個可以往Kafka寫入數(shù)據(jù)的生產(chǎn)者、一個可以從Kafka讀取數(shù)據(jù)的消費者,或者一個兼具兩種角色的應(yīng)用程序。 Kafka 生產(chǎn)者是指使用 Apache Kafka 消息系統(tǒng)的應(yīng)用程序,它們負(fù)責(zé)將消息發(fā)送到 Kafka 集群中的一個或多

    2024年02月13日
    瀏覽(29)
  • 分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的分區(qū)策略

    分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的分區(qū)策略

    01. Kafka 分區(qū)的作用 分區(qū)的作用就是提供負(fù)載均衡的能力,或者說對數(shù)據(jù)進(jìn)行分區(qū)的主要原因,就是為了實現(xiàn)系統(tǒng)的高伸縮性。不同的分區(qū)能夠被放置到不同節(jié)點的機(jī)器上,而數(shù)據(jù)的讀寫操作也都是針對分區(qū)這個粒度而進(jìn)行的,這樣每個節(jié)點的機(jī)器都能獨立地執(zhí)行各自分區(qū)的

    2024年02月13日
    瀏覽(32)
  • 多圖詳解 kafka 生產(chǎn)者消息發(fā)送過程

    多圖詳解 kafka 生產(chǎn)者消息發(fā)送過程

    生產(chǎn)者客戶端代碼 KafkaProducer 通過解析 producer.propeties 文件里面的屬性來構(gòu)造自己。例如 :分區(qū)器、Key 和 Value 序列化器、攔截器、 RecordAccumulator消息累加器 、 元信息更新器 、啟動發(fā)送請求的后臺線程 生產(chǎn)者元信息更新器 我們之前有講過. 客戶端都會保存集群的元信息,例如

    2023年04月09日
    瀏覽(31)
  • 分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式

    分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式

    不管是把Kafka作為消息隊列、消息總線還是數(shù)據(jù)存儲平臺,總是需要一個可以往Kafka寫入數(shù)據(jù)的生產(chǎn)者、一個可以從Kafka讀取數(shù)據(jù)的消費者,或者一個兼具兩種角色的應(yīng)用程序。 Kafka 生產(chǎn)者是指使用 Apache Kafka 消息系統(tǒng)的應(yīng)用程序,它們負(fù)責(zé)將消息發(fā)送到 Kafka 集群中的一個或多

    2024年02月13日
    瀏覽(28)
  • 07、Kafka ------ 消息生產(chǎn)者(演示 發(fā)送消息) 和 消息消費者(演示 監(jiān)聽消息)

    07、Kafka ------ 消息生產(chǎn)者(演示 發(fā)送消息) 和 消息消費者(演示 監(jiān)聽消息)

    簡單來說,就是一個數(shù)據(jù)項。 ▲ 消息就是 Kafka 所記錄的數(shù)據(jù)節(jié)點,消息在 Kafka 中又被稱為記錄(record)或事件(event)。 從存儲上來看,消息就是存儲在分區(qū)文件(有點類似于List)中的一個數(shù)據(jù)項,消息具有 key、value、時間戳 和 可選的元數(shù)據(jù)頭。 ▲ 下面是一個示例事件

    2024年01月20日
    瀏覽(46)
  • Kafka 入門到起飛系列 - 生產(chǎn)者發(fā)送消息流程解析

    Kafka 入門到起飛系列 - 生產(chǎn)者發(fā)送消息流程解析

    生產(chǎn)者通過 producerRecord 對象封裝消息主題、消息的value(內(nèi)容)、timestamp(時間戳)等 生產(chǎn)者通過 send() 方法發(fā)送消息,send()方法會經(jīng)過如下幾步 1. 首先將消息交給 攔截器(Interceptor) 處理, 攔截器對生產(chǎn)者而言,對所有消息都是生效的,攔截器也支持鏈?zhǔn)骄幊蹋ㄘ?zé)任器鏈)的

    2024年02月16日
    瀏覽(24)
  • kafka服務(wù)端允許生產(chǎn)者發(fā)送最大消息體大小

    ????????server.properties中加上的message.max.bytes配置,我目前設(shè)置為5242880,即5MB,可以根據(jù)實際情況增大。 ????????在生產(chǎn)者端配置max.request.size,這是單個消息最大字節(jié)數(shù),根據(jù)實際調(diào)整,max.request.size 必須小于 message.max.bytes 以及消費者的 max.partition.fetch.bytes。這樣消息

    2024年02月15日
    瀏覽(24)
  • 【注意】Kafka生產(chǎn)者異步發(fā)送消息仍有可能阻塞

    Kafka是常用的消息中間件。在Spring Boot項目中,使用KafkaTemplate作為生產(chǎn)者發(fā)送消息。有時,為了不影響主業(yè)務(wù)流程,會采用 異步 發(fā)送的方式,如下所示。 本以為采用異步發(fā)送,必然不會影響到主業(yè)務(wù)流程。但實際使用時發(fā)現(xiàn),在第一次發(fā)送消息時,如果Kafka Broker連接失敗,

    2023年04月13日
    瀏覽(26)
  • Kafka中的生產(chǎn)者如何處理消息發(fā)送失敗的情況?

    在Kafka中,生產(chǎn)者可以通過以下方式處理消息發(fā)送失敗的情況: 同步發(fā)送模式(Sync Mode):在同步發(fā)送模式下,生產(chǎn)者發(fā)送消息后會阻塞等待服務(wù)器的響應(yīng)。如果發(fā)送失敗,生產(chǎn)者會拋出異常(例如 ProducerRecord 發(fā)送異常)或返回錯誤信息。開發(fā)者可以捕獲異常并根據(jù)需要進(jìn)行

    2024年02月06日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包