1, logstash 配置文件
[root@host1: ] cat /opt/logstash/kafka-to-tcp.yml
input {
kafka {
bootstrap_servers => "192.168.0.11:9092" #這里可以是kafka集群,如"192.168.149.101:9092,192.168.149.102:9092"
consumer_threads => 3 #等于 topic分區(qū)數(shù)
group_id => "logstash_123"
#client_id => "logstash1" #注意,多臺(tái)logstash實(shí)例消費(fèi)同一個(gè)topics時(shí),client_id需要指定不同的名字
#auto_offset_reset => "latest"
auto_offset_reset => "earliest"
topics => ["alertTopic1"]
codec => json { charset => "UTF-8" }
}
}
filter {
#刪除某些數(shù)據(jù):正則取反,根據(jù)json字段ruleName字段內(nèi)容刪除數(shù)據(jù)
if ([ruleName] !~ ".*主機(jī)告警.*") {
drop {}
}
#只保留某些數(shù)據(jù):正則匹配,刪除其他的數(shù)據(jù)
#if ([ruleName] =~ ".*主機(jī)告警.*") {
# drop {}
#}
mutate {
#刪除某些json字段, 修改某些字段內(nèi)容
remove_field => ["eventId","ruleId"]
gsub => [
"Msg" , "[\r|\n]" , ""
]
}
}
output {
#輸出到命令行窗口,方便調(diào)試
#stdout{}
#輸出到文件,方便排查告警漏告等問題
file {
codec => json_lines { charset => "UTF-8" }
path => "/tmp/b.log"
}
#輸出UMP平臺(tái)對(duì)接指定的ip、端口,以指定的格式推送到UMP集中告警平臺(tái)
tcp {
host => "192.168.0.11"
port => "514"
codec => plain {
format =>"%{TIME} 測(cè)試環(huán)境--ruleName:%{ruleName},Msg:%{Msg}\n"
}
}
}
2,調(diào)試并后臺(tái)啟動(dòng)
- ./bin/logstash -f /xx/xx.yml
[root@host1: ] cat /usr/lib/systemd/system/logstashtcp.service
[Unit]
Description=Logstash
Requires=network.service
After=network.service
[Service]
LimitNOFILE=65536
LimitMEMLOCK=infinity
WorkingDirectory=/opt/logstash/
ExecStart=/bin/sh bin/logstash -f kafka-to-tcp.yml
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed
SuccessExitStatus=143
Restart=on-failure
RestartSec=42s
[Install]
WantedBy=multi-user.target
3, 修改logstash 服務(wù)日志路徑
sed -i.bak 's@${sys:ls.logs}@/xx/yy@' config/log4j2.properties
重啟logstash服務(wù)
文章來源地址http://www.zghlxwxcb.cn/news/detail-708597.html
文章來源:http://www.zghlxwxcb.cn/news/detail-708597.html
到了這里,關(guān)于logstash 消費(fèi)kafka數(shù)據(jù),轉(zhuǎn)發(fā)到tcp端口的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!