[gpb@hadoop104 ~]$ cd /opt/module/flume/
[gpb@hadoop104 flume]$ cd job/
[gpb@hadoop104 job]$ rm file_to_kafka.conf
com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder文章來源地址http://www.zghlxwxcb.cn/news/detail-695973.html
#定義組件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
#配置sources
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.kafka.consumer.group.id=topic_log
a1.sources.r1.batchSize = 2000
a1.sources.r1.batchDurationMillis = 1000
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder
#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.useDualCheckpoints = false
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 3
#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
#控制輸出文件類型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip
#組裝
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.1.3 日志消費(fèi)Flume配置實(shí)操
1)創(chuàng)建Flume配置文件
在hadoop104節(jié)點(diǎn)的Flume的job目錄下創(chuàng)建kafka_to_hdfs_log.conf
[atguigu@hadoop104 flume]$ vim job/kafka_to_hdfs_log.conf
2)配置文件內(nèi)容如下
#定義組件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder
#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
#控制輸出文件類型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip
#組裝
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注:配置優(yōu)化
1)FileChannel優(yōu)化
通過配置dataDirs指向多個(gè)路徑,每個(gè)路徑對(duì)應(yīng)不同的硬盤,增大Flume吞吐量。
官方說明如下:
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
checkpointDir和backupCheckpointDir也盡量配置在不同硬盤對(duì)應(yīng)的目錄中,保證checkpoint壞掉后,可以快速使用backupCheckpointDir恢復(fù)數(shù)據(jù)
2)HDFS Sink優(yōu)化
(1)HDFS存入大量小文件,有什么影響?
元數(shù)據(jù)層面:每個(gè)小文件都有一份元數(shù)據(jù),其中包括文件路徑,文件名,所有者,所屬組,權(quán)限,創(chuàng)建時(shí)間等,這些信息都保存在Namenode內(nèi)存中。所以小文件過多,會(huì)占用Namenode服務(wù)器大量內(nèi)存,影響Namenode性能和使用壽命
計(jì)算層面:默認(rèn)情況下MR會(huì)對(duì)每個(gè)小文件啟用一個(gè)Map任務(wù)計(jì)算,非常影響計(jì)算性能。同時(shí)也影響磁盤尋址時(shí)間。
(2)HDFS小文件處理
官方默認(rèn)的這三個(gè)參數(shù)配置寫入HDFS后會(huì)產(chǎn)生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0幾個(gè)參數(shù)綜合作用,效果如下:
(1)文件在達(dá)到128M時(shí)會(huì)滾動(dòng)生成新文件
(2)文件創(chuàng)建超3600秒時(shí)會(huì)滾動(dòng)生成新文件
3)編寫Flume攔截器
(1)數(shù)據(jù)漂移問題
(2)在com.atguigu.gmall.flume.interceptor包下創(chuàng)建TimestampInterceptor類
package com.atguigu.gmall.flume.interceptor;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
public class TimestampInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
//1、獲取header和body的數(shù)據(jù)
Map<String, String> headers = event.getHeaders();
String log = new String(event.getBody(), StandardCharsets.UTF_8);
//2、將body的數(shù)據(jù)類型轉(zhuǎn)成jsonObject類型(方便獲取數(shù)據(jù))
JSONObject jsonObject = JSONObject.parseObject(log);
//3、header中timestamp時(shí)間字段替換成日志生成的時(shí)間戳(解決數(shù)據(jù)漂移問題)
String ts = jsonObject.getString("ts");
headers.put("timestamp", ts);
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
for (Event event : list) {
intercept(event);
}
return list;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new TimestampInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
(3)重新打包
(4)需要先將打好的包放入到hadoop104的/opt/module/flume/lib文件夾下面。
2.1.4 日志消費(fèi)Flume測(cè)試
1)啟動(dòng)Zookeeper、Kafka集群
2)啟動(dòng)日志采集Flume
[atguigu@hadoop102 ~]$ f1.sh start
3)啟動(dòng)hadoop104的日志消費(fèi)Flume
[atguigu@hadoop104 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf -Dflume.root.logger=info,console
4)生成模擬數(shù)據(jù)
[atguigu@hadoop102 ~]$ lg.sh
5)觀察HDFS是否出現(xiàn)數(shù)據(jù)
2.1.5 日志消費(fèi)Flume啟停腳本
若上述測(cè)試通過,為方便,此處創(chuàng)建一個(gè)Flume的啟停腳本。
1)在hadoop102節(jié)點(diǎn)的/home/atguigu/bin目錄下創(chuàng)建腳本f2.sh
[atguigu@hadoop102 bin]$ vim f2.sh
在腳本中填寫如下內(nèi)容
#!/bin/bash
case $1 in
"start")
echo " --------啟動(dòng) hadoop104 日志數(shù)據(jù)flume-------"
ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_log.conf >/dev/null 2>&1 &"
;;
"stop")
echo " --------停止 hadoop104 日志數(shù)據(jù)flume-------"
ssh hadoop104 "ps -ef | grep kafka_to_hdfs_log | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac
2)增加腳本執(zhí)行權(quán)限
[atguigu@hadoop102 bin]$ chmod 777 f2.sh
3)f2啟動(dòng)
[atguigu@hadoop102 module]$ f2.sh start
4)f2停止
[atguigu@hadoop102 module]$ f2.sh stop
文章來源:http://www.zghlxwxcb.cn/news/detail-695973.html
到了這里,關(guān)于離線數(shù)倉同步數(shù)據(jù)1的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!