一、什么是FLink
Apache?Flink?是一個(gè)框架和分布式處理引擎,用于在無(wú)邊界和有邊界數(shù)據(jù)流上進(jìn)行有狀態(tài)的計(jì)算。Flink?能在所有常見集群環(huán)境中運(yùn)行,并能以內(nèi)存速度和任意規(guī)模進(jìn)行計(jì)算。
接下來(lái),我們來(lái)介紹一下?Flink?架構(gòu)中的重要方面。
處理無(wú)界和有界數(shù)據(jù)
任何類型的數(shù)據(jù)都可以形成一種事件流。信用卡交易、傳感器測(cè)量、機(jī)器日志、網(wǎng)站或移動(dòng)應(yīng)用程序上的用戶交互記錄,所有這些數(shù)據(jù)都形成一種流。
數(shù)據(jù)可以被作為?無(wú)界?或者?有界?流來(lái)處理。
-
無(wú)界流?有定義流的開始,但沒(méi)有定義流的結(jié)束。它們會(huì)無(wú)休止地產(chǎn)生數(shù)據(jù)。無(wú)界流的數(shù)據(jù)必須持續(xù)處理,即數(shù)據(jù)被攝取后需要立刻處理。我們不能等到所有數(shù)據(jù)都到達(dá)再處理,因?yàn)檩斎胧菬o(wú)限的,在任何時(shí)候輸入都不會(huì)完成。處理無(wú)界數(shù)據(jù)通常要求以特定順序攝取事件,例如事件發(fā)生的順序,以便能夠推斷結(jié)果的完整性。
-
有界流?有定義流的開始,也有定義流的結(jié)束。有界流可以在攝取所有數(shù)據(jù)后再進(jìn)行計(jì)算。有界流所有數(shù)據(jù)可以被排序,所以并不需要有序攝取。有界流處理通常被稱為批處理
Apache?Flink?擅長(zhǎng)處理無(wú)界和有界數(shù)據(jù)集?精確的時(shí)間控制和狀態(tài)化使得?Flink?的運(yùn)行時(shí)(runtime)能夠運(yùn)行任何處理無(wú)界流的應(yīng)用。有界流則由一些專為固定大小數(shù)據(jù)集特殊設(shè)計(jì)的算法和數(shù)據(jù)結(jié)構(gòu)進(jìn)行內(nèi)部處理,產(chǎn)生了出色的性能。
附:官方文檔:
Apache Flink: Apache Flink 是什么?
二、基本概念
-
checkpoint機(jī)制
Checkpoint是Flink實(shí)現(xiàn)容錯(cuò)機(jī)制最核心的功能,它能夠根據(jù)配置周期性地基于Stream中各個(gè)Operator/task的狀態(tài)來(lái)生成快照,從而將這些狀態(tài)數(shù)據(jù)定期持久化存儲(chǔ)下來(lái),當(dāng)Flink程序一旦意外崩潰時(shí),重新運(yùn)行程序時(shí)可以有選擇地從這些快照進(jìn)行恢復(fù),從而修正因?yàn)楣收蠋?lái)的程序數(shù)據(jù)異常。當(dāng)然,為了保證exactly-once/at-least-once的特性,還需要數(shù)據(jù)源支持?jǐn)?shù)據(jù)回放。Flink針對(duì)不同的容錯(cuò)和消息處理上提供了不同的容錯(cuò)語(yǔ)義。
exactly-once:Flink提供了可以恢復(fù)數(shù)據(jù)流應(yīng)用的到一致狀態(tài)的容錯(cuò)機(jī)制,確保在發(fā)生故障,程序的每一條記錄只會(huì)作用于狀態(tài)一次
-
flink?cdc
Flink?CDC?connector?可以捕獲在一個(gè)或多個(gè)表中發(fā)生的所有變更。該模式通常有一個(gè)前記錄和一個(gè)后記錄。Flink?CDC?connector?可以直接在Flink中以非約束模式(流)使用,而不需要使用類似?kafka?之類的中間件中轉(zhuǎn)數(shù)據(jù)
-
Flink提交流程
參考文檔:
【Flink】Flink CDC介紹和原理概述_一個(gè)寫濕的程序猿的博客-CSDN博客_flink的cdc
Flink?CDC原理簡(jiǎn)述:
在最新?CDC?調(diào)研報(bào)告中,Debezium?和?Canal?是目前最流行使用的?CDC?工具,這些?CDC?工具的核心原理是抽取數(shù)據(jù)庫(kù)日志獲取變更。
在經(jīng)過(guò)一系列調(diào)研后,目前?Debezium?(支持全量、增量同步,同時(shí)支持?MySQL、PostgreSQL、Oracle?等數(shù)據(jù)庫(kù)),使用較為廣泛。
Flink?SQL?CDC?內(nèi)置了?Debezium?引擎,利用其抽取日志獲取變更的能力,將
changelog?轉(zhuǎn)換為?Flink?SQL?認(rèn)識(shí)的?RowData?數(shù)據(jù)。(以下右側(cè)是?Debezium
的數(shù)據(jù)格式,左側(cè)是?Flink?的?RowData?數(shù)據(jù)格式)。
RowData?代表了一行的數(shù)據(jù),在?RowData?上面會(huì)有一個(gè)元數(shù)據(jù)的信息?RowKind,RowKind?里面包括了插入(+I)、更新前(-U)、更新后(+U)、刪除(-D),這樣和數(shù)據(jù)庫(kù)里面的?binlog?概念十分類似。
通過(guò)?Debezium?采集的數(shù)據(jù),包含了舊數(shù)據(jù)(before)和新數(shù)據(jù)行(after)以及原數(shù)據(jù)信息(source),op?的?u?表示是update?更新操作標(biāo)識(shí)符(op?字段的值?c,u,d,r?分別對(duì)應(yīng)?create,update,delete,reade),ts_ms?表示同步的時(shí)間戳。
三、實(shí)時(shí)同步代碼解析
源碼:
以myrs_fc_data_sync分支為例:
-
配置flinkcdc基礎(chǔ)屬性
-
配置監(jiān)聽的數(shù)據(jù)源
-
flinkcdc環(huán)境配置
-
數(shù)據(jù)輸出配置。
在?DBConf.java中,配置的為需要監(jiān)聽的Sqlserver數(shù)據(jù)源表
各種數(shù)據(jù)json如下:
sqlservercdc格式:
{"op":"u","before":{"area_partition_code":"1zVqkZ9AidRUsrikNu6868","latitude":"33.580258","upd_host":"172.20.15.192:8080","upd_name":"周波","upd_user":"3lw3kDQFGUXQoxwvkKT4NB","crt_job_no":"200786171","crt_time":1634923665000,"upd_time":1634923669187,"id":"1zVqkZ9AidRUsrikNu6868","crt_user":"3lw3kDQFGUXQoxwvkKT4NB","crt_name":"周波","longitude":"114.016521","area_partition_name":"漯河服務(wù)站","data_type":"my_base_sale_area_partition","crt_host":"172.20.15.192:8080","status":1,"exchange_status":1},"after":{"area_partition_code":"1zVqkZ9AidRUsrikNu6868","latitude":"33.580258","upd_host":"172.20.15.192:8080","upd_name":"周波","upd_user":"3lw3kDQFGUXQoxwvkKT4NB","crt_job_no":"200786171","crt_time":1634923666000,"upd_time":1634923669187,"id":"1zVqkZ9AidRUsrikNu6868","crt_user":"3lw3kDQFGUXQoxwvkKT4NB","crt_name":"周波","longitude":"114.016521","area_partition_name":"漯河服務(wù)站","data_type":"my_base_sale_area_partition","crt_host":"172.20.15.192:8080","status":1,"exchange_status":1},"source":{"schema":"dbo","event_serial_no":2,"connector":"sqlserver","name":"order_center_10.106.215.90","commit_lsn":"00027124:011d4308:0006","change_lsn":"00027124:011d4308:0005","version":"1.7.0.Final","ts_ms":1656569539697,"snapshot":"false","db":"MY_SlaughterProduct","table":"my_base_sale_area_partition"},"ts_ms":1656569542562}
mysqlbinlog格式:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-475405.html
{"data":[{"id":"5VVel5vwVcuJGb40BwBTZX","order_no":"XS-202206210916005325095828","cust_code":"MYRS00604735","cust_name":"浙江杭泰食品股份有限公司","company_code":"103","company_name":"內(nèi)鄉(xiāng)屠宰廠","cust_area_code":"2FqYTboHpr4xCvNut5XiBh","cust_area":"杭州服務(wù)站","arrival_addr":"內(nèi)鄉(xiāng)屠宰廠","arrival_date":null,"delivery_mode":"客戶自提","delivery_mode_code":"1105001","order_weight":null,"order_amount":null,"service_manager":"余志軍","service_manager_job_no":"210124517","note":null,"status":"8","crt_time":"2022-06-20 09:16:05.000","crt_user":"my-job-executor","crt_name":"余志軍","crt_job_no":null,"upd_time":"2022-06-20 11:13:01.958","upd_name":"","upd_job_no":null,"delivery_addr":null,"service_tel":null,"generate":null,"generate_people":null,"generate_date":null,"father_order_no":null,"split_order_status":null,"prepare_status":null,"prepare_date":"2022-06-20 10:49:01.000","distribution_status":null,"distribution_people":null,"distribution_date":null,"data_type":null,"exchange_status":null,"pig_form_id":"103","financial_status":null,"financial_job_no":null,"financial_name":null,"financial_time":null,"order_type":"0","freight_deduction_amount":null,"arrival_time":null,"order_change_type":null,"order_date":"2022-06-21","province":"河南省","city":"南陽(yáng)市","county":null,"two_category":"農(nóng)貿(mào)","two_category_code":"1107002","delivery_branch":null,"branch_code":null,"branch_name":null,"area_status":null,"crt_dunning":null,"approval_status":null,"latest_delivery_date":null,"order_attributes":"1","retail_status":null,"retail_flag":"0"}],"database":"my_slaughter_finance","es":1655979696000,"id":87051,"isDdl":false,"mysqlType":{"id":"varchar(32)","order_no":"varchar(64)","cust_code":"varchar(64)","cust_name":"varchar(255)","company_code":"varchar(64)","company_name":"varchar(255)","cust_area_code":"varchar(64)","cust_area":"varchar(128)","arrival_addr":"varchar(1024)","arrival_date":"datetime(3)","delivery_mode":"varchar(64)","delivery_mode_code":"varchar(64)","order_weight":"decimal(18,2)","order_amount":"decimal(18,2)","service_manager":"varchar(64)","service_manager_job_no":"varchar(64)","note":"varchar(1024)","status":"int(11)","crt_time":"datetime(3)","crt_user":"varchar(64)","crt_name":"varchar(64)","crt_job_no":"varchar(64)","upd_time":"datetime(3)","upd_name":"varchar(255)","upd_job_no":"varchar(255)","delivery_addr":"varchar(255)","service_tel":"varchar(255)","generate":"int(11)","generate_people":"varchar(255)","generate_date":"datetime(3)","father_order_no":"varchar(2":null,"delivery_mode":null,"delivery_mode_code":"1105001","order_weight":"310.0","order_amount":"6416.0","service_manager":"熊巍東","service_manager_job_no":"200706045","note":null,"status":"7","crt_time":"2020-11-03 00:00:02.000","crt_user":null,"crt_name":"付宇","crt_job_no":"170704993","upd_time":null,"upd_name":null,"upd_job_no":null,"delivery_addr":null,"service_tel":null,"generate":null,"generate_people":null,"generate_date":null,"father_order_no":null,"split_order_status":null,"prepare_status":"0","prepare_date":"2020-11-04 00:00:00.000","distribution_status":null,"distribution_people":null,"distribution_date":null,"data_type":"my_sale_order","exchange_status":"1","pig_form_id":"103","financial_status":null,"financial_job_no":"200713613","financial_name":"王歡","financial_time":null,"order_type":"3","freight_deduction_amount":null,"arrival_time":null,"order_change_type":null,"order_date":"2020-11-03","province":null,"city":null,"county":null,"two_category":null,"two_category_code":null,"delivery_branch":null,"branch_code":null,"branch_name":null,"area_status":"1","crt_dunning":"0","approval_status":null,"latest_delivery_date":null,"order_attributes":null,"retail_status":null,"retail_flag":"0"}],"database":"my_slaughter_finance","es":1656043006000,"id":18387,"isDdl":false,"mysqlType":{"id":"varchar(32)","order_no":"varchar(64)","cust_code":"varchar(64)","cust_name":"varchar(255)","company_code":"varchar(64)","company_name":"varchar(255)","cust_area_code":"varchar(64)","cust_area":"varchar(128)","arrival_addr":"varchar(1024)","arrival_date":"datetime(3)","delivery_mode":"varchar(64)","delivery_mode_code":"varchar(64)","order_weight":"decimal(18,2)","order_amount":"decimal(18,2)","service_manager":"varchar(64)","service_manager_job_no":"varchar(64)","note":"varchar(1024)","status":"int(11)","crt_time":"datetime(3)","crt_user":"varchar(64)","crt_name":"varchar(64)","crt_job_no":"varchar(64)","upd_time":"datetime(3)","upd_name":"varchar(255)","upd_job_no":"varchar(255)","delivery_addr":"varchar(255)","service_tel":"varchar(255)","generate":"int(11)","generate_people":"varchar(255)","generate_date":"datetime(3)","father_order_no":"varchar(255)","split_order_status":"int(11)","prepare_status":"int(11)","prepare_date":"datetime(3)","distribution_status":"int(11)","distribution_people":"varchar(255)","distribution_date":"datetime(3)","data_type":"varchar(64)","exchange_status":"int(11)","pig_form_id":"varchar(64)","financial_status":"int(11)","financial_job_no":"varchar(255)","financial_name":"varchar(255)","financial_time":"datetime(3)","order_type":"int(11)","freight_deduction_amount":"decimal(18,2)","arrival_time":"datetime(3)","order_change_type":"int(11)","order_date":"varchar(32)","province":"varchar(64)","city":"varchar(64)","county":"varchar(64)","two_category":"varchar(32)","two_category_code":"varchar(64)","delivery_branch":"int(11)","branch_code":"varchar(255)","branch_name":"varchar(255)","area_status":"int(11)","crt_dunning":"int(11)","approval_status":"int(11)","latest_delivery_date":"varchar(64)","order_attributes":"int(11)","retail_status":"int(11)","retail_flag":"int(11)"},"old":[{"crt_time":"2020-11-03 00:00:00.000"}],"pkNames":["id"],"sql":"","sqlType":{"id":12,"order_no":12,"cust_code":12,"cust_name":12,"company_code":12,"company_name":12,"cust_area_code":12,"cust_area":12,"arrival_addr":12,"arrival_date":93,"delivery_mode":12,"delivery_mode_code":12,"order_weight":3,"order_amount":3,"service_manager":12,"service_manager_job_no":12,"note":12,"status":4,"crt_time":93,"crt_user":12,"crt_name":12,"crt_job_no":12,"upd_time":93,"upd_name":12,"upd_job_no":12,"delivery_addr":12,"service_tel":12,"generate":4,"generate_people":12,"generate_date":93,"father_order_no":12,"split_order_status":4,"prepare_status":4,"prepare_date":93,"distribution_status":4,"distribution_people":12,"distribution_date":93,"data_type":12,"exchange_status":4,"pig_form_id":12,"financial_status":4,"financial_job_no":12,"financial_name":12,"financial_time":93,"order_type":4,"freight_deduction_amount":3,"arrival_time":93,"order_change_type":4,"order_date":12,"province":12,"city":12,"county":12,"two_category":12,"two_category_code":12,"delivery_branch":4,"branch_code":12,"branch_name":12,"area_status":4,"crt_dunning":4,"approval_status":4,"latest_delivery_date":12,"order_attributes":4,"retail_status":4,"retail_flag":4},"table":"my_finance_sale_order","ts":1656043006603,"type":"UPDATE"}
附:學(xué)習(xí)資料文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-475405.html
到了這里,關(guān)于Flink CDC數(shù)據(jù)同步的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!