將Parquet文件的數(shù)據(jù)導(dǎo)入Hive
查詢(xún)parquet文件格式
主要利用社區(qū)工具 https://github.com/apache/parquet-mr/
編譯cli工具
cd parquet-cli;
mvn clean install -DskipTests;
查看元數(shù)據(jù)信息
java -cp parquet-cli-1.13.1.jar;dependency/* org.apache.parquet.cli.Main meta yellow_tripdata_2023-03.parquet
查詢(xún)抽樣數(shù)據(jù)
java -cp parquet-cli-1.13.1.jar;dependency/* org.apache.parquet.cli.Main head -n 2 yellow_tripdata_2023-03.parquet
{"VendorID": 2, "tpep_pickup_datetime": 1677629203000000, "tpep_dropoff_datetime": 1677629803000000, "passenger_count": 1, "trip_distance": 0.0, "RatecodeID": 1, "store_and_fwd_flag": "N", "PULocationID": 238, "DOLocationID": 42, "payment_type": 2, "fare_amount": 8.6, "extra": 1.0, "mta_tax": 0.5, "tip_amount": 0.0, "tolls_amount": 0.0, "improvement_surcharge": 1.0, "total_amount": 11.1, "congestion_surcharge": 0.0, "Airport_fee": 0.0}
{"VendorID": 2, "tpep_pickup_datetime": 1677629305000000, "tpep_dropoff_datetime": 1677631170000000, "passenger_count": 2, "trip_distance": 12.4, "RatecodeID": 1, "store_and_fwd_flag": "N", "PULocationID": 138, "DOLocationID": 231, "payment_type": 1, "fare_amount": 52.7, "extra": 6.0, "mta_tax": 0.5, "tip_amount": 12.54, "tolls_amount": 0.0, "improvement_surcharge": 1.0, "total_amount": 76.49, "congestion_surcharge": 2.5, "Airport_fee": 1.25}
parquet 和 hive 的 field 類(lèi)型映射關(guān)系文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-476257.html
parquet 字段類(lèi)型 | hive 字段類(lèi)型 |
---|---|
BINARY | STRING |
BOOLEAN | BOOLEAN |
DOUBLE | DOUBLE |
FLOAT | FLOAT |
INT32 | INT |
INT64 | BIGINT |
INT96 | TIMESTAMP |
BINARY + OriginalType UTF8 | STRING |
BINARY + OriginalType DECIMAL | DECIMAL |
創(chuàng)建hive表 數(shù)據(jù)存儲(chǔ)格式采用parquet
# 創(chuàng)建以parquet存儲(chǔ)的表
CREATE TABLE `test_trino.yellow_taxi_trip_records_tmp`
(
`VendorID` int COMMENT '儀表供應(yīng)商ID',
`tpep_pickup_datetime` TIMESTAMP COMMENT '儀表啟動(dòng)時(shí)間',
`tpep_dropoff_datetime` TIMESTAMP COMMENT '儀表關(guān)閉時(shí)間',
`passenger_count` bigint COMMENT '乘客數(shù)量',
`trip_distance` double COMMENT '行程距離',
`RateCodeID` bigint COMMENT '費(fèi)率編碼',
`store_and_fwd_flag` string COMMENT '是否存儲(chǔ)',
`PULocationID` bigint COMMENT '上車(chē)區(qū)域坐標(biāo)',
`DOLocationID` bigint COMMENT '下場(chǎng)區(qū)域坐標(biāo)',
`payment_type` bigint COMMENT '付款方式',
`fare_amount` double COMMENT '票價(jià)',
`extra` double COMMENT '雜費(fèi)附加費(fèi)',
`mta_tax` double COMMENT '稅費(fèi)',
`tip_amount` double COMMENT '小費(fèi)',
`tolls_amount` double COMMENT '過(guò)路費(fèi)',
`improvement_surcharge` double COMMENT '改善附加費(fèi)',
`total_amount` double COMMENT '費(fèi)用總計(jì),不包含現(xiàn)金小費(fèi)',
`congestion_surcharge` double COMMENT '擁堵費(fèi)',
`airport_fee` double COMMENT '機(jī)房上下車(chē)費(fèi)用'
)
COMMENT '黃色的出租車(chē)記錄'
PARTITIONED BY (
`ym` string COMMENT '分區(qū)字段,年月(yyyyMM)')
STORED AS PARQUET;
加載文件
# 利用hive客戶(hù)端load parquet數(shù)據(jù)
LOAD DATA LOCAL INPATH '/opt/yellow_tripdata_2023-02.parquet' OVERWRITE INTO TABLE `test_trino.yellow_taxi_trip_records_tmp` PARTITION (ym=202302);
將json數(shù)據(jù)導(dǎo)入ES
ES批量導(dǎo)入api
批量寫(xiě)入es需要使用bulk api,這個(gè)API支持json文件的數(shù)據(jù)導(dǎo)入。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-476257.html
原始json文件內(nèi)容
{"geonameid": 2986043, "name": "Pic de Font Blanca", "latitude": 42.64991, "longitude": 1.53335, "country_code": "AD", "population": 0}
{"geonameid": 2994701, "name": "Roc Mélé", "latitude": 42.58765, "longitude": 1.74028, "country_code": "AD", "population": 0}
{"geonameid": 3007683, "name": "Pic des Langounelles", "latitude": 42.61203, "longitude": 1.47364, "country_code": "AD", "population": 0}
{"geonameid": 3017832, "name": "Pic de les Abelletes", "latitude": 42.52535, "longitude": 1.73343, "country_code": "AD", "population": 0}
{"geonameid": 3017833, "name": "Estany de les Abelletes", "latitude": 42.52915, "longitude": 1.73362, "country_code": "AD", "population": 0}
{"geonameid": 3023203, "name": "Port Vieux de la Coume d’Ose", "latitude": 42.62568, "longitude": 1.61823, "country_code": "AD", "population": 0}
{"geonameid": 3029315, "name": "Port de la Cabanette", "latitude": 42.6, "longitude": 1.73333, "country_code": "AD", "population": 0}
{"geonameid": 3034945, "name": "Port Dret", "latitude": 42.60172, "longitude": 1.45562, "country_code": "AD", "population": 0}
{"geonameid": 3038814, "name": "Costa de Xurius", "latitude": 42.50692, "longitude": 1.47569, "country_code": "AD", "population": 0}
{"geonameid": 3038815, "name": "Font de la Xona", "latitude": 42.55003, "longitude": 1.44986, "country_code": "AD", "population": 0}
{"geonameid": 3038816, "name": "Xixerella", "latitude": 42.55327, "longitude": 1.48736, "country_code": "AD", "population": 0}
{"geonameid": 3038818, "name": "Riu Xic", "latitude": 42.57165, "longitude": 1.67554, "country_code": "AD", "population": 0}
{"geonameid": 3038819, "name": "Pas del Xic", "latitude": 42.49766, "longitude": 1.57597, "country_code": "AD", "population": 0}
{"geonameid": 3038820, "name": "Roc del Xeig", "latitude": 42.56068, "longitude": 1.4898, "country_code": "AD", "population": 0}
索引結(jié)構(gòu)
PUT allcountries
{
"settings": {
"index.number_of_replicas": 0
},
"mappings": {
"_doc":{
"dynamic": "strict",
"properties": {
"geonameid": {
"type": "long"
},
"name": {
"type": "text"
},
"latitude": {
"type": "double"
},
"longitude": {
"type": "double"
},
"country_code": {
"type": "text"
},
"population": {
"type": "long"
}
}
}
}
}
重組json腳本
# coding=UTF-8
# 將原始josn重組出適合ES bulk API導(dǎo)入的JSON數(shù)據(jù)
import json
import os
import io
current_path = os.path.dirname(__file__)
#w打開(kāi)一個(gè)文件只用于寫(xiě)入,r用于只讀
#如果該文件已存在則打開(kāi)文件,并從開(kāi)頭開(kāi)始編輯,即原有內(nèi)容會(huì)被刪除
#如果該文件不存在,創(chuàng)建新文件
new_jsonfile = io.open(current_path+'/es-test-bulk.json','w',encoding='utf-8')
with io.open(current_path+'/es-test.json','r',encoding='utf-8')as fp:
for line in fp.readlines():
json_data=json.loads(line)
#添加index行
new_data={}
new_data['index']={}
new_data['index']['_index']="allCountries"
temp=json.dumps(new_data).encode("utf-8").decode('unicode_escape')
new_jsonfile.write(temp)
new_jsonfile.write('\n'.decode('utf-8'))
#原json對(duì)象處理為1行
old_data={}
old_data['geonameid']=json_data['geonameid']
old_data['name']=json_data['name']
old_data['latitude']=json_data['latitude']
old_data['longitude']=json_data['longitude']
old_data['country_code']=json_data['country_code']
old_data['population']=json_data['population']
temp=json.dumps(old_data).encode("utf-8").decode('unicode_escape')
new_jsonfile.write(temp)
new_jsonfile.write('\n'.decode('utf-8'))
new_jsonfile.close()
重組后的json文件
{"index": {"_index": "allcountries"}}
{"name": "El Barrerol", "geonameid": 3040809, "longitude": 1.45207, "country_code": "AD", "latitude": 42.439579999999999, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Camí d’Easagents", "geonameid": 3040810, "longitude": 1.61341, "country_code": "AD", "latitude": 42.53349, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Pleta de Duedra", "geonameid": 3040811, "longitude": 1.4949399999999999, "country_code": "AD", "latitude": 42.625540000000001, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Pleta de Duedra", "geonameid": 3040812, "longitude": 1.5637000000000001, "country_code": "AD", "latitude": 42.61985, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Plana Duedra", "geonameid": 3040813, "longitude": 1.5228900000000001, "country_code": "AD", "latitude": 42.59393, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Planella del Duc", "geonameid": 3040814, "longitude": 1.4995700000000001, "country_code": "AD", "latitude": 42.456490000000002, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Canal del Duc", "geonameid": 3040815, "longitude": 1.6195600000000001, "country_code": "AD", "latitude": 42.576920000000001, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Canal Dreta", "geonameid": 3040816, "longitude": 1.5381, "country_code": "AD", "latitude": 42.551319999999997, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Canal Dreta", "geonameid": 3040817, "longitude": 1.4865900000000001, "country_code": "AD", "latitude": 42.506630000000001, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Port Dret", "geonameid": 3040818, "longitude": 1.7001299999999999, "country_code": "AD", "latitude": 42.573979999999999, "population": 0}
bulk api調(diào)用
curl -H "Content-Type: application/x-ndjson" -XPOST "192.168.1.1:9600/allcountries/_doc/_bulk" --data-binary @"/opt/es-documents-bulk.json"
到了這里,關(guān)于將Parquet文件的數(shù)據(jù)導(dǎo)入Hive 、JSON文件導(dǎo)入ES的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!