大家好,今天我們開始學(xué)習(xí)kafka中間件,今天我們改變一下策略,不刷視頻學(xué)習(xí),改為實(shí)踐學(xué)習(xí),在網(wǎng)上找一些案例功能去做,來達(dá)到學(xué)習(xí)實(shí)踐的目的。
首先,是安裝相關(guān)組件。
1. docker安裝安裝
1.1 yum-utils軟件包
yum install -y yum-utils
1.2?設(shè)置阿里云鏡像
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
1.3?安裝docker
yum install docker-ce docker-ce-cli containerd.io
1.4?啟動docker
systemctl start docker
1.5?測試
docker version
docker run hello-world
docker images
至此,docker就安裝完畢了。接下來就是安裝zookeeper和kafka了,我這里用的是kafka2.x的版本,因此需要結(jié)合zookeeper去是使用。現(xiàn)在最新的kafka3.x已經(jīng)可以拋棄zookeeper去單獨(dú)使用了,小伙伴們有興趣的話可以自己去動手安裝實(shí)踐下。
2.?安裝zookeeper和kafka
2.1?docker安裝zookeeper
docker pull wurstmeister/zookeeper
2.2?啟動zookeeper
docker run -d --name zookeeper -p 2181:2181 -e TZ="Asia/Shanghai" --restart always wurstmeister/zookeeper
2.3?docker查看zookeeper容器是否啟動
docker ps
?出現(xiàn)以上信息,就代表zookeeper已經(jīng)安裝并啟動成功。
2.4?安裝kafka
docker pull wurstmeister/kafka
2.5?啟動kafka
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=124.223.205.125:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://124.223.205.125:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" -e TZ="Asia/Shanghai" wurstmeister/kafka
2.6?用docker?ps查看kafka是否啟動
出現(xiàn)以上信息,就代表kafka啟動成功了。
下來就測試一下
3. 發(fā)送消息和消費(fèi)消息
3.1?進(jìn)入kafka容器
docker exec -it 容器id /bin/bash
cd /opt/kafka_2.13-2.8.1/bin/
?3.2?連接生產(chǎn)者
./kafka-console-producer.sh --broker-list localhost:9092 --topic shopping
接下來就可以發(fā)送消息了。
?3.3?另起一個(gè)窗口,重復(fù)3.1的動作進(jìn)入kafka容器,然后連接消費(fèi)者
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic shopping --from-beginning
這是就能就收消息了。
?到達(dá)這里,我們的kafka就安裝并測試成功了。
4. 接下來我們就創(chuàng)建Springboot工程來連接kafka進(jìn)行消息的生產(chǎn)和消費(fèi)
4.1 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.volga</groupId>
<artifactId>kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 阿里巴巴 fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
4.2 我們創(chuàng)建一個(gè)訂單的實(shí)體類
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {
/**
* 訂單id
*/
private long orderId;
/**
* 訂單號
*/
private String orderNum;
/**
* 訂單創(chuàng)建時(shí)間
*/
private LocalDateTime createTime;
}
4.3?創(chuàng)建生產(chǎn)者
@Component
@Slf4j
public class KafkaProvider {
/**
* 消息 TOPIC
*/
private static final String TOPIC = "shopping";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {
// 構(gòu)建一個(gè)訂單類
Order order = Order.builder()
.orderId(orderId)
.orderNum(orderNum)
.createTime(createTime)
.build();
// 發(fā)送消息,訂單類的 json 作為消息體
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(TOPIC, JSONObject.toJSONString(order));
// 監(jiān)聽回調(diào)
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
log.info("生產(chǎn)者產(chǎn)生消息 失敗 ## Send message fail ...");
}
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("生產(chǎn)者產(chǎn)生消息 成功 ## Send message success ...");
}
});
}
}
4.4?創(chuàng)建消費(fèi)者
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = "shopping", groupId = "group_id") //這個(gè)groupId是在yml中配置的
public void consumer(String message) {
log.info("消費(fèi)者消費(fèi)信息 ## consumer message: {}", message);
}
}
4.5?創(chuàng)建測試類
@SpringBootTest
public class SpringBootKafakaApplicationTests {
@Autowired
private KafkaProvider kafkaProvider;
@Test
public void sendMessage() throws InterruptedException {
System.out.println("是否為空??+"+kafkaProvider);
// 發(fā)送 10 個(gè)消息
for (int i = 0; i < 10; i++) {
long orderId = i+1;
String orderNum = UUID.randomUUID().toString();
kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());
}
TimeUnit.MINUTES.sleep(1);
}
}
4.6?要創(chuàng)建一個(gè)Application方法,不然項(xiàng)目會啟動報(bào)錯
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class,args);
}
}
4.7?配置application.yml
spring:
kafka:
# 指定 kafka 地址,我這里部署在的虛擬機(jī),開發(fā)環(huán)境是Windows,kafkahost是虛擬機(jī)的地址, 若外網(wǎng)地址,注意修改為外網(wǎng)的IP( 集群部署需用逗號分隔)
bootstrap-servers: 服務(wù)器ip:9092
consumer:
# 指定 group_id
group-id: group_id
auto-offset-reset: earliest
# 指定消息key和消息體的序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
# 發(fā)生錯誤后,消息重發(fā)的次數(shù)。
retries: 0
#當(dāng)有多個(gè)消息需要被發(fā)送到同一個(gè)分區(qū)時(shí),生產(chǎn)者會把它們放在同一個(gè)批次里。該參數(shù)指定了一個(gè)批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計(jì)算。
batch-size: 16384
# 設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小。
buffer-memory: 33554432
# 指定消息key和消息體的序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringSerializer
value-deserializer: org.apache.kafka.common.serialization.StringSerializer
listener:
# 在偵聽器容器中運(yùn)行的線程數(shù)。
concurrency: 5
#listner負(fù)責(zé)ack,每調(diào)用一次,就立即commit
ack-mode: manual_immediate
missing-topics-fatal: false
以上就創(chuàng)建項(xiàng)目成功了,我們運(yùn)行測試方法,就能獲取kafka中的消息了。
###?生產(chǎn)消息
?###?消費(fèi)消息
這里就是簡單實(shí)現(xiàn)了kafka的消息生產(chǎn)和消費(fèi),后續(xù)的kafka復(fù)雜場景的實(shí)現(xiàn)會持續(xù)更新。文章來源:http://www.zghlxwxcb.cn/news/detail-424508.html
我是空谷有來人,謝謝支持。?文章來源地址http://www.zghlxwxcb.cn/news/detail-424508.html
到了這里,關(guān)于docker安裝kafka,并集成springboot進(jìn)行測試的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!