前言:
提示:這里簡述我使用的版本情況:
ubuntu16.04
hbase1.1.5
hive1.2.1
sqoop1.4.6
flume1.7.0
項目所使用的參考文檔和代碼資源和部分數(shù)據(jù)
網(wǎng)盤鏈接:鏈接:https://pan.baidu.com/s/1TIKHMBmEFPiOv48pxBKn2w
提取碼:0830
基本概述
為更好的理解項目架構,對項目使用的一些服務補充一些基本概述:
Sqoop概述
什么是Sqoop
Sqoop是Apache旗下的一款開源工具,2013年獨立成為Apache的一個頂級開源項目
Sqoop主要用于在Hadoop和關系數(shù)據(jù)庫或大型機之間傳輸數(shù)據(jù),可以使用Sqoop工具將數(shù)據(jù)從關系數(shù)據(jù)庫管理系統(tǒng)導入(import)到Hadoop分布式文件系統(tǒng)中,或者將Hadoop中的數(shù)據(jù)轉換導出(export)到關系數(shù)據(jù)庫管理系統(tǒng),功能圖如下
目前sqoop主要分為sqoop1和sqoop2兩個版本,其中版本號為1.4.x屬于sqoop1,而版本號為1.99.x的屬于Sqoop2.這兩個版本開發(fā)時的定位方向不同,體系結構具有很大的差異,因此它們之間互不兼容。
Sqoop1功能結構簡單,部署方便。提供命令行操作方式,主要適用于系統(tǒng)服務管理人員進行簡單的數(shù)據(jù)遷移操作,該項目只用到sqoop1解決數(shù)據(jù)遷移問題,因此我們使用sqoop1就可以完成基本的需求
Flume概述
什么是Flume
Flume原是Cloudera公司提供的一個高可用、高可靠、分布式海量日志采集、聚合和傳輸系統(tǒng),后來納入到Apache旗下,作為一個頂級開源項目。Apache Flume 不僅只限于日志數(shù)據(jù)的采集,由于Flume采集的數(shù)據(jù)源是可定制的,因此Flume還課用于傳輸大量事件數(shù)據(jù),包括但不限于網(wǎng)絡流量數(shù)據(jù)、社交媒體生成的數(shù)據(jù)以及幾乎任何可能的數(shù)據(jù)源
當前Flume分成兩個版本:Flume0.9x統(tǒng)稱Flume-og和Flume1.x版本,統(tǒng)稱Flume-ng,早期Flume-og存在設計不合理,納入Apache旗下后對Flume代碼進行重構,進行補充和加強該項目也是使用Flume-ng版本進行Flume開發(fā)
Flume運行機制
Agent:Agent是Flume中的核心組件,用來收集數(shù)據(jù)。一個Agent就是一個JVM進程,它是Flume中最小的獨立運行的單元。
Flume的核心是把數(shù)據(jù)從數(shù)據(jù)源(如Web Server)通過數(shù)據(jù)采集器(Source)收集過來,
再將收集的數(shù)據(jù)通過緩沖通道(Channel)匯集到指定的接收器(Sink)。
為什么需要flume
1、當大量的數(shù)據(jù)在同一個時間要寫入HDFS時,每次一個文件被創(chuàng)建或者分配一個新的塊,都會在namenode發(fā)生很復雜的操作,主節(jié)點壓力很大,會造成很多問題,比如寫入時間嚴重延遲、寫入失敗等。
2、flume是一個靈活的分布式系統(tǒng),易擴展,高度可定制化。
3、flume中的核心組件Agent。一個Agent可以連接一個或者多個Agent,可以從一個或者多個Agent上收集數(shù)據(jù)。多個Agent相互連接,可以建立流作業(yè),在Agent鏈上,就能將數(shù)據(jù)從一個位置移動到另一個地方(HDFS、HBase等)。
HIve概述
Hive起源于Facebook,facebook公司有著大量的日志數(shù)據(jù),而Hadoop是一個實現(xiàn)了Mapreduce模式開源的分布式并行計算框架??梢暂p松處理大規(guī)模的數(shù)據(jù)量,Mapreduce程序雖然對于熟悉Java語言的工程師來說比較容易開發(fā),但是對于其他語言使用者來說難度較大,為此Facebook開發(fā)團隊想到設計一種使用Sql語言就能夠對日志數(shù)據(jù)查詢分析的工具,大大節(jié)省開發(fā)人員的學習成本,HIve則誕生于此
什么是Hive
Hive是建立在Hadoop文件系統(tǒng)上的數(shù)據(jù)倉庫,它提供了一系列工具,能夠對存儲在HDFS中的數(shù)據(jù)進行數(shù)據(jù)提取、轉換和加載(ETL),這是一種可以存儲、查詢和分析存儲在Hadoop中的大規(guī)模數(shù)據(jù)的工具。
Hive定義了簡單的類SQL查詢語言,可以將結構化的數(shù)據(jù)文件映射為一張數(shù)據(jù)表,允許熟悉SQL的用戶查詢數(shù)據(jù),也允許熟悉MapReduce的開發(fā)者開發(fā)定義mapper和reducer來處理復雜的分析工作,這樣Hive的有事更加明顯
(1)用戶接口:主要分為3個,分別是CLI、JDBC/ODBC 和 WebUI。其中,CLI 即 Shell終端命令行,它是最常用的方式。JDBC/ODBC是Hive的
Java實現(xiàn),與使用傳統(tǒng)數(shù)據(jù)庫JDBC的方式類似,WebUI指的是通過瀏覽器訪問Hive。
(2)跨語言服務(Thrift Server):Thrift是Facebook開發(fā)的一個軟件框架,可以用來進行可擴展且跨語言的服務。Hive集成了該服務﹐能讓不同
的編程語言調用Hive的接口。
(3)底層的驅動引擎:主要包含編譯器(Compiler),優(yōu)化器(Optimizer)和執(zhí)行器(Executor ),它們用于完成HQL查詢語句從詞法分析、語法分析、編譯、優(yōu)化以及查詢計劃的生成,生成的查詢計劃存儲在HDFS中,并在隨后由MapReduce調用執(zhí)行。
(4)元數(shù)據(jù)存儲系統(tǒng)(Metastore):Hive中的元數(shù)據(jù)通常包含表名,列、分區(qū)及其相關屬性,表數(shù)據(jù)所在目錄的位置信息,Metastore默認存在自帶的 Derby數(shù)據(jù)庫中。由于Derby數(shù)據(jù)庫不適合多用戶操作,并且數(shù)據(jù)存儲目錄不固定,不方便管理,因此,通常都將元數(shù)據(jù)存儲在MySQL數(shù)據(jù)庫。
系統(tǒng)背景:
近年來,隨著社會的不斷發(fā)展,人們對于海量數(shù)據(jù)的挖掘和運用越來越重視,互聯(lián)網(wǎng)是面向全社會公眾進行信息交流的平臺,已成為了收集信息的最佳渠道并逐步進入傳統(tǒng)的流通領域。同時,伴隨著大數(shù)據(jù)技術的創(chuàng)新與應用,進一步為人們進行大數(shù)據(jù)統(tǒng)計分析提供了便利。
大數(shù)據(jù)信息的統(tǒng)計分析可以為企業(yè)決策者提供充實的依據(jù)。例如,通過對某網(wǎng)站日志數(shù)據(jù)統(tǒng)計分析,可以得出網(wǎng)站的日訪問量,從而得出網(wǎng)站的歡迎程度;通過對移動APP的下載數(shù)據(jù)量進行統(tǒng)計分析,可以得出應用程序的受歡迎程度﹐甚至還可以通過不同維度(區(qū)域、時間段、下載方式等)進行進一步更深層次的數(shù)據(jù)分析,為運營分析與推廣決策提供可靠的參照數(shù)據(jù)
為了更清晰地了解系統(tǒng)日志數(shù)據(jù)統(tǒng)計分析的流程及架構,溝通一張架構圖來了解一下
三個Nginx可能產生大量的日志文件,然后通過Flume分別采集這些日志文件,然后將采集的日志文件放在HDFS上,我們可以寫Mapreduce程序來對采集后的日志進行預處理,因為從服務器采集過來的數(shù)據(jù)格式不能滿足我們的需求,可能有一些臟數(shù)據(jù),我們要通過Mapreduce對這些文件進行清理,把它們編程結構化的數(shù)據(jù),然后我們可以把清洗后的數(shù)據(jù)加載到Hive倉庫里面??梢詫η逑春蟮臄?shù)據(jù)進行數(shù)據(jù)分析,這一步是大數(shù)據(jù)中最重要的工作.分析之后得出我們的結果,可以通過sqoop這個遷移工具將我們的結果導出到mysql,導出到mysql之后我們可以通過web技術對我們的結果進行一個可視化的展示
在整個流程中,系統(tǒng)的數(shù)據(jù)分析并不是一次性大的,而是按照一定頻率反復計算,因而整個處理鏈條中的各個環(huán)節(jié)需要按照一個的先后關系緊密銜接,即大量任務單元的管理調度。
模塊開發(fā)
數(shù)據(jù)采集
在該項目中,對數(shù)據(jù)采集模塊的可靠性,容錯能力的要求通常不會非常嚴苛,因此使用通用的Flume日志采集框架完全可以滿足數(shù)據(jù)采集的需求
使用Flume搭建日志采集系統(tǒng)
a1.sources=s1
a1.channels=c1
a1.sinks=k1
a1.sources.s1.type=TAILDIR
a1.sources.s1.positionFile=/root/export/data/nginx/taildir_position.json
a1.sources.s1.filegroups=f1 f2
a1.sources.s1.filegroups.f1=/root/export/data/nginx/test1/access.log
a1.sources.s1.filegroups.f2=/root/export/data/nginx/test2/.*log.*
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://192.168.80.140:9000/weblog/%Y%m%d
#上傳文件的前綴
a1.sinks.k1.hdfs.filePrefix = logs-
#是否按照時間滾動文件夾
#a1.sinks.k1.hdfs.round =true
#多少時間單位創(chuàng)建一個新的文件夾
#a1.sinks.k1.hdfs.roundValue = 1
#重新定義時間單位
#a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 1000
#設置文件類型,可支持壓縮
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的文件
a1.sinks.k1.hdfs.rollInterval =0
#設置每個文件的滾動大小
a1.sinks.k1.hdfs.rollSize = 10485760
#文件的滾動與Event數(shù)量無關
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.threadsPoolSize=10
a1.sinks.k1.hdfs.callTimeout=30000
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=1000
a1.sources.s1.channels=c1
a1.sinks.k1.channel=c1
我們知道通過Flume系統(tǒng)采集后的網(wǎng)站流量日志數(shù)據(jù)會匯總到HDFS上進行保存(這里假設保存目錄為 /root/export/data/nginx/test1/access.log)
采集后的數(shù)據(jù):
也可以用現(xiàn)成的采集后的數(shù)據(jù),在網(wǎng)盤資源里有
采集后的數(shù)據(jù)每個字段的含義
數(shù)據(jù)預處理
在收集的日志文件中,通常不能直接將日志文件進行數(shù)據(jù)分析,這是因為日志文件中有許多不合法的數(shù)據(jù),要對不合法的數(shù)據(jù)進行過濾,清洗出無意義的數(shù)據(jù)信息,并且將原始日志中的數(shù)據(jù)格式轉換成利于后續(xù)數(shù)據(jù)分析時規(guī)范的格式,根據(jù)統(tǒng)計需求,篩選出不同主題的數(shù)據(jù)
實現(xiàn)數(shù)據(jù)預處理
1.創(chuàng)建Maven項目,添加相關依賴
添加相關依賴
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13-beta-3</version>
<scope>compile</scope>
</dependency>
</dependencies>
創(chuàng)建JavaBean對象,封裝日志記錄(WebLogBean.java)
package cn.itcast.mr.weblog.bean;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 對接外部數(shù)據(jù)的層,表結構定義最好跟外部數(shù)據(jù)源保持一致
* 術語: 貼源表
* @author itcast
*
*/
public class WebLogBean implements Writable {
private boolean valid = true;// 判斷數(shù)據(jù)是否合法
private String remote_addr;// 記錄客戶端的ip地址
private String remote_user;// 記錄客戶端用戶名稱,忽略屬性"-"
private String time_local;// 記錄訪問時間與時區(qū)
private String request;// 記錄請求的url與http協(xié)議
private String status;// 記錄請求狀態(tài);成功是200
private String body_bytes_sent;// 記錄發(fā)送給客戶端文件主體內容大小
private String http_referer;// 用來記錄從那個頁面鏈接訪問過來的
private String http_user_agent;// 記錄客戶瀏覽器的相關信息
//設置屬性值
public void set(boolean valid,String remote_addr, String remote_user, String time_local, String request, String status, String body_bytes_sent, String http_referer, String http_user_agent) {
this.valid = valid;
this.remote_addr = remote_addr;
this.remote_user = remote_user;
this.time_local = time_local;
this.request = request;
this.status = status;
this.body_bytes_sent = body_bytes_sent;
this.http_referer = http_referer;
this.http_user_agent = http_user_agent;
}
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getRemote_user() {
return remote_user;
}
public void setRemote_user(String remote_user) {
this.remote_user = remote_user;
}
public String getTime_local() {
return this.time_local;
}
public void setTime_local(String time_local) {
this.time_local = time_local;
}
public String getRequest() {
return request;
}
public void setRequest(String request) {
this.request = request;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getBody_bytes_sent() {
return body_bytes_sent;
}
public void setBody_bytes_sent(String body_bytes_sent) {
this.body_bytes_sent = body_bytes_sent;
}
public String getHttp_referer() {
return http_referer;
}
public void setHttp_referer(String http_referer) {
this.http_referer = http_referer;
}
public String getHttp_user_agent() {
return http_user_agent;
}
public void setHttp_user_agent(String http_user_agent) {
this.http_user_agent = http_user_agent;
}
public boolean isValid() {
return valid;
}
public void setValid(boolean valid) {
this.valid = valid;
}
/**
* 重寫toString()方法,使用Hive默認分隔符進行分隔,為后期導入Hive表提供便利
* @return
*/
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.valid);
sb.append("\001").append(this.getRemote_addr());
sb.append("\001").append(this.getRemote_user());
sb.append("\001").append(this.getTime_local());
sb.append("\001").append(this.getRequest());
sb.append("\001").append(this.getStatus());
sb.append("\001").append(this.getBody_bytes_sent());
sb.append("\001").append(this.getHttp_referer());
sb.append("\001").append(this.getHttp_user_agent());
return sb.toString();
}
/**
* 序列化方法
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
this.valid = in.readBoolean();
this.remote_addr = in.readUTF();
this.remote_user = in.readUTF();
this.time_local = in.readUTF();
this.request = in.readUTF();
this.status = in.readUTF();
this.body_bytes_sent = in.readUTF();
this.http_referer = in.readUTF();
this.http_user_agent = in.readUTF();
}
/**
* 反序列化方法
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeBoolean(this.valid);
out.writeUTF(null==remote_addr?"":remote_addr);
out.writeUTF(null==remote_user?"":remote_user);
out.writeUTF(null==time_local?"":time_local);
out.writeUTF(null==request?"":request);
out.writeUTF(null==status?"":status);
out.writeUTF(null==body_bytes_sent?"":body_bytes_sent);
out.writeUTF(null==http_referer?"":http_referer);
out.writeUTF(null==http_user_agent?"":http_user_agent);
}
}
編寫MapReduce程序,執(zhí)行數(shù)據(jù)預處理
package cn.itcast.mr.weblog.preprocess;
import cn.itcast.mr.weblog.bean.WebLogBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
/**
* 處理原始日志,過濾出真實請求數(shù)據(jù),轉換時間格式,對缺失字段填充默認值,對記錄標記valid和invalid
*/
public class WeblogPreProcess {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WeblogPreProcess.class);
job.setMapperClass(WeblogPreProcessMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("d:/weblog/input"));
FileOutputFormat.setOutputPath(job, new Path("d:/weblog/output"));
job.setNumReduceTasks(0);
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
public static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
// 用來存儲網(wǎng)站url分類數(shù)據(jù)
Set<String> pages = new HashSet<String>();
Text k = new Text();
NullWritable v = NullWritable.get();
/**
* 設置初始化方法,加載網(wǎng)站需要分析的url分類數(shù)據(jù),存儲到MapTask的內存中,用來對日志數(shù)據(jù)進行過濾
* 如果用戶請求的資源是以下列形式,就表示用戶請求的是合法資源。
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
pages.add("/about");
pages.add("/black-ip-list/");
pages.add("/cassandra-clustor/");
pages.add("/finance-rhive-repurchase/");
pages.add("/hadoop-family-roadmap/");
pages.add("/hadoop-hive-intro/");
pages.add("/hadoop-zookeeper-intro/");
pages.add("/hadoop-mahout-roadmap/");
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//獲取一行數(shù)據(jù)
String line = value.toString();
//調用解析類WebLogParser解析日志數(shù)據(jù),最后封裝為WebLogBean對象
WebLogBean webLogBean = WebLogParser.parser(line);
if (webLogBean != null) {
// 過濾js/圖片/css等靜態(tài)資源
WebLogParser.filtStaticResource(webLogBean, pages);
k.set(webLogBean.toString());
context.write(k, v);
}
}
}
}
定義WebLogParser類用于解析讀取每行日志信息,并將解析結果封裝為WebLogBean對象
package cn.itcast.mr.weblog.preprocess;
import cn.itcast.mr.weblog.bean.WebLogBean;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Locale;
import java.util.Set;
public class WebLogParser {
//定義時間格式
public static SimpleDateFormat df1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);
public static SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
public static WebLogBean parser(String line) {
WebLogBean webLogBean = new WebLogBean();
//把一行數(shù)據(jù)以空格字符切割并存入數(shù)組arr中
String[] arr = line.split(" ");
//如果數(shù)組長度小于等于11,說明這條數(shù)據(jù)不完整,因此可以忽略這條數(shù)據(jù)
if (arr.length > 11) {
//滿足條件的數(shù)據(jù)逐個賦值給webLogBean對象
webLogBean.setRemote_addr(arr[0]);
webLogBean.setRemote_user(arr[1]);
String time_local = formatDate(arr[3].substring(1));
if(null==time_local || "".equals(time_local)) time_local="-invalid_time-";
webLogBean.setTime_local(time_local);
webLogBean.setRequest(arr[6]);
webLogBean.setStatus(arr[8]);
webLogBean.setBody_bytes_sent(arr[9]);
webLogBean.setHttp_referer(arr[10]);
//如果useragent元素較多,拼接useragent
if (arr.length > 12) {
StringBuilder sb = new StringBuilder();
for(int i=11;i<arr.length;i++){
sb.append(arr[i]);
}
webLogBean.setHttp_user_agent(sb.toString());
} else {
webLogBean.setHttp_user_agent(arr[11]);
}
if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大于400,HTTP錯誤
webLogBean.setValid(false);
}
if("-invalid_time-".equals(webLogBean.getTime_local())){
webLogBean.setValid(false);
}
} else {
webLogBean=null;
}
return webLogBean;
}
//添加標識
public static void filtStaticResource(WebLogBean bean, Set<String> pages) {
if (!pages.contains(bean.getRequest())) {
bean.setValid(false);
}
}
//格式化時間方法
public static String formatDate(String time_local) {
try {
return df2.format(df1.parse(time_local));
} catch (ParseException e) {
return null;
}
}
}
該項目使用本地模式運行,只需要在編寫MapReduce程序完成后,在本地D:/weblog/input目錄中放入將要清洗的日志文件,再執(zhí)行程序就即可
(意思就是將我們剛才flume采集的文件放到input下)
程序執(zhí)行結果
在程序指定的輸出路徑可以看到這些
查看part-m-000的內容
這些黑黑的是間隔符,可以看出和之前flume采集的數(shù)據(jù)的差比,有很多flase的數(shù)據(jù)段
數(shù)據(jù)倉庫開發(fā)
數(shù)據(jù)預處理完成后,就需要將MapReduce程序的輸出結果文件上傳至HDFS中,并使用HIVE建立相應的表結構于上床的輸出結果文件產生映射關系。
我們先將程序執(zhí)行結果文件
上傳到hadoop集群中的主服務器,通過lrzsz拖拽上傳到/root/weblog
這個路徑可以根據(jù)自己需求修改,再上傳到hdfs目錄中(如/weblog/preprocessed)
hadoop fs -put part-m-0000 /weblog/preprocessed
如果出現(xiàn)下述情況
輸入hadoop dfsadmin -safemode leave
即可
上傳成功
啟動Hive
創(chuàng)建數(shù)據(jù)倉庫
hive > create database weblog;
使用我們剛才創(chuàng)建的數(shù)據(jù)庫
hive > use weblog;
創(chuàng)建表
hive > create table ods_weblog_origin(
valid string,
remote_addr string,
remote_user string,
time_local string,
request string,
status string,
body_bytes_sent string,
http_referer string,
http_user_agent string ) partitioned by(datestr string) row format delimited fields terminated by '\001';
加載數(shù)據(jù)到hive表
把我們數(shù)據(jù)清洗后的數(shù)據(jù)加載到表中
hive >load data inpath '/weblog/preprocessed/' overwrite into table ods_weblog_origin partition(datestr='20210617');
這里注意路徑是我們剛才上傳mapreduce程序執(zhí)行結果文件的路徑,路徑前后不要出現(xiàn)空格
查詢
select * from ods_weblog_origin;
這段指令執(zhí)行后我們可以看到的結果如下
創(chuàng)建明細表ods_weblog_detail
hive > create table ods_weblog_detail(
valid string, --有效標識
remote_addr string, --來源IP
remote_user string, --用戶標識
time_local string, --訪問完整時間
daystr string, --訪問日期
timestr string, --訪問時間
month string, --訪問月
day string, --訪問日
hour string, --訪問時
request string, --請求的url
status string, --響應碼
body_bytes_sent string, --傳輸字節(jié)數(shù)
http_referer string, --來源url
ref_host string, --來源的host
ref_path string, --來源的路徑
ref_query string, --來源參數(shù)query
ref_query_id string, --來源參數(shù)query值
http_user_agent string --客戶終端標識
) partitioned by(datestr string);
18、創(chuàng)建中間臨時表t_ods_tmp_referurl
hive > create table t_ods_tmp_referurl as SELECT a.*,b.*
FROM ods_weblog_origin a LATERAL VIEW parse_url_tuple(regexp_replace(http_referer, "\"", ""), 'HOST', 'PATH','QUERY', 'QUERY:id') b as host, path, query, query_id;
創(chuàng)建臨時中間表t_ods_tmp_detail
hive > create table t_ods_tmp_detail as select b.*,substring(time_local,0,10) as daystr,
substring(time_local,12) as tmstr,
substring(time_local,6,2) as month,
substring(time_local,9,2) as day,
substring(time_local,11,3) as hour
from t_ods_tmp_referurl b;
加載數(shù)據(jù)到明細寬表前啟用動態(tài)分區(qū)
hive > set hive.exec.dynamic.partition=true;
hive > set hive.exec.dynamic.partition.mode=nonstrict;
生成明細寬表 向ods_weblog_detail表,加載數(shù)據(jù)
hive > insert overwrite table ods_weblog_detail partition(datestr)
select distinct otd.valid,otd.remote_addr,otd.remote_user,
otd.time_local,otd.daystr,otd.tmstr,otd.month,otd.day,otd.hour,
otr.request,otr.status,otr.body_bytes_sent,
otr.http_referer,otr.host,otr.path,
otr.query,otr.query_id,otr.http_user_agent,otd.daystr
from t_ods_tmp_detail as otd,t_ods_tmp_referurl as otr
where otd.remote_addr=otr.remote_addr
and otd.time_local=otr.time_local
and otd.body_bytes_sent=otr.body_bytes_sent
and otd.request=otr.request;
查看HDFS的WEB UI界面的ods_weblog_detail文件夾
統(tǒng)計每一天的PV量
# 創(chuàng)建表dw_pvs_everyday
hive > create table dw_pvs_everyday(pvs bigint,month string,day string);
#提取“day”字段
hive > insert into table dw_pvs_everyday
select count(*) as pvs,owd.month as month,owd.day as day
from ods_weblog_detail owd
group by owd.month,owd.day;
查看表dw_pvs_everyday中的數(shù)據(jù)
Select * from dw_pvs_everyday;
實現(xiàn)人均瀏覽量
創(chuàng)建維度表dw_avgpv_user_everyday
hive > create table dw_avgpv_user_everyday( day string,avgpv string);
向表dw_avgpv_user_everyday中插入數(shù)據(jù)
hive > insert into table dw_avgpv_user_everyday
select '2013-09-18',sum(b.pvs)/count(b.remote_addr) from
(select remote_addr,count(1) as pvs from ods_weblog_detail where
datestr='2013-09-18' group by remote_addr) b;
查看表dw_avgpv_user_everyday中的數(shù)據(jù)
Select * from dw_avgpv_user_everyday;
數(shù)據(jù)導出
通過SQLyog工具遠程連接集群主服務器的MySQL服務
這里要注意的是鏈連接數(shù)據(jù)庫出現(xiàn)Access denied for user ‘root‘
補充說明:當別的機子(IP )通過客戶端的方式在沒有授權的情況下是無法連接 MySQL 數(shù)據(jù)庫的,如果需要遠程連接 Linux 系統(tǒng)上的 MySQL 時,必須為其 IP 和 具體用戶 進行 授權 。一般 root 用戶不會提供給開發(fā)者。如:使用 Windows 上的 SQLyog 圖形化管理工具連接 Linux 上的 MySQL 數(shù)據(jù)庫,必須先對其進行授權。
解決方法的參考鏈接https://blog.csdn.net/aotongkeji/article/details/123155896
然后測試連接
然后我們可以右擊創(chuàng)建數(shù)據(jù)庫
也可以
CREATE DATABASE if NOT EXISTS sqoopdb;
如圖所在空白輸入代碼 執(zhí)行操作
創(chuàng)建七日人均瀏覽量表t_avgpv_num
mysql > create table `t_avgpv_num` (
`dateStr` varchar(255) DEFAULT NULL,
`avgPvNum` decimal(6,2) DEFAULT NULL
) ENGINE=MyISAM DEFAULT CHARSET=utf8;
Sqoop導出數(shù)據(jù)
這一步是將我們的數(shù)據(jù)加載到mysql數(shù)據(jù)庫中
sqoop export \
--connect jdbc:mysql://192.168.80.140:3306/sqoopdb \
--username hive\
--password hive\
--table t_avgpv_num \
--columns "dateStr,avgPvNum" \
--fields-terminated-by '\001' \
--export-dir /user/hive/warehouse/weblog.db/dw_avgpv_user_everyday;
日志分析系統(tǒng)報表展示
將我們的數(shù)據(jù)庫數(shù)據(jù)使用web技術轉化成可視化數(shù)據(jù)
這部分代碼太多了,就不放在這里了
所需代碼已經在網(wǎng)盤中文章來源:http://www.zghlxwxcb.cn/news/detail-402697.html
數(shù)據(jù)庫名字和密碼和數(shù)據(jù)庫驅動位置
根據(jù)自己情況修改
然后點擊run as
輸入tomcat7:run
然后在瀏覽器上輸入http://localhost:8080/index.html
即可查看可視化結果文章來源地址http://www.zghlxwxcb.cn/news/detail-402697.html
到了這里,關于大數(shù)據(jù)綜合項目--網(wǎng)站流量日志數(shù)據(jù)分析系統(tǒng)(詳細步驟和代碼)的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!