1.0 背景調研
因業(yè)務需求,需要查詢其他部門的數據庫數據,不方便直連數據庫,所以要定時將他們的數據同步到我們的環(huán)境中,技術選型選中了kafka+CDC
Kafka是Apache旗下的一款分布式流媒體平臺,Kafka是一種高吞吐量、持久性、分布式的發(fā)布訂閱的消息隊列系統(tǒng)。 它最初由LinkedIn(領英)公司發(fā)布,使用Scala語言編寫,與2010年12月份開源,成為Apache的頂級子項目。 它主要用于處理消費者規(guī)模網站中的所有動作流數據。動作指(網頁瀏覽、搜索和其它用戶行動所產生的數據)。
1.1 kafka核心
Kafka 的核心概念是消息系統(tǒng),其中包含以下幾個重要組件:
Topic(主題):消息的分類或者說是邏輯上的區(qū)域。數據被發(fā)布到 Kafka 集群的 Topic 中,并且消費者對特定 Topic 進行訂閱來消費這些數據。
Producer(生產者):負責向 Kafka 集群中的 Topic 發(fā)布消息,可以是任何發(fā)送數據的應用程序。
Consumer(消費者):訂閱一個或多個 Topic,從 Broker 中拉取數據并進行處理。消費者可以以組的形式組織,每個組只能有一個 Consumer 對 Topic 的每個分區(qū)進行消費。
Broker(代理服務器):Kafka 集群中的每個節(jié)點都是一個 Broker,負責管理數據的存儲和傳輸,處理生產者和消費者的請求。
1.2 技術比對
相比于 Cattle,Kafka 具備以下優(yōu)勢:
高吞吐量和低延遲:Kafka 使用簡單而高效的數據存儲機制,具備高度可伸縮性,能夠處理大規(guī)模的數據流。它的設計目標是支持每秒數百萬條消息的處理。
分布式和可擴展:Kafka 可以在多個 Broker 節(jié)點上進行水平擴展,通過分區(qū)機制實現負載均衡和容錯性。
持久化存儲:Kafka 使用日志結構存儲消息,提供了高效的磁盤持久化功能,確保數據的可靠性和持久性。
多語言支持:Kafka 提供豐富的客戶端庫,支持多種編程語言,包括 Java、Python、Go、C++ 等,方便開發(fā)者進行集成和使用。
生態(tài)系統(tǒng)豐富:Kafka 生態(tài)系統(tǒng)提供了許多與之配套的工具和服務,例如 Kafka Connect 用于數據集成,KSQL 實現流處理,以及一些第三方工具用于監(jiān)控、管理和運維等。
總的來說,Kafka 是一個高性能、可靠性強且具備良好擴展性的分布式流處理平臺,適用于實時數據流處理、消息隊列、日志收集等各種場景。相比之下,Cattle 在數據傳輸和流處理方面的功能可能較為有限,但具體選擇還需根據業(yè)務需求和技術棧來進行評估和決策。
2.0 安裝環(huán)境
Linux服務器
Xshell工具
Jdk
Kafka
Docker
Windows服務器
消費者系統(tǒng)(我們用的是.net,可使用其他語言代替)
。。。。。。
3.0 目標源數據庫配置步驟
需要在目標源數據庫增加個賬號給maxwell使用,這里以mysql為例
新建mysql的帳號
CREATE USER ' maxwell用戶名'@'%' IDENTIFIED BY '密碼';
CREATE USER 'maxwell用戶名'@'localhost' IDENTIFIED BY '密碼';
給新賬號賦權限(這個是全部權限)
GRANT ALL ON maxwell用戶名.* TO 'maxwell用戶名'@'%';
GRANT ALL ON maxwell用戶名.* TO 'maxwell用戶名'@'localhost';
給新賬號賦權限(這個是指定權限,兩個執(zhí)行一個就行)
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell用戶名'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell用戶名'@'localhost';
4.0 Linux服務器配置步驟
4.1 安裝基礎工具
4.1.1 lrzsz(文件傳輸工具)
第1步:安裝lrzsz,這個工具可以直接拖拽文件上傳到服務器
yum install lrzsz –y
執(zhí)行結果示例
第2步:新建目錄,用來存儲安裝包
mkdir /home/software
第3步:把安裝包傳輸到linux服務器上,拖拽到/home/software
CD /home/software
Kafka安裝包本次使用的是2.13版本,開始拖拽文件到Xshell窗口下
kafka_2.13-3.5.0.tgz
執(zhí)行結果示例
執(zhí)行l(wèi)s查看目錄下是否有文件
Ls
執(zhí)行結果示例
4.1.2 vim(文本編輯器)
第1步:安裝vim,這個是文本編輯器
yum?install?vim?–y
執(zhí)行結果示例
可通過vim命令打開文本,配置文件等
4.2 安裝Java
第1步:安裝jdk,執(zhí)行命令,*代表安裝所有相關的包,yum install -y java-1.8.0-openjdk.x86_64如果執(zhí)行這個命令,則只安裝jdk本身,后續(xù)的軟件安裝運行會出現依賴包不全等問題,建議使用*來安裝
yum install -y java-1.8.0-openjdk*
執(zhí)行結果示例
第2步:查詢java版本,檢驗第一步是否安裝成功,同時查出java版本號,為后面配置環(huán)境變量做準備
rpm -qa | grep java
執(zhí)行結果示例
第3步:配置java環(huán)境變量
輸入以下命令打開配置文件
vi /etc/profile
按鍵pgdn跳到最后一行 按鍵O 新插下一行
將下面文件添加到配置文件中 注意:替換java版本號
export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.382.b05-1.el7_9.x86_64
export JRE_HOME=/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.382.b05-1.el7_9.x86_64
export CLASSPATH=.:$JRE_HOME/lib
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
按鍵esc切換命令模式,輸入:wq退出保存
第4步:刷新環(huán)境變量,讓新加的java配置生效
source /etc/profile
第5步:檢查java是否安裝成功,輸入后會提示java版本號,如果第3步配置錯誤,會提示java命令不存在
java –version
執(zhí)行結果示例
4.3 安裝kafka
第1步:新建目錄,用來存儲kafka
mkdir /home/kafka
第2步:要解壓 tgz 文件,可以使用 Linux 系統(tǒng)自帶的 tar 命令。文件在4.1.1中上傳到了服務器
tar –xzvf kafka_2.13-3.5.0.tgz -C /home/kafka
執(zhí)行結果示例
tar命令解釋(無特殊情況不需要第3步后續(xù)操作,了解就好):
tar -xzvf kafka_2.13-3.5.0.tgz
x:表示解壓
z:表示使用 gzip 壓縮
v:表示顯示詳細的解壓過程
f:表示指定要解壓的文件
解壓完成后,解壓出來的文件會放在當前目錄下。如果要將解壓出來的文件放在其他目錄,可以在命令中使用 -C 選項指定目錄,例如:
tar -xzvf kafka_2.13-3.5.0.tgz -C /home/kafka
這樣就可以將解壓出來的文件放在 /home/kafka 目錄下。
第4步:因為解壓后,多了一層目錄【mv kafka_2.13-3.5.0】,所以要移動目錄
mv kafka_2.13-3.5.0/* ./
然后刪除空目錄
rmdir kafka_2.13-3.5.0
第3步:配置kafka,打開kafka配置文件
vim?/home/kafka/config/server.properties
修改一下配置,注釋掉的要打開注釋,刪除掉#
socket服務端地址
listeners=PLAINTEXT://【本機IP自行替換】:9092
偵聽器名稱、主機名和端口代理將通知給客戶端。
advertised.listeners=PLAINTEXT:// 【本機IP自行替換】:9092
日志文件路徑
log.dirs=/home/kafka_data/logs
zookeeper地址,zookeeper是用來監(jiān)聽kafka源數據變化
zookeeper.connect=【本機IP自行替換】:2181
第4步:創(chuàng)建日志的文件夾,如果有就不用創(chuàng)建了
mkdir -p /home/kafka_data/logs
-p 是創(chuàng)建多級目錄
4.4 安裝docker
sudo是一個用于在Linux系統(tǒng)上獲得超級用戶權限的命令。它允許普通用戶以超級用戶身份執(zhí)行特權命令。
第1步:安裝yum-utils,yum-utils是yum的一個附加軟件包,提供了一些額外的功能和工具。
sudo yum install -y yum-utils
執(zhí)行結果示例
第2步:添加存儲庫的地址
sudo yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
執(zhí)行結果示例
第3步:安裝docker相關程序
sudo yum install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin
docker-ce是Docker的主要軟件包,包含了Docker的核心功能;
docker-ce-cli是Docker的命令行界面工具;
containerd.io是Docker的底層容器運行時,負責管理容器的生命周期;
docker-buildx-plugin是Docker的多平臺構建插件,可以跨多種平臺構建容器鏡像;
docker-compose-plugin是Docker的應用程序容器編排工具,可以快速構建、啟動和管理多個Docker容器應用。
執(zhí)行結果示例
輸入y繼續(xù)
輸入y繼續(xù)
安裝完成!?。?/p>
第4步:配置docker
vi /etc/docker/daemon.json
添加配置項文件
{
? "registry-mirrors": ["https://5twf62k1.mirror.aliyuncs.com"],
? "insecure-registries": ["sinoeyes.io","hub.sinoeyes.com"]
}
按鍵esc切換命令模式,輸入:wq退出保存,保存后會生成文件
registry-mirrors屬性用于指定Docker鏡像的加速器地址。由于Docker鏡像可能分布在全球不同的地方,而各地的網絡環(huán)境和速度也不同,因此使用鏡像加速器可以顯著提高拉取鏡像的速度和穩(wěn)定性。例如,將registry-mirrors屬性設置為http://f1361db2.m.daocloud.io即可使用DaoCloud鏡像加速器。
insecure-registries屬性用于指定Docker容器鏡像的非安全注冊表地址。如果您使用的是私有Docker注冊表,并且此注冊表的TLS證書未經過驗證或過期,則可以添加該注冊表的URL以允許使用不安全的HTTP協(xié)議進行拉取和推送鏡像。例如,將insecure-registries屬性設置為["myregistry.example.com:5000"]即可允許使用不安全的HTTP協(xié)議訪問myregistry.example.com:5000注冊表。
第5步:輸入ls查看是否有文件生成
CD /etc/docker/
Ls
執(zhí)行結果示例
第6步:docker-compose部署,使用curl工具下載Docker Compose程序,并將其保存到/usr/local/bin/docker-compose路徑下。
curl -L https://github.com/docker/compose/releases/download/1.21.1/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose
執(zhí)行結果示例
這里我因網絡原因,下載失敗,如果命令正常執(zhí)行,不用進行第6步的后續(xù)。
手動上傳文件到目錄下,將docker-compose里面的文件全部上傳到這個路徑下【/usr/local/bin】。
第7步:該命令的作用就是將/usr/local/bin/docker-compose文件的權限設置為可執(zhí)行權限,以便用戶可以直接運行這個文件并使用Docker Compose的功能。
chmod +x /usr/local/bin/docker-compose
第8步:檢查Docker Compose是否安裝成功
docker-compose -v
執(zhí)行結果示例
4.5 運行kafka
第1步:新建目錄
mkdir -p /home/app/maxwell/conf
第2步:在/home/app/maxwell下創(chuàng)建文件docker-compose.yml
vim docker-compose.yml
按鍵I進入編輯模式輸入以下文件內容
version: '3.5'
services:
? maxwell:
??? restart: always
??? image: zendesk/maxwell
??? container_name: maxwell
??? network_mode: "host"
??? command: bin/maxwell --config /etc/maxwell/config.properties
??? #command: bin/maxwell-bootstrap --config /etc/maxwell/config.properties
??? volumes:
????? - ./conf:/etc/maxwell/
??? environment:
????? - "TZ=Asia/Shanghai"
按鍵esc切換命令模式,輸入:wq退出保存,保存后會生成文件
maxwell容器的編排文件,以后啟動maxwell時更方便
第3步:在/home/app/maxwell/conf下創(chuàng)建文件config.properties
vim config.properties
按鍵I進入編輯模式,輸入以下內容,根據情況替換信息,關鍵信息紅色標出
daemon=true
# 第一次啟動時建議改為debug,可以開到mysql數據與kafka請求,穩(wěn)定后再改為info
#log_level=info
log_level=info
producer=kafka
kafka.bootstrap.servers=【本機IP自行替換】:9092
# 會往 kafka下主題為'test'的分區(qū)下推送數據
kafka_topic=maxwell
#當producer_partition_by設置為table時,Maxwell會將生成的消息根據表名稱進行分區(qū),不同的表將會被分配到不同的分區(qū)中,默認為database
producer_partition_by=table
# client_id=maxwell_1
client_id=sddi-consumer-group-1-client-1
# mysql login info 需要先在mysql創(chuàng)建maxwell用戶
host=【目標源數據庫IP】
# port=33066
port=3306
user=【數據庫賬號】
password=【數據庫密碼】
schema_database=maxwell
replication_host=【目標源數據庫IP】
replication_user=【數據庫賬號】
replication_password=【數據庫密碼】
replication_port=3306
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
# exclude_dbs=*
# 同步的數據表
include_dbs=【sddi 數據庫名】
inlcude_tables=【sddi.b_missfile_info,sddi.b_pharm_info, e,sddi.f_pbt_file,同步的表,逗號隔開】
#inlcude_tables=sddi.b_missfile_info,sddi.b_pharm_info
~???
按鍵esc切換命令模式,輸入:wq退出保存,保存后會生成文件????????????????????????????????????????
第4步:啟動docker
systemctl start docker
第5步:啟動zookeeper,以守護線程的方式,這個是kafka集成的消息隊列
bin/zookeeper-server-start.sh -daemon /home/kafka/config/zookeeper.properties
如果出現錯誤,沒有那個文件或目錄,則進行一下操作
先把zookeeper-server-start.sh添加到環(huán)境變量
echo?"export?PATH=$PATH:/home/kafka/bin"?>> /root/.bash_profile
source?/root/.bash_profile
然后執(zhí)行zookeeper命令
zookeeper-server-start.sh -daemon /home/kafka/config/zookeeper.properties
第6步:啟動kafka,以守護線程的方式
kafka-server-start.sh -daemon /home/kafka/config/server.properties
第7步:在 Kafka 中創(chuàng)建一個名為 maxwell 的新主題(Topic)
kafka-topics.sh?--bootstrap-server?192.168.180.31:9092?--create?--topic?maxwell
第8步:啟動maxwell
cd /home/app/maxwell
因為第1步和第2步創(chuàng)建了配置文件,所以這里可以省略很多參數,直接啟動maxwell
docker-compose up -d
執(zhí)行結果示例
第9步:查看docker的maxwell容器
列出所有容器
docker ps
查詢指定容器的日志
docker logs --tail 100 -f 51b978d72157
第10步:查看maxwell的下發(fā)消息(數據量大慎用)
kafka-console-consumer.sh --topic maxwell --from-beginning --bootstrap-server 192.168.180.31:9092
5.0 后期運維
查詢所有的消費組
kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --list
查看某個消費組的消費情況
kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --describe --group sddi-consumer-group-1
屬性介紹
列頭 |
實例值 |
描述 |
GROUP |
sddi-consumer-group-1 |
消費者組的名稱 |
TOPIC |
maxwell |
所訂閱的主題。 |
PARTITION |
0 |
主題的分區(qū)編號。 |
CURRENT-OFFSET |
666520 |
消費者當前的偏移量(已經消費到的消息的偏移量)。 |
LOG-END-OFFSET |
698913 |
當前分區(qū)日志的最新偏移量(該分區(qū)中的消息總數)。 |
LAG |
32393 |
當前落后的偏移量數量(LAG = LOG-END-OFFSET - CURRENT-OFFSET),表示消費者還未消費的消息數量。 |
CONSUMER-ID |
sddi-consumer-group-1-client-1-8cc14037-bf1c-43eb-8094-2c4e45e5cc04 |
消費者的唯一標識符。 |
HOST |
/192.168.180.18 |
消費者所在的主機名。 |
CLIENT-ID |
sddi-consumer-group-1-client-1 |
消費者的客戶端標識符。 |
這些信息對于監(jiān)控和管理 Kafka 消費者組以及消費狀態(tài)非常有用。可以根據需要使用不同的選項來查看和管理消費者組的狀態(tài)。
初始化maxwell 測試環(huán)境 命令是單個表初始化同步
docker run -it --network host --rm zendesk/maxwell bin/maxwell-bootstrap --user=maxwell --password=rPq60r4BUA19@ --host=192.168.17.22 -database sddi --table b_business_info --client_id=sddi-consumer-group-1-client-1
停止maxwell (只是記錄命令,可以不執(zhí)行)
cd /home/app/maxwell
docker-compose down
停止kafka
kafka-server-stop.sh
停止zookeeper
zookeeper-server-stop.sh
查看容器的配置
docker inspect maxwell
查看主題列表
kafka-topics.sh --list --bootstrap-server 192.168.180.31:9092
創(chuàng)建主題
kafka-topics.sh --bootstrap-server 192.168.180.31:9092 --create --topic maxwell --replication-factor 1 --partitions 3
查看主題信息
kafka-topics.sh --bootstrap-server 192.168.180.31:9092 --describe --topic maxwell
使用生產者發(fā)送消息
kafka-console-producer.sh --broker-list 192.168.180.31:9092 --topic maxwell
使用消息者接受消息(從起始位置開始查看)
kafka-console-consumer.sh --bootstrap-server 192.168.180.31:9092 --topic maxwell --from-beginning
使用消息者接受消息(從當前位置開始查看)
kafka-console-consumer.sh --bootstrap-server 192.168.180.31:9092 --topic maxwell
查看kafka的所有消費組
kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --list
查看kafka日志
tail -10000f /home/kafka/logs/server.log
查看zookeeper日志
tail -1000f /home/kafka/logs/zookeeper.out
查看maxwell日志
cd /home/app/maxwell
docker-compose logs -f --tail 100
Earliest 策略直接指定**–to-earliest**。
kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --all-topics --to-earliest –execute
kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --topics maxwell --to-earliest –execute
Latest 策略直接指定**–to-latest**。
kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --all-topics --to-latest --execute
Current 策略直接指定**–to-current**。
kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --all-topics --to-current --execute
Specified-Offset 策略直接指定**–to-offset**。
kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --all-topics --to-offset 51691 --execute
Shift-By-N 策略直接指定**–shift-by N**。
kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --topic maxwell --shift-by 1 --execute
DateTime 策略直接指定**–to-datetime**。
kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute
最后是實現 Duration 策略,我們直接指定**–by-duration**
kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --by-duration PT0H30M0S --execute
Earliest 策略表示將位移調整到主題當前最早位移處。這個最早位移不一定就是 0,因為在生產環(huán)境中,很久遠的消息會被 Kafka 自動刪除,所以當前最早位移很可能是一個大于 0 的值。如果你想要重新消費主題的所有消息,那么可以使用 Earliest 策略。
Latest 策略表示把位移重設成最新末端位移。如果你總共向某個主題發(fā)送了 15 條消息,那么最新末端位移就是 15。如果你想跳過所有歷史消息,打算從最新的消息處開始消費的話,可以使用 Latest 策略。
Current 策略表示將位移調整成消費者當前提交的最新位移。有時候你可能會碰到這樣的場景:你修改了消費者程序代碼,并重啟了消費者,結果發(fā)現代碼有問題,你需要回滾之前的代碼變更,同時也要把位移重設到消費者重啟時的位置,那么,Current 策略就可以幫你實現這個功能。
表中第 4 行的 Specified-Offset 策略則是比較通用的策略,表示消費者把位移值調整到你指定的位移處。這個策略的典型使用場景是,消費者程序在處理某條錯誤消息時,你可以手動地“跳過”此消息的處理。在實際使用過程中,可能會出現 corrupted 消息無法被消費的情形,此時消費者程序會拋出異常,無法繼續(xù)工作。一旦碰到這個問題,你就可以嘗試使用 Specified-Offset 策略來規(guī)避。
如果說 Specified-Offset 策略要求你指定位移的絕對數值的話,那么 Shift-By-N 策略指定的就是位移的相對數值,即你給出要跳過的一段消息的距離即可。這里的“跳”是雙向的,你既可以向前“跳”,也可以向后“跳”。比如,你想把位移重設成當前位移的前 100 條位移處,此時你需要指定 N 為 -100。
剛剛講到的這幾種策略都是位移維度的,下面我們來聊聊從時間維度重設位移的 DateTime 和 Duration 策略。
DateTime 允許你指定一個時間,然后將位移重置到該時間之后的最早位移處。常見的使用場景是,你想重新消費昨天的數據,那么你可以使用該策略重設位移到昨天 0 點。
Duration 策略則是指給定相對的時間間隔,然后將位移調整到距離當前給定時間間隔的位移處,具體格式是 PnDTnHnMnS。如果你熟悉 Java 8 引入的 Duration 類的話,你應該不會對這個格式感到陌生。它就是一個符合 ISO-8601 規(guī)范的 Duration 格式,以字母 P 開頭,后面由 4 部分組成,即 D、H、M 和 S,分別表示天、小時、分鐘和秒。舉個例子,如果你想將位移調回到 15 分鐘前,那么你就可以指定 PT0H15M0S。
6.0 消費者系統(tǒng)
我們這里采用的是.net平臺的c#語言,編寫了windows server程序,來處理隊列數據
使用的類庫是Confluent.Kafka
6.1 Confluent.Kafka
Confluent.Kafka是一個流行的開源Kafka客戶端庫,它提供了在.NET應用程序中與Apache Kafka進行交互的功能。下面是對Confluent.Kafka類庫的介紹:
高級API:Confluent.Kafka為開發(fā)人員提供了一組簡單易用的高級API,用于連接到Kafka集群、讀取和寫入消息、管理消費者組等。你可以方便地使用它來發(fā)送消息到Kafka主題或從主題中消費消息。
完全支持Kafka協(xié)議:Confluent.Kafka完全支持Kafka協(xié)議,包括Kafka 1.0.0及更高版本。它與最新的Kafka版本保持同步,并提供了一致性和穩(wěn)定性。
高性能:Confluent.Kafka經過優(yōu)化,具有出色的性能表現。它采用了異步、無鎖的設計,支持高并發(fā)的消息處理。
配置靈活:Confluent.Kafka提供了豐富的配置選項,可以根據實際需求進行調整。你可以配置消息的傳遞語義、消費者的批量讀取、消息序列化和反序列化方式等。
支持消息引擎擴展:Confluent.Kafka還支持通過插件機制擴展消息引擎,例如支持Avro、Protobuf等消息格式的插件。
社區(qū)活躍:Confluent.Kafka是一個受歡迎且活躍的開源項目,擁有強大的社區(qū)支持。你可以在社區(qū)中獲得幫助、提出問題或提交改進建議。
總之,Confluent.Kafka是一個功能強大、性能優(yōu)越的Kafka客戶端庫,為.NET開發(fā)人員提供了與Apache Kafka無縫集成的能力,使他們能夠輕松地構建可靠的消息流應用程序。
6.2 相關連接
https://github.com/confluentinc/confluent-kafka-dotnet/
http://www.bilibili996.com/Course?id=1159444000368文章來源:http://www.zghlxwxcb.cn/news/detail-830908.html
https://zhuanlan.zhihu.com/p/139101754?utm_id=0文章來源地址http://www.zghlxwxcb.cn/news/detail-830908.html
到了這里,關于kafka的安裝,用于數據庫同步數據的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!