這篇文章主要介紹了如何使用Logstash同步mysql數(shù)據(jù)到Elasticsearch(親自踩坑),如果幫助到了大家,希望用你毛茸茸的小手點個贊??;如有錯誤或未考慮周全的地方,希望在評論區(qū)留言??
Logstash官方文檔提供了解決方案
一. 安裝Logstash
- Logstash下載地址
下載版本一定要和Elasticsearch版本一致(如Elasticsearch版本為7.10.2,那么對應下載的Logstash版本也要選擇7.10.2 ! ! ! 說多了都是淚????)下邊是我們公司使用的ES版本,因此Logstash版本也要選擇7.10.2
- 下載MySQL依賴包下載地址(作為java程序員,我相信你肯定不需要??)
恭喜你??????,到這你已經(jīng)成功1/4啦,接下來我們要去創(chuàng)建ES索引以及MySQL表啦,加油??
二.創(chuàng)建ES索引以及MySQL表
ps:創(chuàng)建過程略過…當然,既然你能搜到這篇文章,說明你也已經(jīng)創(chuàng)建過了,那么,請直接跳到第三步
- ES索引結構
- MySQL表結構
mysql原始數(shù)據(jù)
恭喜你??????,到這你已經(jīng)成功2/4啦,接下來我們要去配置Logstash啦,加油??
三. 配置Logstash
- 進入到 /logstash-7.10.2/bin目錄下,創(chuàng)建 testlogstash.conf 文件,這是具體的配置
input {
stdin {}
jdbc {
#===================這是你需要修改的第1個地方===============#
#數(shù)據(jù)庫連接地址
jdbc_connection_string => "jdbc:mysql://localhost:3306/user_center?characterEncoding=UTF-8&autoReconnect=true"
jdbc_user => "root"
jdbc_password => "root"
# MySQL依賴包路徑;
jdbc_driver_library => "/Users/chencunyou/Desktop/java/mysql/mysql-connector-java-8.0.27.jar"
#查詢數(shù)據(jù)庫語句(查詢出的字段名要和es索引的字段名相同)
statement => "SELECT id,nick_name AS nickName,sex,birth_date AS birthDate,create_time AS createTime,update_time AS updateTime
FROM sys_user"
# 需要記錄的字段,用于增量同步,需是數(shù)據(jù)庫字段
tracking_column => "updateTime"
# record_last_run上次數(shù)據(jù)存放位置;
last_run_metadata_path => "/Users/chencunyou/Desktop/java/logstash-8.9.2/data/last_time.txt"
#====================第1個地方修改結束=====================#
type => "jdbc"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# 數(shù)據(jù)庫重連嘗試次數(shù)
connection_retry_attempts => "3"
# 判斷數(shù)據(jù)庫連接是否可用,默認false不開啟
jdbc_validate_connection => "true"
# 數(shù)據(jù)庫連接可用校驗超時時間,默認3600S
jdbc_validation_timeout => "3600"
# 開啟分頁查詢(默認false不開啟);
jdbc_paging_enabled => "true"
# 單次分頁查詢條數(shù)(默認100000,若字段較多且更新頻率較高,建議調(diào)低此值);
jdbc_page_size => "500"
# statement為查詢數(shù)據(jù)sql,如果sql較復雜,建議配通過statement_filepath配置sql文件的存放路徑;
# sql_last_value為內(nèi)置的變量,存放上次查詢結果中最后一條數(shù)據(jù)tracking_column的值,此處即為ModifyTime;
# statement_filepath => "mysql/jdbc.sql"
# 是否將字段名轉換為小寫,默認true(如果有數(shù)據(jù)序列化、反序列化需求,建議改為false);
lowercase_column_names => false
# Value can be any of: fatal,error,warn,info,debug,默認info;
sql_log_level => warn
#
# 是否記錄上次執(zhí)行結果,true表示會將上次執(zhí)行結果的tracking_column字段的值保存到last_run_metadata_path指定的文件中;
record_last_run => true
# 需要記錄查詢結果某字段的值時,此字段為true,否則默認tracking_column為timestamp的值;
use_column_value => true
# Value can be any of: numeric,timestamp,Default value is "numeric"
tracking_column_type => timestamp
# 是否清除last_run_metadata_path的記錄,需要增量同步時此字段必須為false;
clean_run => false
#
# 同步頻率(分 時 天 月 年),默認每分鐘同步一次;
schedule => "* * * * *"
}
}
output {
elasticsearch {
#===================這是你需要修改的第2個地方===============#
# host => "localhost"
# port => "9200"
# 配置ES集群地址
hosts => ["127.0.0.1:9200"]
user => "admin"
password => "admin"
# 啟用SSL/TLS加密(前提是開啟了HTTPS訪問)
ssl => true
# 禁用證書驗證(前提是開啟了HTTPS訪問)
ssl_certificate_verification => false
# 證書地址(如果啟用SSL/TLS加密的話,即使禁用證書驗證,也需要配置否則會報錯)
#cacert => "/Users/chencunyou/Desktop/java/logstash-7.10.2/CloudSearchService.cer"
# 你的索引名字,必須小寫
index => "sys_user_dev"
# 數(shù)據(jù)唯一索引(建議使用數(shù)據(jù)庫表唯一索引id)
document_id => "%{id}"
#====================第2個地方修改結束=====================#
}
stdout {
codec => json_lines
}
}
- SSL證書下載
如果你是買的云服務器廠商的ES服務,去實例基本信息下載即可。我們公司用的華為云,我以華為云為例,進到"云搜索服務CSS",找到ES實例,然后下載證書
恭喜你??????,到這你已經(jīng)成功3/4啦,接下來進入到最重要,同時99%會報錯的最后一步,加油??
四. 同步數(shù)據(jù)
-
進入到 bin 目錄
-
執(zhí)行腳本同步數(shù)據(jù)
./logstash -f testlogstash.conf
- 執(zhí)行結果(??恭喜你,同步失敗,當然幸運的你也可能不會碰到此問題)
Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>"369539918421758535", :_index=>"sys_user_dev", :routing=>nil, :_type=>"_doc"}, #<LogStash::Event:0x41528007>], :response=>{"index"=>{"_index"=>"sys_user_dev", "_type"=>"_doc", "_id"=>"369539918421758535", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [sex] of type [integer] in document with id '369539918421758535'. Preview of field's value: 'false'", "caused_by"=>{"type"=>"json_parse_exception", "reason"=>"Current token (VALUE_FALSE) not numeric, can not use numeric value accessors\n at [Source: (byte[])\"{\"type\":\"jdbc\",\"createTime\":\"2023-09-15 20:22:47\",\"id\":369539918421758535,\"nickName\":\"易烊千璽\",\"@timestamp\":\"2023-09-15T14:23:01.583Z\",\"updateTime\":\"2023-09-15 20:23:03\",\"@version\":\"1\",\"sex\":false,\"birthDate\":\"2000-09-15\"}\"; line: 1, column: 202]"}}}}}
??原因:數(shù)據(jù)庫表中的sex字段,字段類型為tinyint,Logstash會轉成boolean,但是ES中設置的也是int類型,所以你懂的
解決方案: 修改 testlogstash.conf 文件中的mysql連接地址,指定 tinyInt1isBit=false
jdbc_connection_string => "jdbc:mysql://localhost:3306/user_center?characterEncoding=UTF-8&autoReconnect=true&tinyInt1isBit=false"
- 再次執(zhí)行命令(發(fā)現(xiàn)還是報錯??,當然幸運的你也可能不會碰到此問題)
Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>"369539918421758535", :_index=>"sys_user_dev", :routing=>nil, :_type=>"_doc"}, #<LogStash::Event:0x4f58d64c>], :response=>{"index"=>{"_index"=>"sys_user_dev", "_type"=>"_doc", "_id"=>"369539918421758535", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [birthDate] of type [date] in document with id '369539918421758535'. Preview of field's value: '2000-09-14T16:00:00.000Z'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to parse date field [2000-09-14T16:00:00.000Z] with format [yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis]", "caused_by"=>{"type"=>"date_time_parse_exception", "reason"=>"Failed to parse with all enclosed parsers"}}}}}}
??原因:mysql 中的字段 ‘birth_date’ 數(shù)據(jù)格式為 2000-09-15,通過LogStash導數(shù)據(jù)時 格式變?yōu)?2000-09-14T16:00:00.000Z而ES索引類型要求的格式為 :yyyy-MM-dd所以報錯;createTime、updateTime同理
解決方案:將testlogstash.conf 文件中的sql語句中的birthDate、createTime、updateTime使用 Data_Format操作
SELECT id,nick_name AS nickName,sex,
DATE_FORMAT(birth_date,'%Y-%m-%d') AS birthDate,
DATE_FORMAT(create_time,'%Y-%m-%d %H:%i:%S') AS createTime,
DATE_FORMAT(update_time,'%Y-%m-%d %H:%i:%S') AS updateTime
FROM sys_user
- 同步成功
恭喜你??????,到這你已經(jīng)成功啦
四. 總結
- 啟動容易報錯的地方
??如果你的ES集群開啟了HTTPS訪問,需要注意在 testlogstash.conf 文件配置SSL相關配置
- 同步數(shù)據(jù)時容易出錯的2個地方
??數(shù)據(jù)庫字段類型為tinyint,需要在mysql連接地址配置 tinyInt1isBit=false
??時間轉換問題,sql需要使用 Data_Format 格式化文章來源:http://www.zghlxwxcb.cn/news/detail-844951.html
- 如果還是報錯
??請用idea打開testlogstash.conf 文件,重新對齊一下格式,保存后再執(zhí)行腳本同步文章來源地址http://www.zghlxwxcb.cn/news/detail-844951.html
- 完結撒花????????????
到了這里,關于使用Logstash同步mysql數(shù)據(jù)到Elasticsearch(親自踩坑)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!