1、實戰(zhàn)線上問題
Q1:Logstash 同步 postgreSQL 到 Elasticsearch 數(shù)據(jù)不一致。
在使用 Logstash 從 pg 庫中將一張表導(dǎo)入到 ES 中時,發(fā)現(xiàn) ES 中的數(shù)據(jù)量和 PG 庫中的這張表的數(shù)據(jù)量存在較大差距。如何快速比對哪些數(shù)據(jù)沒有插入?導(dǎo)入過程中,Logstash 日志沒有異常。PG 中這張表有 7600W。
Q2:mq 異步雙寫數(shù)據(jù)庫、es 的方案中,如何保證數(shù)據(jù)庫數(shù)據(jù)和 es 數(shù)據(jù)的一致性?
2、推薦解決方案之一——ID 比較法
如下示例,僅拿問題1舉例驗證,問題2原理一致。
2.1 方案探討
要找出哪些數(shù)據(jù)沒有插入到 Elasticsearch 中,可以采用以下方法:
確保 Logstash 配置文件中的 input 插件的 JDBC 驅(qū)動程序正確配置,以便從 PostgreSQL 數(shù)據(jù)庫中提取所有數(shù)據(jù)。注意 statement 參數(shù),確保它選擇了所有需要的數(shù)據(jù)。
檢查 Logstash 配置文件的 output 插件,確保正確配置了 Elasticsearch 的連接參數(shù)。同時,檢查是否有過濾器在導(dǎo)入過程中過濾掉了部分?jǐn)?shù)據(jù)。
在 Logstash 配置文件中添加一個 stdout 插件,將從 PostgreSQL 數(shù)據(jù)庫中讀取的數(shù)據(jù)記錄到文件中。
例如,可以添加以下內(nèi)容:
output?{
??elasticsearch?{
????...Elasticsearch?配置...
??}
??stdout?{
????codec?=>?json_lines
????path?=>?"/path/to/logstash_output.log"
??}
}
將 Logstash 輸出文件與 PostgreSQL 數(shù)據(jù)庫中的原始數(shù)據(jù)進(jìn)行比較,以找出未導(dǎo)入的數(shù)據(jù)??梢允褂?Python、Shell 腳本或其他編程語言編寫一個簡單的腳本來執(zhí)行此操作。
如果 Logstash 輸出文件中的記錄數(shù)與 PostgreSQL 數(shù)據(jù)庫中的記錄數(shù)一致,但 Elasticsearch 中的記錄數(shù)不一致,請檢查 Elasticsearch 集群的健康狀況和日志。確認(rèn)集群是否在接收和索引數(shù)據(jù)時遇到問題。
如果問題仍然存在,嘗試將批量操作的大小減小,以減輕 Elasticsearch 和 Logstash 的負(fù)擔(dān)??梢酝ㄟ^在 Logstash 配置文件的 output 插件中設(shè)置 flush_size 和 idle_flush_time 參數(shù)來實現(xiàn)。
處理大量數(shù)據(jù)時,可能需要調(diào)整 Logstash 和 Elasticsearch 的性能和資源配置。根據(jù)硬件和網(wǎng)絡(luò)條件,可能需要優(yōu)化批量操作、JVM 設(shè)置、線程池大小等方面的設(shè)置。
2.2 比較腳本的實現(xiàn)
以下是一個簡單的 Shell 腳本示例,用于比較 Logstash 輸出文件(JSON 格式)和 PostgreSQL 數(shù)據(jù)庫中的數(shù)據(jù)。該腳本將比較特定字段(如 id)以確定哪些數(shù)據(jù)可能未導(dǎo)入到 Elasticsearch。
首先,從 PostgreSQL 數(shù)據(jù)庫中導(dǎo)出數(shù)據(jù),將其保存為 CSV 文件:
COPY?(SELECT?id?FROM?your_table)?TO?'/path/to/postgres_data.csv'?WITH
接下來,創(chuàng)建一個名為 compare.sh 的 Shell 腳本:
#!/bin/bash
#?將?JSON?文件中的?ID?提取到一個文件中
jq?'.id'?/path/to/logstash_output.log?>?logstash_ids.txt
#?刪除?JSON?中的雙引號
sed?-i?'s/"http://g'?logstash_ids.txt
#?對?Logstash?和?PostgreSQL?的?ID?文件進(jìn)行排序
sort?-n?logstash_ids.txt?>?logstash_ids_sorted.txt
sort?-n?/path/to/postgres_data.csv?>?postgres_ids_sorted.txt
#?使用?comm?比較兩個已排序的?ID?文件
comm?-23?postgres_ids_sorted.txt?logstash_ids_sorted.txt?>?missing_ids.txt
#?輸出結(jié)果
echo?"以下 ID 在 Logstash 輸出文件中未找到:"
cat?missing_ids.txt
為腳本添加可執(zhí)行權(quán)限并運行:
chmod?+x?compare.sh
./compare.sh
此腳本會比較 logstash_output.log 和 postgres_data.csv 文件中的 ID。如果發(fā)現(xiàn)缺失的 ID,它們將被保存在 missing_ids.txt 文件中,并輸出到控制臺。請注意,該腳本假設(shè)已經(jīng)安裝了 jq(一個命令行 JSON 處理器)。如果沒有,請先安裝 jq
。
3、推薦方案二——Redis 加速對比
在這種情況下,可以使用 Redis 的集合數(shù)據(jù)類型來存儲 PostgreSQL 數(shù)據(jù)庫和 Logstash 輸出文件中的 ID。接下來,可以使用 Redis 提供的集合操作來找到缺失的 ID。
以下是一個使用 Redis 實現(xiàn)加速比對的示例:
首先,從 PostgreSQL 數(shù)據(jù)庫中導(dǎo)出數(shù)據(jù),將其保存為 CSV 文件:
COPY?(SELECT?id?FROM?your_table)?TO?'/path/to/postgres_data.csv'?WITH?CSV?HEADER;
安裝并啟動 Redis。
使用 Python 腳本將 ID 數(shù)據(jù)加載到 Redis:
import?redis
import?csv
#?連接到?Redis
r?=?redis.StrictRedis(host='localhost',?port=6379,?db=0)
#?從?PostgreSQL?導(dǎo)出的?CSV?文件中加載數(shù)據(jù)
with?open('/path/to/postgres_data.csv',?newline='')?as?csvfile:
????csv_reader?=?csv.reader(csvfile)
????next(csv_reader)??#?跳過表頭
????for?row?in?csv_reader:
????????r.sadd('postgres_ids',?row[0])
#?從?Logstash?輸出文件中加載數(shù)據(jù)
with?open('/path/to/logstash_output.log',?newline='')?as?logstash_file:
????for?line?in?logstash_file:
????????id?=?line.split('"id":')[1].split(',')[0].strip()
????????r.sadd('logstash_ids',?id)
#?計算差集
missing_ids?=?r.sdiff('postgres_ids',?'logstash_ids')
#?輸出缺失的?ID
print("以下 ID 在 Logstash 輸出文件中未找到:")
for?missing_id?in?missing_ids:
????print(missing_id)
這個 Python 腳本使用 Redis 集合數(shù)據(jù)類型存儲 ID,然后計算它們之間的差集以找到缺失的 ID。需要先安裝 Python 的 Redis 庫??梢允褂靡韵旅畎惭b:
pip?install?redis
這個腳本是一個基本示例,可以根據(jù)需要修改和擴展它。使用 Redis 的優(yōu)點是它能在內(nèi)存中快速處理大量數(shù)據(jù),而不需要在磁盤上讀取和寫入臨時文件。
4、小結(jié)
方案一:使用 Shell 腳本和 grep 命令
優(yōu)點:
(1)簡單,易于實現(xiàn)。
(2)不需要額外的庫或工具。
缺點:
(1)速度較慢,因為它需要在磁盤上讀寫臨時文件。
(2)對于大數(shù)據(jù)量的情況,可能會導(dǎo)致較高的磁盤 I/O 和內(nèi)存消耗。
方案二:使用 Redis 實現(xiàn)加速比對
優(yōu)點:
(1)速度更快,因為 Redis 是基于內(nèi)存的數(shù)據(jù)結(jié)構(gòu)存儲。
(2)可擴展性較好,可以處理大量數(shù)據(jù)。
缺點:
(1)實現(xiàn)相對復(fù)雜,需要編寫額外的腳本。
(2)需要安裝和運行 Redis 服務(wù)器。
根據(jù)需求和數(shù)據(jù)量,可以選擇合適的方案。如果處理的數(shù)據(jù)量較小,且對速度要求不高,可以選擇方案一,使用 Shell 腳本和 grep 命令。這種方法簡單易用,但可能在大數(shù)據(jù)量下表現(xiàn)不佳。
如果需要處理大量數(shù)據(jù),建議選擇方案二,使用 Redis 實現(xiàn)加速比對。這種方法速度更快,能夠有效地處理大數(shù)據(jù)量。然而,這種方法需要額外的設(shè)置和配置,例如安裝 Redis 服務(wù)器和編寫 Python 腳本。
在實際應(yīng)用中,可能需要根據(jù)具體需求進(jìn)行權(quán)衡,以選擇最適合的解決方案。
推薦閱讀
全網(wǎng)首發(fā)!從 0 到 1 Elasticsearch 8.X 通關(guān)視頻
重磅 | 死磕 Elasticsearch 8.X 方法論認(rèn)知清單
如何系統(tǒng)的學(xué)習(xí) Elasticsearch ?
2023,做點事
更短時間更快習(xí)得更多干貨!
和全球?近2000+?Elastic 愛好者一起精進(jìn)!
文章來源:http://www.zghlxwxcb.cn/news/detail-608133.html
比同事?lián)屜纫徊綄W(xué)習(xí)進(jìn)階干貨!文章來源地址http://www.zghlxwxcb.cn/news/detail-608133.html
到了這里,關(guān)于數(shù)據(jù)庫同步 Elasticsearch 后數(shù)據(jù)不一致,怎么辦?的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!