一、數(shù)倉分層介紹
1. 普通實(shí)時(shí)計(jì)算與實(shí)時(shí)數(shù)倉比較
普通的實(shí)時(shí)計(jì)算優(yōu)先考慮時(shí)效性,所以從數(shù)據(jù)源采集經(jīng)過實(shí)時(shí)計(jì)算直接得到結(jié)果。如此做時(shí)效性更好,但是弊端是由于計(jì)算過程中的中間結(jié)果沒有沉淀下來,所以當(dāng)面對(duì)大量實(shí)時(shí)需求的時(shí)候,計(jì)算的復(fù)用性較差,開發(fā)成本隨著需求增加直線上升。
實(shí)時(shí)數(shù)倉基于一定的數(shù)據(jù)倉庫理念,對(duì)數(shù)據(jù)處理流程進(jìn)行規(guī)劃、分層,目的是提高數(shù)據(jù)的復(fù)用性。
2. 實(shí)時(shí)電商數(shù)倉,項(xiàng)目分為以下幾層
- ODS:原始數(shù)據(jù),日志和業(yè)務(wù)數(shù)據(jù)
- DWD:根據(jù)數(shù)據(jù)對(duì)象為單位進(jìn)行分流,比如訂單、頁面訪問等等
- DIM:維度數(shù)據(jù)
- DWM:對(duì)于部分?jǐn)?shù)據(jù)對(duì)象進(jìn)行進(jìn)一步加工,比如獨(dú)立訪問、跳出行為,也可以和維度進(jìn)行關(guān)聯(lián),形成寬表,依舊是明細(xì)數(shù)據(jù)
- DWS:根據(jù)某個(gè)主題將多個(gè)事實(shí)數(shù)據(jù)輕度聚合,形成主題寬表
- ADS:把ClickHouse中的數(shù)據(jù)根據(jù)可視化需進(jìn)行篩選聚合
二、實(shí)時(shí)需求概覽
1. 離線計(jì)算與實(shí)時(shí)計(jì)算的比較
離線計(jì)算:就是在計(jì)算開始前已知所有輸入數(shù)據(jù),輸入數(shù)據(jù)不會(huì)產(chǎn)生變化,一般計(jì)算量
級(jí)較大,計(jì)算時(shí)間也較長(zhǎng)。例如今天早上一點(diǎn),把昨天累積的日志,計(jì)算出所需結(jié)果。最經(jīng)
典的就是 Hadoop 的 MapReduce 方式;
一般是根據(jù)前一日的數(shù)據(jù)生成報(bào)表,雖然統(tǒng)計(jì)指標(biāo)、報(bào)表繁多,但是對(duì)時(shí)效性不敏感。
從技術(shù)操作的角度,這部分屬于批處理的操作。即根據(jù)確定范圍的數(shù)據(jù)一次性計(jì)算。
實(shí)時(shí)計(jì)算:輸入數(shù)據(jù)是可以以序列化的方式一個(gè)個(gè)輸入并進(jìn)行處理的,也就是說在開始
的時(shí)候并不需要知道所有的輸入數(shù)據(jù)。與離線計(jì)算相比,運(yùn)行時(shí)間短,計(jì)算量級(jí)相對(duì)較小。
強(qiáng)調(diào)計(jì)算過程的時(shí)間要短,即所查當(dāng)下給出結(jié)果。
主要側(cè)重于對(duì)當(dāng)日數(shù)據(jù)的實(shí)時(shí)監(jiān)控,通常業(yè)務(wù)邏輯相對(duì)離線需求簡(jiǎn)單一下,統(tǒng)計(jì)指標(biāo)也
少一些,但是更注重?cái)?shù)據(jù)的時(shí)效性,以及用戶的交互性。從技術(shù)操作的角度,這部分屬于流
處理的操作。根據(jù)數(shù)據(jù)源源不斷地到達(dá)進(jìn)行實(shí)時(shí)的運(yùn)算。
2. 實(shí)時(shí)需求種類
日常統(tǒng)計(jì)報(bào)表或分析圖中需要包含當(dāng)日部分
對(duì)于日常企業(yè)、網(wǎng)站的運(yùn)營(yíng)管理如果僅僅依靠離線計(jì)算,數(shù)據(jù)的時(shí)效性往往無法滿足。通過實(shí)時(shí)計(jì)算獲得當(dāng)日、分鐘級(jí)、秒級(jí)甚至亞秒的數(shù)據(jù)更加便于企業(yè)對(duì)業(yè)務(wù)進(jìn)行快速反應(yīng)與調(diào)整。
所以實(shí)時(shí)計(jì)算結(jié)果往往要與離線數(shù)據(jù)進(jìn)行合并或者對(duì)比展示在 BI 或者統(tǒng)計(jì)平臺(tái)中。
數(shù)據(jù)大屏,相對(duì)于 BI 工具或者數(shù)據(jù)分析平臺(tái)是更加直觀的數(shù)據(jù)可視化方式。尤其是一些大促活動(dòng),已經(jīng)成為必備的一種營(yíng)銷手段。
另外還有一些特殊行業(yè),比如交通、電信的行業(yè),那么大屏監(jiān)控幾乎是必備的監(jiān)控手段。
3. 數(shù)據(jù)預(yù)警或提示
經(jīng)過大數(shù)據(jù)實(shí)時(shí)計(jì)算得到的一些風(fēng)控預(yù)警、營(yíng)銷信息提示,能夠快速讓風(fēng)控或營(yíng)銷部分得到信息,以便采取各種應(yīng)對(duì)。
比如,用戶在電商、金融平臺(tái)中正在進(jìn)行一些非法或欺詐類操作,那么大數(shù)據(jù)實(shí)時(shí)計(jì)算可以快速的將情況篩選出來發(fā)送風(fēng)控部門進(jìn)行處理,甚至自動(dòng)屏蔽。 或者檢測(cè)到用戶的行為對(duì)于某些商品具有較強(qiáng)的購買意愿,那么可以把這些“商機(jī)”推送給客服部門,讓客服進(jìn)行主動(dòng)的跟進(jìn)。
4. 實(shí)時(shí)推薦系統(tǒng)
實(shí)時(shí)推薦就是根據(jù)用戶的自身屬性結(jié)合當(dāng)前的訪問行為,經(jīng)過實(shí)時(shí)的推薦算法計(jì)算,從而將用戶可能喜歡的商品、新聞、視頻等推送給用戶。
這種系統(tǒng)一般是由一個(gè)用戶畫像批處理加一個(gè)用戶行為分析的流處理組合而成。
三、統(tǒng)計(jì)架構(gòu)分析
1. 離線架構(gòu)
2. 實(shí)時(shí)架構(gòu)
四、日志數(shù)據(jù)采集
1. 模擬日志生成器的使用
這里提供了一個(gè)模擬生成數(shù)據(jù)的 jar 包,可以將日志發(fā)送給某一個(gè)指定的端口,需要大數(shù)據(jù)程序員了解如何從指定端口接收數(shù)據(jù)并數(shù)據(jù)進(jìn)行處理的流程。
將數(shù)據(jù)生成腳本/行為數(shù)據(jù)的內(nèi)容到 node101 的 /opt/module/gmall-flink/rt_applog 目錄
根據(jù)實(shí)際需要修改 application.yml
使用模擬日志生成器的 jar 運(yùn)行
java -jar gmall2020-mock-log-2020-12-18.jar
目前我們還沒有地址接收日志,所以程序運(yùn)行后的結(jié)果有如下錯(cuò)誤
注意:ZooKeeper 從 3.5 開始,AdminServer 的端口也是 8080,如果在本機(jī)啟動(dòng)了zk,那么可能看到 404、405 錯(cuò)誤,意思是找到請(qǐng)求地址了,但是接收的方式不對(duì)。
2. 日志采集模塊-本地測(cè)試
① SpringBoot 簡(jiǎn)介
SpringBoot 是由 Pivotal 團(tuán)隊(duì)提供的全新框架,其設(shè)計(jì)目的是用來簡(jiǎn)化新 Spring 應(yīng)用的初始搭建以及開發(fā)過程。 該框架使用了特定的方式來進(jìn)行配置,從而使開發(fā)人員不再需要定義樣板化的配置。
1)有了 springboot 我們就可以…
? 不再需要那些千篇一律,繁瑣的 xml 文件。
? 內(nèi)嵌 Tomcat,不再需要外部的 Tomcat
? 更方便的和各個(gè)第三方工具(mysql,redis,elasticsearch,dubbo,kafka 等等整合),而只要維護(hù)一個(gè)配置文件即可。
2)springboot 和 ssm 的關(guān)系
springboot 整合了 springmvc,spring 等核心功能。也就是說本質(zhì)上實(shí)現(xiàn)功能的還是原有的 spring ,springmvc 的包,但是 springboot 單獨(dú)包裝了一層,這樣用戶就不必直接對(duì) springmvc,spring 等,在 xml 中配置。
3)沒有 xml,我們要去哪配置
springboot 實(shí)際上就是把以前需要用戶手工配置的部分,全部作為默認(rèn)項(xiàng)。除非用戶需要額外更改不然不用配置。這就是所謂的:“約定大于配置” 如果需要特別配置的時(shí)候,去修改application.properties (application.yml)
② 快速搭建 SpringBoot 程序 gmall-logger,采集模擬生成的日志數(shù)據(jù)
1)在 IDEA 中安裝 lombok 插件
在 Plugins 下搜索 lombok 然后在線安裝即可,安裝后注意重啟
2)創(chuàng)建空的父工程 gmall2021,用于管理后續(xù)所有的模塊 module
我們這里就是為了將各個(gè)模塊放在一起,但是模塊彼此間還是獨(dú)立的,所以創(chuàng)建一個(gè) Empty Project 即可;如果要是由父 module 管理子 module,需要將父 module 的 pom.xml 文件的 <packaging>
設(shè)置為 pom
配置項(xiàng)目名稱為 gmall2021-logger 及 JDK 版本
選擇版本以及通過勾選自動(dòng)添加 lombok、SpringWeb、Kafka 相關(guān)依賴
完成之后開始下載依賴,完整的 pom.xml 文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.10</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.fancy.gmall</groupId>
<artifactId>gmall2021-logger</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>gmall2021-logger</name>
<description>gmall2021-logger</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
創(chuàng)建 LoggerController 輸出 SpringBoot 處理流程
package com.fancy.gmall.controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class LoggerController {
@RequestMapping("test1")
public String test1() {
System.out.println("111");
return "success";
}
@RequestMapping("test2")
public String test2(@RequestParam("name") String name,
@RequestParam("age") int age) {
System.out.println(name + ":" + age);
return "success";
}
}
啟動(dòng)并查看輸出
3. SpringBoot 整合 Kafka
修改 SpringBoot 核心配置文件 application.propeties
# 應(yīng)用名稱
spring.application.name=gmall-logger
# 應(yīng)用服務(wù) WEB 訪問端口
server.port=8081
#============== kafka ===================
# 指定 kafka 代理地址,可以多個(gè)
spring.kafka.bootstrap-servers=node101:9092
# 指定消息 key 和消息體的編解碼方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
在 LoggerController 中添加方法,將日志落盤并發(fā)送到 Kafka 主題中
package com.fancy.gmall.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
public class LoggerController {
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("applog")
public String getLogger(@RequestParam("param") String jsonStr) {
log.info(jsonStr);
kafkaTemplate.send("ods_base_log", jsonStr);
return "success";
}
}
在 Resources 中添加 logback.xml 配置文件
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="LOG_HOME" value="d:/opt/module/logs" />
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%msg%n</pattern>
</encoder>
</appender>
<appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_HOME}/app.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
<encoder>
<pattern>%msg%n</pattern>
</encoder>
</appender>
<!-- 將某一個(gè)包下日志單獨(dú)打印日志 -->
<logger name="com.fancy.gmall.controller.LoggerController"
level="INFO" additivity="false">
<appender-ref ref="rollingFile" />
<appender-ref ref="console" />
</logger>
<root level="error" additivity="false">
<appender-ref ref="console" />
</root>
</configuration>
logback 配置文件說明
? appender
追加器,描述如何寫入到文件中(寫在哪,格式,文件的切分)
- ConsoleAppender :追加到控制臺(tái)
- RollingFileAppender :滾動(dòng)追加到文件
? logger
控制器,描述如何選擇追加器
注意:要是單獨(dú)為某個(gè)類指定的時(shí)候,別忘了修改類的全限定名
? 日志級(jí)別
TRACE [DEBUG INFO WARN ERROR] FATAL
修改 node101 上的 rt_applog 目錄下的 application.yml 配置文件
注意:mock.url 設(shè)置為自身 Windows 的 IP 地址
測(cè)試
? 運(yùn)行 Windows 上的 Idea 程序 LoggerApplication
? 運(yùn)行 rt_applog 下的 jar 包
? 啟動(dòng) kafka 消費(fèi)者進(jìn)行測(cè)試
bin/kafka-console-consumer.sh --bootstrap-server node:9092 --topic ods_base_log
3. 日志采集模塊-打包單機(jī)部署
修改 gmall2021-logger 中的 logback.xml 配置文件
<property name="LOG_HOME" value="/opt/module/gmall-flink/rt_applog/logs" />
注意:路徑和上面創(chuàng)建的路徑保持一致,根據(jù)自己的實(shí)際情況進(jìn)行修改
打包
將打好的 jar 包 上 傳 到 node101 的 /opt/module/gmall-flink/rt_applog 目錄下
修改/opt/module/gmall-flink/rt_applog/application.yml
#http 模式下,發(fā)送的地址
mock.url=http://hadoop102:8081/applog
測(cè)試
? 運(yùn)行 hadoop102 上的 rt_gmall 下的日志處理 jar 包
? 運(yùn)行 rt_applog 下的 jar 包
? 啟動(dòng) kafka 消費(fèi)者進(jìn)行測(cè)試
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ods_base_log
五、業(yè)務(wù)數(shù)據(jù)庫數(shù)據(jù)采集
1. MySQL 的準(zhǔn)備
創(chuàng)建實(shí)時(shí)業(yè)務(wù)數(shù)據(jù)庫
導(dǎo)入建表數(shù)據(jù)
修改/etc/my.cnf 文件
[fancy@node101 module]$ sudo vim /etc/my.cnf
server-id = 1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=gmall2021
注意:binlog-do-db 根據(jù)自己的情況進(jìn)行修改,指定具體要同步的數(shù)據(jù)庫
重啟 MySQL 使配置生效
sudo systemctl restart mysqld
到 /var/lib/mysql 目錄下查看初始文件大小 154
模擬生成數(shù)據(jù)
? 把 數(shù)據(jù)生成腳本 / 業(yè)務(wù)數(shù)據(jù)里面的 jar 和 properties 文件上傳到 /opt/module/gmall-flink/rt_db 目錄下
? 修改 application.properties 中數(shù)據(jù)庫連接信息
logging.level.root=info
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://node101:3306/gmall2021?characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=000000
logging.pattern.console=%m%n
mybatis-plus.global-config.db-config.field-strategy=not_null
#業(yè)務(wù)日期
mock.date=2021-03-06
#是否重置
mock.clear=1
#是否重置用戶
mock.clear.user=0
……
注意:如果生成較慢,可根據(jù)配置情況適當(dāng)調(diào)整配置項(xiàng)
? 運(yùn)行 jar 包
[fancy@node101 rt_dblog]$ java -jar gmall2020-mock-db-2020-11-27.jar
再次到/var/lib/mysql 目錄下,查看 index 文件的大小
2. 環(huán)境搭建
在工程中新建模塊 gmall2021-realtime
創(chuàng)建如下包結(jié)構(gòu)
目錄 | 作用 |
---|---|
app | 產(chǎn)生各層數(shù)據(jù)的 flink 任務(wù) |
bean | 數(shù)據(jù)對(duì)象 |
common | 公共常量 |
utils | 工具類 |
修改配置文件
1)在 pom.xml 添加如下配置
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<flink.version>1.12.0</flink.version>
<scala.version>2.12</scala.version>
<hadoop.version>3.1.3</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<!--如果保存檢查點(diǎn)到 hdfs 上,需要引入此依賴-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<!--Flink 默認(rèn)使用的是 slf4j 記錄日志,相當(dāng)于一個(gè)日志的接口,我們這里使用 log4j 作為具體的日志實(shí)現(xiàn)-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
2)在 resources 目錄下創(chuàng)建 log4j.properties 配置文件
log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
3. 代碼實(shí)現(xiàn)
1. 將流數(shù)據(jù)推送下游的 Kafka 的 Topic 中
package com.fancy.gmall.utils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
public class MyKafkaUtil {
private static String KAFKA_SERVER = "node101:9092,node102:9092,node103:9092";
private static Properties properties = new Properties();
static {
properties.setProperty("bootstrap.servers", KAFKA_SERVER);
}
public static FlinkKafkaProducer<String> getKafkaSink(String topic) {
return new FlinkKafkaProducer<String>(topic, new SimpleStringSchema(), properties);
}
}
2.編寫主程序,消費(fèi) MySQL 變化數(shù)據(jù)并將數(shù)據(jù)寫入 Kafka
package com.fancy.gmall.app;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.fancy.gmall.utils.MyKafkaUtil;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
public class Flink_CDCWithCustomerSchema {
public static void main(String[] args) throws Exception{
//1. 創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2. 創(chuàng)建 Flink-MySQL-CDC 的 Source
DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
.hostname("node101")
.port(3306)
.username("root")
.password("000000")
.databaseList("gmall2021")
.startupOptions(StartupOptions.latest())
.deserializer(new DebeziumDeserializationSchema<String>() {
@Override
public TypeInformation<String> getProducedType() {
return null;
}
// 自定義數(shù)據(jù)解析器
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
// 獲取主題信息 包含著數(shù)據(jù)庫和表名
String topic = sourceRecord.topic();
String[] arr = topic.split("\\.");
String db = arr[1];
String tableName = arr[2];
// 獲取操作類型
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
// 獲取值信息并轉(zhuǎn)換為 Struct 類型
Struct value = (Struct) sourceRecord.value();
// 獲取變化后的數(shù)據(jù)
Struct after = value.getStruct("after");
// 創(chuàng)建 JSON 對(duì)象用于存儲(chǔ)數(shù)據(jù)信息
JSONObject data = new JSONObject();
if (after != null) {
Schema schema = after.schema();
for (Field field : schema.fields()) {
data.put(field.name(), after.get(field.name()));
}
}
// 創(chuàng)建 JSON 對(duì)象用于封裝最終返回值數(shù)據(jù)信息
JSONObject result = new JSONObject();
result.put("operation", operation.toString().toLowerCase());
result.put("data", data);
result.put("database", db);
result.put("table", tableName);
// 發(fā)送數(shù)據(jù)到下游
collector.collect(result.toJSONString());
}
}).build();
// 3. 使用 CDC Source 從 MySQL 讀取數(shù)據(jù)
DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
// 4. 打印數(shù)據(jù)
mysqlDS.addSink(MyKafkaUtil.getKafkaSink("ods_base_db"));
// 5. 執(zhí)行任務(wù)
env.execute();
}
}
測(cè)試點(diǎn)擊運(yùn)行即可
六、Nginx 安裝
在 node 上運(yùn)行 yum,安裝相關(guān)依賴包
sudo yum -y install openssl openssl-devel pcre pcre-devel zlib zlib-devel gcc gcc-c++
將 nginx-1.12.2.tar.gz 上傳到 /opt/software 下
在/opt/module/software 下解壓縮 nginx-1.12.2.tar.gz 包,接著進(jìn)入解壓縮目錄,執(zhí)行
./configure --prefix=/opt/module/nginx
make && make install
–prefix=要安裝到的目錄
安裝成功后,/opt/module/nginx 目錄下結(jié)構(gòu)
啟動(dòng) Nginx
在/opt/module/nginx/sbin 目錄下執(zhí)行 ./nginx
? 如果在普通用戶下面啟動(dòng)會(huì)報(bào)錯(cuò)
原因:nginx 占用 80 端口,默認(rèn)情況下非 root 用戶不允許使用1024 以下端口
解決:讓當(dāng)前用戶的某個(gè)應(yīng)用也可以使用 1024 以下的端口
sudo setcap cap_net_bind_service=+eip /opt/module/nginx/sbin/nginx
注意:要根據(jù)自己的實(shí)際路徑進(jìn)行配置
查看啟動(dòng)情況
ps -ef |grep nginx
因?yàn)?nginx 不是用 java 寫的,所以不能通過 jps 查看
? 在瀏覽器中輸入 http://node101/
重啟 Nginx
./nginx -s reload
關(guān)閉 Nginx
./nginx -s stop
通過配置文件啟動(dòng)
./nginx -c /opt/module/nginx/conf/nginx.conf
/opt/module/nginx/sbin/nginx -c /opt/module/nginx/conf/nginx.conf
其中 -c 是指定配置文件,而且配置文件路徑必須指定絕對(duì)路徑
配置檢查
當(dāng)修改 Nginx 配置文件后,可以使用 Nginx 命令進(jìn)行配置文件語法檢查,用于檢查 Nginx 配置文件是否正確
/opt/module /nginx/sbin/nginx -c /opt/module/nginx/conf/nginx.conf –t
如果 80 端口號(hào)被占用 httpd
sudo systemctl stop httpd
sudo systemctl disable httpd
部分機(jī)器啟動(dòng)時(shí)報(bào)錯(cuò):
/usr/local/nginx/sbin/nginx: error while loading shared libraries: libpcre.so.1: cannot open shared object file: No such file or directory
解決:ln -s /usr/local/lib/libpcre.so.1 /lib64
配置負(fù)載均衡
模擬數(shù)據(jù)以后應(yīng)該發(fā)給 nginx, 然后 nginx 再轉(zhuǎn)發(fā)給我們的日志服務(wù)器。日志服務(wù)器我們會(huì)分別配置node101, node102, node103 三臺(tái)設(shè)備上。
1.打開 nginx 配置文件
cd /opt/module/nginx/conf
vim nginx.conf
2.修改如下配置
http {
# 啟動(dòng)省略
upstream logcluster{
server node101:8081 weight=1;
server node102:8081 weight=1;
server node103:8081 weight=1;
}
server {
listen 80;
server_name localhost;
#charset koi8-r;
#access_log logs/host.access.log main;
location / {
#root html;
#index index.html index.htm;
# 代理的服務(wù)器集群 命名隨意, 但是不能出現(xiàn)下劃線
proxy_pass http://logcluster;
proxy_connect_timeout 10;
}
# 其他省略
}
七、Maxwell 安裝
1. 介紹
Maxwell 是由美國 Zendesk 開源,用 Java 編寫的 MySQL 實(shí)時(shí)抓取軟件。 實(shí)時(shí)讀取MySQL 二進(jìn)制日志 Binlog,并生成 JSON 格式的消息,作為生產(chǎn)者發(fā)送給 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平臺(tái)的應(yīng)用程序。
官網(wǎng)地址:http://maxwells-daemon.io/
2. Maxwell 工作原理
MySQL 主從復(fù)制過程
? Master 主庫將改變記錄,寫到二進(jìn)制日志(binary log)中
? Slave 從庫向 mysql master 發(fā)送 dump 協(xié)議,將 master 主庫的 binary log events 拷貝到它的中繼日志(relay log);
? Slave 從庫讀取并重做中繼日志中的事件,將改變的數(shù)據(jù)同步到自己的數(shù)據(jù)庫。
Maxwell 的工作原理
很簡(jiǎn)單,就是把自己偽裝成 slave,假裝從 master 復(fù)制數(shù)據(jù)
MySQL 的 binlog
(1) 什么是 binlog
MySQL 的二進(jìn)制日志可以說 MySQL 最重要的日志了,它記錄了所有的 DDL 和 DML(除了數(shù)據(jù)查詢語句)語句,以事件形式記錄,還包含語句所執(zhí)行的消耗的時(shí)間,MySQL 的二進(jìn)制日志是事務(wù)安全型的。
一般來說開啟二進(jìn)制日志大概會(huì)有 1%的性能損耗。二進(jìn)制有兩個(gè)最重要的使用場(chǎng)景:
? 其一:MySQL Replication 在 Master 端開啟 binlog,Master 把它的二進(jìn)制日志傳遞給 slaves 來達(dá)到 master-slave 數(shù)據(jù)一致的目的。
? 其二:自然就是數(shù)據(jù)恢復(fù)了,通過使用 mysqlbinlog 工具來使恢復(fù)數(shù)據(jù)。
二進(jìn)制日志包括兩類文件:二進(jìn)制日志索引文件(文件名后綴為.index)用于記錄所有的二進(jìn)制文件,二進(jìn)制日志文件(文件名后綴為.00000*)記錄數(shù)據(jù)庫所有的 DDL 和 DML(除了數(shù)據(jù)查詢語句)語句事件。
(2) binlog 的開啟
? 找到 MySQL 配置文件的位置
? Linux: /etc/my.cnf
如果/etc 目錄下沒有,可以通過 locate my.cnf 查找位置
? Windows: \my.ini
? 在 mysql 的配置文件下,修改配置
在 [mysqld] 區(qū)塊,設(shè)置/添加 log-bin=mysql-bin
這個(gè)表示 binlog 日志的前綴是 mysql-bin,以后生成的日志文件就是mysql-bin.123456 的文件后面的數(shù)字按順序生成,每次 mysql 重啟或者到達(dá)單個(gè)文件大小的閾值時(shí),新生一個(gè)文件,按順序編號(hào)。
(3) binlog 的分類設(shè)置
mysql binlog 的格式有三種,分別是 STATEMENT,MIXED,ROW。
在配置文件中可以選擇配置 binlog_format= statement|mixed|row
? 三種格式的區(qū)別:
? statement
語句級(jí),binlog 會(huì)記錄每次一執(zhí)行寫操作的語句。
相對(duì) row 模式節(jié)省空間,但是可能產(chǎn)生不一致性,比如
update tt set create_date=now()
如果用 binlog 日志進(jìn)行恢復(fù),由于執(zhí)行時(shí)間不同可能產(chǎn)生的數(shù)據(jù)就不同。
優(yōu)點(diǎn): 節(jié)省空間
缺點(diǎn): 有可能造成數(shù)據(jù)不一致。
? row
行級(jí), binlog 會(huì)記錄每次操作后每行記錄的變化。
優(yōu)點(diǎn):保持?jǐn)?shù)據(jù)的絕對(duì)一致性。因?yàn)椴还?sql 是什么,引用了什函數(shù),他只記錄執(zhí)行后的效果。
缺點(diǎn):占用較大空間。
? mixed
statement 的升級(jí)版,一定程度上解決了,因?yàn)橐恍┣闆r而造成的 statement 模式不一致問題
默認(rèn)還是 statement,在某些情況下譬如:
- 當(dāng)函數(shù)中包含 UUID() 時(shí);
- 包含 AUTO_INCREMENT 字段的表被更新時(shí);
- 執(zhí)行 INSERT DELAYED 語句時(shí);
- 用 UDF 時(shí);
會(huì)按照 ROW 的方式進(jìn)行處理
優(yōu)點(diǎn):節(jié)省空間,同時(shí)兼顧了一定的一致性。
缺點(diǎn):還有些極個(gè)別情況依舊會(huì)造成不一致,另外 statement 和 mixed 對(duì)于需要對(duì) binlog 的監(jiān)控的情況都不方便。
綜合上面對(duì)比,Maxwell 想做監(jiān)控分析,選擇 row 格式比較合適
3. 安裝 Maxwell
? 將工具下的 maxwell-1.25.0.tar.gz 上傳到/opt/software 目錄下
? 解壓 maxwell-1.25.0.tar.gz 到/opt/module 目錄
[root@node101 module]$ tar -zxvf /opt/software/maxwell-1.25.0.tar.gz -C /opt/module/
4. 初始化 Maxwell 元數(shù)據(jù)庫
? 在 MySQL 中建立一個(gè) maxwell 庫用于存儲(chǔ) Maxwell 的元數(shù)據(jù)
[root@node101 module]$ mysql -uroot -p000000
mysql> CREATE DATABASE maxwell;
? 設(shè)置安全級(jí)別
mysql> set global validate_password_length=4;
mysql> set global validate_password_policy=0;
? 分配一個(gè)賬號(hào)可以操作該數(shù)據(jù)庫
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '000000';
? 分配這個(gè)賬號(hào)可以監(jiān)控其他數(shù)據(jù)庫的權(quán)限
mysql> GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO maxwell@'%';
5. 使用 Maxwell 監(jiān)控抓取 MySQL 數(shù)據(jù)
? 拷貝配置文件
[root@node101 maxwell-1.25.0]$ cp config.properties.example config.properties
? 修改配置文件
producer=kafka
kafka.bootstrap.servers=node101:9092,node102:9092,node103:9092
#需要添加
kafka_topic=ods_base_db_m
# mysql login info
host=node101
user=maxwell
password=000000
#需要添加 初始化會(huì)用
client_id=maxwell_1
注意:默認(rèn)還是輸出到指定 Kafka 主題的一個(gè) kafka 分區(qū),因?yàn)槎鄠€(gè)分區(qū)并行可能會(huì)打亂 binlog 的順序。
如果要提高并行度,首先設(shè)置 kafka 的分區(qū)數(shù)>1,然后設(shè)置 producer_partition_by 屬性可選值
producer_partition_by=database|table|primary_key|random| column
? 在/home/bin 目錄下編寫 maxwell.sh 啟動(dòng)腳本
[root@node101 maxwell-1.25.0]$ vim /home/bin/maxwell.sh
/opt/module/maxwell-1.25.0/bin/maxwell --config
/opt/module/maxwell-1.25.0/config.properties >/dev/null 2>&1 &
? 授予執(zhí)行權(quán)限
[root@node101 maxwell-1.25.0]$ sudo chmod +x /home/bin/maxwell.sh
? 運(yùn)行啟動(dòng)程序
[root@node101 maxwell-1.25.0]$ maxwell.sh
? 啟動(dòng) Kafka 消費(fèi)客戶端,觀察結(jié)果
[root@node101 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server node101:9092 --topic ods_base_db_m
? 執(zhí)行/opt/module/rt_dblog 下的 jar 生成模擬數(shù)據(jù)
[root@node101 rt_dblog]$ java -jar gmall2020-mock-db-2020-11-27.jar
八、Canal 安裝
1. Canal入門
阿里巴巴 B2B 公司,因?yàn)闃I(yè)務(wù)的特性,賣家主要集中在國內(nèi),買家主要集中在國外,所以衍生出了同步杭州和美國異地機(jī)房的需求,從 2010 年開始,阿里系公司開始逐步的嘗試基于數(shù)據(jù)庫的日志解析,獲取增量變更進(jìn)行同步,由此衍生出了增量訂閱&消費(fèi)的業(yè)務(wù)。
Canal 是用 java 開發(fā)的基于數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱&消費(fèi)的中間件。目前,Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 來處理獲得的相關(guān)數(shù)據(jù)。(數(shù)據(jù)庫同步需要阿里的 Otter 中間件,基于 Canal)
2. 使用場(chǎng)景
(1) 原始場(chǎng)景: 阿里 Otter 中間件的一部分,Otter 是阿里用于進(jìn)行異地?cái)?shù)據(jù)庫之間的同步框架,Canal 是其中一部分。
(2) 常見場(chǎng)景1:更新緩存
(3) 常見場(chǎng)景2:抓取業(yè)務(wù)數(shù)據(jù)新增變化表,用于制作拉鏈表。
(4) 常見場(chǎng)景3:抓取業(yè)務(wù)表的新增變化數(shù)據(jù),用于制作實(shí)時(shí)統(tǒng)計(jì)(我們就是這種場(chǎng)景)
…
3. Canal 的工作原理
(1) MySQL 主從復(fù)制過程
? Master 主庫將改變記錄,寫到二進(jìn)制日志(Binary log)中
? Slave 從庫向 mysql master 發(fā)送 dump 協(xié)議,將 master 主庫的 binary log events 拷貝到它的中繼日志(relay log);
? Slave 從庫讀取并重做中繼日志中的事件,將改變的數(shù)據(jù)同步到自己的數(shù)據(jù)庫。
(2) Canal 的工作原理
很簡(jiǎn)單,就是把自己偽裝成 Slave,假裝從 Master 復(fù)制數(shù)據(jù)
4. Canal 架構(gòu)以及安裝
地址:https://github.com/alibaba/canal/releases
將 canal.deployer-1.1.4.tar.gz拷貝到 /opt/sortware 目錄下,然后解壓到 /opt/module/canal 包下
注意:canal 解壓后是散的,我們?cè)谥付ń鈮耗夸浀臅r(shí)候需要將 canal 指定上
[root@node101 software]$ mkdir /opt/module/canal
[root@node101 software]$ tar -zxvf canal.deployer-1.1.4.tar.gz -C /opt/module/canal
5. canal 單機(jī)版
① 修改 conf/canal.properties 的配置
[root@node101 conf]$ pwd
/opt/module/canal/conf
[atguigu@hadoop102 conf]$ vim canal.properties
? 這個(gè)文件是 canal 的基本通用配置,canal 端口號(hào)默認(rèn)就是 11111
? 修改 canal 的輸出 mode,默認(rèn) tcp,改為輸出到 kafka
tcp 就是輸出到 canal 客戶端,通過編寫 Java 代碼處理
? 修改 Kafka 集群的地址
? 如果創(chuàng)建多個(gè)實(shí)例
通過前面 canal 架構(gòu),我們可以知道,一個(gè) canal 服務(wù)中可以有多個(gè) instance,conf/ 下的每一個(gè) example 即是一個(gè)實(shí)例,每個(gè)實(shí)例下面都有獨(dú)立的配置文件。默認(rèn)只有一個(gè)實(shí)
例 example,如果需要多個(gè)實(shí)例處理不同的 MySQL 數(shù)據(jù)的話,直接拷貝出多個(gè)example, 并對(duì)其重新命名,命名和配置文件中指定的名稱一致,然后修改 canal.properties 中的 canal.destinations=實(shí)例 1,實(shí)例 2,實(shí)例 3。
② 修改 instance.properties
我們這里只讀取一個(gè) MySQL 數(shù)據(jù),所以只有一個(gè)實(shí)例,這個(gè)實(shí)例的配置文件在 conf/example 目錄下
[root@node101 example]$ pwd
/opt/module/canal/conf/example
[root@node101 example]$ vim instance.properties
? 配置 MySQL 服務(wù)器地址
? 配置連接 MySQL 的用戶名和密碼,默認(rèn)就是我們前面授權(quán)的 canal
? 修改輸出到 Kafka 的主題以及分區(qū)數(shù)
注意:默認(rèn)還是輸出到指定 Kafka 主題的一個(gè) kafka 分區(qū),因?yàn)槎鄠€(gè)分區(qū)并行可能會(huì)打亂
binlog 的順序。如果要提高并行度,首先設(shè)置 kafka 的分區(qū)數(shù)>1,然后設(shè)置canal.mq.partitionHash 屬性
③ 單機(jī) canal 測(cè)試
? 啟動(dòng) canal
[root@node101 example]$ cd /opt/module/canal/
[root@node101 canal]$ bin/startup.sh
看到 CanalLauncher 你表示啟動(dòng)成功,同時(shí)會(huì)創(chuàng)建 gmall2021_db_c 主題
? 啟動(dòng) Kafka 消費(fèi)客戶端測(cè)試,查看消費(fèi)情況
[root@node101 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server node101:9092 --topic gmall2020_db_c
? 運(yùn)行/opt/module/rt_dblog 中生成模擬數(shù)據(jù)
6. canal 高可用
這種 zookeeper 為觀察者監(jiān)控的模式,只能實(shí)現(xiàn)高可用,而不是負(fù)載均衡,即同一時(shí)點(diǎn)只有一個(gè) canal-server 節(jié)點(diǎn)能夠監(jiān)控某個(gè)數(shù)據(jù)源,只要這個(gè)節(jié)點(diǎn)能夠正常工作,那么其他監(jiān)控這個(gè)數(shù)據(jù)源的 canal-server 只能做 stand-by,直到工作節(jié)點(diǎn)停掉,其他 canal-server 節(jié)點(diǎn)才能搶占。因?yàn)橛幸粋€(gè) stand-by 也要占用資源,同時(shí) canal 傳輸數(shù)據(jù)宕機(jī)的情況也比較少,所以好多企業(yè)是不配置 canal 的高可用的。
① 停止單機(jī) canal 進(jìn)程
② 在 node101 上修改 canal.properties
? 配置 zookeeper
? 避免發(fā)送重復(fù)數(shù)據(jù) (否則在切換 active 的時(shí)候會(huì)重復(fù)發(fā)送數(shù)據(jù))
③ 把 canal 目錄分發(fā)給其他虛擬機(jī)
[root@node101 module]$ xsync canal/
④ 測(cè)試
? 先在 node101 啟動(dòng) canal,再在 node102 上啟動(dòng) canal
[root@node101 canal]$ bin/startup.sh
? 啟動(dòng) kafka 消費(fèi)客戶端
[root@node101 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server node101:9092 --topic gmall2021_db_c
? 在數(shù)據(jù)庫 gmall2021 隨便一張表中修改一條數(shù)據(jù),查看效果
? 掛掉 102 再次修改數(shù)據(jù),查看效果
7. Maxwell 與 Canal 工具對(duì)比
? Maxwell 沒有 Canal 那種 server+client 模式,只有一個(gè) server 把數(shù)據(jù)發(fā)送到消息隊(duì)列或 redis。
? Maxwell 有一個(gè)亮點(diǎn)功能,就是 Canal 只能抓取最新數(shù)據(jù),對(duì)已存在的歷史數(shù)據(jù)沒有辦法處理。而 Maxwell 有一個(gè) bootstrap 功能,可以直接引導(dǎo)出完整的歷史數(shù)據(jù)用于初始化,非常好用。
? Maxwell 不能直接支持 HA,但是它支持?jǐn)帱c(diǎn)還原,即錯(cuò)誤解決后重啟繼續(xù)上次點(diǎn)兒讀
取數(shù)據(jù)。
? Maxwell 只支持 json 格式,而 Canal 如果用 Server+client 模式的話,可以自定義格
式。
? Maxwell 比 Canal 更加輕量級(jí)。
① 執(zhí)行不同操作,Maxwell 和 canal 數(shù)據(jù)格式對(duì)比
? 執(zhí)行 insert 測(cè)試語句
INSERT INTO z_user_info VALUES(30,'zhang3','13810001010'),(31,'li4','1389999999');
canal | maxwell |
---|---|
{“data”:[{“id”:“30”,“user_name”:“zhang3”,“tel”:“13810001010”},{“id”:“31”,“user_name”:“l(fā)i4”,“tel”:“1389999999”}],“database”:“gmall-2020-04”,“es”:1589385314000,“id”:2,“isDdl”:false,“mysqlType”:{“id”:“bigint(20)”,“user_name”:“varchar(20)”,“tel”:“varchar(20)”},“old”:null,“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“user_name”:12,“tel”:12},“table”:“z_user_info”,“ts”:1589385314116,“type”:“INSERT”}` | {“database”:“gmall-2020-04”,“table”:“z_user_info”,“type”:“insert”,“ts”:1589385314,“xid”:82982,“xoffset”:0,“data”:{“id”:30,“user_name”:“zhang3”,“tel”:“13810001010”}}{“database”:“gmall-2020-04”,“table”:“z_user_info”,“type”:“insert”,“ts”:1589385314,“xid”:82982,“commit”:true,“data”:{“id”:31,“user_name”:“l(fā)i4”,“tel”:“1389999999”}} |
? 執(zhí)行 update 操作
UPDATE z_user_info SET `user_name`='wang55' WHERE id IN(30,31)
canal | maxwell |
---|---|
{“data”:[{“id”:“30”,“user_name”:“wang55”,“tel”:“13810001010”},{“id”:“31”,“user_name”:“wang55”,“tel”:“1389999999”}],“database”:“gmall-2020-04”,“es”:1589385508000,“id”:3,“isDdl”:false,“mysqlType”:{“id”:“bigint(20)”,“user_name”:“varchar(20)”,“tel”:“varchar(20)”},“old”:[{“user_name”:“zhang3”},{“user_name”:“l(fā)i4”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“user_name”:12,“tel”:12},“table”:“z_user_info”,“ts”:1589385508676,“type”:“UPDATE”}` | {“database”:“gmall-2020-04”,“table”:“z_user_info”,“type”:“update”,“ts”:1589385508,“xid”:83206,“xoffset”:0,“data”:{“id”:30,“user_name”:“wang55”,“tel”:“13810001010”},“old”:{“user_name”:“zhang3”}}{“database”:“gmall-2020-04”,“table”:“z_user_info”,“type”:“update”,“ts”:1589385508,“xid”:83206,“commit”:true,“data”:{“id”:31,“user_name”:“wang55”,“tel”:“1389999999”},“old”:{“user_name”:“l(fā)i4”}} |
? delete 操作
DELETE FROM z_user_info WHERE id IN(30,31)
canal | maxwell |
---|---|
`{“data”:[{“id”:“30”,“user_name”:“wang55”,“tel”:“13810001010”},{“id”:“31”,“user_name”:“wang55”,“tel”:“1389999999”}],“database”:“gmall-2020-04”,“es”:1589385644000,“id”:4,“isDdl”:false,“mysqlType”:{“id”:“bigint(20)”,“user_name”:“varchar(20)”,“tel”:“varchar(20)”},“old”:null,“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“user_name”:12,“tel”:12},“table”:“z_user_info”,“ts”:1589385644829,“type”:“DELETE”} | {“database”:“gmall-2020-04”,“table”:“z_user_info”,“type”:“delete”,“ts”:1589385644,“xid”:83367,“xoffset”:0,“data”:{“id”:30,“user_name”:“wang55”,“tel”:“13810001010”}}{“database”:“gmall-2020-04”,“table”:“z_user_info”,“type”:“delete”,“ts”:1589385644,“xid”:83367,“commit”:true,“data”:{“id”:31,“user_name”:“wang55”,“tel”:“1389999999”}} |
② 總結(jié)數(shù)據(jù)特點(diǎn)
? 日志結(jié)構(gòu)
canal 每一條 SQL 會(huì)產(chǎn)生一條日志,如果該條 Sql 影響了多行數(shù)據(jù),則已經(jīng)會(huì)通過集合的方式歸集在這條日志中。(即使是一條數(shù)據(jù)也會(huì)是數(shù)組結(jié)構(gòu))maxwell 以影響的數(shù)據(jù)為單位產(chǎn)生日志,即每影響一條數(shù)據(jù)就會(huì)產(chǎn)生一條日志。如果想知道這些日志是否是通過某一條 sql 產(chǎn)生的可以通過 xid 進(jìn)行判斷,相同的 xid 的日志來自同一 sql。
? 數(shù)字類型
當(dāng)原始數(shù)據(jù)是數(shù)字類型時(shí),maxwell 會(huì)尊重原始數(shù)據(jù)的類型不增加雙引,變?yōu)樽址?。canal 一律轉(zhuǎn)換為字符串。
? 帶原始數(shù)據(jù)字段定義
canal 數(shù)據(jù)中會(huì)帶入表結(jié)構(gòu)。maxwell 更簡(jiǎn)潔。
8. Maxwell 的初始化數(shù)據(jù)功能
例如:初始化用戶表
bin/maxwell-bootstrap --user maxwell --password 000000 --host node101--database gmall2021 --table user_info --client_id maxwell_1
? --user maxwell
數(shù)據(jù)庫分配的操作 maxwell 數(shù)據(jù)庫的用戶名
? --password 000000
數(shù)據(jù)庫分配的操作 maxwell 數(shù)據(jù)庫的密碼
? --host
數(shù)據(jù)庫主機(jī)名
? --database
數(shù)據(jù)庫名
? --table
表名文章來源:http://www.zghlxwxcb.cn/news/detail-463381.html
? --client_id
maxwell-bootstrap 不具備將數(shù)據(jù)直接導(dǎo)入 kafka或者 hbase 的能力,通過–client_id指定將數(shù)據(jù)交給哪個(gè) maxwell 進(jìn)程處理,在 maxwell 的 conf.properties 中配置文章來源地址http://www.zghlxwxcb.cn/news/detail-463381.html
到了這里,關(guān)于Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!