前言
? ? ? ? 今天來學(xué)習(xí)一個(gè)新的大數(shù)據(jù)小工具 Maxwell ,它和 Sqoop 很像。Sqoop主要用于在 Hadoop (比如 HDFS、Hive、HBase 等)和關(guān)系型數(shù)據(jù)庫之間進(jìn)行數(shù)據(jù)的批量導(dǎo)入和導(dǎo)出,而 Maxwell 則主要用于監(jiān)控?cái)?shù)據(jù)庫的變化(通過監(jiān)控 binlog ),并將變化的數(shù)據(jù)以JSON格式發(fā)布到消息隊(duì)列(一般是 Kafka)或 Redis 中。
????????Sqoop 一般都是在特定時(shí)間(比如用 Azkaban 調(diào)度在每天0點(diǎn)進(jìn)行作業(yè)),可以用于離線數(shù)倉的數(shù)據(jù)同步問題;但是對(duì)于實(shí)時(shí)數(shù)據(jù)分析,Sqoop 并不方便。所以這里就引出了 Maxwell 這個(gè)工具,它的特點(diǎn)包括輕量級(jí)、能夠捕獲完整的歷史數(shù)據(jù)(通過bootstrap功能)以及支持?jǐn)帱c(diǎn)還原(即在錯(cuò)誤解決后可以從上次的位置繼續(xù)讀取數(shù)據(jù))。然而,Maxwell只支持 json 格式,并且不能直接支持HA(高可用性)。
? ? ? ? 這里說起 Sqoop 我又想到了 DataX ,它也是一款全量數(shù)據(jù)采集工具,和 Sqoop 很相似,但是還是有所區(qū)別:
- Sqoop是基于Hadoop生態(tài)系統(tǒng)的,充分利用了MapReduce計(jì)算框架進(jìn)行數(shù)據(jù)的導(dǎo)入導(dǎo)出。而DataX僅僅在運(yùn)行DataX的單臺(tái)機(jī)器上進(jìn)行數(shù)據(jù)的抽取和加載。
- Sqoop采用MapReduce框架,具有良好的并發(fā)性和容錯(cuò)性,可以在多個(gè)節(jié)點(diǎn)同時(shí)進(jìn)行import或export操作。而DataX則是單進(jìn)程的,沒有并發(fā)性和容錯(cuò)性。
- Sqoop主要支持在關(guān)系型數(shù)據(jù)庫和Hadoop組件(如HDFS、Hive、HBase)之間進(jìn)行數(shù)據(jù)遷移,但不支持在Hadoop組件之間或關(guān)系型數(shù)據(jù)庫之間進(jìn)行數(shù)據(jù)遷移。相比之下,DataX支持更廣泛的數(shù)據(jù)遷移場景,包括關(guān)系型數(shù)據(jù)庫、Hadoop組件以及其他數(shù)據(jù)源之間的數(shù)據(jù)遷移。
- Sqoop在導(dǎo)入導(dǎo)出數(shù)據(jù)時(shí),會(huì)生成一個(gè)MapReduce作業(yè)在Hadoop集群中運(yùn)行,可以處理大規(guī)模的數(shù)據(jù),但不具備統(tǒng)計(jì)和校驗(yàn)?zāi)芰?。而DataX在傳輸過程中可以進(jìn)行數(shù)據(jù)過濾,并且可以統(tǒng)計(jì)傳輸數(shù)據(jù)的信息,對(duì)于業(yè)務(wù)場景復(fù)雜、表結(jié)構(gòu)變更的情況更為適用。
- Sqoop采用命令行的方式調(diào)用,容易與現(xiàn)有的調(diào)度監(jiān)控方案相結(jié)合。而DataX采用XML配置文件的方式,開發(fā)運(yùn)維上相對(duì)不太方便。
接下來就來學(xué)習(xí) Maxwell ,內(nèi)容不多,過兩天再把 DataX 搞定。
1、MaxWell 簡介
1.1 Maxwell概述
????????Maxwell 是由美國 Zendesk 公司開源,用 Java 編寫的 MySQL 變更數(shù)據(jù)抓取軟件。它會(huì)實(shí)時(shí)監(jiān)控Mysql數(shù)據(jù)庫的數(shù)據(jù)變更操作(包括insert、update、delete),并將變更數(shù)據(jù)以 JSON 格式發(fā)送給 Kafka、Kinesi、RabbitMQ、Redis 或者其它平臺(tái)的應(yīng)用程序。
????????官網(wǎng)地址:Maxwell's Daemon
1.2 Maxwell 輸出數(shù)據(jù)格式
我們知道?Maxwell 是以 JSON 格式傳輸數(shù)據(jù)的,上面顯示了不同數(shù)據(jù)庫數(shù)據(jù)操作對(duì)應(yīng)的 json 格式,這里是這些?json 字段的說明:
字段 |
解釋 |
database |
變更數(shù)據(jù)所屬的數(shù)據(jù)庫 |
table |
表更數(shù)據(jù)所屬的表 |
type |
數(shù)據(jù)變更類型 |
ts |
數(shù)據(jù)變更發(fā)生的時(shí)間 |
xid |
事務(wù)id |
commit |
事務(wù)提交標(biāo)志,可用于重新組裝事務(wù) |
data |
對(duì)于insert類型,表示插入的數(shù)據(jù);對(duì)于update類型,標(biāo)識(shí)修改之后的數(shù)據(jù);對(duì)于delete類型,表示刪除的數(shù)據(jù) |
old |
對(duì)于update類型,表示修改之前的數(shù)據(jù),只包含變更字段 |
2、Maxwell 原理
????????Maxwell的工作原理是實(shí)時(shí)讀取MySQL數(shù)據(jù)庫的二進(jìn)制日志(Binlog),從中獲取變更數(shù)據(jù),再將變更數(shù)據(jù)以JSON格式發(fā)送至Kafka等流處理平臺(tái)。
2.1 MySQL二進(jìn)制日志
2.1.1、什么是 binlog
????????二進(jìn)制日志(Binlog)是MySQL服務(wù)端非常重要的一種日志,它記錄了所有的 DDL 和 DML (除了查詢語句)語句,以事件的形式記錄,還包含所執(zhí)行的消耗的時(shí)間,MySQL 的二進(jìn)制日志是事務(wù)安全類型的。Maxwell的工作原理和主從復(fù)制密切相關(guān)。
? ? ? ? 一般開啟二進(jìn)制日志會(huì)有 1% 的性能損耗。二進(jìn)制日志有兩個(gè)最重要的場景:
- MySQL Replication 在 Master 端開啟 binlog,Master 把它的二進(jìn)制日志傳遞給其它 salves 來達(dá)到 master-salve 數(shù)據(jù)一致的目的。
- 通過使用 musqlbinlog 工具來進(jìn)行數(shù)據(jù)恢復(fù)。
? ? ? ? MySQL二進(jìn)制日志的存儲(chǔ)機(jī)制很像我們之前學(xué)的Kafka 文件存儲(chǔ)機(jī)制,它也包含兩類文件:二進(jìn)制索引文件(存儲(chǔ)二進(jìn)制日志文件索引,后綴為 .index)和二進(jìn)制日志文件(用于記錄所有 DDL 和 DML 語句,后綴為 .00000*)。
2.1.2、binlog 的開啟
- 找到 MySQL 配置文件的位置
- linux:/etc/my.cnf (可以通過 locate my.cnf) 查找位置
- window:\my.ini
- 修改配置:在 [mysqlId] 區(qū)塊,設(shè)置添加 log-bin=mysql-bin (這個(gè)表示 binlog 的前綴是 mysql-bin,也可以換成別的,也就是說以后生成的日志文件就是 mysq-bin.000001 格式的文件,每次 mysql 重啟或者達(dá)到單個(gè)文件大小的閾值時(shí),生成一個(gè)新的日志文件、按順序編號(hào))
2.1.3、binlog 的分類設(shè)置
mysql binlog 的格式共有三種:STATEMENT,MINED,ROW。
在配置文件中可以選擇配置:binllog_format = statment|mixed|row
- statement:語句級(jí),它會(huì)記錄每一次執(zhí)行寫操作的雨具。相比較 row 模式節(jié)省空間,但是可能產(chǎn)生不一致,比如 update student enroll_time = now(); 這種 SQL 如果進(jìn)行數(shù)據(jù)恢復(fù)那一定是有問題的。
- 優(yōu)點(diǎn):節(jié)省空間
- 缺點(diǎn):有可能造成數(shù)據(jù)不一致
- row:行級(jí),binlog 記錄每次操作后每行記錄的變化,相當(dāng)于它記錄的是我們每次操作后表格的內(nèi)容。
- 優(yōu)點(diǎn):保持?jǐn)?shù)據(jù)的絕對(duì)一致性,不管什么sql,它只記錄執(zhí)行后的結(jié)果
- 缺點(diǎn):占用磁盤較大
- mixed:混合級(jí)別,statement 的升級(jí)版,一定程度上解決了 statement 模式因?yàn)橐恍┣闆r而造成的數(shù)據(jù)不一致問題。比如對(duì)于 SQL 中包含隨機(jī)數(shù)或者時(shí)間的語句,它保存的是 SQL 執(zhí)行的結(jié)果;對(duì)于 SQL 不包含隨機(jī)數(shù)或者時(shí)間的語句,它保存的是 SQL 語句本身。
- 優(yōu)點(diǎn):節(jié)省空間
- 缺點(diǎn):極個(gè)別情況下仍然存在不一致
綜上,Maxwell 做監(jiān)控分析,選擇 row 模式比較合適(必須選擇 row),因?yàn)槲覀兊南掠危ū热?Kafka 是無法執(zhí)行 sql 語句的)
2.1.4、Maxwell 和 Canal 對(duì)比
對(duì)比 | Canal | Maxwell |
語言 | java |
java |
數(shù)據(jù)格式 | 自由 | json |
采集模式 | 增量 | 全量/增量 |
數(shù)據(jù)落地 | 定制 | 支持 kafka 等多種平臺(tái) |
HA | 支持 | 支持 |
Canal 的原理和 Maxwell 都是通過監(jiān)控 binlog 實(shí)現(xiàn)的。?
2.2 MySQL主從復(fù)制
????????MySQL的主從復(fù)制,就是用來建立一個(gè)和主數(shù)據(jù)庫完全一樣的數(shù)據(jù)庫環(huán)境,這個(gè)數(shù)據(jù)庫稱為從數(shù)據(jù)庫。
1)主從復(fù)制的應(yīng)用場景如下:
(1)做數(shù)據(jù)庫的熱備:主數(shù)據(jù)庫服務(wù)器故障后,可切換到從數(shù)據(jù)庫繼續(xù)工作。
(2)讀寫分離:主數(shù)據(jù)庫只負(fù)責(zé)業(yè)務(wù)數(shù)據(jù)的寫入操作,而多個(gè)從數(shù)據(jù)庫只負(fù)責(zé)業(yè)務(wù)數(shù)據(jù)的查詢工作,在讀多寫少場景下,可以提高數(shù)據(jù)庫工作效率。
2)主從復(fù)制的工作原理如下:
(1)Master主庫將數(shù)據(jù)變更記錄,寫到二進(jìn)制日志(binary log)中
(2)Slave從庫向mysql master發(fā)送dump協(xié)議,將master主庫的binary log events拷貝到它的中繼日志(relay log)
(3)Slave從庫讀取并執(zhí)行中繼日志中的事件,將改變的數(shù)據(jù)同步到自己的數(shù)據(jù)庫。
2.3 Maxwell原理
????????Maxwell 的原理很簡單,就是將自己偽裝成slave,并遵循MySQL主從復(fù)制的協(xié)議,從master同步數(shù)據(jù)。
3、Maxwell 部署
3.1、啟用 MySQL binlog
MySQL服務(wù)器的Binlog默認(rèn)是未開啟的,如需進(jìn)行同步,需要先進(jìn)行開啟。
1)修改MySQL配置文件/etc/my.cnf
sudo vim /etc/my.cnf
2)增加如下配置
[mysqld]
#數(shù)據(jù)庫id
server-id = 1
#啟動(dòng)binlog,該參數(shù)的值會(huì)作為binlog的文件名
log-bin=mysql-bin
#binlog類型,maxwell要求為row類型
binlog_format=row
#啟用binlog的數(shù)據(jù)庫,需根據(jù)實(shí)際情況作出修改
binlog-do-db=gmall
#如果需要監(jiān)控多個(gè)數(shù)據(jù)庫 不要加逗號(hào) 直接新開一行
binlog-do-db=test
如果我們要監(jiān)控除了個(gè)別數(shù)據(jù)庫之外的其它所有數(shù)據(jù)庫,我們可以使用 binlog-ignore-db 屬性。
3)重啟MySQL服務(wù)
sudo systemctl restart mysqld
4)查看是否修改完成
show variables like '%binlog%';
我們看到 binlog_format 的 Value 現(xiàn)在是 ROW ,說明 binlog 配置成功。
5)查看 binlog 文件
進(jìn)入 /var/lib/mysql 目錄,查看 MySQL 生成的 binlog 文件:
cd /var/lib/mysql
sudo ls -l | grep mysql-bin
?注:MySQL 生成的 binlog 文件初始大小一定是 514 字節(jié)。
3.2、初始化?maxwell 元數(shù)據(jù)庫
(1)創(chuàng)建元數(shù)據(jù)庫 maxwell 用于存儲(chǔ) maxwell 的元數(shù)據(jù)
create database maxwell;
?(2)設(shè)置 mysql 用戶密碼安全級(jí)別
set global validate_password_policy=0;
set global validate_password_length=4;
如果上面的命令報(bào)錯(cuò),去 /etc/my.cnf 的 [mysqld] 下添加下面兩行:
plugin-load-add=validate_password.so
validate-password=FORCE_PLUS_PERMANENT
然后重啟 mysqld 服務(wù)
sudo systemctl restart mysqld
(3)分配一個(gè)用戶賬號(hào)可以操作該元數(shù)據(jù)庫
GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '123456';
(4)分配這個(gè)賬號(hào)可以監(jiān)控其他數(shù)據(jù)庫的權(quán)限
GRANT SELECT ,REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO maxwell@'%';
(5)刷新 mysql 表權(quán)限
flush privileges;
3.3、Maxwell 進(jìn)程啟動(dòng)
3.3.1、使用命令行參數(shù)啟動(dòng)
bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=stdout
- user:連接 mysql 的用戶(上面我們已經(jīng)創(chuàng)建了)
- password:連接 mysql 的用戶密碼
- host: mysql 安裝的主機(jī)名
- producer:生產(chǎn)者模式(stdout:控制臺(tái),kafka:kafka 集群)
查看 maxwell 輸出:?
?我們可以看到,我們的 insert 語句被 maxwell 轉(zhuǎn)為了 json 被輸出到了控制臺(tái)。
3.3.2、定制化配置文件啟動(dòng)
?編輯配置文件 config.properties:
log_level=info
producer=stdout
kafka.bootstrap.servers=localhost:9092
# mysql login info
host=hadoop102
user=maxwell
password=123456
?啟動(dòng) Maxwell:
4、Maxwell 使用
4.1、監(jiān)控 MySQL 數(shù)據(jù)并在控制臺(tái)打印
????????上面我們已經(jīng)演示過了,也就是首先在 mysql 的配置文件中指定需要打印 binlog 的數(shù)據(jù)庫,然后通過命令或者配置文件來開啟 Maxwell 進(jìn)程:
bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=stdout
接著我們對(duì)被監(jiān)聽的數(shù)據(jù)庫的操作信息就會(huì)以 json 的格式打印出來:?
4.2、監(jiān)控 MySQL 數(shù)據(jù)并輸出到 Kafka
????????監(jiān)控 MySQL 數(shù)據(jù)并輸出到 Kafka,這種需求是我們實(shí)際工作用的最多的,這也是我們離線數(shù)倉和實(shí)時(shí)數(shù)倉所必須的。
4.2.1、實(shí)例
1)啟動(dòng) Zookeeper 和 Kafka
2)啟動(dòng) Maxwell 監(jiān)控 binlog
bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=kafka --kafka.bootstrap.servers=hadoop102:9092 --kafka_topic=maxwell
注:這里我們制定了 Kafka 集群地址為 hadoop102:9092 ,主題為 maxwell,雖然主題并不存在,但是 kafka 會(huì)自動(dòng)創(chuàng)建我們指定的主題
3)在 hadoop103 消費(fèi) maxwell 主題
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic maxwell
4)在 MySQL 中增加一條數(shù)據(jù)再刪除:?
?可以看到我們的數(shù)據(jù)成功被 Kafka 消費(fèi),說明業(yè)務(wù)數(shù)據(jù)日志成功傳輸?shù)?Kafka 。
4.2.2、Kafak 主題數(shù)據(jù)的分區(qū)控制
? ? ? ? 上面的案例中,我們不論 MySQL 的那個(gè)數(shù)據(jù)庫,哪張表,都被保存到了 Kafka 的 maxwell 主題,這樣顯然很亂。所以這里我們需要解決一下:
? ? ? ? 在生產(chǎn)環(huán)境中,我們一般都會(huì)用 maxwell 監(jiān)控多個(gè) mysql 庫的數(shù)據(jù),然后將這些數(shù)據(jù)發(fā)往 Kafka 的一個(gè)主題,并且這個(gè)主題肯定是多分區(qū)的,為了提高并發(fā)度。那么如何控制這些數(shù)據(jù)的分區(qū)問題,那就變得至關(guān)重要,實(shí)現(xiàn)步驟如下:
(1)修改 maxwell 的配置文件,定制化啟動(dòng) maxwell 進(jìn)程
log_level=info
producer=kafka
kafka.bootstrap.servers=hadoop102:9092
# mysql login info
host=hadoop102
user=maxwell
password=123456
# kafka 主題
kafka_topic=maxwell3
# 默認(rèn)配好的
kafka.compression.type=snappy
kafka.retries=0
kafka.acks=1
#producer_partition_by=database # [database, table, primary_key, transaction_id, thread_id, column]
# 常用的按照庫分區(qū)、按照表分區(qū),這里我們測試按照庫分區(qū)
producer_partition_by=database
(2)手動(dòng)創(chuàng)建 Kafka 多分區(qū)主題
kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --create --replication-factor 2 --partitions 3 --topic maxwell3
?查看主題信息:
這樣,我們不同數(shù)據(jù)庫的操作記錄就會(huì)保存到 Kafka 不同的分區(qū)當(dāng)中,?分區(qū)可以使得集群負(fù)載均衡,還可以提高提高讀寫效率。
4.3、監(jiān)控 MySQL 指定表輸出控制臺(tái)
(1)運(yùn)行 maxwell 來監(jiān)控 mysql 指定表數(shù)據(jù)更新
監(jiān)控 test 庫下的 staff 表
bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=stdout --filter 'exclude: *.*,include:test.staff'
(2)向 staff 表中插入數(shù)據(jù),查看 maxwell 控制臺(tái)輸出
(3)向 ws2 表中插入一條數(shù)據(jù),查看
注:表 ws2 并不是我們指定監(jiān)控的表
結(jié)果:maxwell 控制臺(tái)并沒有任何輸出?
注:還可以設(shè)置 include.test.* 來表示監(jiān)控 test 庫下的所有表
4.4、監(jiān)控 MySQL 指定表全量輸出到控制臺(tái)
? ? ? ? Maxwell 進(jìn)程默認(rèn)只能監(jiān)控 mysql 的 binlog 日志的新增以及變化的,但是 Maxwell 是支持?jǐn)?shù)據(jù)初始化的,可以通過修改 Maxwell 的元數(shù)據(jù)來對(duì) MySQL 中的某張表進(jìn)行數(shù)據(jù)初始化,也就是我們常說的全量同步。具體操作如下:
? ? ? ? 需求:將 test 庫下的 staff 表中的數(shù)據(jù)全量導(dǎo)入到 maxwell 控制臺(tái)打印。
我們可以在數(shù)據(jù)庫 maxwell (這個(gè)數(shù)據(jù)庫是我們初始化 maxwell 元數(shù)據(jù)庫時(shí)創(chuàng)建的)中查看元數(shù)據(jù)表 positions 的內(nèi)容:?
這里的 server_id?是我們?cè)?mysql 的配置文件 /etc/my.cnf 中配置的;這里的 binlog_file 代表的是最新的 binlog 文件 mysql-bin.000003 ;這里的 binlog_position 的值為 2802 代表目前已經(jīng)讀到了第 2802 個(gè)字節(jié)的位置,意思就說我們現(xiàn)在是從這個(gè)偏移量之后開始監(jiān)聽的。
注:也可以通過 SQL:show master status; 來查看當(dāng)前的 binlog 狀態(tài)。
1. 向元數(shù)據(jù)表 maxwell.bootstrap 中插入數(shù)據(jù)
insert into maxwell.bootstrap(database_name,table_name) values('test','staff');
這就相當(dāng)于告訴 maxwell,下一次啟動(dòng) maxwell 作業(yè)時(shí),就會(huì)調(diào)用一個(gè)初始化作業(yè),把我們指定的這個(gè) test 數(shù)據(jù)庫下的 staff 表進(jìn)行一次增量同步。
我們可以看到,通過 bootstrap 表插入數(shù)據(jù)來完成初始化,這樣插入的數(shù)據(jù)是 json 中 type 字段的值是 "bootstrap-insert" 而不是 "insert"。
再次查看 bootstrap 表:?
我們可以發(fā)現(xiàn),is_complete 字段的值從默認(rèn)值 0 變成了 1,inserted_rows 從 0 變成了 6,后面的 started_at 和 completed_at 字段的值也不再為空了。
2. 使用 maxwell-bootstrap 腳本工具
除了上面的直接向 maxwell 元數(shù)據(jù)庫中的 bootstrap 表插入數(shù)據(jù),也可以使用 maxwell 提供的 maxwell-bootstrap 腳本,本質(zhì)其實(shí)是一樣的,具體看你覺得哪個(gè)省事。
bin/maxwell-bootstrap --database gmall --table user_info --config ./config.properties
這樣,這個(gè)初始化作業(yè)就算完成了,當(dāng)我們下次打開?maxwell 進(jìn)程時(shí),它并不會(huì)再次執(zhí)行這個(gè)初始化任務(wù),因?yàn)樗呀?jīng)完成過了。文章來源:http://www.zghlxwxcb.cn/news/detail-831110.html
總結(jié)
? ? ? ? 至此,Maxwell 的學(xué)習(xí)也已經(jīng)結(jié)束了,待會(huì)吧離線數(shù)倉項(xiàng)目里的 maxwell 部分完成??傮w來說,這個(gè)工具還是很好用且簡單的,希望有一天自己也可以用屎山堆砌出這么一個(gè)自己的框架!文章來源地址http://www.zghlxwxcb.cn/news/detail-831110.html
到了這里,關(guān)于Maxwell - 增量數(shù)據(jù)同步工具的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!