一、同步環(huán)境
1.mongodb版本:3.6.3。(有點老了,后來發(fā)現(xiàn)flinkcdc都只能監(jiān)控一張表,多張表無法監(jiān)控)
2.datax版本:自己編譯的DataX-datax_v202210
3.hdfs版本:3.1.3
4.hive版本:3.1.2
二、同步思路
1.增量數(shù)據(jù):需要每隔1小時將mongodb中17個集合的數(shù)據(jù)同步至hive,因為有數(shù)據(jù)生成時間,才用datax查詢方式,將上一個小時的數(shù)據(jù)依次循環(huán)調(diào)用datax同步至hdfs,利用shell腳本和調(diào)度器定時裝載至hive中形成ods層,并和其他表關聯(lián)處理形成dwd層,提供給需求方。
2.全量數(shù)據(jù):歷史數(shù)據(jù)才用datax編寫腳本循環(huán)讀取+調(diào)度+hive動態(tài)分區(qū)方式同步至hive。因為hive動態(tài)分區(qū)默認只支持100個分區(qū),我是按小時進行分區(qū)的,因此我每次只拉取4天數(shù)據(jù),拉取太多報錯,編寫腳本,需要多少天,拉取多少天。(比較笨的方法,有更好的方式歡迎評論區(qū)討論)
三、datax配置
{
"job": {
"content": [
{
"reader": {
"name": "mongodbreader",
"parameter": {
"address": ["xxxxxxxx:27017"],
"authDb": "admin",
"userName": "xxxxx",
"userPassword": "xxxx",
"dbName": "xxxx",
"collectionName": "xxxx",
"column": [
{
"name": "_id",
"type": "string"
},
{
"name": "data",
"type": "string"
},
{
"name": "gid",
"type": "string"
},
{
"name": "text",
"type": "string"
},
{
"name": "time",
"type": "bigint"
},
{
"name": "uid",
"type": "string"
}
],
"query":"{\"time\":{ \"$gte\": ${start_time}, \"$lt\": ${end_time}}}"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "ask_id",
"type": "string"
},
{
"name": "data",
"type": "string"
},
{
"name": "gid",
"type": "string"
},
{
"name": "text",
"type": "string"
},
{
"name": "time",
"type": "string"
},
{
"name": "uid",
"type": "string"
}
],
"compress": "gzip",
"defaultFS": "xxxx:8020",
"fieldDelimiter": "\t",
"fileName": "xxxx",
"fileType": "text",
"path": "${targetdir}",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
這里面有兩個坑。
第一個:datax連接mongodb一定注意"authDb": “admin”,這個配置,要明確同步賬號認證庫的位置,賬號在那個庫里面認證的就寫哪個庫,由于mongodb每個庫是單獨認證的,一直報:
com.alibaba.datax.common.exception.DataXException: Code:[Framework-02], Description:[DataX引擎運行過程出錯,具體原因請參看DataX運行結束時的錯誤診斷信息 .]. - com.mongodb.MongoCommandException: Command failed with error 13: 'command count requires authentication' on server xxx:27117. The full response is { "ok" : 0.0, "errmsg" : "command count requires authentication", "code" : 13, "codeName" : "Unauthorized" }
找過很多資料,兩種方式解決賬號認證問題。一種是,剛才提到的指明賬號認證庫;第二種,就是同步哪個庫,單獨給這個賬號再授權一遍庫的權限,代碼如下:
db.createUser({user:"x x
x x x",pwd:"xxxxxx",roles:[{"role":"read","db":"xxxx"}]})
查詢同步不需要太高的權限,read即可
第二坑:mongodb的query查詢,用的是json語句,網(wǎng)上有大神分享的源碼分析,里面的查詢條件是“and”語句,也就是說,用逗號分隔的查詢條件是and,想用or要多次查詢(但是我測試十幾也不全是and,好像是同樣的字段以最后一條為準,留著后面再研究班),哎,沒辦法,誰讓我懶得自己寫代碼,湊合著用吧。分享query查詢語句多個條件的用法:
"query":"{\"time\":{ \"$gte\": 1646064000, \"$lte\": 1648742399},\"time\":{ \"$gte\": 1654012800, \"$lte\": 1656604799},\"time\":{ \"$gte\": 1661961600, \"$lte\": 1664553599}}"
四、datax同步調(diào)度腳本
#!/bin/bash
# 定義變量方便修改
APP=xxx
TABLE=xxx
DATAX_HOME=xxxx
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一小時
do_date=2022111416
hr1=${do_date: 8: 2}
date1=${do_date: 0: 8}
hdfs_path=xxx
#處理目標路徑,此處的處理邏輯是,如果目標路徑不存在,則創(chuàng)建;若存在,則清空,目的是保證同步任務可重復執(zhí)行
hadoop fs -test -e $hdfs_path
if [[ $? -eq 1 ]]; then
echo "路徑 $hdfs_path 不存在,正在創(chuàng)建......"
hadoop fs -mkdir -p $hdfs_path
else
echo "路徑 $hdfs_path 已經(jīng)存在"
fs_count=$(hadoop fs -count $hdfs_path)
content_size=$(echo $fs_count | awk '{print $3}')
if [[ $content_size -eq 0 ]]; then
echo "路徑$hdfs_path為空"
else
echo "路徑$hdfs_path不為空,正在清空......"
hadoop fs -rm -r -f $hdfs_path/*
fi
fi
#數(shù)據(jù)同步
for i in xxx xxx xxx
do
echo ================== $i 裝載日期為 $do_date ==================
python $DATAX_HOME/bin/datax.py -p"-Dcollection=$i -Dtargetdir=$hdfs_path" $DATAX_HOME/xxx
done
五、datax同步至es 配置
mongodb同步至es有一個專用的組件,monstache;知道,但還沒用過,留白,由于時間緊張用的datax,此處三個注意點:
1.object格式可以datax讀取的時候可用string,導入es再改回object
2.es重名沒問題
3.想用es中文分詞統(tǒng)計詞頻,除了要配置中文ik,也需要filedata=true;文章來源:http://www.zghlxwxcb.cn/news/detail-421879.html
{
"job": {
"content": [
{
"reader": {
"name": "mongodbreader",
"parameter": {
"address": ["xxxx:27017"],
"userName": "xxx",
"authDb": "xxx",
"userPassword": "xxxx",
"dbName": "xxxx",
"collectionName": "${collection}",
"column": [
{
"name": "_id",
"type": "string" #原有格式為objectid,用此處用string
},
{
"name": "data",
"type": "string" #原有格式為list(object),用string可以倒進去
},
{
"name": "gid",
"type": "string"
},
{
"name": "text",
"type": "string"
},
{
"name": "time",
"type": "bigint"
},
{
"name": "uid",
"type": "string"
},
{
"name": "deleted",
"type": "bigint"
}
],
"query":"{\"time\":{ \"$gte\": 1661961600, \"$lte\": 1664553599}}"
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "xxxxxx:9200",
"index": "xxxx",
"type": "xxxx",
"cleanup": false,
"settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}},
"discovery": false,
"batchSize": 2048,
"splitter": ",",
"column": [
{
"name": "_id",
"type": "id"
},
{
"name": "data",
"type": "object" #源數(shù)據(jù)為object,此處也為object
},
{
"name": "gid",
"type": "keyword"
},
{
"name": "text",#即使和關鍵詞重名也不影響,挺好
"type": "text","analyzer": "ik_smart"
},#此處想用es分詞,來統(tǒng)計詞頻的小伙伴建議開啟filedata:true,不知道能不能用哈,反正我知道不開啟,不能用,有興趣可以研究下,告訴我
{
"name": "time",
"type": "long"
},
{
"name": "uid",
"type": "keyword"
},
{
"name": "deleted",
"type": "long"
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 4
}
}
}
}
六、其他問題
其他就比較簡單了,懶得記了,后面有問題再補充文章來源地址http://www.zghlxwxcb.cn/news/detail-421879.html
到了這里,關于datax 同步mongodb數(shù)據(jù)庫到hive(hdfs)和elasticserch(es)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!