1. Clickhouse建表
- 創(chuàng)建database
create database ad_report;
use ad_report;
- 創(chuàng)建table
drop table if exists dwd_ad_event_inc;
create table if not exists dwd_ad_event_inc
(
event_time Int64 comment '事件時間',
event_type String comment '事件類型',
ad_id String comment '廣告id',
ad_name String comment '廣告名稱',
ad_product_id String comment '廣告產(chǎn)品id',
ad_product_name String comment '廣告產(chǎn)品名稱',
ad_product_price Decimal(16, 2) comment '廣告產(chǎn)品價格',
ad_material_id String comment '廣告素材id',
ad_material_url String comment '廣告素材url',
ad_group_id String comment '廣告組id',
platform_id String comment '推廣平臺id',
platform_name_en String comment '推廣平臺名稱(英文)',
platform_name_zh String comment '推廣平臺名稱(中文)',
client_country String comment '客戶端所處國家',
client_area String comment '客戶端所處地區(qū)',
client_province String comment '客戶端所處省份',
client_city String comment '客戶端所處城市',
client_ip String comment '客戶端ip地址',
client_device_id String comment '客戶端設(shè)備id',
client_os_type String comment '客戶端操作系統(tǒng)類型',
client_os_version String comment '客戶端操作系統(tǒng)版本',
client_browser_type String comment '客戶端瀏覽器類型',
client_browser_version String comment '客戶端瀏覽器版本',
client_user_agent String comment '客戶端UA',
is_invalid_traffic UInt8 comment '是否是異常流量'
) ENGINE = MergeTree()
ORDER BY (event_time, ad_name, event_type, client_province, client_city, client_os_type,
client_browser_type, is_invalid_traffic);
2. Hive數(shù)據(jù)導(dǎo)出至Clickhouse
使用spark-sql查詢數(shù)據(jù),然后通過jdbc寫入Clickhouse。文章來源地址http://www.zghlxwxcb.cn/news/detail-574125.html
- 創(chuàng)建Maven項(xiàng)目,pom.xml文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.yt</groupId>
<artifactId>hive-to-clickhouse</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!-- 引入mysql驅(qū)動,目的是訪問hive的metastore元數(shù)據(jù)-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
<!-- 引入spark-hive模塊-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.3.1</version>
<scope>provided</scope>
</dependency>
<!--引入clickhouse-jdbc驅(qū)動,為解決依賴沖突,需排除jackson的兩個依賴-->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
<exclusions>
<exclusion>
<artifactId>jackson-databind</artifactId>
<groupId>com.fasterxml.jackson.core</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-core</artifactId>
<groupId>com.fasterxml.jackson.core</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- 引入commons-cli,目的是方便處理程序的輸入?yún)?shù) -->
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<!--將依賴編譯到j(luò)ar包中-->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<!--配置執(zhí)行器-->
<execution>
<id>make-assembly</id>
<!--綁定到package執(zhí)行周期上-->
<phase>package</phase>
<goals>
<!--只運(yùn)行一次-->
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- 創(chuàng)建HiveToClickhouse類
public class HiveToClickhouse {
public static void main(String[] args) {
//使用 commons-cli 解析參數(shù)
//1.定義參數(shù)
Options options = new Options();
options.addOption(OptionBuilder.withLongOpt("hive_db").withDescription("hive數(shù)據(jù)庫名稱(required)").hasArg(true).isRequired(true).create());
options.addOption(OptionBuilder.withLongOpt("hive_table").withDescription("hive表名稱(required)").hasArg(true).isRequired(true).create());
options.addOption(OptionBuilder.withLongOpt("hive_partition").withDescription("hive分區(qū)(required)").hasArg(true).isRequired(true).create());
options.addOption(OptionBuilder.withLongOpt("ck_url").withDescription("clickhouse的jdbc url(required)").hasArg(true).isRequired(true).create());
options.addOption(OptionBuilder.withLongOpt("ck_table").withDescription("clickhouse表名稱(required)").hasArg(true).isRequired(true).create());
options.addOption(OptionBuilder.withLongOpt("batch_size").withDescription("數(shù)據(jù)寫入clickhouse時的批次大小(required)").hasArg(true).isRequired(true).create());
//2.解析參數(shù)
CommandLineParser parser = new GnuParser();
CommandLine cmd = null;
try {
cmd = parser.parse(options, args);
} catch (ParseException e) {
//若catch到參數(shù)解析異常(即傳入的參數(shù)非法),則打印幫助信息,并return
System.out.println(e.getMessage());
HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.printHelp("--option argument", options);
return;
}
//3.創(chuàng)建SparkConf
SparkConf sparkConf = new SparkConf().setAppName("hive2clickhouse");
//4.創(chuàng)建SparkSession,并啟動Hive支持
SparkSession sparkSession = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate();
//5.設(shè)置如下參數(shù),支持使用正則表達(dá)式匹配查詢字段
sparkSession.sql("set spark.sql.parser.quotedRegexColumnNames=true");
//6.執(zhí)行如下查詢語句,查詢hive表中除去dt分區(qū)字段外的所有字段
String sql = "select `(dt)?+.+` from " + cmd.getOptionValue("hive_db") + "." + cmd.getOptionValue("hive_table") + " where dt='" + cmd.getOptionValue("hive_partition") + "'";
Dataset<Row> hive = sparkSession.sql(sql);
//7.將數(shù)據(jù)通過jdbc模式寫入clickhouse
hive.write().mode(SaveMode.Append)
.format("jdbc")
.option("url", cmd.getOptionValue("ck_url"))
.option("dbtable", cmd.getOptionValue("ck_table"))
.option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
.option("batchsize", cmd.getOptionValue("batch_size"))
.save();
//8.關(guān)閉SparkSession
sparkSession.close();
}
}
- 上傳hive.xml,hdfs.xml 以及core-site.xml文件到項(xiàng)目的resource目錄下
- 打包,并上傳hive-to-clickhouse-1.0-SNAPSHOT-jar-with-dependencies.jar到hadoop節(jié)點(diǎn)
- 執(zhí)行如下命令測試
spark-submit \
--class com.atguigu.ad.spark.HiveToClickhouse \
--master yarn \
ad_hive_to_clickhouse-1.0-SNAPSHOT-jar-with-dependencies.jar \
--hive_db ad \
--hive_table dwd_ad_event_inc \
--hive_partition 2023-06-07 \
--ck_url jdbc:clickhouse://hadoop102:8123/ad_report \
--ck_table dwd_ad_event_inc \
--batch_size 1000
PS:
- 為保證任務(wù)可提交到y(tǒng)arn運(yùn)行,需要在$SPARK_HOME/conf/spark-env.sh文件中增加如下參數(shù):
export HADOOP_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop/
文章來源:http://www.zghlxwxcb.cn/news/detail-574125.html
到了這里,關(guān)于數(shù)倉報表數(shù)據(jù)導(dǎo)出——Hive數(shù)據(jù)導(dǎo)出至Clickhouse的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!