国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink實時同步MySQL與Doris數(shù)據(jù)

這篇具有很好參考價值的文章主要介紹了Flink實時同步MySQL與Doris數(shù)據(jù)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

參考:

技術(shù)解析|Doris Connector 結(jié)合 Flink CDC 實現(xiàn) MySQL 分庫分表 Exactly Once 精準(zhǔn)接入-阿里云開發(fā)者社區(qū)

邏輯圖:

mysql 同步doris,Flink-cdc,數(shù)據(jù)庫,mysql,java

1. Flink環(huán)境:

https://flink.apache.org/zh/

  • 下載flink-1.15.1
wget https://dlcdn.apache.org/flink/flink-1.15.1/flink-1.15.1-bin-scala_2.12.tgz
  • 解壓,修改配置
tar -zxvf flink-1.15.1-bin-scala_2.12.tgz cd flink-1.15.1
  • 修改配置
修改rest.bind-address為 0.0.0.0
vi conf/flink-conf.yaml
  • 下載依賴jar包 至 flink安裝目錄lib下
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar 

wget https://repo1.maven.org/maven2/org/apache/doris/flink-doris-connector-1.14_2.12/1.0.3/flink-doris-connector-1.14_2.12-1.0.3.jar
  • 啟動flink
./bin/start-cluster.sh

mysql 同步doris,Flink-cdc,數(shù)據(jù)庫,mysql,java

  • 訪問WebUI

http://192.168.0.158:8081

2、MySQL數(shù)據(jù)表及數(shù)據(jù)

  1. 開啟Binlog,進入容器修改/etc/mysql/mysql.cnf,然后重啟mysql
[mysqld] 
log_bin=mysql_bin 
binlog-format=Row 
server-id=1
  1. 進入MySQL命令行:創(chuàng)建數(shù)據(jù)庫emp,數(shù)據(jù)表employee:
CREATE DATABASE emp; 

USE emp; 

CREATE TABLE employee ( 
emp_no INT NOT NULL, 
birth_date DATE NOT NULL, 
first_name VARCHAR(14) NOT NULL, 
last_name VARCHAR(16) NOT NULL, 
gender ENUM ('M','F') NOT NULL, 
hire_date DATE NOT NULL, PRIMARY KEY (emp_no) 
); ? 

INSERT INTO `employee` VALUES 
(10001,'1953-09-02','Georgi','Facello','M','1986-06-26'), 
(10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21'), 
(10003,'1959-12-03','Parto','Bamford','M','1986-08-28'), 
(10004,'1954-05-01','Chirstian','Koblick','M','1986-12-01'), 
(10005,'1955-01-21','Kyoichi','Maliniak','M','1989-09-12'), 
(10006,'1953-04-20','Anneke','Preusig','F','1989-06-02'), 
(10007,'1957-05-23','Tzvetan','Zielinski','F','1989-02-10'), 
(10008,'1958-02-19','Saniya','Kalloufi','M','1994-09-15'), 
(10009,'1952-04-19','Sumant','Peac','F','1985-02-18'), 
(10010,'1963-06-01','Duangkaew','Piveteau','F','1989-08-24'), 
(10011,'1953-11-07','Mary','Sluis','F','1990-01-22'), 
(10012,'1960-10-04','Patricio','Bridgland','M','1992-12-18'), 
(10013,'1963-06-07','Eberhardt','Terkki','M','1985-10-20'), 
(10014,'1956-02-12','Berni','Genin','M','1987-03-11'), 
(10015,'1959-08-19','Guoxiang','Nooteboom','M','1987-07-02'), 
(10016,'1961-05-02','Kazuhito','Cappelletti','M','1995-01-27'), 
(10017,'1958-07-06','Cristinel','Bouloucos','F','1993-08-03'), 
(10018,'1954-06-19','Kazuhide','Peha','F','1987-04-03'), 
(10019,'1953-01-23','Lillian','Haddadi','M','1999-04-30'), 
(10020,'1952-12-24','Mayuko','Warwick','M','1991-01-26');

3. Doris數(shù)據(jù)表

  1. 進入MySQL命令行:創(chuàng)建Doris數(shù)據(jù)庫demo,數(shù)據(jù)表employee_info
CREATE DATABASE demo; 

USE demo; 

CREATE TABLE employee_info ( 
emp_no int NOT NULL, 
birth_date date, 
first_name varchar(20), 
last_name varchar(20), 
gender char(2), 
hire_date date, 
database_name varchar(50), 
table_name varchar(200) 
) 
UNIQUE KEY(`emp_no`, `birth_date`) 
DISTRIBUTED BY HASH(`birth_date`) BUCKETS 1 
PROPERTIES ( "replication_allocation" = "tag.location.default: 1" );

4. Flink數(shù)據(jù)表及數(shù)據(jù)

  • 啟動fink-sql-client
./bin/sql-client.sh embedded

mysql 同步doris,Flink-cdc,數(shù)據(jù)庫,mysql,java

  • 開啟Checkpoint
Flink作業(yè)周期性執(zhí)行checkpoint,記錄Binlog位點,當(dāng)作業(yè)發(fā)生Failover時,便會從之前記錄的Binlog位點繼續(xù)處理。
生產(chǎn)環(huán)境建議設(shè)置為60秒。
Flink SQL> SET execution.checkpointing.interval = 10s

mysql 同步doris,Flink-cdc,數(shù)據(jù)庫,mysql,java

  • 創(chuàng)建MySQL CDC表
Flink SQL> CREATE TABLE employee_source ( 
database_name STRING METADATA VIRTUAL, 
table_name STRING METADATA VIRTUAL, 
emp_no int NOT NULL, 
birth_date date, 
first_name STRING, 
last_name STRING, 
gender STRING, 
hire_date date, 
PRIMARY KEY (`emp_no`) NOT ENFORCED 
) 
WITH ( 
'connector' = 'mysql-cdc', 
'hostname' = 'localhost', 
'port' = '3336', 
'username' = 'root', 
'password' = '1234.abcd', 
'database-name' = 'emp', 
'table-name' = 'employee' 
);

查詢數(shù)據(jù):

Flink SQL> select * from employee_source limit 10;

mysql 同步doris,Flink-cdc,數(shù)據(jù)庫,mysql,java

  • 創(chuàng)建Doris Sink表
Flink SQL> CREATE TABLE cdc_doris_sink ( 
emp_no int , 
birth_date STRING, 
first_name STRING, 
last_name STRING, 
gender STRING, 
hire_date STRING, 
database_name STRING, 
table_name STRING 
) 
WITH ( 
'connector' = 'doris', 
'fenodes' = 'localhost:8030', 
'table.identifier' = 'demo.employee_info', 
'username' = 'root', 
'password' = '1234.abcd' 
);
參數(shù)說明:
connector : 指定連接器是doris
fenodes:doris FE節(jié)點IP地址及http port
table.identifier : Doris對應(yīng)的數(shù)據(jù)庫及表名
username:doris用戶名
password:doris用戶密碼

查詢數(shù)據(jù):

Flink SQL> select * from cdc_doris_sink;

mysql 同步doris,Flink-cdc,數(shù)據(jù)庫,mysql,java

  • 添加數(shù)據(jù)同步任務(wù)
Flink SQL> insert into cdc_doris_sink (emp_no,birth_date,first_name,last_name,gender,hire_date,database_name,table_name) 
select emp_no,cast(birth_date as string) as birth_date ,first_name,last_name,gender,cast(hire_date as string) as hire_date ,database_name,table_name from employee_source;

WebUI可以看到正在執(zhí)行中的任務(wù),說明添加完成

mysql 同步doris,Flink-cdc,數(shù)據(jù)庫,mysql,java

查看Doris數(shù)據(jù)表中數(shù)據(jù)

mysql> select * from employee_info;

5. 問題說明:

NoResourceAvailableException: Could not acquire the minimum required resources

進入flink目錄,修改conf/conf/flink-conf.yaml:taskmanager.numberOfTaskSlots: 4 , 一般配置為cpu的個數(shù)。文章來源地址http://www.zghlxwxcb.cn/news/detail-640043.html

到了這里,關(guān)于Flink實時同步MySQL與Doris數(shù)據(jù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • flink-cdc同步mysql數(shù)據(jù)到elasticsearch

    flink-cdc同步mysql數(shù)據(jù)到elasticsearch

    CDC是(Change Data Capture 變更數(shù)據(jù)獲?。┑暮喎Q。核心思想是,監(jiān)測并捕獲數(shù)據(jù)庫的變動(包括數(shù)據(jù) 或 數(shù)據(jù)表的插入INSERT、更新UPDATE、刪除DELETE等),將這些變更按發(fā)生的順序完整記錄下來,寫入到消息中間件中以供其他服務(wù)進行訂閱及消費。 cdc項目地址:https://github.com/ver

    2024年02月13日
    瀏覽(23)
  • 使用Apache Doris自動同步整個 MySQL/Oracle 數(shù)據(jù)庫進行數(shù)據(jù)分析

    使用Apache Doris自動同步整個 MySQL/Oracle 數(shù)據(jù)庫進行數(shù)據(jù)分析

    Flink-Doris-Connector 1.4.0 允許用戶一步將包含數(shù)千個表的整個數(shù)據(jù)庫(MySQL或Oracle )攝取到Apache Doris(一種實時分析數(shù)據(jù)庫)中。 通過內(nèi)置的Flink CDC,連接器可以直接將上游源的表模式和數(shù)據(jù)同步到Apache Doris,這意味著用戶不再需要編寫DataStream程序或在Doris中預(yù)先創(chuàng)建映射表。

    2024年02月09日
    瀏覽(21)
  • 【FLINK】Kafka數(shù)據(jù)源通過Flink-cdc進行實時數(shù)據(jù)同步

    【FLINK】Kafka數(shù)據(jù)源通過Flink-cdc進行實時數(shù)據(jù)同步

    CDC是Change Data Capture的縮寫,中文意思是 變更數(shù)據(jù)獲取 ,flink-cdc的作用是,通過flink捕獲數(shù)據(jù)源的事務(wù)變動操作記錄,包括數(shù)據(jù)的增刪改操作等,根據(jù)這些記錄可作用于對目標(biāo)端進行實時數(shù)據(jù)同步。 下圖是flink-cdc最新支持的數(shù)據(jù)源類型: kafka的數(shù)據(jù)源要通過flink-cdc進行實時數(shù)

    2024年02月12日
    瀏覽(36)
  • ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

    ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

    ApacheStreamPark是流處理極速開發(fā)框架,流批一體 湖倉一體的云原生平臺,一站式流處理計算平臺。 ??特性中的簡單易用和文檔詳盡這兩點我也是深有體會的,部署一點都不簡單,照著官方文檔都不一定能搞出來,下面部署環(huán)節(jié)慢慢來吐槽吧。 ??之前我們寫 Flink SQL 基本上

    2024年02月11日
    瀏覽(28)
  • Doris通過Flink CDC接入MySQL實戰(zhàn)

    1. 創(chuàng)建MySQL庫表,寫入demo數(shù)據(jù) 登錄測試MySQL 創(chuàng)建MySQL庫表,寫入demo數(shù)據(jù) 注意:MySQL需要開通bin-log log_bin=mysql_bin binlog-format=Row server-id=1 2. 創(chuàng)建Doris庫表 創(chuàng)建Doris表 3. 啟動Flink 啟動flink 創(chuàng)建Flink 任務(wù): 輸入如下地址,查看flink任務(wù) http://localhost:8081/#/job/running 數(shù)據(jù)驗證:啟動后可

    2023年04月10日
    瀏覽(23)
  • flink cdc同步Oracle數(shù)據(jù)庫資料到Doris問題集錦

    java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/ThreadFactoryBuilder at com.ververica.cdc.debezium.DebeziumSourceFunction.open(DebeziumSourceFunction.java:218) ~[flink-connector-debezium-2.2.0.jar:2.2.0] at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-co

    2024年02月16日
    瀏覽(22)
  • 基于Flink CDC實時同步數(shù)據(jù)(MySQL到MySQL)

    基于Flink CDC實時同步數(shù)據(jù)(MySQL到MySQL)

    jdk8 Flink 1.16.1(部署在遠程服務(wù)器:192.168.137.99) Flink CDC 2.3.0 MySQL 8.0(安裝在本地:192.168.3.31) (安裝部署過程略) 準(zhǔn)備三個數(shù)據(jù)庫:flink_source、flink_sink、flink_sink_second。 將flink_source.source_test表實時同步到flink_sink和flink_sink_second的sink_test表。 (建庫建表過程略) 開發(fā)過程

    2024年02月06日
    瀏覽(27)
  • Flink+Doris 實時數(shù)倉

    Flink+Doris 實時數(shù)倉

    Doris基本原理 Doris基本架構(gòu)非常簡單,只有FE(Frontend)、BE(Backend)兩種角色,不依賴任何外部組件,對部署和運維非常友好。架構(gòu)圖如下 可以 看到Doris 的數(shù)倉架構(gòu)十分簡潔,不依賴 Hadoop 生態(tài)組件,構(gòu)建及運維成本較低。 FE(Frontend)以 Java 語言為主,主要功能職責(zé): 接收用戶

    2024年02月07日
    瀏覽(20)
  • 基于 Flink CDC 構(gòu)建 MySQL 到 Databend 的 實時數(shù)據(jù)同步

    基于 Flink CDC 構(gòu)建 MySQL 到 Databend 的 實時數(shù)據(jù)同步

    這篇教程將展示如何基于 Flink CDC 快速構(gòu)建 MySQL 到 Databend 的實時數(shù)據(jù)同步。本教程的演示都將在 Flink SQL CLI 中進行,只涉及 SQL,無需一行 Java/Scala 代碼,也無需安裝 IDE。 假設(shè)我們有電子商務(wù)業(yè)務(wù),商品的數(shù)據(jù)存儲在 MySQL ,我們需要實時把它同步到 Databend 中。 接下來的內(nèi)容

    2024年02月10日
    瀏覽(29)
  • 使用Flink CDC將Mysql中的數(shù)據(jù)實時同步到ES

    最近公司要搞搜索,需要把mysql中的數(shù)據(jù)同步到es中來進行搜索,由于公司已經(jīng)搭建了flink集群,就打算用flink來做這個同步。本來以為很簡單,跟著官網(wǎng)文檔走就好了,結(jié)果沒想到折騰了將近一周的時間…… 我也是沒想到,這玩意網(wǎng)上資源竟然這么少,找到的全部都是通過

    2024年02月11日
    瀏覽(25)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包