国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Hudi(7):Hudi集成Spark之spark-sql方式

這篇具有很好參考價(jià)值的文章主要介紹了Hudi(7):Hudi集成Spark之spark-sql方式。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

目錄

0. 相關(guān)文章鏈接

1.?創(chuàng)建表

1.1.?啟動(dòng)spark-sql

1.2.?建表參數(shù)

1.3.?創(chuàng)建非分區(qū)表

1.4.?創(chuàng)建分區(qū)表

1.5.?在已有的hudi表上創(chuàng)建新表

1.6.?通過CTAS (Create Table As Select)建表

2.?插入數(shù)據(jù)

2.1.?向非分區(qū)表插入數(shù)據(jù)

2.2.?向分區(qū)表動(dòng)態(tài)分區(qū)插入數(shù)據(jù)

2.3.?向分區(qū)表靜態(tài)分區(qū)插入數(shù)據(jù)

2.4.?使用bulk_insert插入數(shù)據(jù)

3.?查詢數(shù)據(jù)

3.1.?查詢

3.2.?時(shí)間旅行查詢

4.?更新數(shù)據(jù)

4.1.?update

4.2.?MergeInto

5.?刪除數(shù)據(jù)

6.?覆蓋數(shù)據(jù)

7.?修改表結(jié)構(gòu)(Alter Table)

8.?修改分區(qū)

9.?存儲(chǔ)過程(Procedures)


0. 相關(guān)文章鏈接

?Hudi文章匯總?

1.?創(chuàng)建表

1.1.?啟動(dòng)spark-sql

# 啟動(dòng)spark-sql之前需要先啟動(dòng)Hive的Metastore
nohup hive --service metastore & 

#針對(duì)Spark 3.2
spark-sql \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

# 如果沒有配置hive環(huán)境變量,手動(dòng)拷貝hive-site.xml到spark的conf下

1.2.?建表參數(shù)

參數(shù)名

默認(rèn)值

說明

primaryKey

uuid

表的主鍵名,多個(gè)字段用逗號(hào)分隔。

同 hoodie.datasource.write.recordkey.field

preCombineField

表的預(yù)合并字段。

同 hoodie.datasource.write.precombine.field

type

cow

創(chuàng)建的表類型:

type = 'cow'

type = 'mor'

同hoodie.datasource.write.table.type

1.3.?創(chuàng)建非分區(qū)表

  • 創(chuàng)建一個(gè)cow表,默認(rèn)primaryKey 'uuid',不提供preCombineField
create table hudi_cow_nonpcf_tbl (
  uuid int,
  name string,
  price double
) using hudi;
  • 創(chuàng)建一個(gè)mor非分區(qū)表
create table hudi_mor_tbl (
  id int,
  name string,
  price double,
  ts bigint
) using hudi
tblproperties (
  type = 'mor',
  primaryKey = 'id',
  preCombineField = 'ts'
);

1.4.?創(chuàng)建分區(qū)表

創(chuàng)建一個(gè)cow分區(qū)外部表,指定primaryKey和preCombineField

create table hudi_cow_pt_tbl (
  id bigint,
  name string,
  ts bigint,
  dt string,
  hh string
) using hudi
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl';

1.5.?在已有的hudi表上創(chuàng)建新表

不需要指定模式和非分區(qū)列(如果存在)之外的任何屬性,Hudi可以自動(dòng)識(shí)別模式和配置。

  • 非分區(qū)表
create table hudi_existing_tbl0 
using hudi
location 'file:///tmp/hudi/dataframe_hudi_nonpt_table';
  • 分區(qū)表
create table hudi_existing_tbl1 
using hudi
partitioned by (dt, hh)
location 'file:///tmp/hudi/dataframe_hudi_pt_table';

1.6.?通過CTAS (Create Table As Select)建表

為了提高向hudi表加載數(shù)據(jù)的性能,CTAS使用批量插入作為寫操作。

  • 通過CTAS創(chuàng)建cow非分區(qū)表,不指定preCombineField?
create table hudi_ctas_cow_nonpcf_tbl
using hudi
tblproperties (primaryKey = 'id')
as
select 
    1 as id
    , 'a1' as name
    , 10 as price
;
  • 通過CTAS創(chuàng)建cow分區(qū)表,指定preCombineField
create table hudi_ctas_cow_pt_tbl
using hudi
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts')
partitioned by (dt)
as
select 
    1 as id
    , 'a1' as name
    , 10 as price
    , 1000 as ts
    , '2021-12-01' as dt
;
  • 通過CTAS從其他表加載數(shù)據(jù)
# 創(chuàng)建內(nèi)部表
create table parquet_mngd 
using parquet 
location 'file:///tmp/parquet_dataset/*.parquet';

# 通過CTAS加載數(shù)據(jù)
create table hudi_ctas_cow_pt_tbl2 
using hudi 
location 'file:/tmp/hudi/hudi_tbl/' 
options (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
)
partitioned by (datestr) 
as 
select * from parquet_mngd
;

2.?插入數(shù)據(jù)

默認(rèn)情況下,如果提供了preCombineKey,則insert into的寫操作類型為upsert,否則使用insert。

2.1.?向非分區(qū)表插入數(shù)據(jù)

insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20;
insert into hudi_mor_tbl select 1, 'a1', 20, 1000;

2.2.?向分區(qū)表動(dòng)態(tài)分區(qū)插入數(shù)據(jù)

insert into hudi_cow_pt_tbl partition (dt, hh)
select 1 as id, 'a1' as name, 1000 as ts, '2021-12-09' as dt, '10' as hh;

2.3.?向分區(qū)表靜態(tài)分區(qū)插入數(shù)據(jù)

insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11') select 2, 'a2', 1000;

2.4.?使用bulk_insert插入數(shù)據(jù)

hudi支持使用bulk_insert作為寫操作的類型,只需要設(shè)置兩個(gè)配置:

hoodie.sql.bulk.insert.enable 和 hoodie.sql.insert.mode

-- 向指定preCombineKey的表插入數(shù)據(jù),則寫操作為upsert
insert into hudi_mor_tbl select 1, 'a1_1', 20, 1001;
select id, name, price, ts from hudi_mor_tbl;
1   a1_1    20.0    1001

-- 向指定preCombineKey的表插入數(shù)據(jù),指定寫操作為bulk_insert 
set hoodie.sql.bulk.insert.enable=true;
set hoodie.sql.insert.mode=non-strict;

insert into hudi_mor_tbl select 1, 'a1_2', 20, 1002;
select id, name, price, ts from hudi_mor_tbl;
1   a1_1    20.0    1001
1   a1_2    20.0    1002

3.?查詢數(shù)據(jù)

3.1.?查詢

select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0

3.2.?時(shí)間旅行查詢

Hudi從0.9.0開始就支持時(shí)間旅行查詢。Spark SQL方式要求Spark版本 3.2及以上。

-- 關(guān)閉前面開啟的bulk_insert
set hoodie.sql.bulk.insert.enable=false;

create table hudi_cow_pt_tbl1 (
  id bigint,
  name string,
  ts bigint,
  dt string,
  hh string
) using hudi
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl1';


-- 插入一條id為1的數(shù)據(jù)
insert into hudi_cow_pt_tbl1 select 1, 'a0', 1000, '2021-12-09', '10';
select * from hudi_cow_pt_tbl1;

-- 修改id為1的數(shù)據(jù)
insert into hudi_cow_pt_tbl1 select 1, 'a1', 1001, '2021-12-09', '10';
select * from hudi_cow_pt_tbl1;

-- 基于第一次提交時(shí)間進(jìn)行時(shí)間旅行
select * from hudi_cow_pt_tbl1 timestamp as of '20220307091628793' where id = 1;

-- 其他時(shí)間格式的時(shí)間旅行寫法
select * from hudi_cow_pt_tbl1 timestamp as of '2022-03-07 09:16:28.100' where id = 1;

select * from hudi_cow_pt_tbl1 timestamp as of '2022-03-08' where id = 1;

4.?更新數(shù)據(jù)

4.1.?update

更新操作需要指定preCombineField。

  • 語法
UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression]
  • 執(zhí)行更新
update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1;

update hudi_cow_pt_tbl1 set name = 'a1_1', ts = 1001 where id = 1;

-- update using non-PK field
update hudi_cow_pt_tbl1 set ts = 1111 where name = 'a1_1';

4.2.?MergeInto

  • 語法
MERGE INTO tableIdentifier AS target_alias
USING (sub_query | tableIdentifier) AS source_alias
ON <merge_condition>
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN NOT MATCHED [ AND <condition> ]  THEN <not_matched_action> ]

<merge_condition> =A equal bool condition 
<matched_action>  =
  DELETE  |
  UPDATE SET *  |
  UPDATE SET column1 = expression1 [, column2 = expression2 ...]
<not_matched_action>  =
  INSERT *  |
  INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])
  • 執(zhí)行案例
-- 1、準(zhǔn)備source表:非分區(qū)的hudi表,插入數(shù)據(jù)
create table merge_source (id int, name string, price double, ts bigint) using hudi
tblproperties (primaryKey = 'id', preCombineField = 'ts');
insert into merge_source values (1, "old_a1", 22.22, 2900), (2, "new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000);

merge into hudi_mor_tbl as target
using merge_source as source
on target.id = source.id
when matched then update set *
when not matched then insert *
;


-- 2、準(zhǔn)備source表:分區(qū)的parquet表,插入數(shù)據(jù)
create table merge_source2 (id int, name string, flag string, dt string, hh string) using parquet;
insert into merge_source2 values (1, "new_a1", 'update', '2021-12-09', '10'), (2, "new_a2", 'delete', '2021-12-09', '11'), (3, "new_a3", 'insert', '2021-12-09', '12');

merge into hudi_cow_pt_tbl1 as target
using (
  select id, name, '2000' as ts, flag, dt, hh from merge_source2
) source
on target.id = source.id
when matched and flag != 'delete' then
 update set id = source.id, name = source.name, ts = source.ts, dt = source.dt, hh = source.hh
when matched and flag = 'delete' then delete
when not matched then
 insert (id, name, ts, dt, hh) values(source.id, source.name, source.ts, source.dt, source.hh)
;

5.?刪除數(shù)據(jù)

  • 語法:
DELETE FROM tableIdentifier [ WHERE BOOL_EXPRESSION]
  • 案例:
delete from hudi_cow_nonpcf_tbl where uuid = 1;

delete from hudi_mor_tbl where id % 2 = 0;

-- 使用非主鍵字段刪除
delete from hudi_cow_pt_tbl1 where name = 'a1_1';

6.?覆蓋數(shù)據(jù)

  • 使用INSERT_OVERWRITE類型的寫操作覆蓋分區(qū)表
  • 使用INSERT_OVERWRITE_TABLE類型的寫操作插入覆蓋非分區(qū)表或分區(qū)表(動(dòng)態(tài)分區(qū))

1)insert overwrite 非分區(qū)表?

insert overwrite hudi_mor_tbl select 99, 'a99', 20.0, 900;
insert overwrite hudi_cow_nonpcf_tbl select 99, 'a99', 20.0;

2)通過動(dòng)態(tài)分區(qū)insert overwrite table到分區(qū)表

insert overwrite table hudi_cow_pt_tbl1 select 10, 'a10', 1100, '2021-12-09', '11';

3)通過靜態(tài)分區(qū)insert overwrite 分區(qū)表

insert overwrite hudi_cow_pt_tbl1 partition(dt = '2021-12-09', hh='12') select 13, 'a13', 1100;

7.?修改表結(jié)構(gòu)(Alter Table)

  • 語法:
-- Alter table name
ALTER TABLE oldTableName RENAME TO newTableName

-- Alter table add columns
ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*)

-- Alter table column type
ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType

-- Alter table properties
ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value')
  • 案例:
--rename to:
ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2;

--add column:
ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string);

--change column:
ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid int;

--set properties;
alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');

8.?修改分區(qū)

  • 語法:
-- Drop Partition
ALTER TABLE tableIdentifier DROP PARTITION ( partition_col_name = partition_col_val [ , ... ] )

-- Show Partitions
SHOW PARTITIONS tableIdentifier
  • 案例:
--show partition:
show partitions hudi_cow_pt_tbl1;

--drop partition:
alter table hudi_cow_pt_tbl1 drop partition (dt='2021-12-09', hh='10');
  • 注意:show partition結(jié)果是基于文件系統(tǒng)表路徑的。刪除整個(gè)分區(qū)數(shù)據(jù)或直接刪除某個(gè)分區(qū)目錄并不精確。

9.?存儲(chǔ)過程(Procedures)

  • 語法:
--Call procedure by positional arguments
CALL system.procedure_name(arg_1, arg_2, ... arg_n)

--Call procedure by named arguments
CALL system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1, ... arg_name_n => arg_n)
  • 案例(可用的存儲(chǔ)過程:Procedures | Apache Hudi):
--show commit's info
call show_commits(table => 'hudi_cow_pt_tbl1', limit => 10);

注:其他Hudi相關(guān)文章鏈接由此進(jìn) ->??Hudi文章匯總?文章來源地址http://www.zghlxwxcb.cn/news/detail-457069.html


到了這里,關(guān)于Hudi(7):Hudi集成Spark之spark-sql方式的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Hudi0.14.0 集成 Spark3.2.3(IDEA編碼方式)

    本次在IDEA下使用Scala語言進(jìn)行開發(fā),具體環(huán)境搭建查看文章 IDEA 下 Scala Maven 開發(fā)環(huán)境搭建。 1.1 添加maven依賴 創(chuàng)建Maven工程,pom文件:

    2024年01月24日
    瀏覽(22)
  • spark-sql

    [root@localhost bin]# ./spark-sql Error: Failed to load class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver. Failed to load main class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver. You need to build Spark with -Phive and -Phive-thriftserver. 24/02/22?00:23:20 INFO ShutdownHookManager: Shutdown hook called 24/02/22?00:23:20 INFO Shutd

    2024年02月22日
    瀏覽(16)
  • Spark-SQL小結(jié)

    Spark-SQL小結(jié)

    目錄 一、RDD、DataFrame、DataSet的概念、區(qū)別聯(lián)系、相互轉(zhuǎn)換操作 ? 1.RDD概念 ? 2.DataFrame概念 ? 3.DataSet概念 ? 4.RDD、DataFrame、DataSet的區(qū)別聯(lián)系 ? 5.RDD、DataFrame、DataSet的相互轉(zhuǎn)換操作 ? ?1 RDD-DataFrame、DataSet ? ?2? DataFrame-RDD,DataSet ? ?3 DataSet-RDD,DataFrame 二、Spark-SQL連接JDBC的方式

    2024年02月09日
    瀏覽(19)
  • Spark參數(shù)配置和調(diào)優(yōu),Spark-SQL、Config

    一、Hive-SQL / Spark-SQL參數(shù)配置和調(diào)優(yōu) 二、shell腳本spark-submit參數(shù)配置 三、sparkSession中配置參數(shù)

    2024年02月13日
    瀏覽(21)
  • spark-sql字段血緣實(shí)現(xiàn)

    spark-sql字段血緣實(shí)現(xiàn)

    Apache Spark是一個(gè)開源的大數(shù)據(jù)處理框架,它提供了一種高效、易于使用的方式來處理大規(guī)模數(shù)據(jù)集。在Spark中,數(shù)據(jù)是通過DataFrame和Dataset的形式進(jìn)行操作的,這些數(shù)據(jù)結(jié)構(gòu)包含了一系列的字段(也稱為列)。字段血緣是Spark中的一個(gè)關(guān)鍵概念,它幫助我們理解數(shù)據(jù)的來源和流

    2024年02月02日
    瀏覽(19)
  • spark集成hudi

    spark集成hudi

    啟動(dòng)spark-shell 2 hudi內(nèi)置數(shù)據(jù)生成器,生成10條json數(shù)據(jù) 3加載到DF,寫入hudi,實(shí)現(xiàn)簡(jiǎn)單etl處理 4讀取存儲(chǔ)數(shù)據(jù)及注冊(cè)臨時(shí)表

    2024年02月07日
    瀏覽(20)
  • Spark-SQL連接Hive的五種方法

    Spark-SQL連接Hive的五種方法

    若使用Spark內(nèi)嵌的Hive,直接使用即可,什么都不需要做(在實(shí)際生產(chǎn)活動(dòng)中,很少會(huì)使用這一模式) 步驟: 將Hive中conf/下的hive-site.xml拷貝到Spark的conf/目錄下; 把Mysql的驅(qū)動(dòng)copy到j(luò)ars/目錄下; 如果訪問不到hdfs,則將core-site.xml和hdfs-site.xml拷貝到conf/目錄下; 重啟spark-shell;

    2024年02月16日
    瀏覽(21)
  • spark-sql: insert overwrite分區(qū)表問題

    spark-sql: insert overwrite分區(qū)表問題

    用spark-sql,insert overwrite分區(qū)表時(shí)發(fā)現(xiàn)兩個(gè)比較麻煩的問題: 從目標(biāo)表select出來再insert overwrite目標(biāo)表時(shí)報(bào)錯(cuò):Error in query: Cannot overwrite a path that is also being read from. 從其他表select出來再insert overwrite目標(biāo)表時(shí),其他分區(qū)都被刪除了. 印象中這兩個(gè)問題也出現(xiàn)過,但憑經(jīng)驗(yàn)和感覺,

    2024年02月11日
    瀏覽(21)
  • 在 spark-sql / spark-shell / hive / beeline 中粘貼 sql、程序腳本時(shí)的常見錯(cuò)誤

    《大數(shù)據(jù)平臺(tái)架構(gòu)與原型實(shí)現(xiàn):數(shù)據(jù)中臺(tái)建設(shè)實(shí)戰(zhàn)》一書由博主歷時(shí)三年精心創(chuàng)作,現(xiàn)已通過知名IT圖書品牌電子工業(yè)出版社博文視點(diǎn)出版發(fā)行,點(diǎn)擊《重磅推薦:建大數(shù)據(jù)平臺(tái)太難了!給我發(fā)個(gè)工程原型吧!》了解圖書詳情,京東購(gòu)書鏈接:https://item.jd.com/12677623.html,掃描

    2024年02月14日
    瀏覽(21)
  • spark-sql處理json字符串的常用函數(shù)

    整理了spark-sql處理json字符串的幾個(gè)函數(shù): 1?get_json_object 解析不含數(shù)組的 json ? 2 from_json? 解析json 3 schema_of_json?提供生成json格式的方法 4 explode? ?把JSONArray轉(zhuǎn)為多行 get_json_object(string json_string, string path) :適合最外層為{}的json解析。 ?第一個(gè)參數(shù)是json對(duì)象變量,也就是含j

    2023年04月08日
    瀏覽(16)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包