国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Linux部署Kafka及常見問題記錄

這篇具有很好參考價(jià)值的文章主要介紹了Linux部署Kafka及常見問題記錄。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

kafka 使用場景

  • 監(jiān)控 Metrics
  • 網(wǎng)站活動(dòng)追蹤 Website Activity Tracking
  • 日志收集 Log Aggregation
  • 流處理 Stream Processing
  • 事件溯源 Event Sourcing
  • 提交日志 Commit Log

Kafka 基本概念

Broker

和AMQP里協(xié)議的概念一樣, 就是消息中間件所在的服務(wù)器

Topic(主題)

每條發(fā)布到Kafka集群的消息都有一個(gè)類別,這個(gè)類別被稱為Topic。(物理上不同Topic的消息分開存儲(chǔ),邏輯上一個(gè)Topic的消息雖然保存于一個(gè)或多個(gè)broker上但用戶只需指定消息的Topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)

Partition(分區(qū))

Partition是物理上的概念,體現(xiàn)在磁盤上面,每個(gè)Topic包含一個(gè)或多個(gè)Partition.

Producer

負(fù)責(zé)發(fā)布消息到Kafka broker

Consumer

消息消費(fèi)者,向Kafka broker讀取消息的客戶端。

Consumer Group(消費(fèi)者群組)

每個(gè)Consumer屬于一個(gè)特定的Consumer Group(可為每個(gè)Consumer指定group name,若不指定group name則屬于默認(rèn)的group)。

offset 偏移量

是kafka用來確定消息是否被消費(fèi)過的標(biāo)識(shí),在kafka內(nèi)部體現(xiàn)就是一個(gè)遞增的數(shù)字
kafka消息發(fā)送的時(shí)候 ,考慮到性能 可以采用打包方式發(fā)送, 也就是說 傳統(tǒng)的消息是一條一條發(fā)送, 現(xiàn)在可以先把需要發(fā)送的消息緩存在客戶端, 等到達(dá)一定數(shù)值時(shí), 再一起打包發(fā)送, 而且還可以對(duì)發(fā)送的數(shù)據(jù)進(jìn)行壓縮處理,減少在數(shù)據(jù)傳輸時(shí)的開銷

Linux 安裝&啟動(dòng) kafka

截止 2022.08.22 最新版是 3.3.1 官網(wǎng)下載 : https://kafka.apache.org/downloads
linux 部署kafka,MQ消息隊(duì)列,kafka,消息隊(duì)列,laravel

上傳到服務(wù)器,解壓文件

tar -zxvf kafka_2.13-3.3.1.tgz -C /usr/local/

linux 部署kafka,MQ消息隊(duì)列,kafka,消息隊(duì)列,laravel

修改核心配置文件

vim /usr/local/kafka_2.13-3.3.1/config/server.properties
  • broker.id??
    集群環(huán)境中每臺(tái) kafka 都有唯一的 broker.id??, 不能重復(fù)
    broker.id?? 和 log日志中 meta.properties 中的 broker.id?? 一致
  • log.dirs (消息存儲(chǔ)路徑)
log.dirs=/home/servers-kafka/logs/kafka
  • zookeeper.connect (zookeeper連接池地址信息)
  • host.name?? (主機(jī)名稱,為本機(jī)IP地址)
  • 修改 : ??listeners=PLATNTEXT://kafka服務(wù)器IP:9092??

創(chuàng)建數(shù)據(jù)存放目錄

mkdir -p /usr/local/kafka/kafka-logs
mkdir -p /usr/local/kafka/zookeeper-data
mkdir -p /usr/local/kafka/zookeeper-logs

最終配置

broker.id=1
log.dirs=/usr/local/kafka/kafka-logs
zookeeper.connect=139.55.22.212:2181
# 配置文件尾部追加以下內(nèi)容
delete.topic.enable=true
host.name=139.55.22.21

啟動(dòng)

cd /usr/local/kafka/bin

# zookeeper守護(hù)線程啟動(dòng) zookeeper
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties 

# kafka 守護(hù)線程啟動(dòng) kafka
./kafka-server-start.sh -daemon ../config/server.properties 

# 停止 kafka
./kafka-server-stop.sh

# 查看日志
cat /usr/local/kafka/logs/server.log

# kafka 根目錄創(chuàng)建 Topic
cd /usr/local/kafka
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic campaign-logs --replication-factor 1 --partitions 4 

# 查看 topic
# bin/kafka-configs.sh --bootstrap-server IP:端口 --describe --topic 主題名稱
bin/kafka-topics.sh --bootstrap-server localhost:9015 --describe --topic campaign-logs

# 生產(chǎn)者發(fā)布消息
bin/kafka-console-producer.sh --bootstrap-server localhost:9015 --topic campaign-logs

# 消費(fèi)者接受消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9015 --from-beginning --topic campaign-logs

驗(yàn)證 kafk 是否啟動(dòng)成功

jps

linux 部署kafka,MQ消息隊(duì)列,kafka,消息隊(duì)列,laravel

ps -ef | grep kafka

linux 部署kafka,MQ消息隊(duì)列,kafka,消息隊(duì)列,laravel

進(jìn)程存在則啟動(dòng)成功

Topic (主題)

創(chuàng)建

topic命令時(shí)的警告 創(chuàng)建topic的時(shí)候,如果名稱中包含. 或者_(dá),kafka會(huì)拋出警告。原因是:

  1. 在Kafka的內(nèi)部做埋點(diǎn)時(shí)會(huì)根據(jù) topic 的名稱來命名 metrics 的名稱,并且會(huì)將句點(diǎn)號(hào) . 改成下劃線_。假設(shè)遇到一個(gè)topic 的名稱為 topic.1_2,還有一個(gè)topic的名稱為
    topic_1.2,那么最后的metrics的名稱都為topic_1_2,所以就會(huì)發(fā)生名稱沖突。
命名規(guī)則

topic的命名不推薦(雖然可以這樣做)使用雙下劃線__開頭,
因?yàn)橐噪p下劃線開頭的topic一般看作是kafka的內(nèi)部topic,比如__consumer_offsets和__transaction_state。
topic的名稱必須滿足如下規(guī)則:

  1. 由大小寫字母、數(shù)字、. 、- 、_組成
  2. 不能為空、不能為. 、不能為…
  3. 長度不能超過249
# kafka 根目錄創(chuàng)建
cd /usr/local/kafka
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic campaign-logs --replication-factor 1 --partitions 4 

# kafka bin目錄創(chuàng)建
cd /usr/local/kafka/bin
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic campaign-logs --replication-factor 1 --partitions 4 
注意事項(xiàng)

創(chuàng)建topic時(shí),設(shè)置 replication-factor(topic副本)的數(shù)量不能多余啟動(dòng)的broker數(shù)量 kafka
版本不同,創(chuàng)建命令不同,可參考官網(wǎng)文檔
官方文檔:https://kafka.apache.org/documentation/#topicconfigs

查看
# bin/kafka-configs.sh --bootstrap-server IP:端口 --describe --topic 主題名稱
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic campaign-logs

linux 部署kafka,MQ消息隊(duì)列,kafka,消息隊(duì)列,laravel

生產(chǎn)者 (producer)

# 控制臺(tái)啟動(dòng)生產(chǎn)者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic campaign-logs

消費(fèi)者 (consumer)

# 控制臺(tái)啟動(dòng)消費(fèi)者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic campaign-logs
# 拓展配置
# 1. 獲取全部消息 
--from-beginning

異常記錄

啟動(dòng)不成功時(shí),查看服務(wù)日志:

cat /usr/local/kafka/logs/server.log

啟動(dòng)報(bào)錯(cuò):Socket server failed to bind to ip:端口: Cannot assign requested address.

linux 部署kafka,MQ消息隊(duì)列,kafka,消息隊(duì)列,laravel

解決方案

修改 server.properties 調(diào)整監(jiān)聽地址配置

# 示例 listeners=PLAINTEXT://127.0.0.1:端口
listeners=PLAINTEXT://127.0.0.1:555
advertised.listeners=PLAINTEXT://127.0.0.1:666

composer 更新失敗,依賴沖突

linux 部署kafka,MQ消息隊(duì)列,kafka,消息隊(duì)列,laravel

解決方案

conposer >>> require 新增配置, composer update --ignore-platform-reqs 安裝組件
# 1. 單獨(dú)配置后執(zhí)行命令 composer update --ignore-platform-reqs 更新
"symfony/http-kernel": "v4.1.13",
# 2. 再配置 "laravel/lumen-framework":"5.7.*", 后行命令 composer update --ignore-platform-reqs 更新
"laravel/lumen-framework":"5.7.*",

linux 部署kafka,MQ消息隊(duì)列,kafka,消息隊(duì)列,laravel
linux 部署kafka,MQ消息隊(duì)列,kafka,消息隊(duì)列,laravel


Not has broker can connection metadataBrokerList 【沒有broker可以連接metadataBrokerList】

linux 部署kafka,MQ消息隊(duì)列,kafka,消息隊(duì)列,laravel

原因解析

server.properties中有兩個(gè)listeners。

  1. listeners:啟動(dòng)kafka服務(wù)監(jiān)聽的ip和端口,可以監(jiān)聽內(nèi)網(wǎng)ip和0.0.0.0(不能為外網(wǎng)ip),默認(rèn)為java.net.InetAddress.getCanonicalHostName()獲取的ip。
  2. advertised.listeners:生產(chǎn)者和消費(fèi)者連接的地址,kafka會(huì)把該地址注冊(cè)到zookeeper中,所以只能為除0.0.0.0之外的合法ip或域名,默認(rèn)和listeners的配置一致。
解決方案

修改server.properties

# ip可以內(nèi)網(wǎng)、外網(wǎng)ip、127.0.0.1 或域名
advertised.listeners=PLAINTEXT://{ip}:9092  

創(chuàng)建 Topic 時(shí)拋異常:zookeeper is not a recognized option
執(zhí)行創(chuàng)建 topic 命令

# kafka 根路徑創(chuàng)建
cd /usr/local/kafka
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic campaign_logs --replication-factor 2 --partitions 4

linux 部署kafka,MQ消息隊(duì)列,kafka,消息隊(duì)列,laravel

解決方案

執(zhí)行 kafka 命令失敗可能是kafka 版本問題,3.0版本的命令中無需指定 --zookeeper,創(chuàng)建 topic 命令:

# 通過 kafka-topics.sh 腳本創(chuàng)建一個(gè)名為 campaign-logs 并且副本數(shù)為2、分區(qū)數(shù)為4的topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic campaign-logs --replication-factor 2 --partitions 4 

Replication factor: 2 larger than available brokers: 1.

linux 部署kafka,MQ消息隊(duì)列,kafka,消息隊(duì)列,laravel

原因分析

??replication-factor(topic副本)個(gè)數(shù)不能超過broker(服務(wù)器)的個(gè)數(shù)。 如果 kafka 的
broker只有1個(gè),而replication-factor 設(shè)置為2,所以會(huì)報(bào)錯(cuò)。

解決方案

創(chuàng)建topic時(shí),設(shè)置 replication-factor(topic副本)的數(shù)量不能多余啟動(dòng)的broker數(shù)量。此時(shí)正確的創(chuàng)建命令為:

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic campaign-logs --replication-factor 1 --partitions 4 
參考鏈接

Error while executing topic command : Replication factor: 2 larger than available brokers: 1.

Laravel 使用 kafka

composer.json >>> require 新增配置

"require": {
    "laravel/lumen-framework": "5.7.*",
    "symfony/http-kernel": "^4.1.13",
    "nmred/kafka-php": "v0.2.0.8",
},

linux 部署kafka,MQ消息隊(duì)列,kafka,消息隊(duì)列,laravel

執(zhí)行忽略版本匹配更新命令

composer update --ignore-platform-reqs

linux 部署kafka,MQ消息隊(duì)列,kafka,消息隊(duì)列,laravel

.env 文件新增kafka 配置

# kafka主題名稱
KAFKA_TOPIC=campaign-logs
# kafka 地址  KAFKA_URL=IP:端口
KAFKA_URL=135.25.23.56:9092

kafka 服務(wù)類

<?php
/**
 * Kafka 服務(wù)類
 *
 * @author Lee
 * @date 2023.01.18 17:11
 */

namespace App\Services;

use Kafka\Producer;
use Kafka\ProducerConfig;

class KafkaService
{
    public function __construct()
    {
        date_default_timezone_set('PRC');
    }

    /**
     * 生產(chǎn)者
     *
     * @date 2023.01.18 17:13
     * @param $topic
     * @param $value
     * @param $url
     * @return void
     */
    public function Producer($topic, $value , $url)
    {
        $config = ProducerConfig::getInstance();
        $config->setMetadataRefreshIntervalMs(10000);
        $config->setMetadataBrokerList($url);
        $config->setBrokerVersion('1.0.0');
        $config->setRequiredAck(1);
        $config->setIsAsyn(false);
        $config->setProduceInterval(500);
        $producer = new Producer(function () use($value,$topic){
            return [
                [
                    'topic' => $topic,
                    'value' => $value,
                    'key' => '',
                ],
            ];
        });
        $producer->success(function ($result){
            return "success";
        });
        $producer->error(function ($errorCode){
            var_dump($errorCode);
        });
        $producer->send(true);
    }
}

生產(chǎn)消息

/**
 * 生產(chǎn)消息
 *
 * @date 2023.01.18 17:28
 * @return void
 */
public static function produceMsg($data)
{
    /**
     * 配置在env中
     */
    $topic = env('KAFKA_TOPIC');
    $url = env('KAFKA_URL');

    try {
        $value = json_encode($data, JSON_FORCE_OBJECT);
        KafkaService::producer($topic, $value, $url);
    } catch (\Exception $e) {
        dump($e);
    }
}

計(jì)劃任務(wù)消費(fèi)消息

執(zhí)行命令:php artisan consumeMsg

<?php
/**
 * Kafka 消費(fèi)消息計(jì)劃任務(wù)
 *
 * @author Lee
 * @date 2023.01.20 12:17:36
 */

namespace App\Console\Commands;

use Illuminate\Console\Command;

class ConsumeMsg extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'consumeMsg';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Command description';

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        $this->log('開始監(jiān)聽消息...');
        app('kafkaService')->consumer(
            $topics = env('KAFKA_TOPIC'),
            $url = env('KAFKA_URL')
        );
        return $this;
    }

    private function log($msg = '')
    {
        if (!$msg) {
            return $this;
        }
        if (php_sapi_name() == 'cli') {
            echo $msg, PHP_EOL;
        }
        file_put_contents("kafka.log", $msg);
        return $this;
    }
}

config/app.php 中注冊(cè) kafka 服務(wù)文章來源地址http://www.zghlxwxcb.cn/news/detail-784823.html

 'aliases' => [
    'kafkaService' => App\Http\Service\KafkaService::class,
    'consumerKafka' => App\Http\Service\ConsumerService::class
]

配置拓展(可選)

server.properties
# broker的全局唯一編號(hào),不能重復(fù)
broker.id=1

# 監(jiān)聽鏈接的端口, producer或consumer將在此端口建立連接
port=9092

# kafka消息存放的路徑
log.dirs=/home/servers-kafka/logs/kafka

# 處理網(wǎng)絡(luò)請(qǐng)求的線程數(shù)量
num.network.threads=3

# 處理磁盤IO的線程數(shù)量
num.io.threads=8

# 發(fā)送套接字的緩沖區(qū)大小
socket.send.buffer.bytes=102400

# 接受套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400

# 請(qǐng)求套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600

# topic在當(dāng)前broker上的分片個(gè)數(shù)
num.partitions=2

# 恢復(fù)和清理data下數(shù)據(jù)的線程數(shù)量
num.recovery.threads.per.data.dir=1


# segment文件保留的最長時(shí)間,超時(shí)將被刪除
log.retention.hours=168

# 滾動(dòng)生成新的segment文件的最大時(shí)間
log.roll.hours=168

# 日志文件中每個(gè)segment的大小,默認(rèn)為1G
log.segment.bytes=1073741824

# 周期性檢查文件大小的時(shí)間
log.retention.check.interval.ms=300000

# 日志清理是否打開
log.cleaner.enable=true

# broker需要使用zookeeper保存meta數(shù)據(jù)
zookeeper.connect=hadoop02:2181,hadoop03:2181,hadoop04:2181

# zookeeper 連接超時(shí)時(shí)間
zookeeper.connection.timeout.ms=6000

# partion buffer中,消息的條數(shù)達(dá)到閾值,將觸發(fā)flush到磁盤
log.flush.interval.messages=10000

# 消息buffer的時(shí)間,達(dá)到閾值,將觸發(fā)flush到磁盤
log.flush.interval.ms=3000

# 刪除topic需要server.properties中設(shè)置delete.topic.enable=true否則只是標(biāo)記刪除
delete.topic.enable=true

# host.name為本機(jī)IP(重要),
# 如果不改,則客戶端會(huì)拋出:Producerconnection to localhost:9092 unsuccessful 錯(cuò)誤!
host.name=本機(jī)IP

producer.properties

# 指定kafka節(jié)點(diǎn)列表, 用于獲取metadata,不必全部指定
metadata.broker.list=hadoop02:9092,hadoop03:9092

# 指定分區(qū)處理類。默認(rèn)kafka.producer.DefaultPartitioner,表通過key哈希到對(duì)應(yīng)分區(qū)
# partitioner.class=kafka.producer.DefaultPartitioner

# 是否壓縮,默認(rèn)0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮,壓縮后消息中會(huì)有頭來指明消息壓縮類型,故在消費(fèi)者端消息解壓是透明的無需指定
compression.codec=none

# 指定序列化處理類
serializer.class=kafka.serializer.DefaultEncoder

# 如果要壓縮消息,這里指定哪些topic要壓縮消息,默認(rèn)empty,表示不壓縮。
#compressed.topics=

# 設(shè)置發(fā)送數(shù)據(jù)是否需要服務(wù)端的反饋,有三個(gè)值0,1,-1
# 0: producer不會(huì)等待broker發(fā)送ack
# 1: 當(dāng)leader接收到消息之后發(fā)送ack
# -1: 當(dāng)所有的follower都同步消息成功后發(fā)送ack.
request.required.acks=0

# 在向producer發(fā)送ack之前,broker允許等待的最大時(shí)間
# 如果超時(shí),broker將會(huì)向producer發(fā)送一個(gè)error ACK.意味著上一次消息因?yàn)槟撤N原因未能成功(比如follower未能同步成功)
request.timeout.ms=10000

# 同步還是異步發(fā)送消息,默認(rèn)“sync”表同步,"async"表異步。
# 異步可以提高發(fā)送吞吐量,也意味著消息將會(huì)在本地buffer中,并適時(shí)批量發(fā)送,但是也可能導(dǎo)致丟失未發(fā)送過去的消息
producer.type=sync

# 在async模式下,當(dāng)message被緩存的時(shí)間超過此值后,將會(huì)批量發(fā)送給broker,默認(rèn)為5000ms
# 此值和batch.num.messages協(xié)同工作.
queue.buffering.max.ms = 5000

# 在async模式下,producer端允許buffer的最大消息量
# 無論如何,producer都無法盡快的將消息發(fā)送給broker,從而導(dǎo)致消息在producer端大量沉積
# 此時(shí),如果消息的條數(shù)達(dá)到閥值,將會(huì)導(dǎo)致producer端阻塞或者消息被拋棄,默認(rèn)為10000
queue.buffering.max.messages=20000

# 如果是異步,指定每次批量發(fā)送數(shù)據(jù)量,默認(rèn)為200
batch.num.messages=500

# 當(dāng)消息在producer端沉積的條數(shù)達(dá)到"queue.buffering.max.meesages"后
# 阻塞一定時(shí)間后,隊(duì)列仍然沒有enqueue(producer仍然沒有發(fā)送出任何消息)
# 此時(shí)producer可以繼續(xù)阻塞或者將消息拋棄,此timeout值用于控制"阻塞"的時(shí)間
# -1: 無阻塞超時(shí)限制,消息不會(huì)被拋棄
# 0:立即清空隊(duì)列,消息被拋棄
queue.enqueue.timeout.ms=-1

# 當(dāng)producer接收到error ACK,或者沒有接收到ACK時(shí),允許消息重發(fā)的次數(shù)
# 因?yàn)閎roker并沒有完整的機(jī)制來避免消息重復(fù),所以當(dāng)網(wǎng)絡(luò)異常時(shí)(比如ACK丟失)
# 有可能導(dǎo)致broker接收到重復(fù)的消息,默認(rèn)值為3.
message.send.max.retries=3

# producer刷新topicmetada的時(shí)間間隔,producer需要知道partitionleader的位置,以及當(dāng)前topic的情況
# 因此producer需要一個(gè)機(jī)制來獲取最新的metadata,當(dāng)producer遇到特定錯(cuò)誤時(shí),將會(huì)立即刷新
#(比如topic失效,partition丟失,leader失效等),此外也可以通過此參數(shù)來配置額外的刷新機(jī)制,默認(rèn)值600000
topic.metadata.refresh.interval.ms=60000

consumer.properties

# zookeeper連接服務(wù)器地址
zookeeper.connect=hadoop02:2181,hadoop03:2181,hadoop04:2181

# zookeeper的session過期時(shí)間,默認(rèn)5000ms,用于檢測消費(fèi)者是否掛掉
zookeeper.session.timeout.ms=5000

#當(dāng)消費(fèi)者掛掉,其他消費(fèi)者要等該指定時(shí)間才能檢查到并且觸發(fā)重新負(fù)載均衡
zookeeper.connection.timeout.ms=10000

# 指定多久消費(fèi)者更新offset到zookeeper中。注意offset更新時(shí)基于time而不是每次獲得的消息。一旦在更新zookeeper發(fā)生異常并重啟,將可能拿到已拿到過的消息
zookeeper.sync.time.ms=2000

#指定消費(fèi)組
group.id=xxx

# 當(dāng)consumer消費(fèi)一定量的消息之后,將會(huì)自動(dòng)向zookeeper提交offset信息
# 注意offset信息并不是每消費(fèi)一次消息就向zk提交一次,而是現(xiàn)在本地保存(內(nèi)存),并定期提交,默認(rèn)為true
auto.commit.enable=true

# 自動(dòng)更新時(shí)間。默認(rèn)60 * 1000
auto.commit.interval.ms=1000

# 當(dāng)前consumer的標(biāo)識(shí),可以設(shè)定,也可以有系統(tǒng)生成,主要用來跟蹤消息消費(fèi)情況,便于觀察
conusmer.id=xxx

# 消費(fèi)者客戶端編號(hào),用于區(qū)分不同客戶端,默認(rèn)客戶端程序自動(dòng)產(chǎn)生
client.id=xxxx

# 最大取多少塊緩存到消費(fèi)者(默認(rèn)10)
queued.max.message.chunks=50

# 當(dāng)有新的consumer加入到group時(shí),將會(huì)reblance,此后將會(huì)有partitions的消費(fèi)端遷移到新  的consumer上,
# 如果一個(gè)consumer獲得了某個(gè)partition的消費(fèi)權(quán)限,那么它將會(huì)向zk注冊(cè)"Partition Owner registry"節(jié)點(diǎn)信息,
# 但是有可能此時(shí)舊的consumer尚沒有釋放此節(jié)點(diǎn), 此值用于控制,注冊(cè)節(jié)點(diǎn)的重試次數(shù).
rebalance.max.retries=5

# 獲取消息的最大尺寸,broker不會(huì)像consumer輸出大于此值的消息chunk 每次feth將得到多條消息,此值為總大小,提升此值,將會(huì)消耗更多的consumer端內(nèi)存
fetch.min.bytes=6553600

# 當(dāng)消息的尺寸不足時(shí),server阻塞的時(shí)間,如果超時(shí),消息將立即發(fā)送給consumer
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360

# 如果zookeeper沒有offset值或offset值超出范圍。那么就給個(gè)初始的offset。有smallest、largest、anything可選,分別表示給當(dāng)前最小的offset、當(dāng)前最大的offset、拋異常。默認(rèn)largest
auto.offset.reset=smallest

# 指定序列化處理類
derializer.class=kafka.serializer.DefaultDecoder

參考文獻(xiàn)

  • 這是最詳細(xì)的Kafka應(yīng)用教程
  • 一文讀懂|Kafka 開發(fā)基礎(chǔ)
  • 四種消息中間件分析介紹(ActiveMQ、RabbitMQ、RocketMQ、Kafka
  • kafka無法啟動(dòng),Cannot assign requested address.
  • 字節(jié)面試官: 讓你設(shè)計(jì)一個(gè)MQ每秒要抗幾十萬并發(fā),怎么做?
  • Laravel 中 Kafka 的使用詳解
  • Laravel 實(shí)現(xiàn) Kafka 消息推送與接收處理
  • 基于 Laravel 構(gòu)建的速度最快的微框架(micro-framework)
  • 詳解Laravel中Kafka的使用實(shí)例是什么樣的
  • Lumen 中文文檔
  • 30個(gè)Kafka常見錯(cuò)誤小集合

到了這里,關(guān)于Linux部署Kafka及常見問題記錄的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • flink開發(fā)常見問題 —— flink-kafka 依賴版本沖突問題

    flink開發(fā)常見問題 —— flink-kafka 依賴版本沖突問題

    由于 flink / kafka 的版本不斷更新,創(chuàng)建項(xiàng)目的時(shí)候就應(yīng)當(dāng)考慮清楚這幾個(gè)依賴庫的版本問題,盡可能地與實(shí)際場景保持一致,比如服務(wù)器上部署的 kafka 是哪個(gè)版本,flink 是哪個(gè)版本,從而確定我們需要開發(fā)的是哪個(gè)版本,并且在真正的開發(fā)工作開始之前,應(yīng)當(dāng)先測試一下保證

    2024年02月07日
    瀏覽(30)
  • Kafka如何保證消息的消費(fèi)順序【全局有序、局部有序】、Kafka如何保證消息不被重復(fù)消費(fèi)、Kafka為什么這么快?【重點(diǎn)】、Kafka常見問題匯總【史上最全】

    Kafka如何保證消息的消費(fèi)順序【全局有序、局部有序】、Kafka如何保證消息不被重復(fù)消費(fèi)、Kafka為什么這么快?【重點(diǎn)】、Kafka常見問題匯總【史上最全】

    目錄 Kafka消息生產(chǎn) 一個(gè)Topic對(duì)應(yīng)一個(gè)Partition 一個(gè)Topic對(duì)應(yīng)多個(gè)Partition Kafka消息的順序性保證(Producer、Consumer) 全局有序 局部有序? max.in.flight.requests.per.connection參數(shù)詳解 Kafka的多副本機(jī)制 Kafka的follower從leader同步數(shù)據(jù)的流程 Kafka的follower為什么不能用于消息消費(fèi) Kafka的多分區(qū)

    2024年04月11日
    瀏覽(24)
  • 麒麟Linux常見問題

    1)報(bào)錯(cuò)信息 2)原因 可能是下載地址不維護(hù)失效了。 3)解決 (1)更換下載源 對(duì)于麒麟V10 x86,對(duì)文件 /etc/yum.repos.d/kylin_x86_64.repo 備份后修改內(nèi)容如下: 對(duì)于麒麟V10 arm 64,對(duì)文件 /etc/yum.repos.d/kylin_aarch64.repo 備份后修改內(nèi)容如下: (2)緩存處理 4)驗(yàn)證

    2024年02月05日
    瀏覽(25)
  • linux下常見編譯問題

    CMakeLists.txt 添加

    2024年02月12日
    瀏覽(21)
  • Linux中常見的權(quán)限問題

    Linux中常見的權(quán)限問題

    在了解完上一篇文章 Linux權(quán)限的理解與操作 之后,還有一些比較常見的權(quán)限問題需要我們?nèi)チ私?。其中包括目錄的?quán)限,umask 以及 粘滯位的使用。 問題一:進(jìn)入一個(gè)目錄,需要什么權(quán)限? —— 可執(zhí)行權(quán)限(x) 問題二:查看目錄下的文件列表,需要什么權(quán)限? —— 讀權(quán)限

    2024年02月07日
    瀏覽(21)
  • Linux常見問題-打deb包流程

    Deb打包目的:將程序打包成.deb格式是為了在Debian和Ubuntu等基于Debian的Linux發(fā)行版上進(jìn)行方便的安裝和管理。以下是一個(gè)簡要的流程,以一個(gè)輸出 \\\"Hello World\\\" 的C++程序?yàn)槔?確保你的系統(tǒng)安裝了構(gòu)建工具,如g++(用于編譯C++程序)和dpkg-deb(用于創(chuàng)建.deb文件)。在項(xiàng)目目錄中,

    2024年02月14日
    瀏覽(19)
  • LINUX常見問題之oom kill

    OOM(Out Of Memory)機(jī)制為Linux內(nèi)核中一種自我保護(hù)機(jī)制,當(dāng)系統(tǒng)分配不出內(nèi)存時(shí)(觸發(fā)條件)會(huì)觸發(fā)這個(gè)機(jī)制,由系統(tǒng)在已有進(jìn)程中挑選一個(gè)占用內(nèi)存較多,回收內(nèi)存收益最大的進(jìn)程殺掉來釋放內(nèi)存。 Linux下允許程序申請(qǐng)比系統(tǒng)可用內(nèi)存更多的內(nèi)存(如malloc函數(shù)),這個(gè)特性叫Overcommi

    2024年02月02日
    瀏覽(16)
  • linux查看WWN號(hào)及常見問題解決

    linux查看WWN號(hào)及常見問題解決

    要查看CentOS 6.7版本的WWN號(hào),可以執(zhí)行以下步驟: 1.確保已經(jīng)連接了存儲(chǔ)設(shè)備。 2.在終端中輸入命令:lsscsi,然后按 Enter 鍵。該命令會(huì)顯示已連接的存儲(chǔ)設(shè)備的信息。 3.找到你想查看WWN號(hào)的存儲(chǔ)設(shè)備,并查看其 WWN 號(hào)。WWN 號(hào)通常在類似于[X:X:X:X]格式的信息中給出,其中 X 是一

    2024年02月05日
    瀏覽(17)
  • 常見Linux 命令,可以解決日常99%的問題

    常見Linux 命令,可以解決日常99%的問題

    1、基本命令 2、關(guān)機(jī) 3、文件和目錄 4、文件搜索 5、掛載一個(gè)文件系統(tǒng) 6、磁盤空間 7、用戶和群組 8、文件的權(quán)限 9、文件的特殊屬性 使用 “+” 設(shè)置權(quán)限,使用 “-” 用于取消 10、打包和壓縮文件 11、RPM 包 (Fedora, Redhat及類似系統(tǒng)) 12、YUM 軟件包升級(jí)器 (Fedora, RedHat及類

    2024年01月22日
    瀏覽(18)
  • Linux 服務(wù)器文件名亂碼常見問題

    在 Linux 服務(wù)器中,文件名亂碼是一個(gè)常見的問題,特別是當(dāng)涉及到多語言字符集時(shí)。這可能導(dǎo)致文件名顯示異常,無法正確識(shí)別和處理文件。本文將介紹一些常見的文件名亂碼問題以及相應(yīng)的解決方法。 字符集問題 文件名亂碼的一個(gè)常見原因是字符集不匹配。當(dāng)文件名包含

    2024年02月05日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包