国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

這篇具有很好參考價(jià)值的文章主要介紹了Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。


一、數(shù)倉分層介紹

1. 普通實(shí)時(shí)計(jì)算與實(shí)時(shí)數(shù)倉比較

普通的實(shí)時(shí)計(jì)算優(yōu)先考慮時(shí)效性,所以從數(shù)據(jù)源采集經(jīng)過實(shí)時(shí)計(jì)算直接得到結(jié)果。如此做時(shí)效性更好,但是弊端是由于計(jì)算過程中的中間結(jié)果沒有沉淀下來,所以當(dāng)面對(duì)大量實(shí)時(shí)需求的時(shí)候,計(jì)算的復(fù)用性較差,開發(fā)成本隨著需求增加直線上升。

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層
實(shí)時(shí)數(shù)倉基于一定的數(shù)據(jù)倉庫理念,對(duì)數(shù)據(jù)處理流程進(jìn)行規(guī)劃、分層,目的是提高數(shù)據(jù)的復(fù)用性。

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

2. 實(shí)時(shí)電商數(shù)倉,項(xiàng)目分為以下幾層

  • ODS:原始數(shù)據(jù),日志和業(yè)務(wù)數(shù)據(jù)
  • DWD:根據(jù)數(shù)據(jù)對(duì)象為單位進(jìn)行分流,比如訂單、頁面訪問等等
  • DIM:維度數(shù)據(jù)
  • DWM:對(duì)于部分?jǐn)?shù)據(jù)對(duì)象進(jìn)行進(jìn)一步加工,比如獨(dú)立訪問、跳出行為,也可以和維度進(jìn)行關(guān)聯(lián),形成寬表,依舊是明細(xì)數(shù)據(jù)
  • DWS:根據(jù)某個(gè)主題將多個(gè)事實(shí)數(shù)據(jù)輕度聚合,形成主題寬表
  • ADS:把ClickHouse中的數(shù)據(jù)根據(jù)可視化需進(jìn)行篩選聚合

二、實(shí)時(shí)需求概覽

1. 離線計(jì)算與實(shí)時(shí)計(jì)算的比較

離線計(jì)算:就是在計(jì)算開始前已知所有輸入數(shù)據(jù),輸入數(shù)據(jù)不會(huì)產(chǎn)生變化,一般計(jì)算量
級(jí)較大,計(jì)算時(shí)間也較長(zhǎng)。例如今天早上一點(diǎn),把昨天累積的日志,計(jì)算出所需結(jié)果。最經(jīng)
典的就是 Hadoop 的 MapReduce 方式;
一般是根據(jù)前一日的數(shù)據(jù)生成報(bào)表,雖然統(tǒng)計(jì)指標(biāo)、報(bào)表繁多,但是對(duì)時(shí)效性不敏感。
從技術(shù)操作的角度,這部分屬于批處理的操作。即根據(jù)確定范圍的數(shù)據(jù)一次性計(jì)算。
實(shí)時(shí)計(jì)算:輸入數(shù)據(jù)是可以以序列化的方式一個(gè)個(gè)輸入并進(jìn)行處理的,也就是說在開始
的時(shí)候并不需要知道所有的輸入數(shù)據(jù)。與離線計(jì)算相比,運(yùn)行時(shí)間短,計(jì)算量級(jí)相對(duì)較小。
強(qiáng)調(diào)計(jì)算過程的時(shí)間要短,即所查當(dāng)下給出結(jié)果。
主要側(cè)重于對(duì)當(dāng)日數(shù)據(jù)的實(shí)時(shí)監(jiān)控,通常業(yè)務(wù)邏輯相對(duì)離線需求簡(jiǎn)單一下,統(tǒng)計(jì)指標(biāo)也
少一些,但是更注重?cái)?shù)據(jù)的時(shí)效性,以及用戶的交互性。從技術(shù)操作的角度,這部分屬于流
處理的操作。根據(jù)數(shù)據(jù)源源不斷地到達(dá)進(jìn)行實(shí)時(shí)的運(yùn)算。

2. 實(shí)時(shí)需求種類

日常統(tǒng)計(jì)報(bào)表或分析圖中需要包含當(dāng)日部分

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層
對(duì)于日常企業(yè)、網(wǎng)站的運(yùn)營(yíng)管理如果僅僅依靠離線計(jì)算,數(shù)據(jù)的時(shí)效性往往無法滿足。通過實(shí)時(shí)計(jì)算獲得當(dāng)日、分鐘級(jí)、秒級(jí)甚至亞秒的數(shù)據(jù)更加便于企業(yè)對(duì)業(yè)務(wù)進(jìn)行快速反應(yīng)與調(diào)整。

所以實(shí)時(shí)計(jì)算結(jié)果往往要與離線數(shù)據(jù)進(jìn)行合并或者對(duì)比展示在 BI 或者統(tǒng)計(jì)平臺(tái)中。

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層
數(shù)據(jù)大屏,相對(duì)于 BI 工具或者數(shù)據(jù)分析平臺(tái)是更加直觀的數(shù)據(jù)可視化方式。尤其是一些大促活動(dòng),已經(jīng)成為必備的一種營(yíng)銷手段。

另外還有一些特殊行業(yè),比如交通、電信的行業(yè),那么大屏監(jiān)控幾乎是必備的監(jiān)控手段。

3. 數(shù)據(jù)預(yù)警或提示

經(jīng)過大數(shù)據(jù)實(shí)時(shí)計(jì)算得到的一些風(fēng)控預(yù)警、營(yíng)銷信息提示,能夠快速讓風(fēng)控或營(yíng)銷部分得到信息,以便采取各種應(yīng)對(duì)。

比如,用戶在電商、金融平臺(tái)中正在進(jìn)行一些非法或欺詐類操作,那么大數(shù)據(jù)實(shí)時(shí)計(jì)算可以快速的將情況篩選出來發(fā)送風(fēng)控部門進(jìn)行處理,甚至自動(dòng)屏蔽。 或者檢測(cè)到用戶的行為對(duì)于某些商品具有較強(qiáng)的購買意愿,那么可以把這些“商機(jī)”推送給客服部門,讓客服進(jìn)行主動(dòng)的跟進(jìn)。

4. 實(shí)時(shí)推薦系統(tǒng)

實(shí)時(shí)推薦就是根據(jù)用戶的自身屬性結(jié)合當(dāng)前的訪問行為,經(jīng)過實(shí)時(shí)的推薦算法計(jì)算,從而將用戶可能喜歡的商品、新聞、視頻等推送給用戶。

這種系統(tǒng)一般是由一個(gè)用戶畫像批處理加一個(gè)用戶行為分析的流處理組合而成。

三、統(tǒng)計(jì)架構(gòu)分析

1. 離線架構(gòu)

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

2. 實(shí)時(shí)架構(gòu)

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

四、日志數(shù)據(jù)采集

1. 模擬日志生成器的使用

這里提供了一個(gè)模擬生成數(shù)據(jù)的 jar 包,可以將日志發(fā)送給某一個(gè)指定的端口,需要大數(shù)據(jù)程序員了解如何從指定端口接收數(shù)據(jù)并數(shù)據(jù)進(jìn)行處理的流程。

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

將數(shù)據(jù)生成腳本/行為數(shù)據(jù)的內(nèi)容到 node101 的 /opt/module/gmall-flink/rt_applog 目錄
Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層
根據(jù)實(shí)際需要修改 application.yml

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層
使用模擬日志生成器的 jar 運(yùn)行

java -jar gmall2020-mock-log-2020-12-18.jar

目前我們還沒有地址接收日志,所以程序運(yùn)行后的結(jié)果有如下錯(cuò)誤

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層
注意:ZooKeeper 從 3.5 開始,AdminServer 的端口也是 8080,如果在本機(jī)啟動(dòng)了zk,那么可能看到 404、405 錯(cuò)誤,意思是找到請(qǐng)求地址了,但是接收的方式不對(duì)。

2. 日志采集模塊-本地測(cè)試

① SpringBoot 簡(jiǎn)介

SpringBoot 是由 Pivotal 團(tuán)隊(duì)提供的全新框架,其設(shè)計(jì)目的是用來簡(jiǎn)化新 Spring 應(yīng)用的初始搭建以及開發(fā)過程。 該框架使用了特定的方式來進(jìn)行配置,從而使開發(fā)人員不再需要定義樣板化的配置。

1)有了 springboot 我們就可以…

? 不再需要那些千篇一律,繁瑣的 xml 文件。
? 內(nèi)嵌 Tomcat,不再需要外部的 Tomcat
? 更方便的和各個(gè)第三方工具(mysql,redis,elasticsearch,dubbo,kafka 等等整合),而只要維護(hù)一個(gè)配置文件即可。

2)springboot 和 ssm 的關(guān)系

springboot 整合了 springmvc,spring 等核心功能。也就是說本質(zhì)上實(shí)現(xiàn)功能的還是原有的 spring ,springmvc 的包,但是 springboot 單獨(dú)包裝了一層,這樣用戶就不必直接對(duì) springmvc,spring 等,在 xml 中配置。

3)沒有 xml,我們要去哪配置

springboot 實(shí)際上就是把以前需要用戶手工配置的部分,全部作為默認(rèn)項(xiàng)。除非用戶需要額外更改不然不用配置。這就是所謂的:“約定大于配置” 如果需要特別配置的時(shí)候,去修改application.properties (application.yml)

② 快速搭建 SpringBoot 程序 gmall-logger,采集模擬生成的日志數(shù)據(jù)

1)在 IDEA 中安裝 lombok 插件

在 Plugins 下搜索 lombok 然后在線安裝即可,安裝后注意重啟

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層
2)創(chuàng)建空的父工程 gmall2021,用于管理后續(xù)所有的模塊 module

我們這里就是為了將各個(gè)模塊放在一起,但是模塊彼此間還是獨(dú)立的,所以創(chuàng)建一個(gè) Empty Project 即可;如果要是由父 module 管理子 module,需要將父 module 的 pom.xml 文件的 <packaging> 設(shè)置為 pom

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

配置項(xiàng)目名稱為 gmall2021-logger 及 JDK 版本
Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

選擇版本以及通過勾選自動(dòng)添加 lombok、SpringWeb、Kafka 相關(guān)依賴
Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層
完成之后開始下載依賴,完整的 pom.xml 文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.10</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.fancy.gmall</groupId>
    <artifactId>gmall2021-logger</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>gmall2021-logger</name>
    <description>gmall2021-logger</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

創(chuàng)建 LoggerController 輸出 SpringBoot 處理流程

package com.fancy.gmall.controller;


import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class LoggerController {

    @RequestMapping("test1")
    public String test1() {
        System.out.println("111");
        return "success";
    }


    @RequestMapping("test2") 
    public String test2(@RequestParam("name") String name,
                        @RequestParam("age") int age)  {
        System.out.println(name + ":" + age);
        return "success";
    }

}

啟動(dòng)并查看輸出

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

3. SpringBoot 整合 Kafka

修改 SpringBoot 核心配置文件 application.propeties

# 應(yīng)用名稱
spring.application.name=gmall-logger
# 應(yīng)用服務(wù) WEB 訪問端口
server.port=8081
#============== kafka ===================
# 指定 kafka 代理地址,可以多個(gè)
spring.kafka.bootstrap-servers=node101:9092
# 指定消息 key 和消息體的編解碼方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

在 LoggerController 中添加方法,將日志落盤并發(fā)送到 Kafka 主題中

package com.fancy.gmall.controller;


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;


@RestController
@Slf4j
public class LoggerController {

    @Autowired
    KafkaTemplate<String, String> kafkaTemplate;
    

    @RequestMapping("applog")
    public String getLogger(@RequestParam("param") String jsonStr) {
        log.info(jsonStr);
        kafkaTemplate.send("ods_base_log", jsonStr);
        return "success";
    }


}

在 Resources 中添加 logback.xml 配置文件

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property name="LOG_HOME" value="d:/opt/module/logs" />
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>
    <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_HOME}/app.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern>
        </rollingPolicy>
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>
    <!-- 將某一個(gè)包下日志單獨(dú)打印日志 -->
    <logger name="com.fancy.gmall.controller.LoggerController"
            level="INFO" additivity="false">
        <appender-ref ref="rollingFile" />
        <appender-ref ref="console" />
    </logger>
    <root level="error" additivity="false">
        <appender-ref ref="console" />
    </root>
</configuration>

logback 配置文件說明

? appender
追加器,描述如何寫入到文件中(寫在哪,格式,文件的切分)

  • ConsoleAppender :追加到控制臺(tái)
  • RollingFileAppender :滾動(dòng)追加到文件

? logger

控制器,描述如何選擇追加器
注意:要是單獨(dú)為某個(gè)類指定的時(shí)候,別忘了修改類的全限定名

? 日志級(jí)別

TRACE [DEBUG INFO WARN ERROR] FATAL

修改 node101 上的 rt_applog 目錄下的 application.yml 配置文件

注意:mock.url 設(shè)置為自身 Windows 的 IP 地址

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

測(cè)試
? 運(yùn)行 Windows 上的 Idea 程序 LoggerApplication
? 運(yùn)行 rt_applog 下的 jar 包
? 啟動(dòng) kafka 消費(fèi)者進(jìn)行測(cè)試

bin/kafka-console-consumer.sh --bootstrap-server node:9092 --topic ods_base_log

3. 日志采集模塊-打包單機(jī)部署

修改 gmall2021-logger 中的 logback.xml 配置文件

<property name="LOG_HOME" value="/opt/module/gmall-flink/rt_applog/logs" />

注意:路徑和上面創(chuàng)建的路徑保持一致,根據(jù)自己的實(shí)際情況進(jìn)行修改

打包

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層
將打好的 jar 包 上 傳 到 node101 的 /opt/module/gmall-flink/rt_applog 目錄下
Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層
修改/opt/module/gmall-flink/rt_applog/application.yml

#http 模式下,發(fā)送的地址
mock.url=http://hadoop102:8081/applog

測(cè)試

? 運(yùn)行 hadoop102 上的 rt_gmall 下的日志處理 jar 包
? 運(yùn)行 rt_applog 下的 jar 包
? 啟動(dòng) kafka 消費(fèi)者進(jìn)行測(cè)試

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ods_base_log

五、業(yè)務(wù)數(shù)據(jù)庫數(shù)據(jù)采集

1. MySQL 的準(zhǔn)備

創(chuàng)建實(shí)時(shí)業(yè)務(wù)數(shù)據(jù)庫

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

導(dǎo)入建表數(shù)據(jù)

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層
修改/etc/my.cnf 文件

[fancy@node101 module]$ sudo vim /etc/my.cnf
server-id = 1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=gmall2021

注意:binlog-do-db 根據(jù)自己的情況進(jìn)行修改,指定具體要同步的數(shù)據(jù)庫

重啟 MySQL 使配置生效

sudo systemctl restart mysqld

到 /var/lib/mysql 目錄下查看初始文件大小 154

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層
模擬生成數(shù)據(jù)

? 把 數(shù)據(jù)生成腳本 / 業(yè)務(wù)數(shù)據(jù)里面的 jar 和 properties 文件上傳到 /opt/module/gmall-flink/rt_db 目錄下

? 修改 application.properties 中數(shù)據(jù)庫連接信息

logging.level.root=info
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://node101:3306/gmall2021?characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=000000
logging.pattern.console=%m%n
mybatis-plus.global-config.db-config.field-strategy=not_null
#業(yè)務(wù)日期
mock.date=2021-03-06
#是否重置
mock.clear=1
#是否重置用戶
mock.clear.user=0
……

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層
注意:如果生成較慢,可根據(jù)配置情況適當(dāng)調(diào)整配置項(xiàng)

? 運(yùn)行 jar 包

[fancy@node101 rt_dblog]$ java -jar gmall2020-mock-db-2020-11-27.jar

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層
再次到/var/lib/mysql 目錄下,查看 index 文件的大小

2. 環(huán)境搭建

在工程中新建模塊 gmall2021-realtime

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

創(chuàng)建如下包結(jié)構(gòu)

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

目錄 作用
app 產(chǎn)生各層數(shù)據(jù)的 flink 任務(wù)
bean 數(shù)據(jù)對(duì)象
common 公共常量
utils 工具類

修改配置文件

1)在 pom.xml 添加如下配置

<properties>
	<java.version>1.8</java.version>
	<maven.compiler.source>${java.version}</maven.compiler.source>
	<maven.compiler.target>${java.version}</maven.compiler.target>
	<flink.version>1.12.0</flink.version>
	<scala.version>2.12</scala.version>
	<hadoop.version>3.1.3</hadoop.version>
</properties>
<dependencies>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-java</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-streaming-java_${scala.version}</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-connector-kafka_${scala.version}</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-clients_${scala.version}</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>	
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-cep_${scala.version}</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-json</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>com.alibaba</groupId>
		<artifactId>fastjson</artifactId>
		<version>1.2.68</version>
	</dependency>
	<!--如果保存檢查點(diǎn)到 hdfs 上,需要引入此依賴-->
	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-client</artifactId>
		<version>${hadoop.version}</version>
	</dependency>
	<dependency>
		<groupId>mysql</groupId>
		<artifactId>mysql-connector-java</artifactId>
		<version>5.1.49</version>
	</dependency>
	<dependency>
		<groupId>com.alibaba.ververica</groupId>
		<artifactId>flink-connector-mysql-cdc</artifactId>
		<version>1.2.0</version>
	</dependency>
	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
		<version>1.18.20</version>
	</dependency>
	<!--Flink 默認(rèn)使用的是 slf4j 記錄日志,相當(dāng)于一個(gè)日志的接口,我們這里使用 log4j 作為具體的日志實(shí)現(xiàn)-->
	<dependency>
		<groupId>org.slf4j</groupId>
		<artifactId>slf4j-api</artifactId>
		<version>1.7.25</version>
	</dependency>
	<dependency>
		<groupId>org.slf4j</groupId>
		<artifactId>slf4j-log4j12</artifactId>
		<version>1.7.25</version>
	</dependency>
	<dependency>
		<groupId>org.apache.logging.log4j</groupId>
		<artifactId>log4j-to-slf4j</artifactId>
		<version>2.14.0</version>
	</dependency>
</dependencies>
<build>
	<plugins>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-assembly-plugin</artifactId>
			<version>3.0.0</version>
			<configuration>
				<descriptorRefs>
					<descriptorRef>jar-with-dependencies</descriptorRef>
				</descriptorRefs>
			</configuration>
			<executions>
				<execution>
					<id>make-assembly</id>
					<phase>package</phase>
					<goals>
						<goal>single</goal>
					</goals>
				</execution>
			</executions>
		</plugin>
	</plugins>
</build>

2)在 resources 目錄下創(chuàng)建 log4j.properties 配置文件

log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

3. 代碼實(shí)現(xiàn)

1. 將流數(shù)據(jù)推送下游的 Kafka 的 Topic 中

package com.fancy.gmall.utils;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

public class MyKafkaUtil {
    private static String KAFKA_SERVER = "node101:9092,node102:9092,node103:9092";
    private static Properties properties = new Properties();

    static {
        properties.setProperty("bootstrap.servers", KAFKA_SERVER);
    }

    public static FlinkKafkaProducer<String> getKafkaSink(String topic) {
        return new FlinkKafkaProducer<String>(topic, new SimpleStringSchema(), properties);
    }
    
}

2.編寫主程序,消費(fèi) MySQL 變化數(shù)據(jù)并將數(shù)據(jù)寫入 Kafka

package com.fancy.gmall.app;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.fancy.gmall.utils.MyKafkaUtil;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

public class Flink_CDCWithCustomerSchema {
    public static void main(String[] args) throws Exception{
        //1. 創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2. 創(chuàng)建 Flink-MySQL-CDC 的 Source
        DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
                .hostname("node101")
                .port(3306)
                .username("root")
                .password("000000")
                .databaseList("gmall2021")
                .startupOptions(StartupOptions.latest())
                .deserializer(new DebeziumDeserializationSchema<String>() {
                    @Override
                    public TypeInformation<String> getProducedType() {
                        return null;
                    }

                    // 自定義數(shù)據(jù)解析器
                    @Override
                    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
                        // 獲取主題信息 包含著數(shù)據(jù)庫和表名
                        String topic = sourceRecord.topic();
                        String[] arr =  topic.split("\\.");
                        String db = arr[1];
                        String tableName = arr[2];
                        // 獲取操作類型
                        Envelope.Operation operation = Envelope.operationFor(sourceRecord);

                        // 獲取值信息并轉(zhuǎn)換為 Struct 類型
                        Struct value = (Struct) sourceRecord.value();

                        // 獲取變化后的數(shù)據(jù)
                        Struct after = value.getStruct("after");

                        // 創(chuàng)建 JSON 對(duì)象用于存儲(chǔ)數(shù)據(jù)信息
                        JSONObject data = new JSONObject();
                        if (after != null) {
                            Schema schema = after.schema();
                            for (Field field : schema.fields()) {
                                data.put(field.name(), after.get(field.name()));
                            }
                        }

                        // 創(chuàng)建 JSON 對(duì)象用于封裝最終返回值數(shù)據(jù)信息
                        JSONObject result = new JSONObject();
                        result.put("operation", operation.toString().toLowerCase());
                        result.put("data", data);
                        result.put("database", db);
                        result.put("table", tableName);

                        // 發(fā)送數(shù)據(jù)到下游
                        collector.collect(result.toJSONString());
                    }
                }).build();
        
        // 3. 使用 CDC Source 從 MySQL 讀取數(shù)據(jù)
        DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
        
        // 4. 打印數(shù)據(jù)
        mysqlDS.addSink(MyKafkaUtil.getKafkaSink("ods_base_db"));
        
        // 5. 執(zhí)行任務(wù)
        env.execute();
    }
}

測(cè)試點(diǎn)擊運(yùn)行即可

六、Nginx 安裝

在 node 上運(yùn)行 yum,安裝相關(guān)依賴包

sudo yum -y install openssl openssl-devel pcre pcre-devel zlib zlib-devel gcc gcc-c++

將 nginx-1.12.2.tar.gz 上傳到 /opt/software 下

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

在/opt/module/software 下解壓縮 nginx-1.12.2.tar.gz 包,接著進(jìn)入解壓縮目錄,執(zhí)行

./configure --prefix=/opt/module/nginx
make && make install

–prefix=要安裝到的目錄

安裝成功后,/opt/module/nginx 目錄下結(jié)構(gòu)

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

啟動(dòng) Nginx

在/opt/module/nginx/sbin 目錄下執(zhí)行 ./nginx

? 如果在普通用戶下面啟動(dòng)會(huì)報(bào)錯(cuò)

原因:nginx 占用 80 端口,默認(rèn)情況下非 root 用戶不允許使用1024 以下端口
解決:讓當(dāng)前用戶的某個(gè)應(yīng)用也可以使用 1024 以下的端口

sudo setcap cap_net_bind_service=+eip /opt/module/nginx/sbin/nginx

注意:要根據(jù)自己的實(shí)際路徑進(jìn)行配置

查看啟動(dòng)情況

ps -ef |grep nginx 

因?yàn)?nginx 不是用 java 寫的,所以不能通過 jps 查看

? 在瀏覽器中輸入 http://node101/

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

重啟 Nginx

./nginx -s reload

關(guān)閉 Nginx

./nginx -s stop

通過配置文件啟動(dòng)

./nginx -c /opt/module/nginx/conf/nginx.conf
/opt/module/nginx/sbin/nginx -c /opt/module/nginx/conf/nginx.conf

其中 -c 是指定配置文件,而且配置文件路徑必須指定絕對(duì)路徑

配置檢查

當(dāng)修改 Nginx 配置文件后,可以使用 Nginx 命令進(jìn)行配置文件語法檢查,用于檢查 Nginx 配置文件是否正確

/opt/module /nginx/sbin/nginx -c /opt/module/nginx/conf/nginx.conf –t

如果 80 端口號(hào)被占用 httpd

sudo systemctl stop httpd
sudo systemctl disable httpd

部分機(jī)器啟動(dòng)時(shí)報(bào)錯(cuò):

/usr/local/nginx/sbin/nginx: error while loading shared libraries: libpcre.so.1: cannot open shared object file: No such file or directory

解決:ln -s /usr/local/lib/libpcre.so.1 /lib64

配置負(fù)載均衡

模擬數(shù)據(jù)以后應(yīng)該發(fā)給 nginx, 然后 nginx 再轉(zhuǎn)發(fā)給我們的日志服務(wù)器。日志服務(wù)器我們會(huì)分別配置node101, node102, node103 三臺(tái)設(shè)備上。

1.打開 nginx 配置文件

cd /opt/module/nginx/conf
vim nginx.conf

2.修改如下配置

http {
 # 啟動(dòng)省略
 upstream logcluster{
	 server node101:8081 weight=1;
	 server node102:8081 weight=1;
	 server node103:8081 weight=1;
 }
 server {
	 listen 80;
	 server_name localhost;
	 #charset koi8-r;
	 #access_log logs/host.access.log main;
	 location / {
	 #root html;
	 #index index.html index.htm;
	 # 代理的服務(wù)器集群 命名隨意, 但是不能出現(xiàn)下劃線
	 proxy_pass http://logcluster;
	 proxy_connect_timeout 10;
 }
 # 其他省略
}

七、Maxwell 安裝

1. 介紹

Maxwell 是由美國 Zendesk 開源,用 Java 編寫的 MySQL 實(shí)時(shí)抓取軟件。 實(shí)時(shí)讀取MySQL 二進(jìn)制日志 Binlog,并生成 JSON 格式的消息,作為生產(chǎn)者發(fā)送給 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平臺(tái)的應(yīng)用程序。

官網(wǎng)地址:http://maxwells-daemon.io/

2. Maxwell 工作原理

MySQL 主從復(fù)制過程

? Master 主庫將改變記錄,寫到二進(jìn)制日志(binary log)中
? Slave 從庫向 mysql master 發(fā)送 dump 協(xié)議,將 master 主庫的 binary log events 拷貝到它的中繼日志(relay log);
? Slave 從庫讀取并重做中繼日志中的事件,將改變的數(shù)據(jù)同步到自己的數(shù)據(jù)庫。

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層
Maxwell 的工作原理

很簡(jiǎn)單,就是把自己偽裝成 slave,假裝從 master 復(fù)制數(shù)據(jù)

MySQL 的 binlog

(1) 什么是 binlog

MySQL 的二進(jìn)制日志可以說 MySQL 最重要的日志了,它記錄了所有的 DDL 和 DML(除了數(shù)據(jù)查詢語句)語句,以事件形式記錄,還包含語句所執(zhí)行的消耗的時(shí)間,MySQL 的二進(jìn)制日志是事務(wù)安全型的。

一般來說開啟二進(jìn)制日志大概會(huì)有 1%的性能損耗。二進(jìn)制有兩個(gè)最重要的使用場(chǎng)景:

? 其一:MySQL Replication 在 Master 端開啟 binlog,Master 把它的二進(jìn)制日志傳遞給 slaves 來達(dá)到 master-slave 數(shù)據(jù)一致的目的。

? 其二:自然就是數(shù)據(jù)恢復(fù)了,通過使用 mysqlbinlog 工具來使恢復(fù)數(shù)據(jù)。

二進(jìn)制日志包括兩類文件:二進(jìn)制日志索引文件(文件名后綴為.index)用于記錄所有的二進(jìn)制文件,二進(jìn)制日志文件(文件名后綴為.00000*)記錄數(shù)據(jù)庫所有的 DDL 和 DML(除了數(shù)據(jù)查詢語句)語句事件。

(2) binlog 的開啟

? 找到 MySQL 配置文件的位置
? Linux: /etc/my.cnf
如果/etc 目錄下沒有,可以通過 locate my.cnf 查找位置
? Windows: \my.ini
? 在 mysql 的配置文件下,修改配置
在 [mysqld] 區(qū)塊,設(shè)置/添加 log-bin=mysql-bin
這個(gè)表示 binlog 日志的前綴是 mysql-bin,以后生成的日志文件就是mysql-bin.123456 的文件后面的數(shù)字按順序生成,每次 mysql 重啟或者到達(dá)單個(gè)文件大小的閾值時(shí),新生一個(gè)文件,按順序編號(hào)。

(3) binlog 的分類設(shè)置

mysql binlog 的格式有三種,分別是 STATEMENT,MIXED,ROW。
在配置文件中可以選擇配置 binlog_format= statement|mixed|row

? 三種格式的區(qū)別:
? statement
語句級(jí),binlog 會(huì)記錄每次一執(zhí)行寫操作的語句。
相對(duì) row 模式節(jié)省空間,但是可能產(chǎn)生不一致性,比如
update tt set create_date=now()
如果用 binlog 日志進(jìn)行恢復(fù),由于執(zhí)行時(shí)間不同可能產(chǎn)生的數(shù)據(jù)就不同。
優(yōu)點(diǎn): 節(jié)省空間
缺點(diǎn): 有可能造成數(shù)據(jù)不一致。
? row
行級(jí), binlog 會(huì)記錄每次操作后每行記錄的變化。
優(yōu)點(diǎn):保持?jǐn)?shù)據(jù)的絕對(duì)一致性。因?yàn)椴还?sql 是什么,引用了什函數(shù),他只記錄執(zhí)行后的效果。
缺點(diǎn):占用較大空間。
? mixed
statement 的升級(jí)版,一定程度上解決了,因?yàn)橐恍┣闆r而造成的 statement 模式不一致問題
默認(rèn)還是 statement,在某些情況下譬如:

  • 當(dāng)函數(shù)中包含 UUID() 時(shí);
  • 包含 AUTO_INCREMENT 字段的表被更新時(shí);
  • 執(zhí)行 INSERT DELAYED 語句時(shí);
  • 用 UDF 時(shí);

會(huì)按照 ROW 的方式進(jìn)行處理
優(yōu)點(diǎn):節(jié)省空間,同時(shí)兼顧了一定的一致性。
缺點(diǎn):還有些極個(gè)別情況依舊會(huì)造成不一致,另外 statement 和 mixed 對(duì)于需要對(duì) binlog 的監(jiān)控的情況都不方便。

綜合上面對(duì)比,Maxwell 想做監(jiān)控分析,選擇 row 格式比較合適

3. 安裝 Maxwell

? 將工具下的 maxwell-1.25.0.tar.gz 上傳到/opt/software 目錄下
? 解壓 maxwell-1.25.0.tar.gz 到/opt/module 目錄

[root@node101 module]$ tar -zxvf /opt/software/maxwell-1.25.0.tar.gz -C /opt/module/

4. 初始化 Maxwell 元數(shù)據(jù)庫

? 在 MySQL 中建立一個(gè) maxwell 庫用于存儲(chǔ) Maxwell 的元數(shù)據(jù)

[root@node101 module]$ mysql -uroot -p000000
mysql> CREATE DATABASE maxwell;

? 設(shè)置安全級(jí)別

mysql> set global validate_password_length=4;
mysql> set global validate_password_policy=0;

? 分配一個(gè)賬號(hào)可以操作該數(shù)據(jù)庫

mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '000000';

? 分配這個(gè)賬號(hào)可以監(jiān)控其他數(shù)據(jù)庫的權(quán)限

mysql> GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO maxwell@'%';

5. 使用 Maxwell 監(jiān)控抓取 MySQL 數(shù)據(jù)

? 拷貝配置文件

[root@node101 maxwell-1.25.0]$ cp config.properties.example config.properties

? 修改配置文件

producer=kafka
kafka.bootstrap.servers=node101:9092,node102:9092,node103:9092
#需要添加
kafka_topic=ods_base_db_m
# mysql login info
host=node101
user=maxwell
password=000000
#需要添加 初始化會(huì)用
client_id=maxwell_1

注意:默認(rèn)還是輸出到指定 Kafka 主題的一個(gè) kafka 分區(qū),因?yàn)槎鄠€(gè)分區(qū)并行可能會(huì)打亂 binlog 的順序。

如果要提高并行度,首先設(shè)置 kafka 的分區(qū)數(shù)>1,然后設(shè)置 producer_partition_by 屬性可選值

producer_partition_by=database|table|primary_key|random| column

? 在/home/bin 目錄下編寫 maxwell.sh 啟動(dòng)腳本

[root@node101 maxwell-1.25.0]$ vim /home/bin/maxwell.sh
/opt/module/maxwell-1.25.0/bin/maxwell --config 
/opt/module/maxwell-1.25.0/config.properties >/dev/null 2>&1 &

? 授予執(zhí)行權(quán)限

[root@node101 maxwell-1.25.0]$ sudo chmod +x /home/bin/maxwell.sh

? 運(yùn)行啟動(dòng)程序

[root@node101 maxwell-1.25.0]$ maxwell.sh

? 啟動(dòng) Kafka 消費(fèi)客戶端,觀察結(jié)果

[root@node101 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server node101:9092 --topic ods_base_db_m

? 執(zhí)行/opt/module/rt_dblog 下的 jar 生成模擬數(shù)據(jù)

[root@node101 rt_dblog]$ java -jar gmall2020-mock-db-2020-11-27.jar

八、Canal 安裝

1. Canal入門

阿里巴巴 B2B 公司,因?yàn)闃I(yè)務(wù)的特性,賣家主要集中在國內(nèi),買家主要集中在國外,所以衍生出了同步杭州和美國異地機(jī)房的需求,從 2010 年開始,阿里系公司開始逐步的嘗試基于數(shù)據(jù)庫的日志解析,獲取增量變更進(jìn)行同步,由此衍生出了增量訂閱&消費(fèi)的業(yè)務(wù)。

Canal 是用 java 開發(fā)的基于數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱&消費(fèi)的中間件。目前,Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 來處理獲得的相關(guān)數(shù)據(jù)。(數(shù)據(jù)庫同步需要阿里的 Otter 中間件,基于 Canal)

2. 使用場(chǎng)景

(1) 原始場(chǎng)景: 阿里 Otter 中間件的一部分,Otter 是阿里用于進(jìn)行異地?cái)?shù)據(jù)庫之間的同步框架,Canal 是其中一部分。

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

(2) 常見場(chǎng)景1:更新緩存

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

(3) 常見場(chǎng)景2:抓取業(yè)務(wù)數(shù)據(jù)新增變化表,用于制作拉鏈表。

(4) 常見場(chǎng)景3:抓取業(yè)務(wù)表的新增變化數(shù)據(jù),用于制作實(shí)時(shí)統(tǒng)計(jì)(我們就是這種場(chǎng)景)

3. Canal 的工作原理

(1) MySQL 主從復(fù)制過程

? Master 主庫將改變記錄,寫到二進(jìn)制日志(Binary log)中
? Slave 從庫向 mysql master 發(fā)送 dump 協(xié)議,將 master 主庫的 binary log events 拷貝到它的中繼日志(relay log);

? Slave 從庫讀取并重做中繼日志中的事件,將改變的數(shù)據(jù)同步到自己的數(shù)據(jù)庫。

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層
(2) Canal 的工作原理

很簡(jiǎn)單,就是把自己偽裝成 Slave,假裝從 Master 復(fù)制數(shù)據(jù)

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

4. Canal 架構(gòu)以及安裝
Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層
地址:https://github.com/alibaba/canal/releases

將 canal.deployer-1.1.4.tar.gz拷貝到 /opt/sortware 目錄下,然后解壓到 /opt/module/canal 包下

注意:canal 解壓后是散的,我們?cè)谥付ń鈮耗夸浀臅r(shí)候需要將 canal 指定上

[root@node101 software]$ mkdir /opt/module/canal
[root@node101 software]$ tar -zxvf canal.deployer-1.1.4.tar.gz -C  /opt/module/canal

5. canal 單機(jī)版

① 修改 conf/canal.properties 的配置

[root@node101 conf]$ pwd
/opt/module/canal/conf
[atguigu@hadoop102 conf]$ vim canal.properties

? 這個(gè)文件是 canal 的基本通用配置,canal 端口號(hào)默認(rèn)就是 11111

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

? 修改 canal 的輸出 mode,默認(rèn) tcp,改為輸出到 kafka

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

tcp 就是輸出到 canal 客戶端,通過編寫 Java 代碼處理

? 修改 Kafka 集群的地址

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

? 如果創(chuàng)建多個(gè)實(shí)例

通過前面 canal 架構(gòu),我們可以知道,一個(gè) canal 服務(wù)中可以有多個(gè) instance,conf/ 下的每一個(gè) example 即是一個(gè)實(shí)例,每個(gè)實(shí)例下面都有獨(dú)立的配置文件。默認(rèn)只有一個(gè)實(shí)
例 example,如果需要多個(gè)實(shí)例處理不同的 MySQL 數(shù)據(jù)的話,直接拷貝出多個(gè)example, 并對(duì)其重新命名,命名和配置文件中指定的名稱一致,然后修改 canal.properties 中的 canal.destinations=實(shí)例 1,實(shí)例 2,實(shí)例 3。

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

② 修改 instance.properties

我們這里只讀取一個(gè) MySQL 數(shù)據(jù),所以只有一個(gè)實(shí)例,這個(gè)實(shí)例的配置文件在 conf/example 目錄下

[root@node101 example]$ pwd
/opt/module/canal/conf/example
[root@node101 example]$ vim instance.properties

? 配置 MySQL 服務(wù)器地址

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

? 配置連接 MySQL 的用戶名和密碼,默認(rèn)就是我們前面授權(quán)的 canal
Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

? 修改輸出到 Kafka 的主題以及分區(qū)數(shù)

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

注意:默認(rèn)還是輸出到指定 Kafka 主題的一個(gè) kafka 分區(qū),因?yàn)槎鄠€(gè)分區(qū)并行可能會(huì)打亂
binlog 的順序。如果要提高并行度,首先設(shè)置 kafka 的分區(qū)數(shù)>1,然后設(shè)置canal.mq.partitionHash 屬性

③ 單機(jī) canal 測(cè)試

? 啟動(dòng) canal

[root@node101 example]$ cd /opt/module/canal/
[root@node101 canal]$ bin/startup.sh

看到 CanalLauncher 你表示啟動(dòng)成功,同時(shí)會(huì)創(chuàng)建 gmall2021_db_c 主題
Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

? 啟動(dòng) Kafka 消費(fèi)客戶端測(cè)試,查看消費(fèi)情況

[root@node101 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server  node101:9092 --topic gmall2020_db_c

? 運(yùn)行/opt/module/rt_dblog 中生成模擬數(shù)據(jù)
Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層
6. canal 高可用

這種 zookeeper 為觀察者監(jiān)控的模式,只能實(shí)現(xiàn)高可用,而不是負(fù)載均衡,即同一時(shí)點(diǎn)只有一個(gè) canal-server 節(jié)點(diǎn)能夠監(jiān)控某個(gè)數(shù)據(jù)源,只要這個(gè)節(jié)點(diǎn)能夠正常工作,那么其他監(jiān)控這個(gè)數(shù)據(jù)源的 canal-server 只能做 stand-by,直到工作節(jié)點(diǎn)停掉,其他 canal-server 節(jié)點(diǎn)才能搶占。因?yàn)橛幸粋€(gè) stand-by 也要占用資源,同時(shí) canal 傳輸數(shù)據(jù)宕機(jī)的情況也比較少,所以好多企業(yè)是不配置 canal 的高可用的。

① 停止單機(jī) canal 進(jìn)程

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

② 在 node101 上修改 canal.properties

? 配置 zookeeper
Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

? 避免發(fā)送重復(fù)數(shù)據(jù) (否則在切換 active 的時(shí)候會(huì)重復(fù)發(fā)送數(shù)據(jù))

Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層

③ 把 canal 目錄分發(fā)給其他虛擬機(jī)

[root@node101 module]$ xsync canal/

④ 測(cè)試

? 先在 node101 啟動(dòng) canal,再在 node102 上啟動(dòng) canal

[root@node101 canal]$ bin/startup.sh

? 啟動(dòng) kafka 消費(fèi)客戶端

[root@node101 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server node101:9092 --topic gmall2021_db_c

? 在數(shù)據(jù)庫 gmall2021 隨便一張表中修改一條數(shù)據(jù),查看效果
? 掛掉 102 再次修改數(shù)據(jù),查看效果

7. Maxwell 與 Canal 工具對(duì)比

? Maxwell 沒有 Canal 那種 server+client 模式,只有一個(gè) server 把數(shù)據(jù)發(fā)送到消息隊(duì)列或 redis。

? Maxwell 有一個(gè)亮點(diǎn)功能,就是 Canal 只能抓取最新數(shù)據(jù),對(duì)已存在的歷史數(shù)據(jù)沒有辦法處理。而 Maxwell 有一個(gè) bootstrap 功能,可以直接引導(dǎo)出完整的歷史數(shù)據(jù)用于初始化,非常好用。

? Maxwell 不能直接支持 HA,但是它支持?jǐn)帱c(diǎn)還原,即錯(cuò)誤解決后重啟繼續(xù)上次點(diǎn)兒讀
取數(shù)據(jù)。

? Maxwell 只支持 json 格式,而 Canal 如果用 Server+client 模式的話,可以自定義格
式。

? Maxwell 比 Canal 更加輕量級(jí)。

① 執(zhí)行不同操作,Maxwell 和 canal 數(shù)據(jù)格式對(duì)比

? 執(zhí)行 insert 測(cè)試語句

INSERT INTO z_user_info  VALUES(30,'zhang3','13810001010'),(31,'li4','1389999999');
canal maxwell
{“data”:[{“id”:“30”,“user_name”:“zhang3”,“tel”:“13810001010”},{“id”:“31”,“user_name”:“l(fā)i4”,“tel”:“1389999999”}],“database”:“gmall-2020-04”,“es”:1589385314000,“id”:2,“isDdl”:false,“mysqlType”:{“id”:“bigint(20)”,“user_name”:“varchar(20)”,“tel”:“varchar(20)”},“old”:null,“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“user_name”:12,“tel”:12},“table”:“z_user_info”,“ts”:1589385314116,“type”:“INSERT”}` {“database”:“gmall-2020-04”,“table”:“z_user_info”,“type”:“insert”,“ts”:1589385314,“xid”:82982,“xoffset”:0,“data”:{“id”:30,“user_name”:“zhang3”,“tel”:“13810001010”}}{“database”:“gmall-2020-04”,“table”:“z_user_info”,“type”:“insert”,“ts”:1589385314,“xid”:82982,“commit”:true,“data”:{“id”:31,“user_name”:“l(fā)i4”,“tel”:“1389999999”}}

? 執(zhí)行 update 操作

UPDATE z_user_info SET `user_name`='wang55' WHERE id IN(30,31)
canal maxwell
{“data”:[{“id”:“30”,“user_name”:“wang55”,“tel”:“13810001010”},{“id”:“31”,“user_name”:“wang55”,“tel”:“1389999999”}],“database”:“gmall-2020-04”,“es”:1589385508000,“id”:3,“isDdl”:false,“mysqlType”:{“id”:“bigint(20)”,“user_name”:“varchar(20)”,“tel”:“varchar(20)”},“old”:[{“user_name”:“zhang3”},{“user_name”:“l(fā)i4”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“user_name”:12,“tel”:12},“table”:“z_user_info”,“ts”:1589385508676,“type”:“UPDATE”}` {“database”:“gmall-2020-04”,“table”:“z_user_info”,“type”:“update”,“ts”:1589385508,“xid”:83206,“xoffset”:0,“data”:{“id”:30,“user_name”:“wang55”,“tel”:“13810001010”},“old”:{“user_name”:“zhang3”}}{“database”:“gmall-2020-04”,“table”:“z_user_info”,“type”:“update”,“ts”:1589385508,“xid”:83206,“commit”:true,“data”:{“id”:31,“user_name”:“wang55”,“tel”:“1389999999”},“old”:{“user_name”:“l(fā)i4”}}

? delete 操作

DELETE FROM z_user_info WHERE id IN(30,31)
canal maxwell
`{“data”:[{“id”:“30”,“user_name”:“wang55”,“tel”:“13810001010”},{“id”:“31”,“user_name”:“wang55”,“tel”:“1389999999”}],“database”:“gmall-2020-04”,“es”:1589385644000,“id”:4,“isDdl”:false,“mysqlType”:{“id”:“bigint(20)”,“user_name”:“varchar(20)”,“tel”:“varchar(20)”},“old”:null,“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“user_name”:12,“tel”:12},“table”:“z_user_info”,“ts”:1589385644829,“type”:“DELETE”} {“database”:“gmall-2020-04”,“table”:“z_user_info”,“type”:“delete”,“ts”:1589385644,“xid”:83367,“xoffset”:0,“data”:{“id”:30,“user_name”:“wang55”,“tel”:“13810001010”}}{“database”:“gmall-2020-04”,“table”:“z_user_info”,“type”:“delete”,“ts”:1589385644,“xid”:83367,“commit”:true,“data”:{“id”:31,“user_name”:“wang55”,“tel”:“1389999999”}}

② 總結(jié)數(shù)據(jù)特點(diǎn)

? 日志結(jié)構(gòu)

canal 每一條 SQL 會(huì)產(chǎn)生一條日志,如果該條 Sql 影響了多行數(shù)據(jù),則已經(jīng)會(huì)通過集合的方式歸集在這條日志中。(即使是一條數(shù)據(jù)也會(huì)是數(shù)組結(jié)構(gòu))maxwell 以影響的數(shù)據(jù)為單位產(chǎn)生日志,即每影響一條數(shù)據(jù)就會(huì)產(chǎn)生一條日志。如果想知道這些日志是否是通過某一條 sql 產(chǎn)生的可以通過 xid 進(jìn)行判斷,相同的 xid 的日志來自同一 sql。

? 數(shù)字類型
當(dāng)原始數(shù)據(jù)是數(shù)字類型時(shí),maxwell 會(huì)尊重原始數(shù)據(jù)的類型不增加雙引,變?yōu)樽址?。canal 一律轉(zhuǎn)換為字符串。

? 帶原始數(shù)據(jù)字段定義
canal 數(shù)據(jù)中會(huì)帶入表結(jié)構(gòu)。maxwell 更簡(jiǎn)潔。

8. Maxwell 的初始化數(shù)據(jù)功能

例如:初始化用戶表

bin/maxwell-bootstrap --user maxwell --password 000000 --host node101--database gmall2021 --table user_info --client_id maxwell_1

? --user maxwell
數(shù)據(jù)庫分配的操作 maxwell 數(shù)據(jù)庫的用戶名

? --password 000000
數(shù)據(jù)庫分配的操作 maxwell 數(shù)據(jù)庫的密碼

? --host
數(shù)據(jù)庫主機(jī)名

? --database
數(shù)據(jù)庫名

? --table
表名

? --client_id
maxwell-bootstrap 不具備將數(shù)據(jù)直接導(dǎo)入 kafka或者 hbase 的能力,通過–client_id指定將數(shù)據(jù)交給哪個(gè) maxwell 進(jìn)程處理,在 maxwell 的 conf.properties 中配置文章來源地址http://www.zghlxwxcb.cn/news/detail-463381.html

到了這里,關(guān)于Flink 實(shí)時(shí)數(shù)倉 (一) --------- 數(shù)據(jù)采集層的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【數(shù)倉】通過Flume+kafka采集日志數(shù)據(jù)存儲(chǔ)到Hadoop

    【數(shù)倉】通過Flume+kafka采集日志數(shù)據(jù)存儲(chǔ)到Hadoop

    【數(shù)倉】基本概念、知識(shí)普及、核心技術(shù) 【數(shù)倉】數(shù)據(jù)分層概念以及相關(guān)邏輯 【數(shù)倉】Hadoop軟件安裝及使用(集群配置) 【數(shù)倉】Hadoop集群配置常用參數(shù)說明 【數(shù)倉】zookeeper軟件安裝及集群配置 【數(shù)倉】kafka軟件安裝及集群配置 【數(shù)倉】flume軟件安裝及配置 【數(shù)倉】flum

    2024年03月17日
    瀏覽(28)
  • Flink系列之:使用Flink CDC從數(shù)據(jù)庫采集數(shù)據(jù),設(shè)置checkpoint支持?jǐn)?shù)據(jù)采集中斷恢復(fù),保證數(shù)據(jù)不丟失

    博主相關(guān)技術(shù)博客: Flink系列之:Debezium采集Mysql數(shù)據(jù)庫表數(shù)據(jù)到Kafka Topic,同步kafka topic數(shù)據(jù)到StarRocks數(shù)據(jù)庫 Flink系列之:使用Flink Mysql CDC基于Flink SQL同步mysql數(shù)據(jù)到StarRocks數(shù)據(jù)庫

    2024年02月11日
    瀏覽(31)
  • hdfs元數(shù)據(jù)實(shí)時(shí)采集

    hdfs元數(shù)據(jù)實(shí)時(shí)采集

    一、背景及問題 0.Hdfs元數(shù)據(jù)管理 ? 1.背景介紹 當(dāng)前在數(shù)據(jù)資產(chǎn)管理平臺(tái)上,需要展示每張hive表及分區(qū)的熱力情況(文件數(shù)、存儲(chǔ)量、更新時(shí)間等信息)。目前熱力數(shù)據(jù)包含兩部分內(nèi)容:熱力元數(shù)據(jù)和審計(jì)日志,其中審計(jì)日志可以直接消費(fèi)kafka得到,而熱力元數(shù)據(jù)暫時(shí)沒有可以直

    2024年02月09日
    瀏覽(18)
  • 水庫安全監(jiān)測(cè)方案(實(shí)時(shí)數(shù)據(jù)采集、高速數(shù)據(jù)傳輸)

    水庫安全監(jiān)測(cè)方案(實(shí)時(shí)數(shù)據(jù)采集、高速數(shù)據(jù)傳輸)

    ? 一、引言 水庫的安全監(jiān)測(cè)對(duì)于防止水災(zāi)和保障人民生命財(cái)產(chǎn)安全至關(guān)重要。為了提高水庫安全監(jiān)測(cè)的效率和準(zhǔn)確性,本文將介紹一種使用星創(chuàng)易聯(lián)DTU200和SG800 5g工業(yè)路由器部署的水庫安全監(jiān)測(cè)方案。 二、方案概述 本方案主要通過使用星創(chuàng)易聯(lián)DTU200和SG800 5g工業(yè)路由器實(shí)現(xiàn)

    2024年02月08日
    瀏覽(28)
  • 一百八十二、大數(shù)據(jù)離線數(shù)倉完整流程——步驟一、用Kettle從Kafka、MySQL等數(shù)據(jù)源采集數(shù)據(jù)然后寫入HDFS

    一百八十二、大數(shù)據(jù)離線數(shù)倉完整流程——步驟一、用Kettle從Kafka、MySQL等數(shù)據(jù)源采集數(shù)據(jù)然后寫入HDFS

    經(jīng)過6個(gè)月的奮斗,項(xiàng)目的離線數(shù)倉部分終于可以上線了,因此整理一下離線數(shù)倉的整個(gè)流程,既是大家提供一個(gè)案例經(jīng)驗(yàn),也是對(duì)自己近半年的工作進(jìn)行一個(gè)總結(jié)。 項(xiàng)目行業(yè)屬于交通行業(yè),因此數(shù)據(jù)具有很多交通行業(yè)的特征,比如轉(zhuǎn)向比數(shù)據(jù)就是統(tǒng)計(jì)車輛左轉(zhuǎn)、右轉(zhuǎn)、直行

    2024年02月07日
    瀏覽(20)
  • Sqoop與Flume的集成:實(shí)時(shí)數(shù)據(jù)采集

    Sqoop與Flume的集成:實(shí)時(shí)數(shù)據(jù)采集

    將Sqoop與Flume集成是實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)采集和傳輸?shù)闹匾襟E之一。Sqoop用于將數(shù)據(jù)從關(guān)系型數(shù)據(jù)庫導(dǎo)入到Hadoop生態(tài)系統(tǒng)中,而Flume用于數(shù)據(jù)流的實(shí)時(shí)采集、傳輸和處理。本文將深入探討如何使用Sqoop與Flume集成,提供詳細(xì)的步驟、示例代碼和最佳實(shí)踐,以確保能夠成功實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)

    2024年01月23日
    瀏覽(22)
  • 定時(shí)音頻數(shù)據(jù)采集并發(fā)送websocket實(shí)時(shí)播放

    一 定時(shí)音頻數(shù)據(jù)采集并發(fā)送websocket實(shí)時(shí)播放 Recorder.js

    2024年02月02日
    瀏覽(26)
  • 【flume實(shí)時(shí)采集mysql數(shù)據(jù)庫的數(shù)據(jù)到kafka】

    【flume實(shí)時(shí)采集mysql數(shù)據(jù)庫的數(shù)據(jù)到kafka】

    最近做了flume實(shí)時(shí)采集mysql數(shù)據(jù)到kafka的實(shí)驗(yàn),做個(gè)筆記,防止忘記 ?。?!建議從頭看到尾,因?yàn)橐恍┖?jiǎn)單的東西我在前面提了,后面沒提。 Kafka搭建:https://blog.csdn.net/cjwfinal/article/details/120803013 flume搭建:https://blog.csdn.net/cjwfinal/article/details/120441503?spm=1001.2014.3001.5502 編寫配置

    2024年02月03日
    瀏覽(24)
  • Kafka數(shù)據(jù)流的實(shí)時(shí)采集與統(tǒng)計(jì)機(jī)制

    隨著大數(shù)據(jù)時(shí)代的到來,實(shí)時(shí)數(shù)據(jù)處理成為了眾多企業(yè)和組織的關(guān)注焦點(diǎn)。為了滿足這一需求,Apache Kafka成為了一個(gè)廣泛采用的分布式流處理平臺(tái)。Kafka以其高吞吐量、可擴(kuò)展性和容錯(cuò)性而聞名,被廣泛應(yīng)用于日志收集、事件驅(qū)動(dòng)架構(gòu)和實(shí)時(shí)分析等場(chǎng)景。 在本文中,我們將探

    2024年02月07日
    瀏覽(28)
  • SoloX - Android/iOS性能數(shù)據(jù)實(shí)時(shí)采集工具

    SoloX - Android/iOS性能數(shù)據(jù)實(shí)時(shí)采集工具

    ??SoloX是一個(gè)可以實(shí)時(shí)收集Android/iOS性能數(shù)據(jù)的web工具。 快速定位分析性能問題,提升應(yīng)用的性能和品質(zhì)。 無需ROOT/越獄,即插即用。 主要特點(diǎn): 無需ROOT/越獄 : Android設(shè)備無需Root,iOS設(shè)備無需越獄。有效解決Android和iOS性能的測(cè)試和分析挑戰(zhàn)。 數(shù)據(jù)完整性 :我們提供CP

    2024年04月11日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包