1、在MySQL中建表
-
建表語句
-
CREATE TABLE `user` ( `id` int(11) NOT NULL, `name` varchar(255) DEFAULT NULL, `age` varchar(255) DEFAULT NULL, `create_date` datetime DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-
-
插入數(shù)據(jù)
-
INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (1, '小明1', '22','2023-06-02 11:26:04'); INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (2, '小明2', '22','2023-06-02 11:26:04'); INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (3, '小明3', '22','2023-06-02 11:26:04'); INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (4, '小明4', '22','2023-06-02 11:26:04'); INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (5, '小明5', '23','2023-06-02 11:26:04'); INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (6, '小明6', '23','2023-06-02 11:26:05'); INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (7, '小明7', '23','2023-06-02 11:26:05'); INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (8, '小明8', '23','2023-06-02 11:26:05'); INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (9, '小明9', '23','2023-06-02 11:26:05');
-
2、在ES建立索引
-
建立索引語句
-
我這里使用Kibana工具連接ES進行操作的,也可以使用Postman進行操作
-
Kibana操作語句
# 創(chuàng)建索引 PUT /user { "mappings" : { "properties" : { "id" : { "type" : "keyword" }, "name" : { "type" : "text" }, "age" : { "type" : "keyword" }, "create_date" : { "type" : "date", "format": "yyyy-MM-dd HH:mm:ss" } } } }
-
Postman操作語句
- 地址輸入
http://localhost:9200/user
-
Json文本輸入
{
"mappings" : {
"properties" : {
"id" : {
"type" : "keyword"
},
"name" : {
"type" : "text"
},
"age" : {
"type" : "keyword"
},
"create_date" : {
"type" : "date",
"format": "yyyy-MM-dd HH:mm:ss"
}
}
}
}
- 當(dāng)出現(xiàn)以下信息代表創(chuàng)建索引成功
{ "acknowledged": true, "shards_acknowledged": true, "index": "user" }
3、構(gòu)建從MySQL到ES的Datax的Json任務(wù)
[root@hadoop101 ~]# vim mysql2es.json
# 添加以下內(nèi)容
{
"job": {
"setting": {
"speed": {
"channel": 8
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"id",
"name",
"age",
"date_format(create_date,'%Y-%m-%d %H:%I:%s')"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://192.168.xx.xxx:3306/bigdata"
],
"table": [
"user"
]
}
],
"password": "xxxxxx",
"username": "root",
"where": "",
"splitPk": "id"
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://192.168.xx.xxx:9200",
"accessId": "root",
"accessKey": "root",
"index": "user",
"type": "_doc",
"settings": {"index" :{"number_of_shards": 5, "number_of_replicas": 1}},
"batchSize": 5000,
"splitter": ",",
"column": [
{
"name": "pk",
"type": "id"
},
{
"name": "id",
"type": "keyword"
},
{
"name": "name",
"type": "text"
},
{
"name": "age",
"type": "keyword"
},
{
"name": "create_date",
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
}
]
}
}
}
]
}
}
-
參數(shù)介紹
- reader:datax的source(來源)端
- reader.cloumn::讀取mysql的字段名
- reader.connection.jdbcUrl:MySQL連接的url
- reader.connection.table:讀取MySQL的表名
- reader.password:連接MySQL的用戶名
- reader.username:連接MySQL的密碼
- reader.where:讀取MySQL的過濾條件
- reader.splitPk:讀取MySQL時按照哪個字段進行切分
- writer:datax的sink(去處)端
- writer.endpoint:ElasticSearch的連接地址
- writer.accessId:http auth中的user
- writer.accessKey:http auth中的password
注意:假如Elasticsearch沒有設(shè)置用戶名,也需要給accessId和accessKey值,不然就報錯了,可以給賦值root,root
-
writer.index:Elasticsearch中的index名
-
writer.type:Elasticsearch中index的type名
-
writer.settings:創(chuàng)建index時候的settings, 與Elasticsearch官方相同
-
writer.batchSize:每次批量數(shù)據(jù)的條數(shù)
-
writer.splitter:如果插入數(shù)據(jù)是array,就使用指定分隔符
-
writer.column:Elasticsearch所支持的字段類型
- 下面是column樣例字段類型
"column": [ # 使用數(shù)據(jù)庫id作為es中記錄的_id,業(yè)務(wù)主鍵(pk):_id 的值指定為某一個字段。 {"name": "pk", "type": "id"}, { "name": "col_ip","type": "ip" }, { "name": "col_double","type": "double" }, { "name": "col_long","type": "long" }, { "name": "col_integer","type": "integer" }, { "name": "col_keyword", "type": "keyword" }, { "name": "col_text", "type": "text", "analyzer": "ik_max_word"}, { "name": "col_geo_point", "type": "geo_point" }, # ES日期字段創(chuàng)建需指定格式 yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis { "name": "col_date", "type": "date", "format": "yyyy-MM-dd HH:mm:ss"}, { "name": "col_nested1", "type": "nested" }, { "name": "col_nested2", "type": "nested" }, { "name": "col_object1", "type": "object" }, { "name": "col_object2", "type": "object" }, { "name": "col_integer_array", "type":"integer", "array":true}, { "name": "col_geo_shape", "type":"geo_shape", "tree": "quadtree", "precision": "10m"} ]
-
如果時間類型需要精細(xì)到y(tǒng)yyy-MM-dd HH:mm:ss.SSSSSS,比如時間為:2023-06-02 13:39:57.000000
- MySQL的cloumn端填寫
"date_format(create_date,'%Y-%m-%d %H:%I:%s.%f')"
- Elasticsearch的cloumn填寫
{ "name": "create_date", "type": "text"}
- Elasticsearch創(chuàng)建索引時修改如下
{ "name": "create_date","type": "text"}
4、運行mysql2es.json腳本
-
問題1: ConfigParser - 插件[mysqlreader,elasticsearchwriter]加載失敗
-
解決問題:
下面是編譯好的elasticsearchwriter插件,放在DATAX_HOME/plugin/writer目錄下面,就可以運行成功了,也可以自己從官網(wǎng)下載,然后自己編譯好也可以用的
- elasticsearchwriter百度網(wǎng)盤資源下載鏈接:
鏈接:https://pan.baidu.com/s/1C_OeXWf_t5iVNiLquvEhjw 提取碼:tutu
-
問題2:在運行從MySQL抽取500萬條數(shù)據(jù)到Elasticsearch時,出現(xiàn)了datax傳輸200多萬條數(shù)據(jù)卡住的情況,報錯日志:I/O Exception … Broken pipe (write failed)
-
解決問題:
datax傳輸mysql到es卡在200多萬這個問題和channel: 8和es的oom有一定的關(guān)系
-
測試樣例
-
1、es batchsize為5000,不開啟mysql切分"splitPk": “id”,卡住
-
2、es batchsize為1000,不開啟mysql切分"splitPk": “id”,不卡住
-
3、es batchsize為5000,開啟mysql切分"splitPk": “id”,不卡住
-
-
總結(jié):這個問題和mysql讀取速率,es 寫入速率有關(guān),開啟切分提高一下讀取速率就不會卡住了
-
成功運行截圖:
文章來源:http://www.zghlxwxcb.cn/news/detail-540743.html
以下是工作中做過的ETL,如有需要,可以私信溝通交流,互相學(xué)習(xí),一起進步
文章來源地址http://www.zghlxwxcb.cn/news/detail-540743.html
到了這里,關(guān)于Datax同步MySQL到ES的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!