国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka3.1部署和Topic主題數(shù)據(jù)生產(chǎn)與消費(fèi)

這篇具有很好參考價(jià)值的文章主要介紹了Kafka3.1部署和Topic主題數(shù)據(jù)生產(chǎn)與消費(fèi)。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。


前言

本章節(jié)主要講述Kafka3.1X版本在Windows11主機(jī)下部署以及JAVA對Kafka應(yīng)用:

一、Kafka3.1X版本在Windows11主機(jī)部署

1.安裝JDK配置環(huán)境變量

2.Zookeeper(zookeeper-3.7.1)
zk
部署后的目錄位置:D:\setup\apache-zookeeper-3.7.1

3.安裝Kafka3.1X
3.1 下載包(kafka_2.12-3.1.2.tgz)
Kafka
Kafka3.1部署和Topic主題數(shù)據(jù)生產(chǎn)與消費(fèi),消息隊(duì)列,kafka,kafka-clients,kafka部署,Window環(huán)境,kafka-clients讀取
3.2、 解壓并進(jìn)入Kafka目錄:
根目錄:D:\setup\kafka3.1.2

3、 編輯config/server.properties文件
注意 log.dirs=D:\setup\kafka3.1.2\logs 為根目錄下的\logs

listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://localhost:9092
log.dirs=D:\\setup\\kafka3.1.2\\logs

4.運(yùn)行Zookeeper
Zookeeper安裝目錄D:\setup\apache-zookeeper-3.7.1\bin,按下Shift+右鍵,選擇“打開命令窗口”選項(xiàng),打開命令行

  .\zkServer.cmd;

Kafka3.1部署和Topic主題數(shù)據(jù)生產(chǎn)與消費(fèi),消息隊(duì)列,kafka,kafka-clients,kafka部署,Window環(huán)境,kafka-clients讀取
5.運(yùn)行Kafka
Kafka安裝目錄D:\setup\kafka3.1.2,按下Shift+右鍵,選擇“打開命令窗口”選項(xiàng),打開命令行

.\bin\windows\kafka-server-start.bat .\config\server.properties

Kafka3.1部署和Topic主題數(shù)據(jù)生產(chǎn)與消費(fèi),消息隊(duì)列,kafka,kafka-clients,kafka部署,Window環(huán)境,kafka-clients讀取

二、Kafk生產(chǎn)Topic主題數(shù)據(jù)

1.kafka生產(chǎn)數(shù)據(jù)

創(chuàng)建Topic主題heima

.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --create --topic heima --partitions 2 --replication-factor 1
Created topic heima.

查看Topic主題heima

.\bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092  --topic heima

Kafka3.1部署和Topic主題數(shù)據(jù)生產(chǎn)與消費(fèi),消息隊(duì)列,kafka,kafka-clients,kafka部署,Window環(huán)境,kafka-clients讀取
Topic主題heima生產(chǎn)數(shù)據(jù)

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic heima

在 > 符號(hào)后輸入數(shù)據(jù):

{"mobilePhone":"186xxxx1234","roleCode":"super_admin_xxx"}

Kafka3.1部署和Topic主題數(shù)據(jù)生產(chǎn)與消費(fèi),消息隊(duì)列,kafka,kafka-clients,kafka部署,Window環(huán)境,kafka-clients讀取

2.JAVA kafka客戶端消費(fèi)數(shù)據(jù)

2.1 pom.xml文件配置kafka客戶端-kafka-clients-2.0.1版本

        <!-- kafka客戶端 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.1</version>
        </dependency>

2.2 JAVA數(shù)據(jù)讀取文件

package com.ems.mgr.web.controller.thirdparty;
import com.alibaba.fastjson.JSONObject;
import com.ems.mgr.common.utils.spring.SpringUtils;
import com.ems.mgr.system.service.ISysUserService;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * Kafka服務(wù)器操作與數(shù)據(jù)讀取
 */
public class KafkaUtilDemo {
    public static final Logger log = LoggerFactory.getLogger(KafkaUtilDemo.class);
    public static final Properties props = new Properties();
//    protected ISysUserService userService = SpringUtils.getBean(ISysUserService.class);

    public static void init(String kafakservers) {
        // 配置Kafka消費(fèi)者屬性
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafakservers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    }

    /**
     * 持續(xù)監(jiān)聽并處理kafa消息,當(dāng)手機(jī)號(hào)mobilePhone非空時(shí)進(jìn)入數(shù)據(jù)同步操作
     * @param kafaktopic
     * @return
     */
    public static String poll(String kafaktopic) {
        String msg = "";
        try {
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList(kafaktopic));
            log.info("Kafka消費(fèi)者訂閱指定主題,持續(xù)監(jiān)聽并處理消息");
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(60000));
                for (ConsumerRecord<String, String> record : records) {
                    log.info("offset = " + record.offset() + ",key = " + record.key() + ",value = " + record.value());
                    msg = record.value();
                    if (!StringUtils.isBlank(record.value())) {
                        JSONObject jsonObject = JSONObject.parseObject(record.value());
                        String mobilePhone = jsonObject.getString("mobilePhone");
                        if (StringUtils.isBlank(mobilePhone)) {
                            log.error("Kafka消費(fèi)者手機(jī)號(hào)mobilePhone為空");
                        } else {
                            KafkaUtilDemo kafkaUtil = new KafkaUtilDemo();
                            kafkaUtil.syncSystemInfoTask(jsonObject);
                        }
                    }
                }
            }
        } catch (Exception e) {
            log.error("Kafka消費(fèi)者訂閱指定主題,持續(xù)監(jiān)聽并處理消息 error msg=" + e.getMessage());
        }
        return msg;
    }

    public boolean syncSystemInfoTask(JSONObject jsonObject) {
        boolean repsBln = true;
        try {
            String mobilePhone = jsonObject.getString("mobilePhone");
            String roleType = jsonObject.getString("roleType");
            String roleCode = jsonObject.getString("roleCode");
            log.info("業(yè)務(wù)數(shù)據(jù)同步操作................");
        } catch (Exception e) {
            repsBln = false;
            log.error("Kafka消費(fèi)者同步入庫異常,error msg=" + e.getMessage());
        }
        return repsBln;
    }

    public static void main(String[] args) {
        try {
            String kafakservers = "localhost:9092";
            String kafaktopic = "heima";
            init(kafakservers);
            poll(kafaktopic);
        } catch (Exception e) {
            log.error("error msg=" + e.getMessage());
        }
    }

}

3 執(zhí)行KafkaUtilDemo 文件,查看消費(fèi)數(shù)據(jù)。
Kafka3.1部署和Topic主題數(shù)據(jù)生產(chǎn)與消費(fèi),消息隊(duì)列,kafka,kafka-clients,kafka部署,Window環(huán)境,kafka-clients讀取

總結(jié)

pom.xml文件在引入spring-kafka 會(huì)由于版本問題出現(xiàn)


org.apache.kafka
kafka-clients
2.0.1
文章來源地址http://www.zghlxwxcb.cn/news/detail-701679.html

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.2.8.RELEASE</version>
    </dependency>

到了這里,關(guān)于Kafka3.1部署和Topic主題數(shù)據(jù)生產(chǎn)與消費(fèi)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • kafka如何動(dòng)態(tài)消費(fèi)新增topic主題

    kafka如何動(dòng)態(tài)消費(fèi)新增topic主題

    一、解決痛點(diǎn) 使用spring-kafka客戶端,每次新增topic主題,都需要硬編碼客戶端并重新發(fā)布服務(wù),操作麻煩耗時(shí)長。kafkaListener雖可以支持通配符消費(fèi)topic,缺點(diǎn)是并發(fā)數(shù)需要手動(dòng)改并且重啟服務(wù) 。對于業(yè)務(wù)邏輯相似場景,創(chuàng)建新主題動(dòng)態(tài)監(jiān)聽可以用kafka-batch-starter組件 二、組件

    2023年04月21日
    瀏覽(34)
  • SpringBoot 2.2.5 整合RabbitMQ,實(shí)現(xiàn)Topic主題模式的消息發(fā)送及消費(fèi)

    1、simple簡單模式 消息產(chǎn)生著§將消息放入隊(duì)列 消息的消費(fèi)者(consumer) 監(jiān)聽(while) 消息隊(duì)列,如果隊(duì)列中有消息,就消費(fèi)掉,消息被拿走后,自動(dòng)從隊(duì)列中刪除(隱患 消息可能沒有被消費(fèi)者正確處理,已經(jīng)從隊(duì)列中消失了,造成消息的丟失)應(yīng)用場景:聊天(中間有一個(gè)過度的服務(wù)器;p端,c端

    2024年02月02日
    瀏覽(26)
  • Linux安裝Kafka,創(chuàng)建topic、生產(chǎn)者、消費(fèi)者

    Linux安裝Kafka,創(chuàng)建topic、生產(chǎn)者、消費(fèi)者

    1.創(chuàng)建安裝目錄/usr/local/kafka mkdir /usr/local/kafka 2.進(jìn)入安裝包目錄 cd?/usr/local/kafka? 3.下載安裝包 wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz 4.解壓安裝包 tar -zxvf kafka_2.12-3.3.1.tgz 5.進(jìn)入cd kafka_2.12-3.3.1目錄 cd kafka_2.12-3.3.1/ 6.修改zookeeper配置 cat ./config/zookeeper.properties | grep

    2023年04月17日
    瀏覽(29)
  • 消息隊(duì)列Pulsar入門(一) 生產(chǎn)者/消費(fèi)者/Topic詳解,附源碼演示

    消息隊(duì)列Pulsar入門(一) 生產(chǎn)者/消費(fèi)者/Topic詳解,附源碼演示

    部署問題 連接Pulsar 創(chuàng)建方式 簡單方法創(chuàng)建 loadConf自定義配置創(chuàng)建 Pulsar官網(wǎng) 發(fā)送模式 同步發(fā)送 異步發(fā)送 訪問方式/發(fā)送方式 Share模式(默認(rèn)情況) 請注意: Exclusive WaitForExclusive 創(chuàng)建方式 簡單方法創(chuàng)建 監(jiān)聽器方法創(chuàng)建 loadConf自定義配置創(chuàng)建 多主題訂閱 傳入List數(shù)組的多主題訂閱

    2024年02月08日
    瀏覽(64)
  • kafka 基礎(chǔ)概念、命令行操作(查看所有topic、創(chuàng)建topic、刪除topic、查看某個(gè)Topic的詳情、修改分區(qū)數(shù)、發(fā)送消息、消費(fèi)消息、 查看消費(fèi)者組 、更新消費(fèi)者的偏移位置)

    kafka 基礎(chǔ)概念、命令行操作(查看所有topic、創(chuàng)建topic、刪除topic、查看某個(gè)Topic的詳情、修改分區(qū)數(shù)、發(fā)送消息、消費(fèi)消息、 查看消費(fèi)者組 、更新消費(fèi)者的偏移位置)

    kafka官網(wǎng) Broker ??一臺(tái)kafka服務(wù)器就是一個(gè)broker,可容納多個(gè)topic。一個(gè)集群由多個(gè)broker組成; Producer ??生產(chǎn)者,即向kafka的broker-list發(fā)送消息的客戶端; Consumer ??消費(fèi)者,即向kafka的broker-list訂閱消息的客戶端; Consumer Group ??消費(fèi)者組是 邏輯上的一個(gè)訂閱者 ,由多個(gè)

    2024年02月01日
    瀏覽(121)
  • kafka配置大全broker、topic、生產(chǎn)者和消費(fèi)者等配置介紹

    每個(gè)kafka broker中配置文件 server.properties 默認(rèn)必須配置的屬性如下: **bootstrap.servers** - 指定生產(chǎn)者客戶端連接kafka集群所需的broker地址列表,格式為host1:port1,host2:port2,可以設(shè)置一個(gè)或多個(gè)。這里并非需要所有的broker地址,因?yàn)樯a(chǎn)者會(huì)從給定的broker里尋找其它的broker。 **key

    2024年02月05日
    瀏覽(40)
  • Kafka:主題創(chuàng)建、分區(qū)修改查看、生產(chǎn)者、消費(fèi)者

    Kafka:主題創(chuàng)建、分區(qū)修改查看、生產(chǎn)者、消費(fèi)者

    1.創(chuàng)建主題 2.查看所有主題 3.查看詳細(xì)主題 序號(hào)從0開始計(jì)算 Partition:分區(qū)數(shù),該主題有3個(gè)分區(qū) Replica:副本數(shù),該主題有3個(gè)副本 Leader:副本數(shù)中的主的序號(hào),生產(chǎn)消費(fèi)的對象 1.修改分區(qū)數(shù) 修改的分區(qū)數(shù)量不可以小于或者等于當(dāng)前主題分區(qū)的數(shù)量,否則會(huì)報(bào)錯(cuò) 在根目錄kaf

    2024年02月11日
    瀏覽(32)
  • JAVA實(shí)時(shí)獲取kafka各個(gè)主題下分區(qū)消息的消費(fèi)情況

    通過指定 主題 和 消費(fèi)者組 調(diào)用方法,實(shí)時(shí)查看主題下分區(qū)消息的消費(fèi)情況(消息總數(shù)量、消費(fèi)消息數(shù)量、未消費(fèi)的消息數(shù)量)。

    2024年02月13日
    瀏覽(27)
  • 07、Kafka ------ 消息生產(chǎn)者(演示 發(fā)送消息) 和 消息消費(fèi)者(演示 監(jiān)聽消息)

    07、Kafka ------ 消息生產(chǎn)者(演示 發(fā)送消息) 和 消息消費(fèi)者(演示 監(jiān)聽消息)

    簡單來說,就是一個(gè)數(shù)據(jù)項(xiàng)。 ▲ 消息就是 Kafka 所記錄的數(shù)據(jù)節(jié)點(diǎn),消息在 Kafka 中又被稱為記錄(record)或事件(event)。 從存儲(chǔ)上來看,消息就是存儲(chǔ)在分區(qū)文件(有點(diǎn)類似于List)中的一個(gè)數(shù)據(jù)項(xiàng),消息具有 key、value、時(shí)間戳 和 可選的元數(shù)據(jù)頭。 ▲ 下面是一個(gè)示例事件

    2024年01月20日
    瀏覽(46)
  • Kafka3.0.0版本——生產(chǎn)者 數(shù)據(jù)去重

    Kafka3.0.0版本——生產(chǎn)者 數(shù)據(jù)去重

    1.1、至少一次 至少一次(At Least Once )的含義 生產(chǎn)者發(fā)送數(shù)據(jù)到kafka集群,kafka集群至少接收到一次數(shù)據(jù)。 至少一次的條件: ACK級(jí)別設(shè)置為-1 + 分區(qū)副本大于等于2 + ISR里應(yīng)答的最小副本數(shù)量大于等于2 1.2、最多一次 最多一次(At Most Once )的含義 生產(chǎn)者發(fā)送數(shù)據(jù)到kafka集群,

    2024年02月01日
    瀏覽(19)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包