国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Hadoop3.0大數(shù)據(jù)處理學習4(案例:數(shù)據(jù)清洗、數(shù)據(jù)指標統(tǒng)計、任務腳本封裝、Sqoop導出Mysql)

這篇具有很好參考價值的文章主要介紹了Hadoop3.0大數(shù)據(jù)處理學習4(案例:數(shù)據(jù)清洗、數(shù)據(jù)指標統(tǒng)計、任務腳本封裝、Sqoop導出Mysql)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

案例需求分析

直播公司每日都會產(chǎn)生海量的直播數(shù)據(jù),為了更好地服務主播與用戶,提高直播質(zhì)量與用戶粘性,往往會對大量的數(shù)據(jù)進行分析與統(tǒng)計,從中挖掘商業(yè)價值,我們將通過一個實戰(zhàn)案例,來使用Hadoop技術來實現(xiàn)對直播數(shù)據(jù)的統(tǒng)計與分析。下面是簡化的日志文件,詳細的我會更新在Gitee hadoop_study/hadoopDemo1 · Huathy/study-all/

{"id":"1580089010000","uid":"12001002543","nickname":"jack2543","gold":561,"watchnumpv":1697,"follower":1509,"gifter":2920,"watchnumuv":5410,"length":3542,"exp":183}
{"id":"1580089010001","uid":"12001001853","nickname":"jack1853","gold":660,"watchnumpv":8160,"follower":1781,"gifter":551,"watchnumuv":4798,"length":189,"exp":89}
{"id":"1580089010002","uid":"12001003786","nickname":"jack3786","gold":14,"watchnumpv":577,"follower":1759,"gifter":2643,"watchnumuv":8910,"length":1203,"exp":54}

原始數(shù)據(jù)清洗代碼

  1. 清理無效記錄:由于原始數(shù)據(jù)是通過日志方式進行記錄的,在使用日志采集工具采集到HDFS后,還需要對數(shù)據(jù)進行清洗過濾,丟棄缺失字段的數(shù)據(jù),針對異常字段值進行標準化處理。
  2. 清除多余字段:由于計算時不會用到所有的字段。

編碼

DataCleanMap

package dataClean;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author Huathy
 * @date 2023-10-22 22:15
 * @description 實現(xiàn)自定義map類,在里面實現(xiàn)具體的清洗邏輯
 */
public class DataCleanMap extends Mapper<LongWritable, Text, Text, Text> {
    /**
     * 1. 從原始數(shù)據(jù)中過濾出來需要的字段
     * 2. 針對核心字段進行異常值判斷
     *
     * @param key
     * @param value
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String valStr = value.toString();
        // 將json字符串數(shù)據(jù)轉換成對象
        JSONObject jsonObj = JSON.parseObject(valStr);
        String uid = jsonObj.getString("uid");
        // 這里建議使用getIntValue(返回0)而不是getInt(異常)。
        int gold = jsonObj.getIntValue("gold");
        int watchnumpv = jsonObj.getIntValue("watchnumpv");
        int follower = jsonObj.getIntValue("follower");
        int length = jsonObj.getIntValue("length");
        // 過濾異常數(shù)據(jù)
        if (StringUtils.isNotBlank(valStr) && (gold * watchnumpv * follower * length) >= 0) {
            // 組裝k2,v2
            Text k2 = new Text();
            k2.set(uid);
            Text v2 = new Text();
            v2.set(gold + "\t" + watchnumpv + "\t" + follower + "\t" + length);
            context.write(k2, v2);
        }
    }
}

DataCleanJob

package dataClean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @author Huathy
 * @date 2023-10-22 22:02
 * @description 數(shù)據(jù)清洗作業(yè)
 * 1. 從原始數(shù)據(jù)中過濾出來需要的字段
 * uid gold watchnumpv(總觀看)、follower(粉絲關注數(shù)量)、length(總時長)
 * 2. 針對以上五個字段進行判斷,都不應該丟失或為空,否則任務是異常記錄,丟棄。
 * 若個別字段丟失,則設置為0.
 * <p>
 * 分析:
 * 1. 由于原始數(shù)據(jù)是json格式,可以使用fastjson對原始數(shù)據(jù)進行解析,獲取指定字段的內(nèi)容
 * 2. 然后對獲取到的數(shù)據(jù)進行判斷,只保留滿足條件的數(shù)據(jù)
 * 3. 由于不需要聚合過程,只是一個簡單的過濾操作,所以只需要map階段即可,不需要reduce階段
 * 4. 其中map階段的k1,v1的數(shù)據(jù)類型是固定的<LongWritable,Text>,k2,v2的數(shù)據(jù)類型是<Text,Text>k2存儲主播ID,v2存儲核心字段
 * 中間用\t制表符分隔即可
 */
public class DataCleanJob {
    public static void main(String[] args) throws Exception {
        System.out.println("inputPath  => " + args[0]);
        System.out.println("outputPath  => " + args[1]);
        String path = args[0];
        String path2 = args[1];

        // job需要的配置參數(shù)
        Configuration configuration = new Configuration();
        // 創(chuàng)建job
        Job job = Job.getInstance(configuration, "wordCountJob");
        // 注意:這一行必須設置,否則在集群的時候將無法找到Job類
        job.setJarByClass(DataCleanJob.class);
        // 指定輸入文件
        FileInputFormat.setInputPaths(job, new Path(path));
        FileOutputFormat.setOutputPath(job, new Path(path2));

        // 指定map相關配置
        job.setMapperClass(DataCleanMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        // 指定reduce 數(shù)量0,表示禁用reduce
        job.setNumReduceTasks(0);

        // 提交任務
        job.waitForCompletion(true);
    }
}

運行

## 運行命令
[root@cent7-1 hadoop-3.2.4]# hadoop jar hadoopDemo1-0.0.1-SNAPSHOT-jar-with-dependencies.jar dataClean.DataCleanJob hdfs://cent7-1:9000/data/videoinfo/231022 hdfs://cent7-1:9000/data/res231022
inputPath  => hdfs://cent7-1:9000/data/videoinfo/231022
outputPath  => hdfs://cent7-1:9000/data/res231022
2023-10-22 23:16:15,845 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
2023-10-22 23:16:16,856 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2023-10-22 23:16:17,041 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1697985525421_0002
2023-10-22 23:16:17,967 INFO input.FileInputFormat: Total input files to process : 1
2023-10-22 23:16:18,167 INFO mapreduce.JobSubmitter: number of splits:1
2023-10-22 23:16:18,873 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1697985525421_0002
2023-10-22 23:16:18,874 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-10-22 23:16:19,157 INFO conf.Configuration: resource-types.xml not found
2023-10-22 23:16:19,158 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2023-10-22 23:16:19,285 INFO impl.YarnClientImpl: Submitted application application_1697985525421_0002
2023-10-22 23:16:19,345 INFO mapreduce.Job: The url to track the job: http://cent7-1:8088/proxy/application_1697985525421_0002/
2023-10-22 23:16:19,346 INFO mapreduce.Job: Running job: job_1697985525421_0002
2023-10-22 23:16:31,683 INFO mapreduce.Job: Job job_1697985525421_0002 running in uber mode : false
2023-10-22 23:16:31,689 INFO mapreduce.Job:  map 0% reduce 0%
2023-10-22 23:16:40,955 INFO mapreduce.Job:  map 100% reduce 0%
2023-10-22 23:16:43,012 INFO mapreduce.Job: Job job_1697985525421_0002 completed successfully
2023-10-22 23:16:43,153 INFO mapreduce.Job: Counters: 33
	File System Counters
		FILE: Number of bytes read=0
		FILE: Number of bytes written=238970
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=24410767
		HDFS: Number of bytes written=1455064
		HDFS: Number of read operations=7
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
		HDFS: Number of bytes read erasure-coded=0
	Job Counters 
		Launched map tasks=1
		Data-local map tasks=1
		Total time spent by all maps in occupied slots (ms)=7678
		Total time spent by all reduces in occupied slots (ms)=0
		Total time spent by all map tasks (ms)=7678
		Total vcore-milliseconds taken by all map tasks=7678
		Total megabyte-milliseconds taken by all map tasks=7862272
	Map-Reduce Framework
		Map input records=90000
		Map output records=46990
		Input split bytes=123
		Spilled Records=0
		Failed Shuffles=0
		Merged Map outputs=0
		GC time elapsed (ms)=195
		CPU time spent (ms)=5360
		Physical memory (bytes) snapshot=302153728
		Virtual memory (bytes) snapshot=2588925952
		Total committed heap usage (bytes)=214958080
		Peak Map Physical memory (bytes)=302153728
		Peak Map Virtual memory (bytes)=2588925952
	File Input Format Counters 
		Bytes Read=24410644
	File Output Format Counters 
		Bytes Written=1455064
[root@cent7-1 hadoop-3.2.4]# 

## 統(tǒng)計輸出文件行數(shù)
[root@cent7-1 hadoop-3.2.4]# hdfs dfs -cat hdfs://cent7-1:9000/data/res231022/* | wc -l
46990
## 查看原始數(shù)據(jù)記錄數(shù)
[root@cent7-1 hadoop-3.2.4]# hdfs dfs -cat hdfs://cent7-1:9000/data/videoinfo/231022/* | wc -l
90000

數(shù)據(jù)指標統(tǒng)計

  1. 對數(shù)據(jù)中的金幣數(shù)量,總觀看PV,粉絲關注數(shù)量,視頻總時長等指標進行統(tǒng)計(涉及四個字段為了后續(xù)方便,可以自定義Writable)
  2. 統(tǒng)計每天開播時長最長的前10名主播以及對應的開播時長

自定義Writeable代碼實現(xiàn)

由于原始數(shù)據(jù)涉及多個需要統(tǒng)計的字段,可以將這些字段統(tǒng)一的記錄在一個自定義的數(shù)據(jù)類型中,方便使用

package videoinfo;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @author Huathy
 * @date 2023-10-22 23:32
 * @description 自定義數(shù)據(jù)類型,為了保存主播相關核心字段,方便后期維護
 */
public class VideoInfoWriteable implements Writable {
    private long gold;
    private long watchnumpv;
    private long follower;
    private long length;

    public void set(long gold, long watchnumpv, long follower, long length) {
        this.gold = gold;
        this.watchnumpv = watchnumpv;
        this.follower = follower;
        this.length = length;
    }

    public long getGold() {
        return gold;
    }

    public long getWatchnumpv() {
        return watchnumpv;
    }

    public long getFollower() {
        return follower;
    }

    public long getLength() {
        return length;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(gold);
        dataOutput.writeLong(watchnumpv);
        dataOutput.writeLong(follower);
        dataOutput.writeLong(length);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.gold = dataInput.readLong();
        this.watchnumpv = dataInput.readLong();
        this.follower = dataInput.readLong();
        this.length = dataInput.readLong();
    }

    @Override
    public String toString() {
        return gold + "\t" + watchnumpv + "\t" + follower + "\t" + length;
    }
}

基于主播維度 videoinfo

VideoInfoJob

package videoinfo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @author Huathy
 * @date 2023-10-22 23:27
 * @description 數(shù)據(jù)指標統(tǒng)計作業(yè)
 * 1. 基于主播進行統(tǒng)計,統(tǒng)計每個主播在當天收到的總金幣數(shù)量,總觀看PV,總粉絲關注量,總視頻開播市場
 * 分析
 * 1. 為了方便統(tǒng)計主播的指標數(shù)據(jù)嗎,最好是把這些字段整合到一個對象中,這樣維護方便
 * 這樣就需要自定義Writeable
 * 2. 由于在這里需要以主播維度進行數(shù)據(jù)的聚合,所以需要以主播ID作為KEY,進行聚合統(tǒng)計
 * 3. 所以Map節(jié)點的<k2,v2>是<Text,自定義Writeable>
 * 4. 由于需要聚合,所以Reduce階段也需要
 */
public class VideoInfoJob {
    public static void main(String[] args) throws Exception {
        System.out.println("inputPath  => " + args[0]);
        System.out.println("outputPath  => " + args[1]);
        String path = args[0];
        String path2 = args[1];

        // job需要的配置參數(shù)
        Configuration configuration = new Configuration();
        // 創(chuàng)建job
        Job job = Job.getInstance(configuration, "VideoInfoJob");
        // 注意:這一行必須設置,否則在集群的時候將無法找到Job類
        job.setJarByClass(VideoInfoJob.class);
        // 指定輸入文件
        FileInputFormat.setInputPaths(job, new Path(path));
        FileOutputFormat.setOutputPath(job, new Path(path2));

        // 指定map相關配置
        job.setMapperClass(VideoInfoMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        // 指定reduce
        job.setReducerClass(VideoInfoReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 提交任務
        job.waitForCompletion(true);
    }
}

VideoInfoMap

package videoinfo;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author Huathy
 * @date 2023-10-22 23:31
 * @description 實現(xiàn)自定義Map類,在這里實現(xiàn)核心字段的拼接
 */
public class VideoInfoMap extends Mapper<LongWritable, Text, Text, VideoInfoWriteable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 讀取清洗后的每一行數(shù)據(jù)
        String line = value.toString();
        String[] fields = line.split("\t");
        String uid = fields[0];
        long gold = Long.parseLong(fields[1]);
        long watchnumpv = Long.parseLong(fields[1]);
        long follower = Long.parseLong(fields[1]);
        long length = Long.parseLong(fields[1]);

        // 組裝K2 V2
        Text k2 = new Text();
        k2.set(uid);

        VideoInfoWriteable v2 = new VideoInfoWriteable();
        v2.set(gold, watchnumpv, follower, length);
        context.write(k2, v2);
    }
}

VideoInfoReduce

package videoinfo;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author Huathy
 * @date 2023-10-22 23:31
 * @description 實現(xiàn)自定義Map類,在這里實現(xiàn)核心字段的拼接
 */
public class VideoInfoReduce extends Reducer<Text, VideoInfoWriteable, Text, VideoInfoWriteable> {
    @Override
    protected void reduce(Text key, Iterable<VideoInfoWriteable> values, Context context) throws IOException, InterruptedException {
        // 從v2s中把相同key的value取出來,進行累加求和
        long goldSum = 0;
        long watchNumPvSum = 0;
        long followerSum = 0;
        long lengthSum = 0;
        for (VideoInfoWriteable v2 : values) {
            goldSum += v2.getGold();
            watchNumPvSum += v2.getWatchnumpv();
            followerSum += v2.getFollower();
            lengthSum += v2.getLength();
        }
        // 組裝k3 v3
        VideoInfoWriteable videoInfoWriteable = new VideoInfoWriteable();
        videoInfoWriteable.set(goldSum, watchNumPvSum, followerSum, lengthSum);
        context.write(key, videoInfoWriteable);
    }
}

基于主播的TOPN計算

VideoInfoTop10Job

package top10;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @author Huathy
 * @date 2023-10-23 21:27
 * @description 數(shù)據(jù)指標統(tǒng)計作業(yè)
 * 需求:統(tǒng)計每天開播時長最長的前10名主播以及時長信息
 * 分析:
 * 1. 為了統(tǒng)計每天開播時長最長的前10名主播信息,需要在map階段獲取數(shù)據(jù)中每個主播的ID和直播時長
 * 2. 所以map階段的k2 v2 為Text LongWriteable
 * 3. 在reduce階段對相同主播的時長進行累加求和,將這些數(shù)據(jù)存儲到一個臨時的map中
 * 4. 在reduce階段的cleanup函數(shù)(最后執(zhí)行)中,對map集合的數(shù)據(jù)進行排序處理
 * 5. 在cleanup函數(shù)中把直播時長最長的前10名主播信息寫出到文件中
 * setup函數(shù)在reduce函數(shù)開始執(zhí)行一次,而cleanup在結束時執(zhí)行一次
 */
public class VideoInfoTop10Job {
    public static void main(String[] args) throws Exception {
        System.out.println("inputPath  => " + args[0]);
        System.out.println("outputPath  => " + args[1]);
        String path = args[0];
        String path2 = args[1];

        // job需要的配置參數(shù)
        Configuration configuration = new Configuration();
        // 從輸入路徑來獲取日期
        String[] fields = path.split("/");
        String tmpdt = fields[fields.length - 1];
        System.out.println("日期:" + tmpdt);
        // 生命周期的配置
        configuration.set("dt", tmpdt);
        // 創(chuàng)建job
        Job job = Job.getInstance(configuration, "VideoInfoTop10Job");
        // 注意:這一行必須設置,否則在集群的時候將無法找到Job類
        job.setJarByClass(VideoInfoTop10Job.class);
        // 指定輸入文件
        FileInputFormat.setInputPaths(job, new Path(path));
        FileOutputFormat.setOutputPath(job, new Path(path2));

        job.setMapperClass(VideoInfoTop10Map.class);
        job.setReducerClass(VideoInfoTop10Reduce.class);
        // 指定map相關配置
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        // 指定reduce
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 提交任務
        job.waitForCompletion(true);
    }
}

VideoInfoTop10Map

package top10;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author Huathy
 * @date 2023-10-23 21:32
 * @description 自定義map類,在這里實現(xiàn)核心字段的拼接
 */
public class VideoInfoTop10Map extends Mapper<LongWritable, Text, Text, LongWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 讀取清洗之后的每一行數(shù)據(jù)
        String line = key.toString();
        String[] fields = line.split("\t");
        String uid = fields[0];
        long length = Long.parseLong(fields[4]);
        Text k2 = new Text();
        k2.set(uid);
        LongWritable v2 = new LongWritable();
        v2.set(length);
        context.write(k2, v2);
    }
}

VideoInfoTop10Reduce

package top10;

import cn.hutool.core.collection.CollUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.*;

/**
 * @author Huathy
 * @date 2023-10-23 21:37
 * @description
 */
public class VideoInfoTop10Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {
    // 保存主播ID和開播時長
    Map<String, Long> map = new HashMap<>();

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        String k2 = key.toString();
        long lengthSum = 0;
        for (LongWritable v2 : values) {
            lengthSum += v2.get();
        }
        map.put(k2, lengthSum);
    }

    /**
     * 任務初始化的時候執(zhí)行一次,一般在里面做一些初始化資源連接的操作。(mysql、redis連接操作)
     *
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        System.out.println("setup method running...");
        System.out.println("context: " + context);
        super.setup(context);
    }

    /**
     * 任務結束的時候執(zhí)行一次,做關閉資源連接操作
     *
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        // 獲取日期
        Configuration configuration = context.getConfiguration();
        String date = configuration.get("dt");
        // 排序
        LinkedHashMap<String, Long> sortMap = CollUtil.sortByEntry(map, new Comparator<Map.Entry<String, Long>>() {
            @Override
            public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) {
                return -o1.getValue().compareTo(o2.getValue());
            }
        });
        Set<Map.Entry<String, Long>> entries = sortMap.entrySet();
        Iterator<Map.Entry<String, Long>> iterator = entries.iterator();
        // 輸出
        int count = 1;
        while (count <= 10 && iterator.hasNext()) {
            Map.Entry<String, Long> entry = iterator.next();
            String key = entry.getKey();
            Long value = entry.getValue();
            // 封裝K3 V3
            Text k3 = new Text(date + "\t" + key);
            LongWritable v3 = new LongWritable(value);
            // 統(tǒng)計的時候還應該傳入日期來用來輸出統(tǒng)計的時間,而不是獲取當前時間(可能是統(tǒng)計歷史)!
            context.write(k3, v3);
            count++;
        }
    }
}

任務定時腳本封裝

任務依賴關系:數(shù)據(jù)指標統(tǒng)計(top10統(tǒng)計以及播放數(shù)據(jù)統(tǒng)計)依賴數(shù)據(jù)清洗作業(yè)
將任務提交命令進行封裝,方便調(diào)用,便于定時任務調(diào)度

編寫任務腳本,并以debug模式執(zhí)行:sh -x data_clean.sh

任務執(zhí)行結果監(jiān)控

針對任務執(zhí)行的結果進行檢測,如果執(zhí)行失敗,則重試任務,同時發(fā)送告警信息。

#!/bin/bash
# 建議使用bin/bash形式
# 判讀用戶是否輸入日期,如果沒有則默認獲取昨天日期。(需要隔幾天重跑,靈活的指定日期)
if [ "x$1" = "x" ]; then
  yes_time=$(date +%y%m%d --date="1 days ago")
else
  yes_time=$1
fi

jobs_home=/home/jobs
cleanjob_input=hdfs://cent7-1:9000/data/videoinfo/${yes_time}
cleanjob_output=hdfs://cent7-1:9000/data/videoinfo_clean/${yes_time}
videoinfojob_input=${cleanjob_output}
videoinfojob_output=hdfs://cent7-1:9000/res/videoinfoJob/${yes_time}
top10job_input=${cleanjob_output}
top10job_output=hdfs://cent7-1:9000/res/top10/${yes_time}

# 刪除輸出目錄,為了兼容腳本重跑
hdfs dfs -rm -r ${cleanjob_output}
# 執(zhí)行數(shù)據(jù)清洗任務
hadoop jar ${jobs_home}/hadoopDemo1-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
  dataClean.DataCleanJob \
  ${cleanjob_input} ${cleanjob_output}

# 判斷數(shù)據(jù)清洗任務是否成功
hdfs dfs -ls ${cleanjob_output}/_SUCCESS
# echo $? 可以獲取上一個命令的執(zhí)行結果0成功,否則失敗
if [ "$?" = "0" ]; then
  echo "clean job execute success ...."
  # 刪除輸出目錄,為了兼容腳本重跑
  hdfs dfs -rm -r ${videoinfojob_output}
  hdfs dfs -rm -r ${top10job_output}
  # 執(zhí)行指標統(tǒng)計任務1
  echo " execute VideoInfoJob ...."
  hadoop jar ${jobs_home}/hadoopDemo1-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
    videoinfo.VideoInfoJob \
    ${videoinfojob_input} ${videoinfojob_output}
  hdfs dfs -ls ${videoinfojob_output}/_SUCCESS
  if [ "$?" != "0" ]
  then
    echo " VideoInfoJob execute failed .... "
  fi
  # 指定指標統(tǒng)計任務2
  echo " execute VideoInfoTop10Job ...."
  hadoop jar ${jobs_home}/hadoopDemo1-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
    top10.VideoInfoTop10Job \
    ${top10job_input} ${top10job_output}
  hdfs dfs -ls ${top10job_output}/_SUCCESS
  if [ "$?" != "0" ]
  then
    echo " VideoInfoJob execute failed .... "
  fi
else
  echo "clean job execute failed ... date time is ${yes_time}"
  # 給管理員發(fā)送短信、郵件
  # 可以在while進行重試
fi

使用Sqoop將計算結果導出到MySQL

Sqoop可以快速的實現(xiàn)hdfs-mysql的導入導出

快速安裝Sqoop工具

Hadoop3.0大數(shù)據(jù)處理學習4(案例:數(shù)據(jù)清洗、數(shù)據(jù)指標統(tǒng)計、任務腳本封裝、Sqoop導出Mysql),Hadoop,大數(shù)據(jù),學習,sqoop,mysql

Hadoop3.0大數(shù)據(jù)處理學習4(案例:數(shù)據(jù)清洗、數(shù)據(jù)指標統(tǒng)計、任務腳本封裝、Sqoop導出Mysql),Hadoop,大數(shù)據(jù),學習,sqoop,mysql文章來源地址http://www.zghlxwxcb.cn/news/detail-714523.html

數(shù)據(jù)導出功能開發(fā),使用Sqoop將MapReduce計算的結果導出到Mysql中

  1. 導出命令
sqoop export \
--connect 'jdbc:mysql://192.168.56.101:3306/data?serverTimezone=UTC&useSSL=false' \
--username 'hdp' \
--password 'admin' \
--table 'top10' \
--export-dir '/res/top10/231022' \
--input-fields-terminated-by "\t"
  1. 導出日志
[root@cent7-1 sqoop-1.4.7.bin_hadoop-2.6.0]# sqoop export \
> --connect 'jdbc:mysql://192.168.56.101:3306/data?serverTimezone=UTC&useSSL=false' \
> --username 'hdp' \
> --password 'admin' \
> --table 'top10' \
> --export-dir '/res/top10/231022' \
> --input-fields-terminated-by "\t"
Warning: /home/sqoop-1.4.7.bin_hadoop-2.6.0//../hcatalog does not exist! HCatalog jobs will fail.
Please set $HCAT_HOME to the root of your HCatalog installation.
Warning: /home/sqoop-1.4.7.bin_hadoop-2.6.0//../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
2023-10-24 23:42:09,452 INFO sqoop.Sqoop: Running Sqoop version: 1.4.7
2023-10-24 23:42:09,684 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
2023-10-24 23:42:09,997 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
2023-10-24 23:42:10,022 INFO tool.CodeGenTool: Beginning code generation
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
2023-10-24 23:42:10,921 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `top10` AS t LIMIT 1
2023-10-24 23:42:11,061 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `top10` AS t LIMIT 1
2023-10-24 23:42:11,084 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /home/hadoop-3.2.4
注: /tmp/sqoop-root/compile/6d507cd9a1a751990abfd7eef20a60c2/top10.java使用或覆蓋了已過時的 API。
注: 有關詳細信息, 請使用 -Xlint:deprecation 重新編譯。
2023-10-24 23:42:23,932 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-root/compile/6d507cd9a1a751990abfd7eef20a60c2/top10.jar
2023-10-24 23:42:23,972 INFO mapreduce.ExportJobBase: Beginning export of top10
2023-10-24 23:42:23,972 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
2023-10-24 23:42:24,237 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
2023-10-24 23:42:27,318 INFO Configuration.deprecation: mapred.reduce.tasks.speculative.execution is deprecated. Instead, use mapreduce.reduce.speculative
2023-10-24 23:42:27,325 INFO Configuration.deprecation: mapred.map.tasks.speculative.execution is deprecated. Instead, use mapreduce.map.speculative
2023-10-24 23:42:27,326 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
2023-10-24 23:42:27,641 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
2023-10-24 23:42:29,161 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1698153196891_0015
2023-10-24 23:42:39,216 INFO input.FileInputFormat: Total input files to process : 1
2023-10-24 23:42:39,231 INFO input.FileInputFormat: Total input files to process : 1
2023-10-24 23:42:39,387 INFO mapreduce.JobSubmitter: number of splits:4
2023-10-24 23:42:39,475 INFO Configuration.deprecation: mapred.map.tasks.speculative.execution is deprecated. Instead, use mapreduce.map.speculative
2023-10-24 23:42:40,171 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1698153196891_0015
2023-10-24 23:42:40,173 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-10-24 23:42:40,660 INFO conf.Configuration: resource-types.xml not found
2023-10-24 23:42:40,660 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2023-10-24 23:42:41,073 INFO impl.YarnClientImpl: Submitted application application_1698153196891_0015
2023-10-24 23:42:41,163 INFO mapreduce.Job: The url to track the job: http://cent7-1:8088/proxy/application_1698153196891_0015/
2023-10-24 23:42:41,164 INFO mapreduce.Job: Running job: job_1698153196891_0015
2023-10-24 23:43:02,755 INFO mapreduce.Job: Job job_1698153196891_0015 running in uber mode : false
2023-10-24 23:43:02,760 INFO mapreduce.Job:  map 0% reduce 0%
2023-10-24 23:43:23,821 INFO mapreduce.Job:  map 25% reduce 0%
2023-10-24 23:43:25,047 INFO mapreduce.Job:  map 50% reduce 0%
2023-10-24 23:43:26,069 INFO mapreduce.Job:  map 75% reduce 0%
2023-10-24 23:43:27,088 INFO mapreduce.Job:  map 100% reduce 0%
2023-10-24 23:43:28,112 INFO mapreduce.Job: Job job_1698153196891_0015 completed successfully
2023-10-24 23:43:28,266 INFO mapreduce.Job: Counters: 33
	File System Counters
		FILE: Number of bytes read=0
		FILE: Number of bytes written=993808
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=1297
		HDFS: Number of bytes written=0
		HDFS: Number of read operations=19
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=0
		HDFS: Number of bytes read erasure-coded=0
	Job Counters 
		Launched map tasks=4
		Data-local map tasks=4
		Total time spent by all maps in occupied slots (ms)=79661
		Total time spent by all reduces in occupied slots (ms)=0
		Total time spent by all map tasks (ms)=79661
		Total vcore-milliseconds taken by all map tasks=79661
		Total megabyte-milliseconds taken by all map tasks=81572864
	Map-Reduce Framework
		Map input records=10
		Map output records=10
		Input split bytes=586
		Spilled Records=0
		Failed Shuffles=0
		Merged Map outputs=0
		GC time elapsed (ms)=3053
		CPU time spent (ms)=11530
		Physical memory (bytes) snapshot=911597568
		Virtual memory (bytes) snapshot=10326462464
		Total committed heap usage (bytes)=584056832
		Peak Map Physical memory (bytes)=238632960
		Peak Map Virtual memory (bytes)=2584969216
	File Input Format Counters 
		Bytes Read=0
	File Output Format Counters 
		Bytes Written=0
2023-10-24 23:43:28,282 INFO mapreduce.ExportJobBase: Transferred 1.2666 KB in 60.9011 seconds (21.2968 bytes/sec)
2023-10-24 23:43:28,291 INFO mapreduce.ExportJobBase: Exported 10 records.

到了這里,關于Hadoop3.0大數(shù)據(jù)處理學習4(案例:數(shù)據(jù)清洗、數(shù)據(jù)指標統(tǒng)計、任務腳本封裝、Sqoop導出Mysql)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • Pandas實戰(zhàn)100例 | 案例 3: 數(shù)據(jù)清洗 - 處理缺失值

    案例 3: 數(shù)據(jù)清洗 - 處理缺失值 知識點講解 在現(xiàn)實世界的數(shù)據(jù)集中,經(jīng)常會遇到缺失值。Pandas 提供了多種方法來處理這些缺失值,包括填充缺失值、刪除含有缺失值的行或列。 示例代碼 檢測缺失值 填充缺失值

    2024年01月20日
    瀏覽(25)
  • 【機器學習】數(shù)據(jù)清洗之處理缺失點

    【機器學習】數(shù)據(jù)清洗之處理缺失點

    ??個人主頁:甜美的江 ??歡迎 ??點贊?評論?收藏 ??收錄專欄:機器學習 ??希望本文對您有所裨益,如有不足之處,歡迎在評論區(qū)提出指正,讓我們共同學習、交流進步! 引言: 在機器學習領域,數(shù)據(jù)被廣泛認為是驅動模型性能的關鍵。然而,在真實世界的數(shù)據(jù)中,缺

    2024年02月20日
    瀏覽(21)
  • Python天氣數(shù)據(jù)處理、數(shù)據(jù)清洗

    Python天氣數(shù)據(jù)處理、數(shù)據(jù)清洗

    文章目錄 前言 一、獲取原始數(shù)據(jù) 二、數(shù)據(jù)處理 1.代碼 2.處理結果 總結 ????????在工作的時候,需要做一個天氣情況的報表,一開始沒學習爬蟲的時候,需要手動到天氣網(wǎng)站上去截取天氣數(shù)據(jù)做到表格里,復制粘貼下來的數(shù)據(jù)需要做一些處理,考慮用Python簡化這些步驟。

    2024年02月01日
    瀏覽(34)
  • 頭歌:數(shù)據(jù)預處理之數(shù)據(jù)清洗

    本關任務:完成泰坦尼克號遇難數(shù)據(jù)的清洗。 ? 案例背景 泰坦尼克號遭遇的災難震驚世界,如何避免災難甚至預測災難呢? 要實現(xiàn)首先要做好泰坦尼克號的損失數(shù)據(jù)統(tǒng)計,才能為數(shù)據(jù)分析打下基礎。 編程要求 根據(jù)提示,你需要完成: 缺失值填充 離群點檢測

    2024年02月11日
    瀏覽(37)
  • 離線數(shù)據(jù)處理 任務二:數(shù)據(jù)清洗

    離線數(shù)據(jù)處理 任務二:數(shù)據(jù)清洗

    hive數(shù)據(jù)庫和表的創(chuàng)建 給dim添加最新狀態(tài)記錄 任務? ???????? 接著上一篇數(shù)據(jù)抽取的任務繼續(xù) 需用到上篇ods數(shù)據(jù)抽取的數(shù)據(jù)繼續(xù)練習 hive數(shù)據(jù)庫和表的創(chuàng)建 ? ? ? ? 1、創(chuàng)建dwd數(shù)據(jù)庫 ????????2、創(chuàng)建dim_user_info 表,分區(qū)字段etl_date ????????3、創(chuàng)建dim_sku_info 表,分區(qū)

    2023年04月09日
    瀏覽(56)
  • 大數(shù)據(jù)處理中的數(shù)據(jù)處理與算法優(yōu)化:機器學習在Hadoop處理中的應用

    作者:禪與計算機程序設計藝術 大數(shù)據(jù)處理中的數(shù)據(jù)處理與算法優(yōu)化:機器學習在Hadoop處理中的應用 引言 隨著大數(shù)據(jù)時代的到來,大量的數(shù)據(jù)處理需求不斷增加,數(shù)據(jù)處理質(zhì)量和效率成為企業(yè)、政府、科研機構等用戶關注的焦點。機器學習作為一種新興的數(shù)據(jù)處理技術,在

    2024年02月13日
    瀏覽(29)
  • 數(shù)據(jù)清洗和預處理

    預計更新 一、 爬蟲技術概述 1.1 什么是爬蟲技術 1.2 爬蟲技術的應用領域 1.3 爬蟲技術的工作原理 二、 網(wǎng)絡協(xié)議和HTTP協(xié)議 2.1 網(wǎng)絡協(xié)議概述 2.2 HTTP協(xié)議介紹 2.3 HTTP請求和響應 三、 Python基礎 3.1 Python語言概述 3.2 Python的基本數(shù)據(jù)類型 3.3 Python的流程控制語句 3.4 Python的函數(shù)和模

    2024年02月07日
    瀏覽(25)
  • pandas數(shù)據(jù)清洗——缺失值處理

    pandas數(shù)據(jù)清洗——缺失值處理

    使用DataFrame對象的info()方法 原始數(shù)據(jù) ?? 注:NaN為空缺值 ? 查看是否有缺失值 ? ?Non-Null Count列顯示的是每個索引中不是空缺的個數(shù) 使用DataFrame的isnull()方法和notnull()方法 1. isnull()方法——判斷是否為空,輸出結果為True和False,不為NaN時返回False,為NaN時返回True。 ? ? 2.

    2024年02月12日
    瀏覽(22)
  • python數(shù)據(jù)預處理—數(shù)據(jù)清洗、數(shù)據(jù)集成、數(shù)據(jù)變換、數(shù)據(jù)歸約

    python數(shù)據(jù)預處理—數(shù)據(jù)清洗、數(shù)據(jù)集成、數(shù)據(jù)變換、數(shù)據(jù)歸約

    進行數(shù)據(jù)分析時,需要預先把進入模型算法的數(shù)據(jù)進行數(shù)據(jù)預處理。一般我們接收到的數(shù)據(jù)很多都是“臟數(shù)據(jù)”,里面可能包含缺失值、異常值、重復值等;同時有效標簽或者特征需要進一步篩選,得到有效數(shù)據(jù),最終把原始數(shù)據(jù)處理成符合相關模型算法的輸入標準,從而進

    2024年02月02日
    瀏覽(23)
  • 數(shù)據(jù)挖掘 | 實驗一 數(shù)據(jù)的清洗與預處理

    數(shù)據(jù)挖掘 | 實驗一 數(shù)據(jù)的清洗與預處理

    1)了解數(shù)據(jù)質(zhì)量問題、掌握常用解決方法; 2)熟練掌握數(shù)據(jù)預處理方法,并使用Python語言實現(xiàn); PC機 + Python3.7環(huán)境(pycharm、anaconda或其它都可以) 清洗與預處理的必要性 在實際數(shù)據(jù)挖掘過程中,我們拿到的初始數(shù)據(jù),往往存在缺失值、重復值、異常值或者錯誤值,通常這

    2023年04月08日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包