国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Spring Kafka生產(chǎn)者實現(xiàn)

這篇具有很好參考價值的文章主要介紹了Spring Kafka生產(chǎn)者實現(xiàn)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

需求

我們需要通過Spring Kafka庫,將消息推送給Kafka的topic中。這里假設Kafka的集群和用戶我們都有了。這里Kafka認證采取SASL_PLAINTEXT方式接入,SASL 采用 SCRAM-SHA-256 方式加解密。

pom.xml

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

我這里不需要寫版本號,應為我使用的Spring Boot。Spring Boot會自動幫我挑選spring-kafka應該使用哪個版本合適。

application.yml

spring:
  kafka:
    producer:
	  # kafka集群地址
      bootstrap-servers: xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092
	  client-id: producer-dev
      # SASL_PLAINTEXT 接入方式
      security:
        protocol: SASL_PLAINTEXT
      # 序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        # SASL 采用 SCRAM-SHA-256 方式
        sasl:
          mechanism: SCRAM-SHA-256
    # jaas配置
    jaas:
      options:
        username: kafkauser
        password: kafkapwd
      enabled: true
      login-module: org.apache.kafka.common.security.scram.ScramLoginModule
      control-flag: required

以上,是關于Spring Kafka的全部配置。下面摘要出來的配置,是可以單獨配置在配置中心的:

topic:
  # 接收消息的主題配置
  save: hello_kafka_topic
spring:
  kafka:
    producer:
      client-id: producer-dev
      # kafka集群地址
      bootstrap-servers: xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092
    # jaas配置
    jaas:
      options:
        username: kafkauser
        password: kafkapwd

Java

KafkaProducerService.java


public interface KafkaProducerService {

    /**
     * 轉(zhuǎn)發(fā)消息到kafka
     */
    void sendToKafka(String msg);

}

KafkaProducerServiceImpl.java



import cn.com.xxx.service.KafkaProducerService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.kafka.core.KafkaProducerException;
import org.springframework.kafka.core.KafkaSendCallback;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

import javax.annotation.Resource;

/**
 * 轉(zhuǎn)發(fā)消息到kafka
 */
@RefreshScope
@Slf4j
@Service
public class KafkaProducerServiceImpl implements KafkaProducerService {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * kafka接收消息的主題
     */
    @Value("${topic.save}")
    private String topic;


    @Override
    public void sendToKafka(String msg) {
        log.info(String.format("$$$$ => Producing message: %s", msg));

        ProducerRecord<String, String> recordKafka = new ProducerRecord<>(topic, msg);

        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(recordKafka);
        future.addCallback(new KafkaSendCallback<String, String>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("成功發(fā)消息:{}給Kafka:{}", msg, result);
            }

            @Override
            public void onFailure(KafkaProducerException ex) {
                log.error("發(fā)消息:{}給Kafka:{}", msg, recordKafka, ex);
            }
        });
    }
}

到這里為止Spring Kafka生產(chǎn)者所有配置就都可以了。這里使用的異步監(jiān)聽kafka回調(diào)的方式發(fā)送消息。

總結(jié)

這里使用Spring Kafka庫異回調(diào)步給Kafka消息。這里使用的Spring Kafka庫是老版本,所以,這里的使用的回調(diào)類是ListenableFuture類。文章來源地址http://www.zghlxwxcb.cn/news/detail-714816.html

參考:

  • Spring for Apache Kafka2.8.3
  • Spring for Apache Kafka

到了這里,關于Spring Kafka生產(chǎn)者實現(xiàn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • Kafka生產(chǎn)者原理 kafka生產(chǎn)者發(fā)送流程 kafka消息發(fā)送到集群步驟 kafka如何發(fā)送消息 kafka詳解

    Kafka生產(chǎn)者原理 kafka生產(chǎn)者發(fā)送流程 kafka消息發(fā)送到集群步驟 kafka如何發(fā)送消息 kafka詳解

    kafka尚硅谷視頻: 10_尚硅谷_Kafka_生產(chǎn)者_原理_嗶哩嗶哩_bilibili ? ???? 1. producer初始化:加載默認配置,以及配置的參數(shù),開啟網(wǎng)絡線程 ???? 2. 攔截器攔截 ???? 3. 序列化器進行消息key, value序列化 ???? 4. 進行分區(qū) ???? 5. kafka broker集群 獲取metaData ???? 6. 消息緩存到

    2024年02月11日
    瀏覽(21)
  • 三、Kafka生產(chǎn)者1---Kafka生產(chǎn)者初始化-new KafkaProducer

    概述 本文主要是分享Kafka初始化生產(chǎn)者的 大體過程 初始化過程中會新建很多對象,本文暫先分享部分對象 1.分區(qū)器---Partitioner partitioner 2.重試時間---long retryBackoffMs 3.序列化器---SerializerK keySerializer,SerializerV valueSerializer 4.攔截器--- ListProducerInterceptorK, V interceptorList 5.累加器-

    2024年03月14日
    瀏覽(37)
  • Apache Kafka - 重識Kafka生產(chǎn)者

    Apache Kafka - 重識Kafka生產(chǎn)者

    Kafka 生產(chǎn)者是 Apache Kafka 中的一個重要組件,它負責將數(shù)據(jù)發(fā)送到 Kafka 集群中。在實時數(shù)據(jù)處理和流式處理應用程序中,Kafka 生產(chǎn)者扮演著非常重要的角色。 這里我們將介紹 Kafka 生產(chǎn)者的概念、工作原理以及如何使用 Kafka 生產(chǎn)者。 Kafka 生產(chǎn)者是一種用于將數(shù)據(jù)發(fā)送到 Kafk

    2024年02月05日
    瀏覽(25)
  • [kafka消息生產(chǎn)被阻塞] - 如何解決Kafka生產(chǎn)者阻塞的問題

    [kafka消息生產(chǎn)被阻塞] - 如何解決Kafka生產(chǎn)者阻塞的問題 Kafka是一個高度可擴展的分布式流平臺,用于構(gòu)建實時數(shù)據(jù)管道和流處理應用程序。作為一個廣泛使用的消息代理系統(tǒng),Kafka在數(shù)據(jù)傳輸方面表現(xiàn)出色,但是在極端情況下,它可能會出現(xiàn)生產(chǎn)者阻塞的問題。這可能會導致

    2024年02月11日
    瀏覽(21)
  • kafka入門(五):kafka生產(chǎn)者發(fā)送消息

    構(gòu)建消息,即創(chuàng)建 ProduceRecord 對象。 (1) kafka發(fā)送消息,最常見的構(gòu)造方法是: topic 表示主題, value 表示值。 (2) kafka發(fā)送消息指定key,ProducerRecord 的 key ,既可以作為消息的唯一id,也可以用來決定消息該被寫到主題的哪個分區(qū)。擁有相同key 的消息,將被寫到同一個分區(qū)。

    2024年01月17日
    瀏覽(41)
  • kafka學習-生產(chǎn)者

    kafka學習-生產(chǎn)者

    目錄 1、消息生產(chǎn)流程 2、生產(chǎn)者常見參數(shù)配置 3、序列化器 基本概念 自定義序列化器 4、分區(qū)器 默認分區(qū)規(guī)則 自定義分區(qū)器 5、生產(chǎn)者攔截器 作用 自定義攔截器 6、生產(chǎn)者原理解析 在Kafka中保存的數(shù)據(jù)都是字節(jié)數(shù)組。 消息發(fā)送前,需要將消息序列化為字節(jié)數(shù)組進行發(fā)送。

    2024年02月09日
    瀏覽(26)
  • Kafka-生產(chǎn)者

    Kafka-生產(chǎn)者

    Kafka在實際應用中,經(jīng)常被用作高性能、可擴展的消息中間件。 Kafka自定義了一套網(wǎng)絡協(xié)議,只要遵守這套協(xié)議的格式,就可以向Kafka發(fā)送消息,也可以從Kafka中拉取消息。 在實踐生產(chǎn)過程中,一套API封裝良好、靈活易用的客戶端可以避免開發(fā)人員重復勞動,提高開發(fā)效率,也

    2024年01月20日
    瀏覽(22)
  • (三)Kafka 生產(chǎn)者

    (三)Kafka 生產(chǎn)者

    創(chuàng)建一個 ProducerRecord 對象,需要包含目標主題和要發(fā)送的內(nèi)容,還可以指定鍵、分區(qū)、時間戳或標頭。 在發(fā)送 ProducerRecord 對象時,生產(chǎn)者需要先把鍵和值對象序列化成字節(jié)數(shù)組,這樣才能在網(wǎng)絡上傳輸。 如果沒有顯式地指定分區(qū),那么數(shù)據(jù)將被傳給分區(qū)器。分區(qū)器通常會基

    2024年02月09日
    瀏覽(21)
  • 三、Kafka生產(chǎn)者

    三、Kafka生產(chǎn)者

    1 發(fā)送原理 在消息發(fā)送的過程中,涉及到了兩個線程——main 線程和 Sender 線程。在 main 線程中創(chuàng)建了一個雙端隊列 RecordAccumulator。main 線程將消息發(fā)送給 RecordAccumulator,Sender 線程不斷從 RecordAccumulator 中拉取消息發(fā)送到 Kafka Broker 【RecordAccumulator緩沖的結(jié)構(gòu): 每一個分區(qū)對應一

    2024年02月12日
    瀏覽(21)
  • Kafka(生產(chǎn)者)

    Kafka(生產(chǎn)者)

    目 前 企 業(yè) 中 比 較 常 見 的 消 息 隊 列 產(chǎn) 品 主 要 有 Kafka(在大數(shù)據(jù)場景主要采用 Kafka 作為消息隊列。) ActiveMQ RabbitMQ RocketMQ 1.1.1 傳統(tǒng)消息隊列的應用場景 傳統(tǒng)的消息隊列的主要應用場景包括: 緩存/消峰 、 解耦 和 異步通信 。 緩沖/消峰: 有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過

    2024年02月11日
    瀏覽(27)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包