国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

docker安裝kafka,并集成springboot進(jìn)行測試

這篇具有很好參考價(jià)值的文章主要介紹了docker安裝kafka,并集成springboot進(jìn)行測試。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

大家好,今天我們開始學(xué)習(xí)kafka中間件,今天我們改變一下策略,不刷視頻學(xué)習(xí),改為實(shí)踐學(xué)習(xí),在網(wǎng)上找一些案例功能去做,來達(dá)到學(xué)習(xí)實(shí)踐的目的。

首先,是安裝相關(guān)組件。

1. docker安裝安裝

1.1 yum-utils軟件包

yum install -y yum-utils

1.2?設(shè)置阿里云鏡像


yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

1.3?安裝docker

yum install docker-ce docker-ce-cli containerd.io 

1.4?啟動docker

systemctl start docker

1.5?測試

docker version
docker run hello-world
docker images

至此,docker就安裝完畢了。接下來就是安裝zookeeper和kafka了,我這里用的是kafka2.x的版本,因此需要結(jié)合zookeeper去是使用。現(xiàn)在最新的kafka3.x已經(jīng)可以拋棄zookeeper去單獨(dú)使用了,小伙伴們有興趣的話可以自己去動手安裝實(shí)踐下。

2.?安裝zookeeper和kafka

2.1?docker安裝zookeeper

docker pull wurstmeister/zookeeper

2.2?啟動zookeeper

docker run -d --name zookeeper -p 2181:2181 -e TZ="Asia/Shanghai" --restart always wurstmeister/zookeeper 

2.3?docker查看zookeeper容器是否啟動

docker ps

docker安裝kafka,并集成springboot進(jìn)行測試

?出現(xiàn)以上信息,就代表zookeeper已經(jīng)安裝并啟動成功。

2.4?安裝kafka

docker pull wurstmeister/kafka

2.5?啟動kafka

docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=124.223.205.125:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://124.223.205.125:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" -e TZ="Asia/Shanghai" wurstmeister/kafka 

2.6?用docker?ps查看kafka是否啟動

docker安裝kafka,并集成springboot進(jìn)行測試

出現(xiàn)以上信息,就代表kafka啟動成功了。

下來就測試一下

3. 發(fā)送消息和消費(fèi)消息

3.1?進(jìn)入kafka容器

docker exec -it 容器id /bin/bash

cd /opt/kafka_2.13-2.8.1/bin/

?3.2?連接生產(chǎn)者

./kafka-console-producer.sh --broker-list localhost:9092 --topic shopping

接下來就可以發(fā)送消息了。

docker安裝kafka,并集成springboot進(jìn)行測試?3.3?另起一個(gè)窗口,重復(fù)3.1的動作進(jìn)入kafka容器,然后連接消費(fèi)者

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic shopping --from-beginning

這是就能就收消息了。

docker安裝kafka,并集成springboot進(jìn)行測試

?到達(dá)這里,我們的kafka就安裝并測試成功了。

4. 接下來我們就創(chuàng)建Springboot工程來連接kafka進(jìn)行消息的生產(chǎn)和消費(fèi)

4.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.volga</groupId>
    <artifactId>kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- 阿里巴巴 fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

4.2 我們創(chuàng)建一個(gè)訂單的實(shí)體類

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {
    /**
     * 訂單id
     */
    private long orderId;
    /**
     * 訂單號
     */
    private String orderNum;
    /**
     * 訂單創(chuàng)建時(shí)間
     */
    private LocalDateTime createTime;
}

4.3?創(chuàng)建生產(chǎn)者

@Component
@Slf4j
public class KafkaProvider {
    /**
     * 消息 TOPIC
     */
    private static final String TOPIC = "shopping";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {
        // 構(gòu)建一個(gè)訂單類
        Order order = Order.builder()
                .orderId(orderId)
                .orderNum(orderNum)
                .createTime(createTime)
                .build();

        // 發(fā)送消息,訂單類的 json 作為消息體
        ListenableFuture<SendResult<String, String>> future =
                kafkaTemplate.send(TOPIC, JSONObject.toJSONString(order));

        // 監(jiān)聽回調(diào)
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.info("生產(chǎn)者產(chǎn)生消息 失敗 ## Send message fail ...");
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("生產(chǎn)者產(chǎn)生消息 成功 ## Send message success ...");
            }
        });
    }
}

4.4?創(chuàng)建消費(fèi)者

@Component
@Slf4j
public class KafkaConsumer {
    @KafkaListener(topics = "shopping", groupId = "group_id") //這個(gè)groupId是在yml中配置的
    public void consumer(String message) {
        log.info("消費(fèi)者消費(fèi)信息 ## consumer message: {}", message);
    }
}

4.5?創(chuàng)建測試類

@SpringBootTest
public class SpringBootKafakaApplicationTests {
    @Autowired
    private KafkaProvider kafkaProvider;

    @Test
    public void sendMessage() throws InterruptedException {
        System.out.println("是否為空??+"+kafkaProvider);
        // 發(fā)送 10 個(gè)消息
        for (int i = 0; i < 10; i++) {
            long orderId = i+1;
            String orderNum = UUID.randomUUID().toString();
            kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());
        }
        TimeUnit.MINUTES.sleep(1);
    }
}

4.6?要創(chuàng)建一個(gè)Application方法,不然項(xiàng)目會啟動報(bào)錯

@SpringBootApplication
public class KafkaApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication.class,args);
    }
}

4.7?配置application.yml

spring:
  kafka:
    # 指定 kafka 地址,我這里部署在的虛擬機(jī),開發(fā)環(huán)境是Windows,kafkahost是虛擬機(jī)的地址, 若外網(wǎng)地址,注意修改為外網(wǎng)的IP( 集群部署需用逗號分隔)
    bootstrap-servers: 服務(wù)器ip:9092
    consumer:
      # 指定 group_id
      group-id: group_id
      auto-offset-reset: earliest
      # 指定消息key和消息體的序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # 發(fā)生錯誤后,消息重發(fā)的次數(shù)。
      retries: 0
      #當(dāng)有多個(gè)消息需要被發(fā)送到同一個(gè)分區(qū)時(shí),生產(chǎn)者會把它們放在同一個(gè)批次里。該參數(shù)指定了一個(gè)批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計(jì)算。
      batch-size: 16384
      # 設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小。
      buffer-memory: 33554432
      # 指定消息key和消息體的序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringSerializer
      value-deserializer: org.apache.kafka.common.serialization.StringSerializer
    listener:
      # 在偵聽器容器中運(yùn)行的線程數(shù)。
      concurrency: 5
      #listner負(fù)責(zé)ack,每調(diào)用一次,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false

以上就創(chuàng)建項(xiàng)目成功了,我們運(yùn)行測試方法,就能獲取kafka中的消息了。

###?生產(chǎn)消息

docker安裝kafka,并集成springboot進(jìn)行測試

?###?消費(fèi)消息

docker安裝kafka,并集成springboot進(jìn)行測試

這里就是簡單實(shí)現(xiàn)了kafka的消息生產(chǎn)和消費(fèi),后續(xù)的kafka復(fù)雜場景的實(shí)現(xiàn)會持續(xù)更新。

我是空谷有來人,謝謝支持。?文章來源地址http://www.zghlxwxcb.cn/news/detail-424508.html

到了這里,關(guān)于docker安裝kafka,并集成springboot進(jìn)行測試的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • SpringBoot3.1.7集成Kafka和Kafka安裝

    SpringBoot3.1.7集成Kafka和Kafka安裝

    我們在很多系統(tǒng)開發(fā)都需要用到消息中間件,目前來說Kafka憑借其優(yōu)秀的性能,使得它的使用率已經(jīng)是名列前茅了,所以今天我們將它應(yīng)用到我們的系統(tǒng) 在使用一個(gè)中間件一定要考慮版本的兼容性,否則后面會遇到很多問題,首先我們打開Spring的官網(wǎng):Spring for Apache Kafka Spr

    2024年01月23日
    瀏覽(29)
  • 使用TestContainers在Docker中進(jìn)行集成測試

    現(xiàn)代軟件應(yīng)用很少獨(dú)立工作。典型的應(yīng)用程序會與幾個(gè)外部系統(tǒng)進(jìn)行通信,如: 數(shù)據(jù)庫、 消息系統(tǒng)、 緩存提供商 其他第三方服務(wù)。 你應(yīng)該編寫測試確保一切正常運(yùn)行。 單元測試 有助于隔離地測試業(yè)務(wù)邏輯,不涉及任何外部服務(wù)。它們易于編寫并提供幾乎即時(shí)的反饋。 有了

    2024年02月08日
    瀏覽(13)
  • SpringBoot 如何使用 EmbeddedDatabaseBuilder 進(jìn)行數(shù)據(jù)庫集成測試

    SpringBoot 如何使用 EmbeddedDatabaseBuilder 進(jìn)行數(shù)據(jù)庫集成測試

    在開發(fā) SpringBoot 應(yīng)用程序時(shí),我們通常需要與數(shù)據(jù)庫進(jìn)行交互。為了確保我們的應(yīng)用程序在生產(chǎn)環(huán)境中可以正常工作,我們需要進(jìn)行數(shù)據(jù)庫集成測試,以測試我們的應(yīng)用程序是否能夠正確地與數(shù)據(jù)庫交互。在本文中,我們將介紹如何使用 SpringBoot 中的 EmbeddedDatabaseBuilder 來進(jìn)行

    2024年02月16日
    瀏覽(30)
  • SpringBoot 如何使用 TestRestTemplate 進(jìn)行 RESTful API 集成測試

    SpringBoot 如何使用 TestRestTemplate 進(jìn)行 RESTful API 集成測試

    在使用 SpringBoot 開發(fā) RESTful API 的過程中,我們需要進(jìn)行集成測試,以確保 API 的正確性和可用性。而 TestRestTemplate 是 Spring Framework 提供的一個(gè)工具類,可以用來進(jìn)行 RESTful API 的集成測試。在本文中,我們將介紹如何使用 TestRestTemplate 進(jìn)行 RESTful API 集成測試。 TestRestTemplate 是

    2024年02月13日
    瀏覽(27)
  • SpringBoot 如何使用 TestEntityManager 進(jìn)行 JPA 集成測試, 如何使用

    SpringBoot 如何使用 TestEntityManager 進(jìn)行 JPA 集成測試, 如何使用

    Spring Boot 是一個(gè)非常流行的 Java Web 開發(fā)框架,它簡化了開發(fā)過程,提高了開發(fā)效率。在開發(fā)過程中,我們通常需要使用 JPA 操作數(shù)據(jù)庫,為了保證代碼的質(zhì)量和正確性,我們需要進(jìn)行集成測試。TestEntityManager 是 Spring Boot 提供的用于 JPA 集成測試的工具,它可以模擬 EntityManag

    2024年02月13日
    瀏覽(41)
  • Docker 安裝kafka 并創(chuàng)建topic 進(jìn)行消息通信

    Docker 安裝kafka 并創(chuàng)建topic 進(jìn)行消息通信

    ????????Apache Kafka是一個(gè)分布式流處理平臺,用于構(gòu)建高性能、可擴(kuò)展的實(shí)時(shí)數(shù)據(jù)流應(yīng)用程序。本文將介紹如何使用Docker容器化技術(shù)來安裝和配置Apache Kafka。 1、kafka安裝必須先安裝Zookpper 2、下載鏡像 3、查看下載好的鏡像 4、啟動Kafka 5、查看是否創(chuàng)建好Kafka容器 6、進(jìn)入到

    2024年03月15日
    瀏覽(18)
  • MinIO【部署 01】MinIO安裝及SpringBoot集成簡單測試

    MinIO【部署 01】MinIO安裝及SpringBoot集成簡單測試

    下載 https://min.io/download#/linux; 安裝文檔 https://min.io/docs/minio/linux/index.html。 工作臺詳細(xì)使用文檔 https://min.io/docs/minio/linux/administration/minio-console.html#minio-console 登錄頁面: 登錄成功: Java Quickstart Guide https://min.io/docs/minio/linux/developers/java/minio-java.html#minio-java-quickstart Java SDK htt

    2024年02月11日
    瀏覽(48)
  • Docker安裝ClickHouse22.6.9.11并與SpringBoot、MyBatisPlus集成

    Docker安裝ClickHouse22.6.9.11并與SpringBoot、MyBatisPlus集成

    上一篇文章CentOS6.10上離線安裝ClickHouse19.9.5.36并修改默認(rèn)數(shù)據(jù)存儲目錄記錄了在舊版的操作系統(tǒng)上直接安裝低版本 ClickHouse (脫胎于俄羅斯頭號搜索引擎的技術(shù))的過程,開啟遠(yuǎn)程訪問并配置密碼; 其實(shí)通過 Docker 運(yùn)行 ClickHouse 是我在2022年10月左右在虛擬機(jī)上實(shí)驗(yàn)的,當(dāng)時(shí)

    2024年02月09日
    瀏覽(17)
  • kafka:java集成 kafka(springboot集成、客戶端集成)

    kafka:java集成 kafka(springboot集成、客戶端集成)

    摘要 對于java的kafka集成,一般選用springboot集成kafka,但可能由于對接方kafka老舊、kafka不安全等問題導(dǎo)致kafak版本與spring版本不兼容,這個(gè)時(shí)候就得自己根據(jù)kafka客戶端api集成了。 一、springboot集成kafka 具體官方文檔地址:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/

    2023年04月22日
    瀏覽(94)
  • 經(jīng)實(shí)測利用POSTMAN根本無法進(jìn)行并發(fā)測試,大家不要再被一些搬運(yùn)工給誤導(dǎo)了

    經(jīng)實(shí)測利用POSTMAN根本無法進(jìn)行并發(fā)測試,大家不要再被一些搬運(yùn)工給誤導(dǎo)了

    以下為我的實(shí)測記錄 一、先上我測試的接口代碼,就是一個(gè)redis的tryLock分布式鎖的獲取,接口在獲取到鎖后,線程sleep了5秒,此時(shí)線程是不釋放鎖的,那按道理第二個(gè)請求在這個(gè)時(shí)間進(jìn)來,是獲取不到鎖的,但結(jié)果卻不是這樣的 二、按照網(wǎng)上的那些博文,postman操作步驟如下

    2024年02月11日
    瀏覽(35)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包