參考:
技術(shù)解析|Doris Connector 結(jié)合 Flink CDC 實現(xiàn) MySQL 分庫分表 Exactly Once 精準(zhǔn)接入-阿里云開發(fā)者社區(qū)
邏輯圖:
1. Flink環(huán)境:
https://flink.apache.org/zh/
- 下載flink-1.15.1
wget https://dlcdn.apache.org/flink/flink-1.15.1/flink-1.15.1-bin-scala_2.12.tgz
- 解壓,修改配置
tar -zxvf flink-1.15.1-bin-scala_2.12.tgz cd flink-1.15.1
- 修改配置
修改rest.bind-address為 0.0.0.0
vi conf/flink-conf.yaml
- 下載依賴jar包 至 flink安裝目錄lib下
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar
wget https://repo1.maven.org/maven2/org/apache/doris/flink-doris-connector-1.14_2.12/1.0.3/flink-doris-connector-1.14_2.12-1.0.3.jar
- 啟動flink
./bin/start-cluster.sh
- 訪問WebUI
http://192.168.0.158:8081
2、MySQL數(shù)據(jù)表及數(shù)據(jù)
- 開啟Binlog,進入容器修改/etc/mysql/mysql.cnf,然后重啟mysql
[mysqld]
log_bin=mysql_bin
binlog-format=Row
server-id=1
- 進入MySQL命令行:創(chuàng)建數(shù)據(jù)庫emp,數(shù)據(jù)表employee:
CREATE DATABASE emp;
USE emp;
CREATE TABLE employee (
emp_no INT NOT NULL,
birth_date DATE NOT NULL,
first_name VARCHAR(14) NOT NULL,
last_name VARCHAR(16) NOT NULL,
gender ENUM ('M','F') NOT NULL,
hire_date DATE NOT NULL, PRIMARY KEY (emp_no)
); ?
INSERT INTO `employee` VALUES
(10001,'1953-09-02','Georgi','Facello','M','1986-06-26'),
(10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21'),
(10003,'1959-12-03','Parto','Bamford','M','1986-08-28'),
(10004,'1954-05-01','Chirstian','Koblick','M','1986-12-01'),
(10005,'1955-01-21','Kyoichi','Maliniak','M','1989-09-12'),
(10006,'1953-04-20','Anneke','Preusig','F','1989-06-02'),
(10007,'1957-05-23','Tzvetan','Zielinski','F','1989-02-10'),
(10008,'1958-02-19','Saniya','Kalloufi','M','1994-09-15'),
(10009,'1952-04-19','Sumant','Peac','F','1985-02-18'),
(10010,'1963-06-01','Duangkaew','Piveteau','F','1989-08-24'),
(10011,'1953-11-07','Mary','Sluis','F','1990-01-22'),
(10012,'1960-10-04','Patricio','Bridgland','M','1992-12-18'),
(10013,'1963-06-07','Eberhardt','Terkki','M','1985-10-20'),
(10014,'1956-02-12','Berni','Genin','M','1987-03-11'),
(10015,'1959-08-19','Guoxiang','Nooteboom','M','1987-07-02'),
(10016,'1961-05-02','Kazuhito','Cappelletti','M','1995-01-27'),
(10017,'1958-07-06','Cristinel','Bouloucos','F','1993-08-03'),
(10018,'1954-06-19','Kazuhide','Peha','F','1987-04-03'),
(10019,'1953-01-23','Lillian','Haddadi','M','1999-04-30'),
(10020,'1952-12-24','Mayuko','Warwick','M','1991-01-26');
3. Doris數(shù)據(jù)表
- 進入MySQL命令行:創(chuàng)建Doris數(shù)據(jù)庫demo,數(shù)據(jù)表employee_info
CREATE DATABASE demo;
USE demo;
CREATE TABLE employee_info (
emp_no int NOT NULL,
birth_date date,
first_name varchar(20),
last_name varchar(20),
gender char(2),
hire_date date,
database_name varchar(50),
table_name varchar(200)
)
UNIQUE KEY(`emp_no`, `birth_date`)
DISTRIBUTED BY HASH(`birth_date`) BUCKETS 1
PROPERTIES ( "replication_allocation" = "tag.location.default: 1" );
4. Flink數(shù)據(jù)表及數(shù)據(jù)
- 啟動fink-sql-client
./bin/sql-client.sh embedded
- 開啟Checkpoint
Flink作業(yè)周期性執(zhí)行checkpoint,記錄Binlog位點,當(dāng)作業(yè)發(fā)生Failover時,便會從之前記錄的Binlog位點繼續(xù)處理。
生產(chǎn)環(huán)境建議設(shè)置為60秒。
Flink SQL> SET execution.checkpointing.interval = 10s
- 創(chuàng)建MySQL CDC表
Flink SQL> CREATE TABLE employee_source (
database_name STRING METADATA VIRTUAL,
table_name STRING METADATA VIRTUAL,
emp_no int NOT NULL,
birth_date date,
first_name STRING,
last_name STRING,
gender STRING,
hire_date date,
PRIMARY KEY (`emp_no`) NOT ENFORCED
)
WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3336',
'username' = 'root',
'password' = '1234.abcd',
'database-name' = 'emp',
'table-name' = 'employee'
);
查詢數(shù)據(jù):
Flink SQL> select * from employee_source limit 10;
- 創(chuàng)建Doris Sink表
Flink SQL> CREATE TABLE cdc_doris_sink (
emp_no int ,
birth_date STRING,
first_name STRING,
last_name STRING,
gender STRING,
hire_date STRING,
database_name STRING,
table_name STRING
)
WITH (
'connector' = 'doris',
'fenodes' = 'localhost:8030',
'table.identifier' = 'demo.employee_info',
'username' = 'root',
'password' = '1234.abcd'
);
參數(shù)說明:
connector : 指定連接器是doris
fenodes:doris FE節(jié)點IP地址及http port
table.identifier : Doris對應(yīng)的數(shù)據(jù)庫及表名
username:doris用戶名
password:doris用戶密碼
查詢數(shù)據(jù):
Flink SQL> select * from cdc_doris_sink;
- 添加數(shù)據(jù)同步任務(wù)
Flink SQL> insert into cdc_doris_sink (emp_no,birth_date,first_name,last_name,gender,hire_date,database_name,table_name)
select emp_no,cast(birth_date as string) as birth_date ,first_name,last_name,gender,cast(hire_date as string) as hire_date ,database_name,table_name from employee_source;
WebUI可以看到正在執(zhí)行中的任務(wù),說明添加完成
查看Doris數(shù)據(jù)表中數(shù)據(jù)
mysql> select * from employee_info;
5. 問題說明:文章來源:http://www.zghlxwxcb.cn/news/detail-640043.html
NoResourceAvailableException: Could not acquire the minimum required resources
進入flink目錄,修改conf/conf/flink-conf.yaml:taskmanager.numberOfTaskSlots: 4 , 一般配置為cpu的個數(shù)。文章來源地址http://www.zghlxwxcb.cn/news/detail-640043.html
到了這里,關(guān)于Flink實時同步MySQL與Doris數(shù)據(jù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!