国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

1、Kafka急速入門(mén)

這篇具有很好參考價(jià)值的文章主要介紹了1、Kafka急速入門(mén)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

1、Kafka實(shí)戰(zhàn)應(yīng)用場(chǎng)景

1、Kafka急速入門(mén)

1、Kafka急速入門(mén)

?1、Kafka急速入門(mén)

?1、Kafka急速入門(mén)

?1、Kafka急速入門(mén)

?1、Kafka急速入門(mén)

2、?Kafka基本概念

1、Kafka急速入門(mén)

?Kafka broker。kafka服務(wù)端,Consumer消費(fèi)者? ?Producer生產(chǎn)者

1、Kafka急速入門(mén)

Topic與分區(qū)是一對(duì)多的關(guān)系;offset是消息分區(qū)中的唯一標(biāo)識(shí),通過(guò)offset定位具體的分區(qū)找到消息所在。

分區(qū):可看成是一個(gè)可追加的日志文件。

分區(qū)是有序的,Topic是無(wú)序的。

分區(qū)指定好了,后期也是能夠修改的,擴(kuò)展性

1、Kafka急速入門(mén)

ISR(In Sync Replicas)詳解

1、Kafka急速入門(mén)

?1、Kafka急速入門(mén)

?1、Kafka急速入門(mén)

?1、Kafka急速入門(mén)

?1、Kafka急速入門(mén)

?1、Kafka急速入門(mén)

3、zookeeper集群環(huán)境搭建

3.1修改/etc/hostname

vim /etc/hostname

1、Kafka急速入門(mén)

?3.2修改/etc/hosts

vim /etc/hosts

1、Kafka急速入門(mén)

?3.3 注意關(guān)閉防火墻狀態(tài)

  • 啟動(dòng)防火墻:systemctl start firewalld
  • 關(guān)閉防火墻:systemctl stop firewalld
  • 重啟防火墻:systemctl restart firewalld
  • 查看防火墻狀態(tài):systemctl status firewalld
  • 開(kāi)機(jī)禁用防火墻:systemctl disable firewalld

3.4 上傳apache-zookeeper-3.8.0.tar.gz壓縮包到服務(wù)器的/home/software/8-apache-zookeeper-3.8.0/目錄下。

壓縮包下載地址:Index of /zookeeper/zookeeper-3.8.0

1、Kafka急速入門(mén)

注意:要下載apache-zookeeper-3.8.0-bin.tar.gz這個(gè)。

cd /home/software/8-apache-zookeeper-3.8.0/

解壓:

tar -zxvf apache-zookeeper-3.8.0-bin.tar.gz

重命名:

mv apache-zookeeper-3.8.0-bin zookeeper

3.5 修改環(huán)境變量:

vim /etc/profile

添加zookeeper的全局變量

# zookeeper environment
export ZOOKEEPER_HOME=/home/software/8-apache-zookeeper-3.8.0/zookeeper
export PATH=.:$ZOOKEEPER_HOME/bin

刷新環(huán)境變量:

source /etc/profile

/etc/profile內(nèi)容:

# /etc/profile

# System wide environment and startup programs, for login setup
# Functions and aliases go in /etc/bashrc

# It's NOT a good idea to change this file unless you know what you
# are doing. It's much better to create a custom.sh shell script in
# /etc/profile.d/ to make custom changes to your environment, as this
# will prevent the need for merging in future updates.

pathmunge () {
    case ":${PATH}:" in
        *:"$1":*)
            ;;
        *)
            if [ "$2" = "after" ] ; then
                PATH=$PATH:$1
            else
                PATH=$1:$PATH
            fi
    esac
}


if [ -x /usr/bin/id ]; then
    if [ -z "$EUID" ]; then
        # ksh workaround
        EUID=`/usr/bin/id -u`
        UID=`/usr/bin/id -ru`
    fi
    USER="`/usr/bin/id -un`"
    LOGNAME=$USER
    MAIL="/var/spool/mail/$USER"
fi

# Path manipulation
if [ "$EUID" = "0" ]; then
    pathmunge /usr/sbin
    pathmunge /usr/local/sbin
else
    pathmunge /usr/local/sbin after
    pathmunge /usr/sbin after
fi

HOSTNAME=`/usr/bin/hostname 2>/dev/null`
HISTSIZE=1000
if [ "$HISTCONTROL" = "ignorespace" ] ; then
    export HISTCONTROL=ignoreboth
else
    export HISTCONTROL=ignoredups
fi

export PATH USER LOGNAME MAIL HOSTNAME HISTSIZE HISTCONTROL

# By default, we want umask to get set. This sets it for login shell
# Current threshold for system reserved uid/gids is 200
# You could check uidgid reservation validity in
# /usr/share/doc/setup-*/uidgid file
if [ $UID -gt 199 ] && [ "`/usr/bin/id -gn`" = "`/usr/bin/id -un`" ]; then
    umask 002
else
    umask 022
fi

for i in /etc/profile.d/*.sh /etc/profile.d/sh.local ; do
    if [ -r "$i" ]; then
        if [ "${-#*i}" != "$-" ]; then 
            . "$i"
        else
            . "$i" >/dev/null
        fi
    fi
done

unset i
unset -f pathmunge



# java environment
export JAVA_HOME=/home/software/1-jdk/jdk1.8.0_341
export CLASSPATH=.:$JAVA_HOME/lib
# export PATH=.:$JAVA_HOME/bin:$JAVA_HOME/lib:$PATH

# Erlang environment
ERLANG_HOME=/usr/local/erlang/
#export PATH=$PATH:$ERLANG_HOME/bin
 export ERLANG_HOME

# RabbitMQ environment
# export PATH=$PATH:$RABBITMQ_HOME/sbin/
export RABBITMQ_HOME=/usr/local/rabbitmq/rabbitmq_server-3.10.7

# zookeeper environment
export ZOOKEEPER_HOME=/home/software/8-apache-zookeeper-3.8.0/zookeeper
export PATH=.:$JAVA_HOME/bin:$JAVA_HOME/lib:$ZOOKEEPER_HOME/bin:$PATH:$ERLANG_HOME/bin:$RABBITMQ_HOME/sbin/

3.6 修改zookeeper配置文件:

3.6.1 首先到指定目錄:

cd?/home/software/8-apache-zookeeper-3.8.0/zookeeper/conf/

3.6.2 然后重命名zoo_sample.cfg文件,重命名后為zoo.cfg。

mv zoo_sample.cfg zoo.cfg

3.6.3 修改兩處地方,然后保存退出:

vim /home/software/8-apache-zookeeper-3.8.0/zookeeper/conf/zoo.cfg

  • 修改數(shù)據(jù)的Dir

dataDir=/home/software/8-apache-zookeeper-3.8.0/zookeeper/data

  • 修改集群地址:

server.0=centos130:2888:3888

3.6.4 增加服務(wù)器表示配置,需要2個(gè)步驟,第一步是創(chuàng)建文件夾和文件,第二是添加配置內(nèi)容:

  • 創(chuàng)建文件夾:

mkdir?/home/software/8-apache-zookeeper-3.8.0/zookeeper/data/

  • 創(chuàng)建文件myid,路徑應(yīng)該創(chuàng)建在/home/software/8-apache-zookeeper-3.8.0/zookeeper/data/下面,如下:

vim /home/software/8-apache-zookeeper-3.8.0/zookeeper/data/myid

注意這里每一臺(tái)服務(wù)器的myid文件內(nèi)容不同,分別修改里面的值為0,1,2;與我們之前的zoo.cfg配置文件里:server.0,server.1,server.2順序相對(duì)應(yīng),然后保存退出

3.7 到此為止,Zookeeper集群環(huán)境大功告成(我只弄一臺(tái)虛擬主機(jī),生產(chǎn)環(huán)境可以按照這個(gè)文檔,配置多個(gè)虛擬主機(jī))!啟動(dòng)Zookeeper命令:

啟動(dòng)路徑:/home/software/8-apache-zookeeper-3.8.0/zookeeper/bin/(也可以在任意目錄,因?yàn)榕渲昧谁h(huán)境變量)

執(zhí)行命令:zkServer.sh start(注意這里3臺(tái)機(jī)器都要進(jìn)行啟動(dòng),啟動(dòng)之后可以查看狀態(tài)。)

查看狀態(tài):zkServer.sh status(在三個(gè)節(jié)點(diǎn)上校驗(yàn)zk的mode,會(huì)看到一個(gè)leader和兩個(gè)follower)

zkCli.sh進(jìn)入zookeeper客戶(hù)端

根據(jù)提示命令進(jìn)行操作:

查找:ls /? ? ? ls zookeeper

創(chuàng)建并賦值:create? /test zookeeoer

獲?。篻et /test

設(shè)值:set /test zookeeper1234

PS1:任意節(jié)點(diǎn)都可以看到zookeeper集群的數(shù)據(jù)一致性

PS2:創(chuàng)建節(jié)點(diǎn)有兩種類(lèi)型:短暫(ephemeral)和持久(persistent)。

3.8 zookeeper開(kāi)機(jī)啟動(dòng)

cd /etc/rc.d/init.d

touch zookeeper

chmod -X 777 zookeeper

vim zookeeper

#! /bin/bash
#chkconfig:2345 20 90
#description:zookeeper
#processname:zookeeper
export JAVA_HOME=/home/software/1-jdk/jdk1.8.0_341
export PATH=$JAVA_HOME/bin:$PATH
case $1 in
                 start) /home/software/8-apache-zookeeper-3.8.0/zookeeper/bin/zkServer.sh start;;
                 stop) /home/software/8-apache-zookeeper-3.8.0/zookeeper/bin/zkServer.sh stop;;
                 status) /home/software/8-apache-zookeeper-3.8.0/zookeeper/bin/zkServer.sh status;;
                 restart /home/software/8-apache-zookeeper-3.8.0/zookeeper/bin/zkServer.sh restart ;;
                 *) echo "require start|stop|status|restart"  ;;
esac

開(kāi)機(jī)啟動(dòng)配置:

chkconfig zookeeper on

驗(yàn)證:

chkconfig --add zookeeper

chkconfig --list?zookeeper

1、Kafka急速入門(mén)

?執(zhí)行reboot重啟命令后,執(zhí)行zkServer.sh status命令查看zookeeper狀態(tài)。

Zookeeper數(shù)據(jù)查看工具ZooInspector

下載地址:https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip

下載后解壓,在解壓目錄/build目錄下,可以看到編譯后的??zookeeper-dev-ZooInspector.jar??包。

1、Kafka急速入門(mén)

?直接雙擊jar包就可以打開(kāi)頁(yè)面。

1、Kafka急速入門(mén)

?連接Zookeeper

1、Kafka急速入門(mén)

1、Kafka急速入門(mén)

?4、kafka集群環(huán)境搭建

下載地址:https://kafka.apache.org/downloads

1、Kafka急速入門(mén)

?kafka環(huán)境搭建準(zhǔn)備:

  • 準(zhǔn)備zookeeper環(huán)境(zookeeper-3.8.0)
  • 下載kafka安裝包:https://downloads.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz
  • 上傳到虛擬機(jī):192.168.110.130

4.1?將kafka安裝包上傳服務(wù)器的/home/software/9-kafka/目錄下

4.2?進(jìn)入/home/software/9-kafka/,解壓復(fù)制到/usr/local目錄下

tar -zxvf kafka_2.13-3.2.1.tgz -C /usr/local/

4.3?進(jìn)入/usr/local 進(jìn)入重命名

mv kafka_2.13-3.2.1 kafka_2.13

4.4?進(jìn)入/usr/local/kafka_2.13/config目錄下,修改server.properties配置文件

vim /usr/local/kafka_2.13/config/server.properties

##4.4.1 集群參數(shù)
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

##4.4.2 修改log.dirs
log.dirs=/usr/local/kafka_2.13/kafka-logs

##4.4.3修改num.partition
num.partitions=5

##4.4.4 新增參數(shù) 192.168.110.130 服務(wù)器ip 9092 ?#默認(rèn)端口
host.name=192.168.110.130
advertised.host.name=192.168.110.130
port=9092

##4.4.5 增加zookeeper地址,以下三臺(tái)zk的地址,是我提前搭建了.
##zookeeper.connect=192.168.33.111:2181,192.168.33.113:2181,192.168.33.114:2181
zookeeper.connect=192.168.110.130:2181

##4.4.6 打開(kāi)advertised.listeners配置,修改里面的IP

advertised.listeners=PLAINTEXT://真實(shí)服務(wù)器IP:9092

##4.4.7 修改完成,退出創(chuàng)建/usr/local/kafka_2.13/kafka-logs 目錄下
mkdir /usr/local/kafka_2.13/kafka-logs

#4.4.8 進(jìn)入bin目錄下 執(zhí)行啟動(dòng)腳本,看到輸出 KafkaServer id=0] started (kafka.server.KafkaServer) 完成了

啟動(dòng)命令:

/usr/local/kafka_2.13/bin/kafka-server-start.sh /usr/local/kafka_2.13/config/server.properties &

kafka控制臺(tái)管理工具安裝

kafka-manager 工具目前改名為cmak,下載地址為:

CMAK(kafka manager)安裝包下載 | Wolfogre's Blog

4.5? 將kafka-manager-2.0.0.2.zip 上傳到/home/software/10-kafka-manager/目錄下 ,然后解壓到 /usr/local/
unzip kafka-manager-2.0.0.2.zip -d /usr/local/

4.6 進(jìn)入/usr/local/kafka-manager-2.0.0.2/conf 配置文件修改參數(shù)application.conf,kafka-manager.zkhosts
#kafka-manager.zkhosts="192.168.33.111:2181,192.168.33.113:2181,192.168.33.114:2181"

kafka-manager.zkhosts="192.168.110.130:2181"

4.7 啟動(dòng)控制臺(tái)
/usr/local/kafka-manager-2.0.0.2/bin/kafka-manager &

4.8 瀏覽器訪問(wèn)控制臺(tái),默認(rèn)端口9000
http://192.168.110.130:9000/

4.9 在頁(yè)面上添加集群
#填充
#192.168.33.111:2181,192.168.33.113:2181,192.168.33.114:2181

192.168.110.130:2181

4.10 集群驗(yàn)證:

4.10.1創(chuàng)建tpoic

  • 通過(guò)控制臺(tái)創(chuàng)建了一個(gè)topic為"test"? 2個(gè)分區(qū)? 1個(gè)副本
  • 在kafka服務(wù)器上執(zhí)行創(chuàng)建topic命令
    kafka-topics.sh --create --bootstrap-server 192.168.110.130:9092 --replication-factor 1 --partitions 2 --topic test

4.10.2 消息發(fā)送與接收驗(yàn)證

cd?/usr/local/kafka_2.13/bin/


4.11 啟動(dòng)發(fā)送消息的腳本
kafka-console-producer.sh --broker-list 192.168.110.130:9092 --topic test

注:

  • --broker-list?192.168.110.130:9092 指的是kafka? broker的地址列表
  • --topic test 指的是把消息發(fā)送到test主題

4.12 另開(kāi)一個(gè)窗口,啟動(dòng)接收消息的腳本
kafka-console-consumer.sh --bootstrap-server 192.168.110.130:9092 --topic test

#啟動(dòng)消費(fèi)者之后,可以在生產(chǎn)者中輸出消費(fèi),消費(fèi)者窗口就能收到了

5、Kafka入門(mén)編碼

1、Kafka急速入門(mén)

?1、Kafka急速入門(mén)

演示代碼:

public interface Const {
	
	String TOPIC_QUICKSTART = "topic-quickstart";
	
	String TOPIC_NORMAL = "topic-normal";
	
	String TOPIC_INTERCEPTOR = "topic-interceptor";
	
	String TOPIC_SERIAL = "topic-serial";
	
	String TOPIC_PARTITION = "topic-partition";
	
	String TOPIC_MODULE = "topic-module";
	
	String TOPIC_CORE = "topic-core";
	
	String TOPIC_REBALANCE = "topic-rebalance";
	
	String TOPIC_MT1 = "topic-mt1";
	
	String TOPIC_MT2 = "topic-mt2";
	
}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {

	private String id;
	
	private String name;
	
}
import com.alibaba.fastjson.JSON;
import com.lvxiaosha.kafka.api.Const;
import com.lvxiaosha.kafka.api.User;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class QuickStartProducer {

	public static void main(String[] args) {
		Properties properties = new Properties();
		//	1.配置生產(chǎn)者啟動(dòng)的關(guān)鍵屬性參數(shù)
		
		//	1.1	BOOTSTRAP_SERVERS_CONFIG:連接kafka集群的服務(wù)列表,如果有多個(gè),使用"逗號(hào)"進(jìn)行分隔
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
		//	1.2	CLIENT_ID_CONFIG:這個(gè)屬性的目的是標(biāo)記kafkaclient的ID
		properties.put(ProducerConfig.CLIENT_ID_CONFIG, "quickstart-producer");
		//	1.3 KEY_SERIALIZER_CLASS_CONFIG VALUE_SERIALIZER_CLASS_CONFIG
		//	Q: 對(duì) kafka的 key 和 value 做序列化,為什么需要序列化?
		//	A: 因?yàn)镵AFKA Broker 在接收消息的時(shí)候,必須要以二進(jìn)制的方式接收,所以必須要對(duì)KEY和VALUE進(jìn)行序列化
		//	字符串序列化類(lèi):org.apache.kafka.common.serialization.StringSerializer
		//	KEY: 是kafka用于做消息投遞計(jì)算具體投遞到對(duì)應(yīng)的主題的哪一個(gè)partition而需要的
		properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
		//	VALUE: 實(shí)際發(fā)送消息的內(nèi)容
		properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
		
		//	2.創(chuàng)建kafka生產(chǎn)者對(duì)象 傳遞properties屬性參數(shù)集合
		KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
		
		for(int i = 0; i <10; i ++) {
			//	3.構(gòu)造消息內(nèi)容
			User user = new User("00" + i, "張三");
			ProducerRecord<String, String> record = 
					//	arg1:topic , arg2:實(shí)際的消息體內(nèi)容
					new ProducerRecord<String, String>(Const.TOPIC_QUICKSTART,
							JSON.toJSONString(user));
			
			//	4.發(fā)送消息
			producer.send(record);			
		}

		
		//	5.關(guān)閉生產(chǎn)者
		producer.close();
		
	}
}
import com.lvxiaosha.kafka.api.Const;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

public class QuickStartConsumer {

	public static void main(String[] args) {
		
		//	1. 配置屬性參數(shù)
		Properties properties = new Properties();
		
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
		
		//	org.apache.kafka.common.serialization.StringDeserializer
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		
		//	非常重要的屬性配置:與我們消費(fèi)者訂閱組有關(guān)系
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "quickstart-group");
		//	常規(guī)屬性:會(huì)話(huà)連接超時(shí)時(shí)間
		properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
		//	消費(fèi)者提交offset: 自動(dòng)提交 & 手工提交,默認(rèn)是自動(dòng)提交
		properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
		properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);

		//	2. 創(chuàng)建消費(fèi)者對(duì)象
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
		
		//	3. 訂閱你感興趣的主題:Const.TOPIC_QUICKSTART
		consumer.subscribe(Collections.singletonList(Const.TOPIC_QUICKSTART));
		
		System.err.println("quickstart consumer started...");
		
		try {
			//	4.采用拉取消息的方式消費(fèi)數(shù)據(jù)
			while(true) {
				//	等待多久拉取一次消息
				//	拉取TOPIC_QUICKSTART主題里面所有的消息
				//	topic 和 partition是 一對(duì)多的關(guān)系,一個(gè)topic可以有多個(gè)partition
				ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
				//	因?yàn)橄⑹窃趐artition中存儲(chǔ)的,所以需要遍歷partition集合
				for(TopicPartition topicPartition : records.partitions()) {
					//	通過(guò)TopicPartition獲取指定的消息集合,獲取到的就是當(dāng)前topicPartition下面所有的消息
					List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
					//	獲取TopicPartition對(duì)應(yīng)的主題名稱(chēng)
					String topic = topicPartition.topic();
					//	獲取當(dāng)前topicPartition下的消息條數(shù)
					int size = partitionRecords.size();
					
					System.err.println(String.format("--- 獲取topic: %s, 分區(qū)位置:%s, 消息總數(shù): %s", 
							topic, 
							topicPartition.partition(),
							size));
					
					for(int i = 0; i < size; i++) {
						ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);
						//	實(shí)際的數(shù)據(jù)內(nèi)容
						String value = consumerRecord.value();
						//	當(dāng)前獲取的消息偏移量
						long offset = consumerRecord.offset();
						//	ISR : High Watermark, 如果要提交的話(huà),比如提交當(dāng)前消息的offset+1 
						//	表示下一次從什么位置(offset)拉取消息
						long commitOffser = offset + 1;
						System.err.println(String.format("獲取實(shí)際消息 value:%s, 消息offset: %s, 提交offset: %s",
								value, offset, commitOffser));
					}
				}
			}			
		} finally {
			consumer.close();
		}
	}
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>Kafka</artifactId>
        <groupId>com.lvxiaosha</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>learn-kafka</artifactId>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <!-- 	排除spring-boot-starter-logging -->
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!-- log4j2 -->
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-log4j2 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
            <version>2.5.6</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.1.1</version>
        </dependency>

    </dependencies>

    <build>
        <finalName>learn-kafka</finalName>
        <!-- 打包時(shí)包含properties、xml -->
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <!-- 是否替換資源中的屬性-->
                <filtering>true</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <mainClass>com.lvxiaosha.kafka.api.LearnKafkaApplication</mainClass>
                </configuration>
            </plugin>
        </plugins>
    </build>


</project>

6、Kafka基本配置參數(shù)詳解

1、Kafka急速入門(mén)

zookeeper.connect 必填項(xiàng)ip:port, 多個(gè)zk節(jié)點(diǎn)用逗號(hào)隔開(kāi)。

listeners 用的比較少。指明Kafka監(jiān)聽(tīng)的客戶(hù)端地址列表。

broker.id 比較重要,必須不同

log.dir 和 log.dirs 用來(lái)存儲(chǔ)Kafka文件的目錄

message.max.bytes:用來(lái)指定broker能夠接受的單個(gè)消息最大值,默認(rèn)1M左右。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-478058.html

group.initial.rebalance.delay.ms,默認(rèn)是3秒鐘。用戶(hù)需要在server.properties文件中自行修改為想要配置的值。這個(gè)參數(shù)的主要效果就是讓coordinator推遲空消費(fèi)組接收到成員加入請(qǐng)求后本應(yīng)立即開(kāi)啟的rebalance。在實(shí)際使用時(shí),假設(shè)你預(yù)估你的所有consumer組成員加入需要在10s內(nèi)完成,那么你就可以設(shè)置該參數(shù)=10000。目前來(lái)看,這個(gè)參數(shù)的使用還是很方便的。

到了這里,關(guān)于1、Kafka急速入門(mén)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Kafka入門(mén)基本概念(詳細(xì))

    Kafka入門(mén)基本概念(詳細(xì))

    Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)(消息引擎系統(tǒng)),它可以處理消費(fèi)者在網(wǎng)站中的所有動(dòng)作流數(shù)據(jù)。 這種動(dòng)作(網(wǎng)頁(yè)瀏覽,搜索和其他用戶(hù)的行動(dòng))是在現(xiàn)代網(wǎng)絡(luò)上的許多社會(huì)功能的一個(gè)關(guān)鍵因素。 這些數(shù)據(jù)通常是由于吞吐量的要求而通過(guò)處理日志和日志聚合

    2024年01月16日
    瀏覽(21)
  • Java 代理模式的基本概念、使用場(chǎng)景、應(yīng)用示例和實(shí)現(xiàn)方法

    代理模式是一種常見(jiàn)的設(shè)計(jì)模式,在 Java 開(kāi)發(fā)中被廣泛應(yīng)用。它允許我們通過(guò)添加一個(gè)代理對(duì)象來(lái)控制對(duì)另一個(gè)對(duì)象的訪問(wèn),從而提供了一種間接訪問(wèn)實(shí)際對(duì)象的方法。本文將詳細(xì)介紹 Java 代理模式的基本概念、使用場(chǎng)景、應(yīng)用示例和實(shí)現(xiàn)方法等相關(guān)內(nèi)容。 代理模式是一種結(jié)

    2024年02月05日
    瀏覽(30)
  • Java SPI概念、實(shí)現(xiàn)原理、優(yōu)缺點(diǎn)、應(yīng)用場(chǎng)景、使用步驟、實(shí)戰(zhàn)SPI案例

    Java SPI概念、實(shí)現(xiàn)原理、優(yōu)缺點(diǎn)、應(yīng)用場(chǎng)景、使用步驟、實(shí)戰(zhàn)SPI案例

    在當(dāng)今互聯(lián)網(wǎng)時(shí)代,應(yīng)用程序越來(lái)越復(fù)雜,對(duì)于我們開(kāi)發(fā)人員來(lái)說(shuō),如何實(shí)現(xiàn)高效的組件化和模塊化已經(jīng)成為了一個(gè)重要的問(wèn)題。而 Java SPI (Service Provider Interface)機(jī)制,作為一種基于接口的服務(wù)發(fā)現(xiàn)機(jī)制,可以幫助我們更好地解決這個(gè)問(wèn)題。這樣會(huì)程序具有高度的 靈活性、

    2024年02月13日
    瀏覽(22)
  • layui的基本使用-日期控件的業(yè)務(wù)場(chǎng)景使用入門(mén)實(shí)戰(zhàn)案例一

    layui的基本使用-日期控件的業(yè)務(wù)場(chǎng)景使用入門(mén)實(shí)戰(zhàn)案例一

    效果鎮(zhèn)樓;? ? ? ?1 前端UI層面; ? 茍日新一刻鐘總結(jié)反觀:? 2? 那么業(yè)務(wù)場(chǎng)景的話(huà),就沒(méi)有那么簡(jiǎn)單了。首先就是解決方案里面可不止一個(gè)文件夾,是多個(gè)項(xiàng)目組成的解決方案。比如倉(cāng)儲(chǔ)層,Repository項(xiàng)目,業(yè)務(wù)層項(xiàng)目,Services,同時(shí)各自對(duì)應(yīng)了各自的接口項(xiàng)目;Model 實(shí)體

    2024年02月13日
    瀏覽(25)
  • 【React 入門(mén)實(shí)戰(zhàn)篇】從零開(kāi)始搭建與理解React應(yīng)用-三、React核心概念與基礎(chǔ)語(yǔ)法

    三、React核心概念與基礎(chǔ)語(yǔ)法 3.1 JSX語(yǔ)法詳解 JSX是React中的一個(gè)語(yǔ)法糖,它允許開(kāi)發(fā)者在JavaScript代碼中編寫(xiě)類(lèi)似HTML的標(biāo)記。這種語(yǔ)法使得開(kāi)發(fā)者能夠以一種聲明式的方式描述界面,提高了代碼的可讀性和可維護(hù)性。 JSX的語(yǔ)法規(guī)則: 元素創(chuàng)建 :使用尖括號(hào) 來(lái)創(chuàng)建元素,就像在

    2024年04月08日
    瀏覽(43)
  • kafka--kafka的基本概念-副本概念replica

    kafka--kafka的基本概念-副本概念replica

    Broker 表示實(shí)際的物理機(jī)器節(jié)點(diǎn) Broker1中的綠色P1表示主分片Broker2中的藍(lán)色P1表示副本分片,其余類(lèi)似,就是主從的概念,如果一個(gè)Broker掛掉了,還有其它的節(jié)點(diǎn)來(lái)保證數(shù)據(jù)的完整性 P可以看做分區(qū) 同一時(shí)間點(diǎn),綠色P1 和紫色P1 不會(huì)完全一致,存在一個(gè)同步的過(guò)程 綠色部分處理

    2024年02月12日
    瀏覽(24)
  • Kubernetes(K8s)從入門(mén)到精通系列之五:K8s的基本概念和術(shù)語(yǔ)之應(yīng)用類(lèi)

    Service: Service指的是無(wú)狀態(tài)服務(wù),通常多個(gè)程序副本提供服務(wù),在特殊情況下也可以是有狀態(tài)的單實(shí)例服務(wù),比如MySQL這種數(shù)據(jù)存儲(chǔ)類(lèi)的服務(wù)。 K8s里的Service具有一個(gè)全局唯一的虛擬ClusterIP地址,客戶(hù)端可以通過(guò)這個(gè)虛擬IP地址+服務(wù)的端口直接訪問(wèn)該服務(wù),再通過(guò)部署K8s集群的

    2024年02月14日
    瀏覽(92)
  • 【TensorFlow 的基本概念和使用場(chǎng)景。】

    TensorFlow 是一個(gè)開(kāi)源的深度學(xué)習(xí)框架,由 Google 開(kāi)發(fā)和維護(hù)。它提供了一個(gè)靈活且高效的方式來(lái)進(jìn)行機(jī)器學(xué)習(xí)和人工智能任務(wù)的開(kāi)發(fā)和部署。TensorFlow 的基本概念包括: 圖(Graph):TensorFlow 使用圖來(lái)表示計(jì)算任務(wù)。圖是由節(jié)點(diǎn)(Nodes)和邊(Edges)組成的,節(jié)點(diǎn)表示操作(或稱(chēng)

    2024年02月22日
    瀏覽(35)
  • TensorFlow 的基本概念和使用場(chǎng)景

    TensorFlow 是 Google 開(kāi)源的機(jī)器學(xué)習(xí)框架,它支持使用數(shù)據(jù)流圖(Data Flow Graph)的方式進(jìn)行計(jì)算,以實(shí)現(xiàn)大規(guī)模分布式機(jī)器學(xué)習(xí)應(yīng)用。TensorFlow 在深度學(xué)習(xí)、自然語(yǔ)言處理、計(jì)算機(jī)視覺(jué)等領(lǐng)域有廣泛應(yīng)用。 TensorFlow 中的重要概念包括: 張量(Tensor):TensorFlow 中的基本數(shù)據(jù)類(lèi)型,

    2024年02月11日
    瀏覽(20)
  • 介紹Tensorflow的基本概念和場(chǎng)景

    TensorFlow是一種開(kāi)源的機(jī)器學(xué)習(xí)框架,由Google開(kāi)發(fā),用于構(gòu)建和訓(xùn)練人工神經(jīng)網(wǎng)絡(luò)。它使用圖形表示來(lái)表示數(shù)學(xué)計(jì)算,其中節(jié)點(diǎn)表示操作,邊表示數(shù)據(jù)流。以下是TensorFlow的基本概念: Tensor:TensorFlow的計(jì)算單位是張量,可以被看作是多維數(shù)組。TensorFlow中的數(shù)據(jù)存儲(chǔ)在張量中。

    2024年02月15日
    瀏覽(20)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包