国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

kafka的安裝,用于數據庫同步數據

這篇具有很好參考價值的文章主要介紹了kafka的安裝,用于數據庫同步數據。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

1.0 背景調研

因業(yè)務需求,需要查詢其他部門的數據庫數據,不方便直連數據庫,所以要定時將他們的數據同步到我們的環(huán)境中,技術選型選中了kafka+CDC

Kafka是Apache旗下的一款分布式流媒體平臺,Kafka是一種高吞吐量、持久性、分布式的發(fā)布訂閱的消息隊列系統(tǒng)。 它最初由LinkedIn(領英)公司發(fā)布,使用Scala語言編寫,與2010年12月份開源,成為Apache的頂級子項目。 它主要用于處理消費者規(guī)模網站中的所有動作流數據。動作指(網頁瀏覽、搜索和其它用戶行動所產生的數據)。

1.1 kafka核心

Kafka 的核心概念是消息系統(tǒng),其中包含以下幾個重要組件:

Topic(主題):消息的分類或者說是邏輯上的區(qū)域。數據被發(fā)布到 Kafka 集群的 Topic 中,并且消費者對特定 Topic 進行訂閱來消費這些數據。

Producer(生產者):負責向 Kafka 集群中的 Topic 發(fā)布消息,可以是任何發(fā)送數據的應用程序。

Consumer(消費者):訂閱一個或多個 Topic,從 Broker 中拉取數據并進行處理。消費者可以以組的形式組織,每個組只能有一個 Consumer 對 Topic 的每個分區(qū)進行消費。

Broker(代理服務器):Kafka 集群中的每個節(jié)點都是一個 Broker,負責管理數據的存儲和傳輸,處理生產者和消費者的請求。

1.2 技術比對

相比于 Cattle,Kafka 具備以下優(yōu)勢:

高吞吐量和低延遲:Kafka 使用簡單而高效的數據存儲機制,具備高度可伸縮性,能夠處理大規(guī)模的數據流。它的設計目標是支持每秒數百萬條消息的處理。

分布式和可擴展:Kafka 可以在多個 Broker 節(jié)點上進行水平擴展,通過分區(qū)機制實現負載均衡和容錯性。

持久化存儲:Kafka 使用日志結構存儲消息,提供了高效的磁盤持久化功能,確保數據的可靠性和持久性。

多語言支持:Kafka 提供豐富的客戶端庫,支持多種編程語言,包括 Java、Python、Go、C++ 等,方便開發(fā)者進行集成和使用。

生態(tài)系統(tǒng)豐富:Kafka 生態(tài)系統(tǒng)提供了許多與之配套的工具和服務,例如 Kafka Connect 用于數據集成,KSQL 實現流處理,以及一些第三方工具用于監(jiān)控、管理和運維等。

總的來說,Kafka 是一個高性能、可靠性強且具備良好擴展性的分布式流處理平臺,適用于實時數據流處理、消息隊列、日志收集等各種場景。相比之下,Cattle 在數據傳輸和流處理方面的功能可能較為有限,但具體選擇還需根據業(yè)務需求和技術棧來進行評估和決策。

2.0 安裝環(huán)境

Linux服務器

Xshell工具

Jdk

Kafka

Docker

Windows服務器

消費者系統(tǒng)(我們用的是.net,可使用其他語言代替)

。。。。。。

3.0 目標源數據庫配置步驟

需要在目標源數據庫增加個賬號給maxwell使用,這里以mysql為例

新建mysql的帳號

CREATE USER ' maxwell用戶名'@'%' IDENTIFIED BY '密碼';

CREATE USER 'maxwell用戶名'@'localhost' IDENTIFIED BY '密碼';

給新賬號賦權限(這個是全部權限)

GRANT ALL ON maxwell用戶名.* TO 'maxwell用戶名'@'%';

GRANT ALL ON maxwell用戶名.* TO 'maxwell用戶名'@'localhost';

給新賬號賦權限(這個是指定權限,兩個執(zhí)行一個就行)

GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell用戶名'@'%';

GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell用戶名'@'localhost';

4.0 Linux服務器配置步驟

4.1 安裝基礎工具

4.1.1 lrzsz(文件傳輸工具)

1步:安裝lrzsz,這個工具可以直接拖拽文件上傳到服務器

yum install lrzsz –y

執(zhí)行結果示例

kafka的安裝,用于數據庫同步數據,linux,kafka,數據庫,分布式

2步:新建目錄,用來存儲安裝包

mkdir /home/software

3步:把安裝包傳輸到linux服務器上,拖拽到/home/software

CD /home/software

Kafka安裝包本次使用的是2.13版本,開始拖拽文件到Xshell窗口下

kafka_2.13-3.5.0.tgz

執(zhí)行結果示例

kafka的安裝,用于數據庫同步數據,linux,kafka,數據庫,分布式

kafka的安裝,用于數據庫同步數據,linux,kafka,數據庫,分布式

執(zhí)行l(wèi)s查看目錄下是否有文件

Ls

執(zhí)行結果示例

kafka的安裝,用于數據庫同步數據,linux,kafka,數據庫,分布式

4.1.2 vim(文本編輯器)

1步:安裝vim,這個是文本編輯器

yum?install?vim?–y

執(zhí)行結果示例

kafka的安裝,用于數據庫同步數據,linux,kafka,數據庫,分布式

可通過vim命令打開文本,配置文件等

4.2 安裝Java

1步:安裝jdk,執(zhí)行命令,*代表安裝所有相關的包,yum install -y java-1.8.0-openjdk.x86_64如果執(zhí)行這個命令,則只安裝jdk本身,后續(xù)的軟件安裝運行會出現依賴包不全等問題,建議使用*來安裝

yum install -y java-1.8.0-openjdk*

執(zhí)行結果示例

kafka的安裝,用于數據庫同步數據,linux,kafka,數據庫,分布式

2步:查詢java版本,檢驗第一步是否安裝成功,同時查出java版本號,為后面配置環(huán)境變量做準備

rpm -qa | grep java

執(zhí)行結果示例

kafka的安裝,用于數據庫同步數據,linux,kafka,數據庫,分布式

3步:配置java環(huán)境變量

輸入以下命令打開配置文件

vi /etc/profile

按鍵pgdn跳到最后一行 按鍵O 新插下一行

將下面文件添加到配置文件中 注意:替換java版本號

export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.382.b05-1.el7_9.x86_64

export JRE_HOME=/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.382.b05-1.el7_9.x86_64

export CLASSPATH=.:$JRE_HOME/lib

export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin

按鍵esc切換命令模式,輸入:wq退出保存

4步:刷新環(huán)境變量,讓新加的java配置生效

source /etc/profile

5步:檢查java是否安裝成功,輸入后會提示java版本號,如果第3步配置錯誤,會提示java命令不存在

java –version

執(zhí)行結果示例

kafka的安裝,用于數據庫同步數據,linux,kafka,數據庫,分布式

4.3 安裝kafka

1步:新建目錄,用來存儲kafka

mkdir /home/kafka

2步:要解壓 tgz 文件,可以使用 Linux 系統(tǒng)自帶的 tar 命令。文件在4.1.1中上傳到了服務器

tar –xzvf kafka_2.13-3.5.0.tgz -C /home/kafka

執(zhí)行結果示例

kafka的安裝,用于數據庫同步數據,linux,kafka,數據庫,分布式

tar命令解釋(無特殊情況不需要第3步后續(xù)操作,了解就好):

tar -xzvf kafka_2.13-3.5.0.tgz

x:表示解壓

z:表示使用 gzip 壓縮

v:表示顯示詳細的解壓過程

f:表示指定要解壓的文件

解壓完成后,解壓出來的文件會放在當前目錄下。如果要將解壓出來的文件放在其他目錄,可以在命令中使用 -C 選項指定目錄,例如:

tar -xzvf kafka_2.13-3.5.0.tgz -C /home/kafka

這樣就可以將解壓出來的文件放在 /home/kafka 目錄下。

4步:因為解壓后,多了一層目錄【mv kafka_2.13-3.5.0】,所以要移動目錄

mv kafka_2.13-3.5.0/* ./

然后刪除空目錄

rmdir kafka_2.13-3.5.0

3步:配置kafka,打開kafka配置文件

vim?/home/kafka/config/server.properties

修改一下配置,注釋掉的要打開注釋,刪除掉#

socket服務端地址
listeners=PLAINTEXT://【本機IP自行替換】:9092

偵聽器名稱、主機名和端口代理將通知給客戶端。

advertised.listeners=PLAINTEXT:// 【本機IP自行替換】:9092

日志文件路徑
log.dirs=/home/kafka_data/logs

zookeeper地址,zookeeper是用來監(jiān)聽kafka源數據變化
zookeeper.connect=【本機IP自行替換】:2181

4步:創(chuàng)建日志的文件夾,如果有就不用創(chuàng)建了

mkdir -p /home/kafka_data/logs

-p 是創(chuàng)建多級目錄

4.4 安裝docker

sudo是一個用于在Linux系統(tǒng)上獲得超級用戶權限的命令。它允許普通用戶以超級用戶身份執(zhí)行特權命令。

1步:安裝yum-utils,yum-utils是yum的一個附加軟件包,提供了一些額外的功能和工具。

sudo yum install -y yum-utils

執(zhí)行結果示例

kafka的安裝,用于數據庫同步數據,linux,kafka,數據庫,分布式

2步:添加存儲庫的地址

sudo yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo

執(zhí)行結果示例

kafka的安裝,用于數據庫同步數據,linux,kafka,數據庫,分布式

3步:安裝docker相關程序

sudo yum install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin

docker-ce是Docker的主要軟件包,包含了Docker的核心功能;

docker-ce-cli是Docker的命令行界面工具;

containerd.io是Docker的底層容器運行時,負責管理容器的生命周期;

docker-buildx-plugin是Docker的多平臺構建插件,可以跨多種平臺構建容器鏡像;

docker-compose-plugin是Docker的應用程序容器編排工具,可以快速構建、啟動和管理多個Docker容器應用。

執(zhí)行結果示例

kafka的安裝,用于數據庫同步數據,linux,kafka,數據庫,分布式

輸入y繼續(xù)

kafka的安裝,用于數據庫同步數據,linux,kafka,數據庫,分布式

輸入y繼續(xù)

kafka的安裝,用于數據庫同步數據,linux,kafka,數據庫,分布式

安裝完成!?。?/p>

4步:配置docker

vi /etc/docker/daemon.json

添加配置項文件

{

? "registry-mirrors": ["https://5twf62k1.mirror.aliyuncs.com"],

? "insecure-registries": ["sinoeyes.io","hub.sinoeyes.com"]

}

按鍵esc切換命令模式,輸入:wq退出保存,保存后會生成文件

registry-mirrors屬性用于指定Docker鏡像的加速器地址。由于Docker鏡像可能分布在全球不同的地方,而各地的網絡環(huán)境和速度也不同,因此使用鏡像加速器可以顯著提高拉取鏡像的速度和穩(wěn)定性。例如,將registry-mirrors屬性設置為http://f1361db2.m.daocloud.io即可使用DaoCloud鏡像加速器。

insecure-registries屬性用于指定Docker容器鏡像的非安全注冊表地址。如果您使用的是私有Docker注冊表,并且此注冊表的TLS證書未經過驗證或過期,則可以添加該注冊表的URL以允許使用不安全的HTTP協(xié)議進行拉取和推送鏡像。例如,將insecure-registries屬性設置為["myregistry.example.com:5000"]即可允許使用不安全的HTTP協(xié)議訪問myregistry.example.com:5000注冊表。

5步:輸入ls查看是否有文件生成

CD /etc/docker/

Ls

執(zhí)行結果示例

kafka的安裝,用于數據庫同步數據,linux,kafka,數據庫,分布式

6步:docker-compose部署,使用curl工具下載Docker Compose程序,并將其保存到/usr/local/bin/docker-compose路徑下。

curl -L https://github.com/docker/compose/releases/download/1.21.1/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose

執(zhí)行結果示例

kafka的安裝,用于數據庫同步數據,linux,kafka,數據庫,分布式

這里我因網絡原因,下載失敗,如果命令正常執(zhí)行,不用進行第6步的后續(xù)。

手動上傳文件到目錄下,將docker-compose里面的文件全部上傳到這個路徑下【/usr/local/bin】。

kafka的安裝,用于數據庫同步數據,linux,kafka,數據庫,分布式

7步:該命令的作用就是將/usr/local/bin/docker-compose文件的權限設置為可執(zhí)行權限,以便用戶可以直接運行這個文件并使用Docker Compose的功能。

chmod +x /usr/local/bin/docker-compose

8步:檢查Docker Compose是否安裝成功

docker-compose -v

執(zhí)行結果示例

kafka的安裝,用于數據庫同步數據,linux,kafka,數據庫,分布式

4.5 運行kafka

1步:新建目錄

mkdir -p /home/app/maxwell/conf

2步:在/home/app/maxwell下創(chuàng)建文件docker-compose.yml

vim docker-compose.yml

按鍵I進入編輯模式輸入以下文件內容

version: '3.5'

services:

? maxwell:

??? restart: always

??? image: zendesk/maxwell

??? container_name: maxwell

??? network_mode: "host"

??? command: bin/maxwell --config /etc/maxwell/config.properties

??? #command: bin/maxwell-bootstrap --config /etc/maxwell/config.properties

??? volumes:

????? - ./conf:/etc/maxwell/

??? environment:

????? - "TZ=Asia/Shanghai"

按鍵esc切換命令模式,輸入:wq退出保存,保存后會生成文件

maxwell容器的編排文件,以后啟動maxwell時更方便

3步:在/home/app/maxwell/conf下創(chuàng)建文件config.properties

vim config.properties

按鍵I進入編輯模式,輸入以下內容,根據情況替換信息,關鍵信息紅色標出

daemon=true

# 第一次啟動時建議改為debug,可以開到mysql數據與kafka請求,穩(wěn)定后再改為info

#log_level=info

log_level=info

producer=kafka

kafka.bootstrap.servers=【本機IP自行替換】:9092

# 會往 kafka下主題為'test'的分區(qū)下推送數據

kafka_topic=maxwell

#producer_partition_by設置為table時,Maxwell會將生成的消息根據表名稱進行分區(qū),不同的表將會被分配到不同的分區(qū)中,默認為database

producer_partition_by=table

# client_id=maxwell_1

client_id=sddi-consumer-group-1-client-1

# mysql login info 需要先在mysql創(chuàng)建maxwell用戶

host=【目標源數據庫IP

# port=33066

port=3306

user=【數據庫賬號】

password=【數據庫密碼】

schema_database=maxwell

replication_host=【目標源數據庫IP

replication_user=【數據庫賬號】

replication_password=【數據庫密碼】

replication_port=3306

jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai

# exclude_dbs=*

# 同步的數據表

include_dbs=sddi 數據庫名】

inlcude_tables=sddi.b_missfile_info,sddi.b_pharm_info, e,sddi.f_pbt_file,同步的表,逗號隔開】

#inlcude_tables=sddi.b_missfile_info,sddi.b_pharm_info

~???

按鍵esc切換命令模式,輸入:wq退出保存,保存后會生成文件????????????????????????????????????????

4步:啟動docker

systemctl start docker

5步:啟動zookeeper,以守護線程的方式,這個是kafka集成的消息隊列

bin/zookeeper-server-start.sh -daemon /home/kafka/config/zookeeper.properties

如果出現錯誤,沒有那個文件或目錄,則進行一下操作

kafka的安裝,用于數據庫同步數據,linux,kafka,數據庫,分布式

先把zookeeper-server-start.sh添加到環(huán)境變量

echo?"export?PATH=$PATH:/home/kafka/bin"?>> /root/.bash_profile

source?/root/.bash_profile

然后執(zhí)行zookeeper命令

zookeeper-server-start.sh -daemon /home/kafka/config/zookeeper.properties

6步:動kafka,以守護線程的方式

kafka-server-start.sh -daemon /home/kafka/config/server.properties

7步:在 Kafka 中創(chuàng)建一個名為 maxwell 的新主題(Topic)

kafka-topics.sh?--bootstrap-server?192.168.180.31:9092?--create?--topic?maxwell

8步:啟動maxwell

cd /home/app/maxwell

因為第1步和第2步創(chuàng)建了配置文件,所以這里可以省略很多參數,直接啟動maxwell

docker-compose up -d

執(zhí)行結果示例

kafka的安裝,用于數據庫同步數據,linux,kafka,數據庫,分布式

9步:查看docker的maxwell容器

列出所有容器

docker ps

查詢指定容器的日志

docker logs --tail 100 -f 51b978d72157

10步:查看maxwell的下發(fā)消息(數據量大慎用)

kafka-console-consumer.sh --topic maxwell --from-beginning --bootstrap-server 192.168.180.31:9092

5.0 后期運維

查詢所有的消費組

kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --list

查看某個消費組的消費情況

kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --describe --group sddi-consumer-group-1

kafka的安裝,用于數據庫同步數據,linux,kafka,數據庫,分布式

屬性介紹

列頭

實例值

描述

GROUP

sddi-consumer-group-1

消費者組的名稱

TOPIC

maxwell

所訂閱的主題。

PARTITION

0

主題的分區(qū)編號。

CURRENT-OFFSET

666520

消費者當前的偏移量(已經消費到的消息的偏移量)。

LOG-END-OFFSET

698913

當前分區(qū)日志的最新偏移量(該分區(qū)中的消息總數)。

LAG

32393

當前落后的偏移量數量(LAG = LOG-END-OFFSET - CURRENT-OFFSET),表示消費者還未消費的消息數量。

CONSUMER-ID

sddi-consumer-group-1-client-1-8cc14037-bf1c-43eb-8094-2c4e45e5cc04

消費者的唯一標識符。

HOST

/192.168.180.18

消費者所在的主機名。

CLIENT-ID

sddi-consumer-group-1-client-1

消費者的客戶端標識符。

這些信息對于監(jiān)控和管理 Kafka 消費者組以及消費狀態(tài)非常有用。可以根據需要使用不同的選項來查看和管理消費者組的狀態(tài)。

初始化maxwell 測試環(huán)境 命令是單個表初始化同步

docker run -it --network host --rm zendesk/maxwell bin/maxwell-bootstrap --user=maxwell --password=rPq60r4BUA19@ --host=192.168.17.22 -database sddi --table b_business_info --client_id=sddi-consumer-group-1-client-1

停止maxwell (只是記錄命令,可以不執(zhí)行)

cd /home/app/maxwell

docker-compose down

停止kafka

kafka-server-stop.sh

停止zookeeper

zookeeper-server-stop.sh

查看容器的配置

docker inspect maxwell

查看主題列表

kafka-topics.sh --list --bootstrap-server 192.168.180.31:9092

創(chuàng)建主題

kafka-topics.sh --bootstrap-server 192.168.180.31:9092 --create --topic maxwell --replication-factor 1 --partitions 3

查看主題信息

kafka-topics.sh --bootstrap-server 192.168.180.31:9092 --describe --topic maxwell

使用生產者發(fā)送消息

kafka-console-producer.sh --broker-list 192.168.180.31:9092 --topic maxwell

使用消息者接受消息(從起始位置開始查看)

kafka-console-consumer.sh --bootstrap-server 192.168.180.31:9092 --topic maxwell --from-beginning

使用消息者接受消息(從當前位置開始查看)

kafka-console-consumer.sh --bootstrap-server 192.168.180.31:9092 --topic maxwell

查看kafka的所有消費組

kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --list

查看kafka日志

tail -10000f /home/kafka/logs/server.log

查看zookeeper日志

tail -1000f /home/kafka/logs/zookeeper.out

查看maxwell日志

cd /home/app/maxwell

docker-compose logs -f --tail 100

Earliest 策略直接指定**–to-earliest**。

kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --all-topics --to-earliest –execute

kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --topics maxwell --to-earliest –execute

Latest 策略直接指定**–to-latest**。

kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --all-topics --to-latest --execute

Current 策略直接指定**–to-current**。

kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --all-topics --to-current --execute

Specified-Offset 策略直接指定**–to-offset**。

kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --all-topics --to-offset 51691 --execute

Shift-By-N 策略直接指定**–shift-by N**。

kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --topic maxwell --shift-by 1 --execute

DateTime 策略直接指定**–to-datetime**。

kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute

最后是實現 Duration 策略,我們直接指定**–by-duration**

kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --by-duration PT0H30M0S --execute

Earliest 策略表示將位移調整到主題當前最早位移處。這個最早位移不一定就是 0,因為在生產環(huán)境中,很久遠的消息會被 Kafka 自動刪除,所以當前最早位移很可能是一個大于 0 的值。如果你想要重新消費主題的所有消息,那么可以使用 Earliest 策略。

Latest 策略表示把位移重設成最新末端位移。如果你總共向某個主題發(fā)送了 15 條消息,那么最新末端位移就是 15。如果你想跳過所有歷史消息,打算從最新的消息處開始消費的話,可以使用 Latest 策略。

Current 策略表示將位移調整成消費者當前提交的最新位移。有時候你可能會碰到這樣的場景:你修改了消費者程序代碼,并重啟了消費者,結果發(fā)現代碼有問題,你需要回滾之前的代碼變更,同時也要把位移重設到消費者重啟時的位置,那么,Current 策略就可以幫你實現這個功能。

表中第 4 行的 Specified-Offset 策略則是比較通用的策略,表示消費者把位移值調整到你指定的位移處。這個策略的典型使用場景是,消費者程序在處理某條錯誤消息時,你可以手動地“跳過”此消息的處理。在實際使用過程中,可能會出現 corrupted 消息無法被消費的情形,此時消費者程序會拋出異常,無法繼續(xù)工作。一旦碰到這個問題,你就可以嘗試使用 Specified-Offset 策略來規(guī)避。

如果說 Specified-Offset 策略要求你指定位移的絕對數值的話,那么 Shift-By-N 策略指定的就是位移的相對數值,即你給出要跳過的一段消息的距離即可。這里的“跳”是雙向的,你既可以向前“跳”,也可以向后“跳”。比如,你想把位移重設成當前位移的前 100 條位移處,此時你需要指定 N 為 -100。

剛剛講到的這幾種策略都是位移維度的,下面我們來聊聊從時間維度重設位移的 DateTime 和 Duration 策略。

DateTime 允許你指定一個時間,然后將位移重置到該時間之后的最早位移處。常見的使用場景是,你想重新消費昨天的數據,那么你可以使用該策略重設位移到昨天 0 點。

Duration 策略則是指給定相對的時間間隔,然后將位移調整到距離當前給定時間間隔的位移處,具體格式是 PnDTnHnMnS。如果你熟悉 Java 8 引入的 Duration 類的話,你應該不會對這個格式感到陌生。它就是一個符合 ISO-8601 規(guī)范的 Duration 格式,以字母 P 開頭,后面由 4 部分組成,即 D、H、M 和 S,分別表示天、小時、分鐘和秒。舉個例子,如果你想將位移調回到 15 分鐘前,那么你就可以指定 PT0H15M0S。

6.0 消費者系統(tǒng)

我們這里采用的是.net平臺的c#語言,編寫了windows server程序,來處理隊列數據

使用的類庫是Confluent.Kafka

6.1 Confluent.Kafka

Confluent.Kafka是一個流行的開源Kafka客戶端庫,它提供了在.NET應用程序中與Apache Kafka進行交互的功能。下面是對Confluent.Kafka類庫的介紹:

高級API:Confluent.Kafka為開發(fā)人員提供了一組簡單易用的高級API,用于連接到Kafka集群、讀取和寫入消息、管理消費者組等。你可以方便地使用它來發(fā)送消息到Kafka主題或從主題中消費消息。

完全支持Kafka協(xié)議:Confluent.Kafka完全支持Kafka協(xié)議,包括Kafka 1.0.0及更高版本。它與最新的Kafka版本保持同步,并提供了一致性和穩(wěn)定性。

高性能:Confluent.Kafka經過優(yōu)化,具有出色的性能表現。它采用了異步、無鎖的設計,支持高并發(fā)的消息處理。

配置靈活:Confluent.Kafka提供了豐富的配置選項,可以根據實際需求進行調整。你可以配置消息的傳遞語義、消費者的批量讀取、消息序列化和反序列化方式等。

支持消息引擎擴展:Confluent.Kafka還支持通過插件機制擴展消息引擎,例如支持Avro、Protobuf等消息格式的插件。

社區(qū)活躍:Confluent.Kafka是一個受歡迎且活躍的開源項目,擁有強大的社區(qū)支持。你可以在社區(qū)中獲得幫助、提出問題或提交改進建議。

總之,Confluent.Kafka是一個功能強大、性能優(yōu)越的Kafka客戶端庫,為.NET開發(fā)人員提供了與Apache Kafka無縫集成的能力,使他們能夠輕松地構建可靠的消息流應用程序。

6.2 相關連接

https://github.com/confluentinc/confluent-kafka-dotnet/

http://www.bilibili996.com/Course?id=1159444000368

https://zhuanlan.zhihu.com/p/139101754?utm_id=0文章來源地址http://www.zghlxwxcb.cn/news/detail-830908.html

到了這里,關于kafka的安裝,用于數據庫同步數據的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!

本文來自互聯(lián)網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 十萬字圖文詳解mysql、redis、kafka、elasticsearch(ES)多源異構不同種類數據庫集成、數據共享、數據同步、不同中間件技術實現與方案,如何構建數據倉庫、數據湖、數倉一體化?

    數據庫大數據量、高并發(fā)、高可用解決方案,十萬字圖文詳解mysql、redis、kafka、elasticsearch(ES)多源異構不同種類數據庫集成、數據共享、數據同步、不同中間件技術實現與方案,如何構建數據倉庫、數據湖、數倉一體化?Delta Lake、Apache Hudi和Apache Iceberg數倉一體化技術架構

    2024年02月07日
    瀏覽(56)
  • Linux達夢數據庫安裝

    Linux達夢數據庫安裝

    說明: 達夢官方推薦是創(chuàng)建新的用戶進行安裝,但是在公司自己私創(chuàng)用戶是違規(guī)的,也可以直接使用root用戶進行安裝,新用戶和使用root安裝不同點我會標注出來。 a、創(chuàng)建用戶組dinstall. ??groupadd dinstall ?b、創(chuàng)建安裝用戶dmdba. ??useradd -g dinstall -m -d /home/dmdba -s /bin/bash d

    2024年02月09日
    瀏覽(29)
  • linux環(huán)境安裝mysql數據庫

    linux環(huán)境安裝mysql數據庫

    一:查看是否自帶mariadb數據庫 命令:rpm -qa | grep mariadb 如果自帶數據庫則卸載掉重新安裝 命令:yum remove mariadb-connector-c-3.1.11-2.el8_3.x86_64 二:下載mysql 命令:wget -i -c http://dev.mysql.com/get/mysql57-community-release-el7-10.noarch.rpm 注意:如果出現No such file or directory,運行命令:su root 三

    2024年02月15日
    瀏覽(32)
  • 用于將Grafana默認數據庫sqlite3遷移到MySQL數據庫

    用于將Grafana默認數據庫sqlite3遷移到MySQL數據庫

    以下是一個方案,用于將Grafana數據遷移到MySQL數據庫。 背景: grafana 默認采用的是sqlite3,當我們要以集群形式部署的時使用mysql較為方便,試了很多sqlite轉mysql的方法要么收費,最后放棄。選擇自己動手風衣足食。 目標: 遷移sqlite3切換數據庫到mysql 前提條件: 確保你已經安裝了

    2024年02月20日
    瀏覽(25)
  • 第44章 SQL 用于各種數據庫的數據類型教程

    Microsoft Access、MySQL 和 SQL Server 所使用的數據類型和范圍。 數據類型 描述 存儲 Text 用于文本或文本與數字的組合。最多 255 個字符。 Memo Memo 用于更大數量的文本。最多存儲 65,536 個字符。 注釋: 無法對 memo 字段進行排序。不過它們是可搜索的。 Byte 允許 0 到 255 的數字。

    2024年02月06日
    瀏覽(26)
  • 分析型數據庫:分布式分析型數據庫

    分析型數據庫:分布式分析型數據庫

    分析型數據庫的另外一個發(fā)展方向就是以分布式技術來代替MPP的并行計算,一方面分布式技術比MPP有更好的可擴展性,對底層的異構軟硬件支持度更好,可以解決MPP數據庫的幾個關鍵架構問題。本文介紹分布式分析型數據庫。 — 背景介紹— 目前在分布式分析型數據庫領域,

    2023年04月14日
    瀏覽(52)
  • Linux系統(tǒng)安裝mysql數據庫(超詳細)

    目錄 1、準備階段???????? 2、具體步驟 2.1、卸載mariadb 2.2、上傳mysql并解壓 2.3、安裝mysql 2.4、查看版本 2.5、啟動mysql服務 2.6、登錄mysql 2.7、修改密碼 2.8、配置mysql遠程訪問 2.9、修改編碼 3、卸載mysql 3.1、查看mysql的安裝情況? 3.2、刪除安裝包? 3.3、在/根目錄下查詢mysql

    2024年02月12日
    瀏覽(21)
  • linux下hive遠程數據庫模式安裝

    linux下hive遠程數據庫模式安裝

    Apache Hive是一個分布式、容錯的數據倉庫系統(tǒng),能夠支持大規(guī)模的分析。Hive元數據倉庫(HMS)提供了一個中央的元數據存儲庫,可輕松分析數據以做出明智的數據驅動決策,因此它是許多數據湖架構的關鍵組件。Hive建立在Apache Hadoop之上,支持在S3、adls、gs等存儲上通過HDFS訪問。

    2023年04月13日
    瀏覽(29)
  • Linux 源碼安裝: PostgreSQL 15.6數據庫

    Linux 源碼安裝: PostgreSQL 15.6數據庫

    ??The Begin??點點關注,收藏不迷路?? ?? PostgreSQL 中文文檔 下載地址:https://www.postgresql.org/ftp/source/ 安裝結果: vi ~/.bashrc ,如果全局的則編輯/etc/profile。 可以執(zhí)行以下命令查看 PostgreSQL 版本信息: 1、創(chuàng)建一個名為 postgresql.service 的服務單元文件: 編輯 /etc/systemd/system/p

    2024年03月24日
    瀏覽(29)
  • Linux高級管理--安裝MySQL數據庫系統(tǒng)

    Linux高級管理--安裝MySQL數據庫系統(tǒng)

    ????????MySQL.是一個真正的多線程、多用戶的SQL數據庫服務,憑借其高性能、高可靠和易于使 用的特性,成為服務器領域中最受歡迎的開源數據庫系統(tǒng)。在2008年以前,MySOL項目由MySQL AB公司進行開發(fā),發(fā)布和支持,之后歷經Sun 公司收購MySOL AB公司,Oracle公司收購Sun公司 的

    2024年02月04日
    瀏覽(49)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包