最近因為一些原因被迫學(xué)習(xí)了一周多的ElasticSearch,記錄一下遇到的小問題。
配置.conf文件
此文件我理解為是Logstash的 *可編譯文件 *,我們通過編寫此文件然后運行l(wèi)ogstash去編譯執(zhí)行來讓我們的數(shù)據(jù)按照自身期望的去傳輸。該文件主要包括input、filter和output三個部分,其中input和output是必要的,filter根據(jù)自身情況選擇使用。
1.輸入源 input
數(shù)據(jù)的來源,因為此處我們要同步MySQL的數(shù)據(jù)到ES,所以要用到j(luò)dbc插件去連接MySQL。
input {
jdbc {
// jdbc驅(qū)動包的路徑 /logstash-8.2.3/logstash-core/lib/jars/mysql-connector-java-8.0.22.jar
jdbc_driver_library => ""
jdbc_driver_class => "com.mysql.cj.jdbc.Driver" //jdbc驅(qū)動類
jdbc_connection_string => "jdbc:mysql://host:3306/DB" //mysql數(shù)據(jù)庫地址
jdbc_user => "root" //mysql數(shù)據(jù)庫用戶名
jdbc_password => "password" //mysql數(shù)據(jù)庫密碼
jdbc_paging_enabled => true //開啟分頁功能
tracking_column => "unix_ts_in_secs" //記錄unix_ts_in_secs (unix時間戳)
use_column_value => true //需要記錄查詢結(jié)果某字段的值
tracking_column_type => "numeric" //跟蹤字段類型 numeric為數(shù)字類型
schedule => "*/3 * * * * *" //cron表達(dá)式 ,定時執(zhí)行
//add_field添加字段 字段數(shù)據(jù)為“tb_type”
add_field => { "[@metadata][type]" => "tb_type" } //[@metadata]為管道內(nèi)臨時字段,不會被輸出
//SQL文,從tb表中提取數(shù)據(jù)
statement => "SELECT *, UNIX_TIMESTAMP(update_time) AS unix_ts_in_secs FROM tb WHERE (UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"
}
}
2.過濾器 filter
mutate插件對input提取到的數(shù)據(jù)進(jìn)行加工修正。
filter {
mutate {
copy => { "id" => "[@metadata][_id]"} //復(fù)制id字段數(shù)據(jù)至[@metadata][_id]字段
remove_field => ["id", "@version", "unix_ts_in_secs"] //移除"id", "@version", "unix_ts_in_secs" 字段
}
}
3.輸出源 output
將上面的到的數(shù)據(jù)輸出到指定位置,這里用elasticsearch插件,即將數(shù)據(jù)保存到ES庫中。
output {
elasticsearch {
index => "tb_sync" // 將數(shù)據(jù)同步到tb_sync索引里
hosts => ["elasticsearch_host:port"] //ES庫的主機地址,端口號一般為9200
document_id => "%{[@metadata][_id]}" //文檔ID為[@metadata][_id],即原來的id字段的值
}
}
4.多表多索引
多個表的數(shù)據(jù)分別同步到不同的索引中。
//sync.conf
/* 利用[@metadata][type]字段做判斷,使同一張表內(nèi)的數(shù)據(jù)得到相同的加工并輸出到同一索引*/
input {
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://123.67.92.210:3306/database"
jdbc_user => "root"
jdbc_password => "123456"
jdbc_paging_enabled => true
tracking_column => "unix_ts_in_secs"
use_column_value => true
add_field => { "[@metadata][type]" => "first_info" } //增加[@metadata][type]字段,字段數(shù)據(jù)設(shè)為first_info
tracking_column_type => "numeric"
schedule => "*/3 * * * * *"
statement => "SELECT *, UNIX_TIMESTAMP(update_time) AS unix_ts_in_secs FROM first_info WHERE (UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"
}
}
input {
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://123.67.92.210:3306/database"
jdbc_user => "root"
jdbc_password => "123456"
jdbc_paging_enabled => true
tracking_column => "unix_ts_in_secs"
use_column_value => true
add_field => { "[@metadata][type]" => "second_info" }
tracking_column_type => "numeric"
schedule => "*/3 * * * * *"
statement => "SELECT *, UNIX_TIMESTAMP(update_time) AS unix_ts_in_secs FROM second_info WHERE (UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"
}
}
filter {
//判斷[@metadata][type]字段,數(shù)據(jù)為first_info,則該條數(shù)據(jù)從first_info表中獲得
if [@metadata][type] == "first_info"{
mutate {
copy => { "id" => "[@metadata][_id]"}
remove_field => ["id", "@version", "unix_ts_in_secs"]
}
}
//數(shù)據(jù)為second_info,則該條數(shù)據(jù)從second_info表中獲得
if [@metadata][type] == "second_info"{
mutate {
copy => { "id" => "[@metadata][_id]"}
remove_field => ["id", "@version", "unix_ts_in_secs"]
}
}
}
output {
//判斷[@metadata][type]字段,
//該條數(shù)據(jù)從first_info表中獲得,輸出到tb_first索引里
if [@metadata][type] == "first_info"{
elasticsearch {
index => "tb_first"
hosts => ["https://192.168.31.141:9200","https://192.168.31.220:9200"] //多節(jié)點的ES
document_id => "%{[@metadata][_id]}"
}
}
//該條數(shù)據(jù)從second_info表中獲得,輸出到tb_second索引里
if [@metadata][type] == "second_info"{
elasticsearch {
index => "tb_second"
hosts => ["http://192.168.31.141:9200","http://192.168.31.220:9200"]
document_id => "%{[@metadata][_id]}"
}
}
}
Logstash至 開啟HTTPS的ElasticSearch
ElasticSearch開啟了安全協(xié)議,我們在使用Logstash同步數(shù)據(jù)時需要有證書和用戶名及密碼。讓ES知道我們是安全被信任的,才會允許我們往里面同步數(shù)據(jù)。
1.證書
在/elasticsearch-8.2.3/config/certs文件夾內(nèi),我們應(yīng)該可以看到三個文件,
http.p12 | http_ca.crt | transport.p12 |
---|
我們可以將 http_ca.crt 文件復(fù)制到/logstash-8.2.3/config/certs文件夾內(nèi),沒有自己建一個文件夾。
2. 配置Logstash
elasticsearch {
index => "tb_sync"
hosts => ["https://192.168.31.141:9200","https://192.168.31.220:9200"]
user => "elastic" //es超級用戶 不建議使用,可以去配置別的roles和用戶
password => "elasticpassword" //用戶密碼
ssl => "true"
//證書存放的絕對路徑
cacert => "/home/logstash-8.2.3/config/certs/http_ca.crt"
document_id => "%{[@metadata][_id]}"
}
運行Logstash
在/logstash-8.2.3文件夾內(nèi),我們編寫了sync.conf ,啟動終端,執(zhí)行下列命令。文章來源:http://www.zghlxwxcb.cn/news/detail-427284.html
./bin/logstash -f sync.conf
TIP:前面ElasticSearch的配置方法可以參考大佬寫的Elastic Stack 8.0 安裝文章來源地址http://www.zghlxwxcb.cn/news/detail-427284.html
到了這里,關(guān)于Logstash 同步MySQL數(shù)據(jù) 至 開啟HTTPS的ElasticSearch的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!