第 1 章:數(shù)據(jù)倉庫
1.1 數(shù)據(jù)倉庫概述
1.1.1 數(shù)據(jù)倉庫概念
1、數(shù)據(jù)倉庫概念:
為企業(yè)制定決策,提供數(shù)據(jù)支持的集合。通過對數(shù)據(jù)倉庫中數(shù)據(jù)的分析,可以幫助企業(yè),改進業(yè)務流程、控制成本,提高產(chǎn)品質量。
數(shù)據(jù)倉庫并不是數(shù)據(jù)的最終目的地,而是為數(shù)據(jù)最終的目的地做好準備,這些準備包括對數(shù)據(jù)的:清洗、轉義、分類、重組、合并、拆分、統(tǒng)計等。
2、數(shù)據(jù)倉庫的數(shù)據(jù)通常包括:業(yè)務數(shù)據(jù)、用戶行為數(shù)據(jù)和爬蟲數(shù)據(jù)等
3、業(yè)務系統(tǒng)數(shù)據(jù)庫(關系型數(shù)據(jù)庫中)
1)業(yè)務數(shù)據(jù):主要指的是各行業(yè)在處理事務過程中產(chǎn)生的業(yè)務數(shù)據(jù)
2)產(chǎn)生:用戶在電商網(wǎng)站中登錄、下單、支付等過程中,需要和網(wǎng)站后臺數(shù)據(jù)庫進行增刪改查交互,產(chǎn)生的數(shù)據(jù)
3)存儲:都是存儲到關系型數(shù)據(jù)庫(如:mysql、oracle)中。
4、用戶行為數(shù)據(jù)(日志文件log)
1)用戶行為數(shù)據(jù):用戶在使用產(chǎn)品過程中,通過埋點與客戶端產(chǎn)品交互所產(chǎn)生的數(shù)據(jù),并發(fā)往日志服務器進行保存。
2)存儲:用戶數(shù)據(jù)通常存儲在日志文件中。
5、爬蟲數(shù)據(jù):通過技術手段獲取其它公司網(wǎng)站的數(shù)據(jù)。
1.1.2 數(shù)據(jù)倉庫示意圖
數(shù)據(jù)倉庫(data warehouse),為企業(yè)指定決策,提供數(shù)據(jù)支持的??梢詭椭髽I(yè),改進業(yè)務流程、提高產(chǎn)品質量等。
數(shù)據(jù)倉庫,并不是數(shù)據(jù)的最終目的地,而是為數(shù)據(jù)最終的目的地做好準備。這些準備包括對數(shù)據(jù)的:備份、清洗、聚合、統(tǒng)計等。
1、報表系統(tǒng):對存儲的數(shù)據(jù)做數(shù)據(jù)統(tǒng)計分析
2、用戶畫像:即用戶信息標簽化,是基于數(shù)據(jù)挖掘的用戶特征提取即需求深度挖掘,是大數(shù)據(jù)時代圍繞“以用戶為中心”開展的個性化服務。標簽化的模型是從用戶社交屬性、生活習慣、消費行為等信息中抽象出來的產(chǎn)物,是用戶“特征標簽”的幾個。
3、推薦系統(tǒng):通過對用戶的歷史行為、用戶興趣偏好來經(jīng)過推薦算法計算分析,然后產(chǎn)生用戶可能感興趣的項目列表。推薦系統(tǒng)可以更精準的理解用戶需求,對用戶進行聚類、打標簽,推薦用戶感興趣的商品,幫助用戶快速找到需要的商品,同時放大需求、增加流量入口、提高商品銷售的有效轉化率。
4、機器學習:利用機器學習算法模型基于大數(shù)據(jù)集進行數(shù)據(jù)挖掘,發(fā)現(xiàn)和利用數(shù)據(jù)價值。
1.2 數(shù)倉項目搭建概述
1.2.1 項目需求分析
1、數(shù)據(jù)需求:用戶分析日志log、業(yè)務數(shù)據(jù)db
2、采集需求:日志采集系統(tǒng)(flume)、業(yè)務數(shù)據(jù)同步系統(tǒng)(Maxwell,datax)
3、數(shù)據(jù)倉庫建模:維度建模
4、數(shù)據(jù)分析:對設備、會員、商品、地區(qū)、活動等電商核心主題進行統(tǒng)計,統(tǒng)計的報表指標接近100個。
5、即席查詢:用戶在使用系統(tǒng)時,根據(jù)自己當時的需求定義的查詢,通常使用即席查詢工具。
6、集群監(jiān)控:對集群性能進行監(jiān)控,發(fā)生異常及時報警。
7、元數(shù)據(jù)管理:存儲所有表對象的詳細信息,通過元數(shù)據(jù)管理有助于開發(fā)人員理解管理數(shù)據(jù)。
8、數(shù)據(jù)質量監(jiān)控:數(shù)據(jù)質量是數(shù)據(jù)分析和數(shù)據(jù)挖掘結果有效性和準確性的基礎。數(shù)據(jù)的導入導出是否完整、一致等問題。一般使用數(shù)據(jù)質量監(jiān)控工具完成。
1.2.2 項目框架
思考:項目技術如何選型?
1、技術選型
考慮的因素:數(shù)據(jù)量的大小、業(yè)務需求、行業(yè)經(jīng)驗、技術成熟度、開發(fā)維護成本、總成本預算
技術選型
數(shù)據(jù)采集傳輸:Flume、kafka、datax,maxwell,sqoop,logstash
數(shù)據(jù)存儲:mysql、hdfs、hbase、redis、mongodb
數(shù)據(jù)計算:hive、spark、flink、storm、tez
數(shù)據(jù)查詢:presto、kylin、impala、druid、clickhouse、doris
數(shù)據(jù)可視化:superset、echarts、quickbi、datav
任務調度、dolphinscheduler、azkabanoozie、airflow
集群監(jiān)控:zabbix、prometheus
元數(shù)據(jù)管理:atlas
權限管理:ranger、sentry
2、系統(tǒng)流程設計
思考:框架版本如何選擇?
3、框架版本選型
1)框架選型
(1)如何選擇apache/cdh/hdp版本?
apache:運維麻煩,組件間兼容性需要自己調研。(大廠使用)
cdh:國內使用最多的版本,開始收費
hdp:開源,可以進行二次開發(fā),但沒cdh穩(wěn)定,國內很少使用
(2)云服務選擇
阿里云的emr、maxcompute、dataworks
亞馬遜云emr
騰訊云emr
華為云emr
2)apache框架版本選型
版本選擇基本原則:
大版本:框架版本選擇盡量不要選擇最新的框架,選擇最新版本半年前的穩(wěn)定版本。
小版本:選大不選小。
1.3 基礎設施
1.3.1 服務器選型
思考:服務器選擇物理機還是云主機呢?主要看成本
不同類型主機的成本投入:
如何選擇?
1、有錢并且和阿里有業(yè)務沖突的 -> 物理機
2、中小公司,為了快速拉到投資 -> 阿里云
3、資金充足,有長期打算的公司 -> 物理機
1.3.2 集群資源規(guī)劃
在企業(yè)中通常會搭建一套生產(chǎn)集群和一套測試集群。
生產(chǎn)集群運行生產(chǎn)任務。
測試集群用于上線前代碼編寫和測試。
1、集群規(guī)模
數(shù)據(jù)量有關
1)如何確認集群規(guī)模?(假設:每臺服務器8T磁盤,128G內存)
(1)每天日活用戶100萬,每人一天平均100條:100萬100條=1億條
(2)每條日志1K左右,每天1億條:100000000/1024/1024=約100G
(3)半年內不擴容服務器來算:100G180天=約18T
(4)保存3副本:18T3=54T
(5)預留20%~30%Buf=54T/0.7=77T
(6)算到這:約8T10臺服務器
2)如果考慮數(shù)倉分層?數(shù)據(jù)采用壓縮?需要重新再計算
2、部署原則:
1)消耗內存的分開
2)傳輸數(shù)據(jù)比較緊密的放在一起(kafka、zookeeper)
3)客戶端盡量放在1到2臺服務器上,方便外部訪問
4)有依賴關系的盡量放在同一臺服務器上
(1)生產(chǎn)集群部署規(guī)劃
(2)測試集群服務部署規(guī)劃
第 2 章:用戶行為日志
2.1 用戶行為日志概述
1、用戶行為日志:包括用戶的各項行為信息以及行為所處的環(huán)境信息
2、目的:優(yōu)化產(chǎn)品和為各項分析統(tǒng)計指標提供數(shù)據(jù)支撐
3、收集手段:埋點
2.2 用戶行為日志內容
本項目中收集和分析目標數(shù)據(jù)主要有:頁面數(shù)據(jù)、事件數(shù)據(jù)、曝光數(shù)據(jù)、啟動數(shù)據(jù)和錯誤數(shù)據(jù)
2.2.1 頁面數(shù)據(jù)
1、頁面數(shù)據(jù):主要記錄一個頁面的用戶訪問情況,包括訪問時間、停留時間、頁面路徑等信息。
2.2.2 事件數(shù)據(jù)
事件數(shù)據(jù)主要記錄應用內一個具體的操作行為,包括操作類型、操作對象、操作對象描述等信息
2.2.3 曝光數(shù)據(jù)
曝光數(shù)據(jù)主要記錄頁面所曝光的內容,包括曝光對象,曝光類型等信息
2.2.4 啟動數(shù)據(jù)
啟動數(shù)據(jù)記錄應用的啟動信息
2.2.5 錯誤數(shù)據(jù)
應用使用過程中的錯誤信息,包括錯誤編號和錯誤信息
2.3 用戶行為日志格式
埋點日志數(shù)據(jù)可分為兩大類:普通頁面埋點日志、啟動日志
普通頁面埋點日志包括:一個頁面瀏覽記錄、若干個用戶在該頁面所做的動作記錄、若干個該頁面的曝光記錄、以及一個在該頁面發(fā)生的報錯記錄。除上述行為信息,頁面日志還包括了這些行為所處的各種環(huán)境信息,包括用戶信息、事件信息、地理位置信息、設備信息、應用信息、渠道信息等。
1、普通頁面埋點日志
啟動日志:以啟動為單位,及一次啟動行為,生成一條啟動日志。一條完整的啟動日志包括一個啟動記錄,一個本次啟動時的報錯記錄,以及啟動時所處的環(huán)境信息,包括用戶信息、時間信息、地理位置信息、設備信息、應用信息、渠道信息等。
第 1 章:電商業(yè)務介紹
1.1 電商的業(yè)務流程
1、我們以一個普通用戶的瀏覽足跡為例進行說明:
1)用戶點開電商首頁開始瀏覽,可能會通過分類查詢或者通過全文搜索找到自己中意的商品,這些商品無疑都是存儲在后臺的管理系統(tǒng)中的
2)當用戶尋址到自己中意的商品,可能會想要購買,將商品加入到購物車中,發(fā)現(xiàn)需要登錄,登錄后,對商品進行結算,這時候購物車的管理和商品訂單信息的生成都會對業(yè)務數(shù)據(jù)庫產(chǎn)生影響,會生成相應的訂單數(shù)據(jù)和支付數(shù)據(jù)。
3)訂單數(shù)據(jù)生成之后,還會對訂單進行跟蹤處理,直到訂單全部完成。
2、主要業(yè)務流程包括:
1)用戶前臺瀏覽商品時的商品詳情的管理
2)用戶商品加入購物車進行支付時用戶個人中心和支付服務的管理
3)用戶支付完成后訂單后臺服務的管理
這些流程涉及到了十幾個甚至幾十個業(yè)務數(shù)據(jù)表,甚至更多
1.2 電商常識
1.2.1 sku和spu
sku:庫存量基本單位。產(chǎn)品統(tǒng)一編號的簡稱,每種商品均對應有唯一的sku號。sku表示一個商品
spu:商品信息集合的最小單位。一組可復用、易檢索的標準化信息集合。spu表示一類商品,同一spu的商品可以共用商品圖片、海報、銷售屬性等
1.2.2 平臺屬性和銷售屬性
1.2.3 電商系統(tǒng)表結構
上面展示的就是本電商數(shù)倉系統(tǒng)涉及到的業(yè)務數(shù)據(jù)表結構關系。
1、這一共34張表,以訂單表、用戶表、sku商品表、活動表和優(yōu)惠卷表為中心。
2、延申出了優(yōu)惠卷領用表,支付流水表、活動訂單表、訂單詳情表、訂單狀態(tài)表、商品品論表、編碼字典表、退單表、spu商品表等。
3、其中的用戶表提供用戶的詳細信息:支付流水表提供訂單的支付詳情;訂單詳情表提供訂單的商品數(shù)量情況;商品表給訂單詳情表提供商品的詳細信息。
本次講解以此34各表為例,實際生產(chǎn)項目中,業(yè)務數(shù)據(jù)庫中的表遠遠不止這些。
第 1 章:實時數(shù)倉同步數(shù)據(jù)
實時數(shù)倉用flink源源不斷地從kafka中讀取數(shù)據(jù)進行計算,所以不需要手動同步數(shù)據(jù)到實時數(shù)倉。
第 2 章:離線數(shù)倉同步數(shù)據(jù)
2.1 用戶行為數(shù)據(jù)同步
2.1.1 數(shù)據(jù)通道
用戶行為數(shù)據(jù)由flume從kafka直接同步到hdfs上,由于離線數(shù)倉采用hive地分區(qū)表按天統(tǒng)計,所以目標路徑要包括一層日期。具體數(shù)據(jù)流向如圖:
2.1.2 日志消費flume概述
1、日志消費flume在架構中的定位
日志消費flume主要用于消費kafka集群中topic_log的數(shù)據(jù)寫入到hdfs中。
flume的集群規(guī)劃如下:
日志消費flume我們將其安裝部署在flume04上。
1)背景:安裝規(guī)劃,該flume需要將kafka中的topic_log的數(shù)據(jù)采集并發(fā)送到hdfs,并且需要對每天產(chǎn)生的用戶行為數(shù)據(jù)進行分區(qū)存儲,將不同日期的數(shù)據(jù)發(fā)送到hdfs中以不同日期命名的路徑下。
2)flume插件選擇:kafkasource、fliechannel、hdfssink
3)關鍵配置如下:
kafkasource:
#訂閱kafka中的topic_log
a1.source.r1.kafka.topics=topic_log
#使用時間戳攔截器為event增加一個header,key為timestamp,value為json字符串中ts字段的值
interceptors=i1
interceptors.i1.type=timestampinterceptor.builder
hdfssink
#path中包括時間轉移序列,用于將不同日期的數(shù)據(jù)放在不同的路徑
path=/orgin_data/gmall/log/topic_log/%Y-%m-%d
2.1.3 日志消費flume配置分析
#訂閱kafka中topic
a1.sources.r1.kafka.topics=topic_log
#path包括時間轉義序列,將不同日期的數(shù)據(jù)放到不同的目錄下
a1.sinks.k1.hdfs.path=/orgin_data/gmall/log/topic_log/%Y-%m-%d
#使用時間攔截器為event增加一個header,其中key是timestamp,value是json字符串中的ts字段的值
interceptors=i1
interceptors.i1.type=timestampinterceptor$builder
2.1.4 自定義flume攔截器
1、日志消費flume使用攔截器的目的-處理時間漂移問題
我們知道在使用hdfs sink時需要在event的header上設置時間戳屬性。但是使用默認的timestrampinterceptor攔截器會默認使用linux系統(tǒng)時間,作為輸出到hdfs路徑的時間。
但是如果數(shù)據(jù)時23:59:59分鐘產(chǎn)生的,flume消費kafka中數(shù)據(jù)時,有可能已經(jīng)到了第二天了,那么這部分數(shù)據(jù)就會發(fā)送到第二天的hdfs路徑。
我們希望根據(jù)日志里面的實際時間,發(fā)往hdfs的路徑,所以我們需要自定義攔截器實現(xiàn)將日志里面的實際時間,提取出來,配置到event的header中。
注意:想要復現(xiàn)時間飄逸現(xiàn)象時,需要保證數(shù)據(jù)產(chǎn)生時間是在時間節(jié)點重新計算附件,如:按天的就需要在00:00前的一分鐘以內;按分鐘的就要在每分鐘的前5秒以內。
2、自定義攔截器
1)創(chuàng)建類timestampinterceptor類
package com.atguigu.flume.interceptor;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* @author leon
* @ClassName TimeStampInterceptor.java
* @createTime 2022年01月23日 13:30:00
*/
public class TimeStampInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
// 1. 獲取Event的Body
String log = new String(event.getBody(), StandardCharsets.UTF_8);
// 2. 解析log為json對象
JSONObject jsonObject = JSONObject.parseObject(log);
// 3. 獲取log中的時間戳
String ts = jsonObject.getString("ts");
// 4. 將時間戳屬性配置到header中
event.getHeaders().put("timestamp",ts);
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new TimeStampInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
2)重新打包后,上傳到hadoop104的flume根目錄下lib文件夾下
[atguigu@hadoop104 lib]$ ls -al | grep flume-interceptor*
-rw-rw-r--. 1 atguigu atguigu 662479 1月 23 13:40 flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar
2.1.5 編寫日志消費flume的配置文件
1、編寫配置日志消費flume的配置文件
在hadoop104的/opt/module/flume/job目錄下創(chuàng)建flume-kafka-hdfs.conf
[atguigu@hadoop104 conf]$ vim flume-kafka-hdfs.conf
# 組件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
# source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.TimeStampInterceptor$Builder
# channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
# sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
# 控制輸出文件DataStream格式。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
2、配置優(yōu)化
1)FIlechannel優(yōu)化
(1)通過配置datadirs指向多個路徑,每個路徑對應不同的硬盤,增大flume吞吐量
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
(2)checkpointdir和backupcheckpointdir也盡量配置到不同的硬盤對應的目錄中,保證checkpoint壞掉后,可以快速使用backupcheckpointdir恢復數(shù)據(jù)
2)hdfs sink優(yōu)化
(1)hdfs存入大量小文件的影響
(2)hdfs小文件處理:配置三個參數(shù)hdfs.rollinterval=3600,hdfs.rollsize=134217728,hdfs.rollcount=0效果;當文件達到128m時會產(chǎn)生新的文件;當創(chuàng)建超過3600秒時會滾動產(chǎn)生新的文件。
2.1.6 編寫日志消費flume啟動停止腳本
1、在hadoop102下的atguima用戶根目錄/home/atguigu/bin下,創(chuàng)建f2.sh文件
[atguigu@hadoop102 bin]$ vim f2.sh
#! /bin/bash
# 1. 判斷是否存在參數(shù)
if [ $# == 0 ];then
echo -e "請輸入?yún)?shù):\nstart 啟動日志消費flume;\nstop 關閉日志消費flume;"&&exit
fi
case $1 in
"start"){
echo " --------啟動 hadoop104 消費flume-------"
ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/job/flume-kafka-hdfs.conf --conf /opt/module/flume/conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/logs/flume.log 2>&1 &"
};;
"stop"){
echo "---------- 停止 hadoop104 上的 日志消費flume ----------"
flume_count=$(xcall jps -ml | grep flume-kafka-hdfs|wc -l);
if [ $flume_count != 0 ];then
ssh hadoop104 "ps -ef | grep flume-kafka-hdfs | grep -v grep | awk '{print \$2}' | xargs -n1 kill -9"
else
echo " hadoop104 當前沒有日志采集flume在運行"
fi
};;
esac
2、設置f2.sh文件的執(zhí)行權限
[atguigu@hadoop102 bin]$ chmod +x f2.sh
2.1.7 用戶行為數(shù)據(jù)同步測試
1、首先執(zhí)行腳本f2.sh啟動日志消費flume,消費kafka中topic_log的數(shù)據(jù)
[atguigu@hadoop102 module]$ f2.sh start
2、執(zhí)行腳本f1.sh啟動日志采集flume,采集日志文件到kafka中的topic_log
[atguigu@hadoop102 module]$ f1.sh start
3、執(zhí)行腳本lg.sh啟動日志數(shù)據(jù)模擬程序,生產(chǎn)模擬數(shù)據(jù)(需要修改配置文件)
[atguigu@hadoop102 module]$ lg.sh
4、查看各節(jié)點的運行程序
[atguigu@hadoop102 ~]$ xcall "jps -ml"
=============== hadoop102 ===============
11584 org.apache.hadoop.hdfs.server.namenode.NameNode
12256 org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer
6113 kafka.Kafka /opt/module/kafka/config/server.properties
11747 org.apache.hadoop.hdfs.server.datanode.DataNode
12420 gmall2020-mock-log-2021-01-22.jar
12453 sun.tools.jps.Jps -ml
5705 org.apache.zookeeper.server.quorum.QuorumPeerMain /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
10764 org.apache.flume.node.Application --conf-file /opt/module/flume/conf/flume-tailDir-kafka.conf --name a1
12031 org.apache.hadoop.yarn.server.nodemanager.NodeManager
=============== hadoop103 ===============
5584 kafka.Kafka /opt/module/kafka/config/server.properties
8355 org.apache.hadoop.yarn.server.nodemanager.NodeManager
7589 org.apache.flume.node.Application --conf-file /opt/module/flume/conf/flume-tailDir-kafka.conf --name a1
8213 org.apache.hadoop.yarn.server.resourcemanager.ResourceManager
5174 org.apache.zookeeper.server.quorum.QuorumPeerMain /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
8843 sun.tools.jps.Jps -ml
8046 org.apache.hadoop.hdfs.server.datanode.DataNode
8814 gmall2020-mock-log-2021-01-22.jar
=============== hadoop104 ===============
5651 org.apache.zookeeper.server.quorum.QuorumPeerMain /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
8627 sun.tools.jps.Jps -ml
8084 org.apache.hadoop.hdfs.server.datanode.DataNode
8265 org.apache.hadoop.yarn.server.nodemanager.NodeManager
6059 kafka.Kafka /opt/module/kafka/config/server.properties
8427 org.apache.flume.node.Application --conf-file /opt/module/flume/conf/flume-kafka-hdfs.conf --name a1
8173 org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode
5、查看對應hdfs上的目錄下是否生成了新的數(shù)據(jù)文件
[atguigu@hadoop102 module]$ hdfs dfs -ls /origin_data/gmall/log/topic_log
Found 2 items
drwxr-xr-x - atguigu supergroup 0 2022-01-23 16:21 /origin_data/gmall/log/topic_log/2020-06-14
[atguigu@hadoop102 module]$ hdfs dfs -ls /origin_data/gmall/log/topic_log/2020-06-14
-rw-r--r-- 3 atguigu supergroup 544097 2022-01-23 16:20 /origin_data/gmall/log/topic_log/2020-06-14/log-.1642926024093
-rw-r--r-- 3 atguigu supergroup 1075832 2022-01-23 16:21 /origin_data/gmall/log/topic_log/2020-06-14/log-.1642926030114
[atguigu@hadoop102 module]$ hdfs dfs -cat /origin_data/gmall/log/topic_log/2020-06-14/log-.1642926024093 |zcat
……
{"common":{"ar":"110000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone Xs Max","mid":"mid_125455","os":"iOS 13.2.3","uid":"65","vc":"v2.1.134"},"page":{"during_time":19258,"last_page_id":"home","page_id":"mine"},"ts":1592122835000}
……
2.2 業(yè)務數(shù)據(jù)同步策略
業(yè)務數(shù)據(jù)是數(shù)據(jù)倉庫的重要數(shù)據(jù)來源,我們需要每日定時從業(yè)務數(shù)據(jù)庫中抽取數(shù)據(jù),傳輸?shù)綌?shù)據(jù)倉庫中,之后再對數(shù)據(jù)進行分析統(tǒng)計。為保證統(tǒng)計結果的正確性,需要保證數(shù)據(jù)倉庫中的數(shù)據(jù)與業(yè)務數(shù)據(jù)庫是同步的,離線數(shù)倉的計算周期通常為天,所以數(shù)據(jù)同步周期也通常為天,即每天同步一次即可。
在同步業(yè)務數(shù)據(jù)時有兩種同步策略:全量同步和增量同步
2.2.1 全量同步策略
1、解釋:每日全量,就是每天都將業(yè)務數(shù)據(jù)庫中的全部數(shù)據(jù)同步一份到數(shù)據(jù)倉庫,是保證兩側數(shù)據(jù)同步的最簡單的方式
2、適用:表數(shù)據(jù)量不大,且每天即會有新數(shù)據(jù)加入,也會有舊的數(shù)據(jù)修改
3、編碼字典表、品牌表、商品三級分類表、商品二級分類表、商品一級分類表、優(yōu)惠規(guī)則表、活動表、活動參與商品表、加購表、商品收藏表、優(yōu)惠卷表、sku商品表、spu商品表
2.2.2 增量同步策略
解釋:每日增量,就是每天只將業(yè)務數(shù)據(jù)中的新增及變化的數(shù)據(jù)同步到數(shù)據(jù)倉庫中。
適用:表數(shù)據(jù)量大,且每天只會有新的數(shù)據(jù)插入的場景。
特點:采用每日增量的表,通常會在首日先進行一個全量同步。
例如:退單表、訂單狀態(tài)表、支付流水表、訂單詳情表、活動與訂單關聯(lián)表、商品評論表
2.2.3 數(shù)據(jù)同步策略的選擇
兩種策略都能保證數(shù)據(jù)倉庫和業(yè)務數(shù)據(jù)庫的數(shù)據(jù)同步,那應該選擇哪個呢?
結論:若業(yè)務數(shù)據(jù)量比較大,且每天的數(shù)據(jù)變化比例還比較低,這時應該選擇增量同步,否則采用全量同步。
2.2.4 同步工具概述
1、種類繁多的數(shù)據(jù)同步工具中,大致可以分為兩大類
1)基于Select查詢的離線、批量同步工具,代表:datax、sqoop
2)基于數(shù)據(jù)庫表述變更日志(mysql的binlog)的實時流式同步工具,代表:maxwell、canal
2、上述同步工具的全量或增量同步適用如下
3、同步工具之間對增量同步不同方案的對比
本項目中,全量同步采用datax,增量同步采用maxwell
注:由于后續(xù)數(shù)倉建模需要,cart_inso需進行全量同步和增量同步
2.3 全量表數(shù)據(jù)同步
2.3.1 數(shù)據(jù)同步工具datax部署
2.3.2 數(shù)據(jù)通道
全量表數(shù)據(jù)有datax從mysql業(yè)務數(shù)據(jù)庫中直接同步到hdfs,具體數(shù)據(jù)流向如下表
注:
1、目標路徑中表名需包含后綴full,表示該表為全量同步
2、目標路徑中包含一層日期,用以對不同天的數(shù)據(jù)進行區(qū)分
2.3.3 編寫datax配置文件
我們需要為每張全量表編寫一個datax的json配置文件,此處為activity_Info為例,編輯配置文件如下:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"activity_name",
"activity_type",
"activity_desc",
"start_time",
"end_time",
"create_time"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://hadoop102:3306/gmall"
],
"table": [
"activity_info"
]
}
],
"password": "jianglai",
"splitPk": "",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "activity_name",
"type": "string"
},
{
"name": "activity_type",
"type": "string"
},
{
"name": "activity_desc",
"type": "string"
},
{
"name": "start_time",
"type": "string"
},
{
"name": "end_time",
"type": "string"
},
{
"name": "create_time",
"type": "string"
}
],
"compress": "gzip",
"defaultFS": "hdfs://hadoop102:8020",
"fieldDelimiter": "\t",
"fileName": "activity_info",
"fileType": "text",
"path": "${targetdir}",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
這個配置文件定義了一個從 MySQL 數(shù)據(jù)庫讀取特定表和列的數(shù)據(jù),并將其寫入到 HDFS 的過程,同時涵蓋了數(shù)據(jù)類型、連接信息、輸出格式和壓縮方式等詳細信息。這種配置通常用于數(shù)據(jù)倉庫的數(shù)據(jù)抽取、轉換和加載(ETL)過程。
注:我們需要對不同天的數(shù)據(jù)加以分區(qū),故path參數(shù)的值配置為動態(tài)傳入?yún)?shù),名為targetdir
2、測試配置文件
[atguigu@hadoop102 datax]$ python bin/datax.py job/activity_info.json -p"-DtargetDir=/origin_data/gmall/db/activity_info_full/2020-06-14"
3、執(zhí)行時如果報錯如下:
經(jīng)DataX智能分析,該任務最可能的錯誤原因是:
com.alibaba.datax.common.exception.DataXException: Code:[HdfsWriter-02], Description:[您填寫的參數(shù)值不合法.]. - 您配置的path: [/origin_data/gmall/db/activity_info/2020-06-14] 不存在, 請先在hive端創(chuàng)建對應的數(shù)據(jù)庫和表.
4、這文件一個個寫太麻煩了,每天的日期都不一樣,怎么辦呢?
2.3.4 datax配置文件生成腳本
1、為了方便起見,我們適用腳本gen_import_config.py批量生成datax的配置文件,腳本內容如下:
# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb
#MySQL相關配置,需根據(jù)實際情況作出修改
mysql_host = "hadoop102"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "你的密碼"
#HDFS NameNode相關配置,需根據(jù)實際情況作出修改
hdfs_nn_host = "hadoop102"
hdfs_nn_port = "8020"
#生成配置文件的目標路徑,可根據(jù)實際情況作出修改
output_path = "/opt/module/datax/job/import"
def get_connection():
return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)
def get_mysql_meta(database, table):
connection = get_connection()
cursor = connection.cursor()
sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
cursor.execute(sql, [database, table])
fetchall = cursor.fetchall()
cursor.close()
connection.close()
return fetchall
def get_mysql_columns(database, table):
return map(lambda x: x[0], get_mysql_meta(database, table))
def get_hive_columns(database, table):
def type_mapping(mysql_type):
mappings = {
"bigint": "bigint",
"int": "bigint",
"smallint": "bigint",
"tinyint": "bigint",
"decimal": "string",
"double": "double",
"float": "float",
"binary": "string",
"char": "string",
"varchar": "string",
"datetime": "string",
"time": "string",
"timestamp": "string",
"date": "string",
"text": "string"
}
return mappings[mysql_type]
meta = get_mysql_meta(database, table)
return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)
def generate_json(source_database, source_table):
job = {
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": mysql_user,
"password": mysql_passwd,
"column": get_mysql_columns(source_database, source_table),
"splitPk": "",
"connection": [{
"table": [source_table],
"jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]
}]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
"fileType": "text",
"path": "${targetdir}",
"fileName": source_table,
"column": get_hive_columns(source_database, source_table),
"writeMode": "append",
"fieldDelimiter": "\t",
"compress": "gzip"
}
}
}]
}
}
if not os.path.exists(output_path):
os.makedirs(output_path)
with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
json.dump(job, f)
def main(args):
source_database = ""
source_table = ""
options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])
for opt_name, opt_value in options:
if opt_name in ('-d', '--sourcedb'):
source_database = opt_value
if opt_name in ('-t', '--sourcetbl'):
source_table = opt_value
generate_json(source_database, source_table)
if __name__ == '__main__':
main(sys.argv[1:])
這個腳本是為了簡化 DataX 數(shù)據(jù)遷移任務的配置過程。它自動從 MySQL 數(shù)據(jù)庫中獲取表的元數(shù)據(jù),然后生成相應的 DataX 配置文件,用于將數(shù)據(jù)從 MySQL 遷移到 HDFS。
2、安裝python mysql驅動
由于需要適用python訪問mysql數(shù)據(jù)庫,故需要安裝驅動,命令如下
[atguigu@hadoop102 bin]$ sudo yum install -y MySQL-python
3、python腳本使用說明
python gen_import_config.py -d database -t table
這樣雖然能調用python腳本生成指定表的datax的json配置文件,但是我的表很多,總不能每個表都執(zhí)行吧
4、創(chuàng)建gen_import_config.sh腳本
#!/bin/bash
python ~/bin/gen_import_config.py -d gmall -t activity_info
python ~/bin/gen_import_config.py -d gmall -t activity_rule
python ~/bin/gen_import_config.py -d gmall -t base_category1
python ~/bin/gen_import_config.py -d gmall -t base_category2
python ~/bin/gen_import_config.py -d gmall -t base_category3
python ~/bin/gen_import_config.py -d gmall -t base_dic
python ~/bin/gen_import_config.py -d gmall -t base_province
python ~/bin/gen_import_config.py -d gmall -t base_region
python ~/bin/gen_import_config.py -d gmall -t base_trademark
python ~/bin/gen_import_config.py -d gmall -t cart_info
python ~/bin/gen_import_config.py -d gmall -t coupon_info
python ~/bin/gen_import_config.py -d gmall -t sku_attr_value
python ~/bin/gen_import_config.py -d gmall -t sku_info
python ~/bin/gen_import_config.py -d gmall -t sku_sale_attr_value
python ~/bin/gen_import_config.py -d gmall -t spu_info
5、為gen_import_config.sh腳本賦予執(zhí)行權限
[atguigu@hadoop102 bin]$ chmod +x gen_import_config.sh
6、執(zhí)行gen_import_config.sh腳本生成配置文件
[atguigu@hadoop102 bin]$ gen_import_config.sh
7、觀察配置文件
[atguigu@hadoop102 bin]$ ll /opt/module/datax/job/import/
總用量 60
-rw-rw-r-- 1 atguigu atguigu 957 10月 15 22:17 gmall.activity_info.json
-rw-rw-r-- 1 atguigu atguigu 1049 10月 15 22:17 gmall.activity_rule.json
-rw-rw-r-- 1 atguigu atguigu 651 10月 15 22:17 gmall.base_category1.json
-rw-rw-r-- 1 atguigu atguigu 711 10月 15 22:17 gmall.base_category2.json
-rw-rw-r-- 1 atguigu atguigu 711 10月 15 22:17 gmall.base_category3.json
-rw-rw-r-- 1 atguigu atguigu 835 10月 15 22:17 gmall.base_dic.json
-rw-rw-r-- 1 atguigu atguigu 865 10月 15 22:17 gmall.base_province.json
-rw-rw-r-- 1 atguigu atguigu 659 10月 15 22:17 gmall.base_region.json
-rw-rw-r-- 1 atguigu atguigu 709 10月 15 22:17 gmall.base_trademark.json
-rw-rw-r-- 1 atguigu atguigu 1301 10月 15 22:17 gmall.cart_info.json
-rw-rw-r-- 1 atguigu atguigu 1545 10月 15 22:17 gmall.coupon_info.json
-rw-rw-r-- 1 atguigu atguigu 867 10月 15 22:17 gmall.sku_attr_value.json
-rw-rw-r-- 1 atguigu atguigu 1121 10月 15 22:17 gmall.sku_info.json
-rw-rw-r-- 1 atguigu atguigu 985 10月 15 22:17 gmall.sku_sale_attr_value.json
-rw-rw-r-- 1 atguigu atguigu 811 10月 15 22:17 gmall.spu_info.json
8、測試腳本生成的datax配置文件
我們以activity_Info為例,測試用腳本生成的配置文件是否可用
1)在hdfs上創(chuàng)建目標路徑
由于datax同步任務要求目標路徑提前存在,故需要手動創(chuàng)建路徑,當前activity_info表的目標路徑應為/origin_data/gmall/db/activity_info_full/2020-06-14
[atguigu@hadoop102 bin]$ hadoop fs -mkdir -f /origin_data/gmall/db/activity_info_full/2020-06-14
2)執(zhí)行datax同步命令
[atguigu@hadoop102 bin]$ python /opt/module/datax/bin/datax.py -p"-Dtargetdir=/origin_data/gmall/db/activity_info_full/2020-06-14" /opt/module/datax/job/import/gmall.activity_info.json
3)觀察同步結果
觀察hdfs目標路徑/origin_data/gmall/db/activity_info/2020-06-14下的文件內容
[atguigu@hadoop102 datax]$ hadoop fs -cat /origin_data/gmall/db/activity_info_full/2020-06-14/* | zcat
2022-03-02 14:05:05,527 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
1 聯(lián)想專場 3101 聯(lián)想滿減 2020-10-21 18:49:12 2020-10-31 18:49:15
2 Apple品牌日 3101 Apple品牌日 2020-06-10 00:00:00 2020-06-12 00:00:00
3 女神節(jié) 3102 滿件打折 2020-03-08 00:00:00 2020-03-09 00:00:00
9、全量表數(shù)據(jù)同步腳本
1)為方便使用以及后續(xù)的任務調度,此處編寫一個全量表數(shù)據(jù)同步腳本mysql_to_hdfs_full.sh
#!/bin/bash
# 定義datax的根目錄
DATAX_HOME=/opt/module/datax
# 如果傳入日期則do_date等于傳入的日期,否則等于前一天日期
if [ -n "$2" ] ;then
do_date=$2
else
do_date=`date -d "-1 day" +%F`
fi
#處理目標路徑,此處的處理邏輯是,如果目標路徑不存在,則創(chuàng)建;若存在,則清空,目的是保證同步任務可重復執(zhí)行
handle_targetdir() {
hadoop fs -test -e $1
if [[ $? -eq 1 ]]; then
echo "路徑$1不存在,正在創(chuàng)建......"
hadoop fs -mkdir -p $1
else
echo "路徑$1已經(jīng)存在"
fs_count=$(hadoop fs -count $1)
content_size=$(echo $fs_count | awk '{print $3}')
if [[ $content_size -eq 0 ]]; then
echo "路徑$1為空"
else
echo "路徑$1不為空,正在清空......"
hadoop fs -rm -r -f $1/*
fi
fi
}
#數(shù)據(jù)同步
import_data() {
datax_config=$1
target_dir=$2
handle_targetdir $target_dir
python $DATAX_HOME/bin/datax.py -p"-Dtargetdir=$target_dir" $datax_config
}
# 根據(jù)傳入的表名,處理不同的表
case $1 in
"activity_info")
import_data /opt/module/datax/job/import/gmall.activity_info.json /origin_data/gmall/db/activity_info_full/$do_date
;;
"activity_rule")
import_data /opt/module/datax/job/import/gmall.activity_rule.json /origin_data/gmall/db/activity_rule_full/$do_date
;;
"base_category1")
import_data /opt/module/datax/job/import/gmall.base_category1.json /origin_data/gmall/db/base_category1_full/$do_date
;;
"base_category2")
import_data /opt/module/datax/job/import/gmall.base_category2.json /origin_data/gmall/db/base_category2_full/$do_date
;;
"base_category3")
import_data /opt/module/datax/job/import/gmall.base_category3.json /origin_data/gmall/db/base_category3_full/$do_date
;;
"base_dic")
import_data /opt/module/datax/job/import/gmall.base_dic.json /origin_data/gmall/db/base_dic_full/$do_date
;;
"base_province")
import_data /opt/module/datax/job/import/gmall.base_province.json /origin_data/gmall/db/base_province_full/$do_date
;;
"base_region")
import_data /opt/module/datax/job/import/gmall.base_region.json /origin_data/gmall/db/base_region_full/$do_date
;;
"base_trademark")
import_data /opt/module/datax/job/import/gmall.base_trademark.json /origin_data/gmall/db/base_trademark_full/$do_date
;;
"cart_info")
import_data /opt/module/datax/job/import/gmall.cart_info.json /origin_data/gmall/db/cart_info_full/$do_date
;;
"coupon_info")
import_data /opt/module/datax/job/import/gmall.coupon_info.json /origin_data/gmall/db/coupon_info_full/$do_date
;;
"sku_attr_value")
import_data /opt/module/datax/job/import/gmall.sku_attr_value.json /origin_data/gmall/db/sku_attr_value_full/$do_date
;;
"sku_info")
import_data /opt/module/datax/job/import/gmall.sku_info.json /origin_data/gmall/db/sku_info_full/$do_date
;;
"sku_sale_attr_value")
import_data /opt/module/datax/job/import/gmall.sku_sale_attr_value.json /origin_data/gmall/db/sku_sale_attr_value_full/$do_date
;;
"spu_info")
import_data /opt/module/datax/job/import/gmall.spu_info.json /origin_data/gmall/db/spu_info_full/$do_date
;;
"all")
import_data /opt/module/datax/job/import/gmall.activity_info.json /origin_data/gmall/db/activity_info_full/$do_date
import_data /opt/module/datax/job/import/gmall.activity_rule.json /origin_data/gmall/db/activity_rule_full/$do_date
import_data /opt/module/datax/job/import/gmall.base_category1.json /origin_data/gmall/db/base_category1_full/$do_date
import_data /opt/module/datax/job/import/gmall.base_category2.json /origin_data/gmall/db/base_category2_full/$do_date
import_data /opt/module/datax/job/import/gmall.base_category3.json /origin_data/gmall/db/base_category3_full/$do_date
import_data /opt/module/datax/job/import/gmall.base_dic.json /origin_data/gmall/db/base_dic_full/$do_date
import_data /opt/module/datax/job/import/gmall.base_province.json /origin_data/gmall/db/base_province_full/$do_date
import_data /opt/module/datax/job/import/gmall.base_region.json /origin_data/gmall/db/base_region_full/$do_date
import_data /opt/module/datax/job/import/gmall.base_trademark.json /origin_data/gmall/db/base_trademark_full/$do_date
import_data /opt/module/datax/job/import/gmall.cart_info.json /origin_data/gmall/db/cart_info_full/$do_date
import_data /opt/module/datax/job/import/gmall.coupon_info.json /origin_data/gmall/db/coupon_info_full/$do_date
import_data /opt/module/datax/job/import/gmall.sku_attr_value.json /origin_data/gmall/db/sku_attr_value_full/$do_date
import_data /opt/module/datax/job/import/gmall.sku_info.json /origin_data/gmall/db/sku_info_full/$do_date
import_data /opt/module/datax/job/import/gmall.sku_sale_attr_value.json /origin_data/gmall/db/sku_sale_attr_value_full/$do_date
import_data /opt/module/datax/job/import/gmall.spu_info.json /origin_data/gmall/db/spu_info_full/$do_date
;;
esac
這個腳本的目的是使得數(shù)據(jù)同步的過程自動化和標準化。它允許用戶指定特定的表或一組表來進行數(shù)據(jù)同步,同時處理日期和目標路徑的邏輯,確保數(shù)據(jù)同步的準確性和可重復性。
2)為mysql_to_hdfs_full.sh腳本增加執(zhí)行權限
[atguigu@hadoop102 bin]$ chmod +x mysql_to_hdfs_full.sh
3)測試同步腳本
[atguigu@hadoop102 bin]$ mysql_to_hdfs_full.sh all 2020-06-14
4)檢查同步結果
查看hdfs目標路徑是否出現(xiàn)了全量表數(shù)據(jù),全量表共15張
全量表同步邏輯比較簡單,只需要每日執(zhí)行全量表數(shù)據(jù)同步腳本mysql_to_hdfs_full.sh即可
2.4 增量表數(shù)據(jù)同步
2.4.1 數(shù)據(jù)通道
增量表數(shù)據(jù)同步數(shù)據(jù)通道如下所示:
注:
1、目標路徑中表明需包含后綴inc,表示該表為锃亮同步
2、目標路徑中包含一層日期,用以對不同天的數(shù)據(jù)進行區(qū)分
2.4.3 flume配置
1、需求:此處flume需要將maxwell采集kafka topic中的業(yè)務變更數(shù)據(jù)傳輸?shù)絟dfs
2、需求分析
1)flume需要用到的組件是:kafkasource和hdfssink,channel選擇filechannel
2)kafkasource需要訂閱kafka中1個topic:topic_db
3)hdfssink需要將不同數(shù)據(jù)寫到不同的路徑,路徑中還用該包含一層日期,用于分區(qū)每天的數(shù)據(jù)
2.4.4 配置示意圖
采用kafka topic中的業(yè)務變更數(shù)據(jù)到hdfs的flume,我們部署在hadoop104
2.4.5 flume配置
1、編寫flume配置文件kafka_to_hdfs_db.conf
[atguigu@hadoop104 job]$ vim kafka_to_hdfs_db.conf
# agent
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# sources
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics = topic_db
a1.sources.r1.kafka.consumer.group.id = flume
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.db.TimestampAndTableNameInterceptor $Builder
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{tableName}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip
## bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
這個配置文件的作用是從 Kafka 主題 topic_db 中讀取數(shù)據(jù),經(jīng)過處理后(如添加時間戳和表名),暫存到文件系統(tǒng)中,并最終將數(shù)據(jù)以壓縮格式寫入到 HDFS 的指定路徑。
2.4.6 配置flume攔截器
項目的pom.xml文件配置
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
創(chuàng)建com.atguigu.gmall.flume.interceptor.db包,并在該包下創(chuàng)建timestampandtablenameinterceptor類
public class TimestampAndTableNameInterceptor implements Interceptor {
@Override
public void initialize() { }
/**
* 攔截單個事件
* @param event
* @return
*/
@Override
public Event intercept(Event event) {
// 1. 獲取事件header
Map<String, String> headers = event.getHeaders();
// 2. 獲取解析body
String body = new String(event.getBody(), StandardCharsets.UTF_8);
// 3. 使用fastjson,將body字符串轉化為JSONObject對象
JSONObject jsonObject = JSONObject.parseObject(body);
// 4. 獲取數(shù)據(jù)中的時間戳
Long ts = jsonObject.getLong("ts");
// 5. Maxwell輸出的數(shù)據(jù)的ts字段時間單位是秒,HDFSSink要求的時間單位是毫秒
String timeMills = String.valueOf(ts * 1000);
// 6. 獲取body數(shù)據(jù)中的table的值
String tableName = jsonObject.getString("table");
// 7. 將時間戳添加到事件頭部
headers.put("timestamp",timeMills);
// 8. 將table的名字插入到事件頭部
headers.put("tableName", tableName);
return event;
}
/**
* 攔截批量事件
* @param events
* @return
*/
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}
@Override
public void close() { }
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new TimestampAndTableNameInterceptor ();
}
@Override
public void configure(Context context) {
}
}
}
這個攔截器主要用于處理從源(如 Kafka)接收的事件,特別是處理 JSON 格式的消息體。
它從每個事件的 JSON 消息體中提取特定的信息(時間戳和表名)并將這些信息添加到事件的 header 中,這對于后續(xù)的事件處理(如根據(jù)時間戳或表名路由事件)非常有用。
打包,并將帶有依賴的jar包放到flume的lib目錄下
2.4.7 通道測試
1、啟動zookeeper集群、kafka集群
2、啟動hadoop104上的flume,采集kafka_topic中的業(yè)務變更數(shù)據(jù)到hdfs
[atguigu@hadoop102 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_db.conf -Dflume.root.logger=INFO,console
3、生成模擬數(shù)據(jù)
[atguigu@hadoop102 db_log]$ java -jar gmall2020-mock-db-2021-11-14.jar
4、觀察hdfs上的目標路徑是否有增量表的數(shù)據(jù)出現(xiàn)
2.4.8 數(shù)據(jù)目標路徑的日期說明
仔細觀察,會發(fā)現(xiàn)目標路徑中的日期,并非模擬數(shù)據(jù)的業(yè)務日期,而是當前日期。這是由于maxwell輸出的json字符串中的ts字段的值,是數(shù)據(jù)的變動日期。而真實場景下,數(shù)據(jù)的業(yè)務日期與變動日期應當是一致的。
這張圖展示了一個數(shù)據(jù)流的架構,說明了如何從 MySQL 數(shù)據(jù)庫通過 Maxwell 和 Kafka 將數(shù)據(jù)流式傳輸?shù)?HDFS,同時使用 Flume 作為傳輸介質。Maxwell 作為 MySQL 的 binlog 復制器,捕獲 MySQL 數(shù)據(jù)庫的更改(如插入、更新和刪除操作)并將這些更改作為消息發(fā)布到 Kafka 隊列中。然后,F(xiàn)lume 從 Kafka 中讀取這些消息,并將它們傳輸?shù)?HDFS。
此處為了模擬真實環(huán)境,對maxwell源碼進行了改動,增加了一個參數(shù)mock_date,該參數(shù)的作用就是指定maxwell輸出json字符串的ts時間戳的日期,接下來進行測試。
1、修改maxwell配置文件config.properties,增加mock_date參數(shù),如下
該日期需和/opt/module/db_log/application.properties中的mock.date參數(shù)保持一致
mock_date=2020-06-14
注:該參數(shù)僅供學習使用,修改該參數(shù)后重啟maxwell才能生效。
2、重啟maxwell
[atguigu@hadoop102 bin]$ maxwell.sh restart
3、重新生成模擬數(shù)據(jù)
[atguigu@hadoop102 bin]$ cd /opt/module/db_log/
[atguigu@hadoop102 db_log]$ java -jar gmall2020-mock-db-2021-11-14.jar
4、觀察hdfs目標路徑日期是否正常
2.4.9 編寫業(yè)務數(shù)據(jù)變更flume采集啟動停止腳本
為方便使用,編寫一個啟動關閉業(yè)務數(shù)據(jù)變更采集的flume腳本
1、再用戶目錄下的bin目錄下編寫腳本f3.sh
[atguigu@hadoop102 bin]$ vim f3.sh
#!/bin/bash
case $1 in
"start")
echo " --------啟動 hadoop104 業(yè)務數(shù)據(jù)flume-------"
ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_db.conf >/dev/null 2>&1 &"
;;
"stop")
echo " --------停止 hadoop104 業(yè)務數(shù)據(jù)flume-------"
ssh hadoop104 "ps -ef | grep kafka_to_hdfs_db.conf | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac
這個腳本是一個Bash腳本,用于初始化數(shù)據(jù)庫中的所有增量表。它的主要功能是使用Maxwell工具來導入特定表的數(shù)據(jù)到Kafka中。Maxwell 是一個 MySQL binlog 到 Kafka 的轉換器,它可以捕獲數(shù)據(jù)庫的更改并將這些更改作為消息發(fā)送到 Kafka。
2、增加腳本權限
[atguigu@hadoop102 bin]$ chmod +x f3.sh
2.4.11 測試同步腳本
1、清理歷史數(shù)據(jù)
為方便查看結果,現(xiàn)在將hdfs上之前同步的增量表數(shù)據(jù)刪除
[atguigu@hadoop102 ~]$ hadoop fs -ls /origin_data/gmall/db | grep _inc | awk '{print $8}' | xargs hadoop fs -rm -r -f
2、執(zhí)行同步腳本
[atguigu@hadoop102 bin]$ mysql_to_kafka_inc_init.sh all
3、檢查同步結果
觀察hdfs上是否重新出現(xiàn)增量表數(shù)據(jù)
2.5 采用通道啟動/停止腳本
在/home/atguigu/bin目錄下創(chuàng)建腳本cluster.sh
[atguigu@hadoop102 bin]$ vim /home/atguigu/bin/cluster.sh
#!/bin/bash
case $1 in
"start"){
echo ================== 啟動 集群 ==================
#啟動 Zookeeper集群
zk.sh start
#啟動 Hadoop集群
hdp.sh start
#啟動 Kafka采集集群
kf.sh start
#啟動采集 Flume
f1.sh start
#啟動日志消費 Flume
f2.sh start
#啟動業(yè)務消費 Flume
f3.sh start
#啟動 maxwell
mxw.sh start
};;
"stop"){
echo ================== 停止 集群 ==================
#停止 Maxwell
mxw.sh stop
#停止 業(yè)務消費Flume
f3.sh stop
#停止 日志消費Flume
f2.sh stop
#停止 日志采集Flume
f1.sh stop
#停止 Kafka采集集群
kf.sh stop
#停止 Hadoop集群
hdp.sh stop
#停止 Zookeeper集群
zk.sh stop
};;
esac
腳本編輯后,賦予腳本執(zhí)行權限chmod+x cluster.sh
第 4 章:數(shù)倉準備
4.1 hive安裝
1、把apache-hive~bin.tar.gz上傳到linux的/opt/software目錄下
2、將/opt/software/目錄下的apache-hive~bin.tar.gz到/opt/module/目錄下面
[atguigu@hadoop102 software]$ tar -zxvf apache-hive-3.1.2-bin.tar.gz -C /opt/module/
3、修改解壓后的目錄名稱為hive
[atguigu@hadoop102 module]$ mv apache-hive-3.1.2-bin/ /opt/module/hive
4、修改/etc/profile.d/my_env.sh文件,將Hive的/bin目錄添加到環(huán)境變量
[atguigu@hadoop102 hive]$ sudo vim /etc/profile.d/my_env.sh
……
#HIVE_HOME
export HIVE_HOME=/opt/module/hive
export PATH=$PATH:$HIVE_HOME/bin
[atguigu@hadoop102 hive]$ source /etc/profile
5、在hive根目錄下,使用/bin目錄中的schematool命令初始化hive自帶的derby元數(shù)據(jù)庫
[atguigu@hadoop102 hive]$ bin/schematool -dbType derby -initSchema
6、執(zhí)行上述初始化元數(shù)據(jù)庫時,會發(fā)現(xiàn)存在jar包沖突問題,現(xiàn)象如下
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/hive/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Metastore connection URL: jdbc:derby:;databaseName=metastore_db;create=true
Metastore Connection Driver : org.apache.derby.jdbc.EmbeddedDriver
Metastore connection User: APP
Starting metastore schema initialization to 3.1.0
Initialization script hive-schema-3.1.0.derby.sql
解決jar沖突問題,只需要將hive的/lib目錄下的log4j-slf4j-impl-2.10.0.jar重命名即可
[atguigu@hadoop102 hive]$ mv lib/log4j-slf4j-impl-2.10.0.jar lib/log4j-slf4j-impl-2.10.0.back
4.2 將hive元數(shù)據(jù)配置到mysql
4.2.1 拷貝驅動
將mysql的jdbc驅動拷貝到hive的lib目錄下
[atguigu@hadoop102 software]$ cp mysql-connector-java-5.1.37.jar /opt/module/hive/lib
4.2.2 配置metastore到mysql
在$hive_home/conf目錄下新建hive-site.xml文件
[atguigu@hadoop102 hive]$ vim conf/hive-site.xml
添加如下內容
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- jdbc連接的URL -->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hadoop102:3306/metastore?useSSL=false</value>
</property>
<!-- jdbc連接的Driver-->
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<!-- jdbc連接的username-->
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<!-- jdbc連接的password -->
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>你的密碼</value>
</property>
<!-- Hive默認在HDFS的工作目錄 -->
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<!-- Hive元數(shù)據(jù)存儲的驗證 -->
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
<!-- 元數(shù)據(jù)存儲授權 -->
<property>
<name>hive.metastore.event.db.notification.api.auth</name>
<value>false</value>
</property>
</configuration>
這個配置文件的主要作用是設置 Hive 與其元數(shù)據(jù)存儲(通常是一個 RDBMS,如 MySQL)的連接配置,定義 Hive 在 HDFS 中的存儲位置,以及配置一些與元數(shù)據(jù)和安全性相關的選項。
4.2.3 hive初始化元數(shù)據(jù)庫
1、登錄mysql
[atguigu@hadoop102 module]$ mysql -uroot -p你的密碼
2、新建hive元數(shù)據(jù)庫
mysql> create database metastore;
mysql> quit;
3、初始化hive元數(shù)據(jù)庫
[atguigu@hadoop102 hive]$ bin/schematool -initSchema -dbType mysql -verbose
4.2.4 啟動hive
1、啟動hive客戶端
[atguigu@hadoop102 hive]$ bin/hive
2、查看一下數(shù)據(jù)庫
hive (default)> show databases;
OK
database_name
default
4.2.5 修改元數(shù)據(jù)庫字符集
hive元數(shù)據(jù)庫的字符集默認為latin1,由于其不支持中文字符,故若建表語句中包含中文注釋,會出現(xiàn)亂碼現(xiàn)象。如需要解決亂碼問題,需做一下修改。
1、修改hive元數(shù)據(jù)庫中存儲注釋的字符的字符集為utf-8
1)字段注釋
mysql> alter table metastore.COLUMNS_V2 modify column COMMENT varchar(256) character set utf8;
2)表注釋文章來源:http://www.zghlxwxcb.cn/news/detail-814186.html
mysql> alter table metastore.TABLE_PARAMS modify column PARAM_VALUE mediumtext character set utf8;
2、修改hive-site.xml中jdbc url,如下文章來源地址http://www.zghlxwxcb.cn/news/detail-814186.html
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hadoop102:3306/metastore?useSSL=false&useUnicode=true&characterEncoding=UTF-8</value>
</property>
到了這里,關于大數(shù)據(jù)開發(fā)之電商數(shù)倉(hadoop、flume、hive、hdfs、zookeeper、kafka)的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!