在我之前的文章 “Elasticsearch:ES|QL 查詢展示”,我展示了如何在 Kibana 中使用 ES|QL 對索引來進(jìn)行查詢及統(tǒng)計(jì)。在很多的情況下,我們需要在客戶端中來對數(shù)據(jù)進(jìn)行查詢,那么我們該怎么辦呢?我們需要使用到 Elasticsearch 的客戶端。在今天的文章中,我們來展示如何使用 Python 來對數(shù)據(jù)進(jìn)行查詢。
注意:為了使用 ES|QL,我們的 Elastic Stack 版本至少在 8.12 及以上。
安裝
如果你還沒有安裝好自己的 Elasticsearch 及 Kibana,請參考如下的鏈接來進(jìn)行安裝:
- 如何在 Linux,MacOS 及 Windows 上進(jìn)行安裝 Elasticsearch
- Kibana:如何在 Linux,MacOS 及 Windows上安裝 Elastic 棧中的 Kibana
在安裝的時候,我們選擇 Elastic Stack 8.x 來進(jìn)行安裝。特別值得指出的是:ES|QL 只在 Elastic Stack 8.11 及以后得版本中才有。你需要下載 Elastic Stack 8.11 及以后得版本來進(jìn)行安裝。
在首次啟動 Elasticsearch 的時候,我們可以看到如下的輸出:
我們需要記下 Elasticsearch 超級用戶 elastic 的密碼。
我們還需要安裝 Elasticsearch 的 python 依賴包:
pip3 install elasticsearch==8.12.1
$ pip3 list | grep elasticsearch
elasticsearch 8.12.1
準(zhǔn)備數(shù)據(jù)
我們參考之前的文章 “Elasticsearch:ES|QL 查詢展示” 來創(chuàng)建索引:
PUT sample_data
{
"mappings": {
"properties": {
"client.ip": {
"type": "ip"
},
"message": {
"type": "keyword"
}
}
}
}
PUT sample_data/_bulk
{"index": {}}
{"@timestamp": "2023-10-23T12:15:03.360Z", "client.ip": "172.21.2.162", "message": "Connected to 10.1.0.3", "event.duration": 3450233}
{"index": {}}
{"@timestamp": "2023-10-23T12:27:28.948Z", "client.ip": "172.21.2.113", "message": "Connected to 10.1.0.2", "event.duration": 2764889}
{"index": {}}
{"@timestamp": "2023-10-23T13:33:34.937Z", "client.ip": "172.21.0.5", "message": "Disconnected", "event.duration": 1232382}
{"index": {}}
{"@timestamp": "2023-10-23T13:51:54.732Z", "client.ip": "172.21.3.15", "message": "Connection error", "event.duration": 725448}
{"index": {}}
{"@timestamp": "2023-10-23T13:52:55.015Z", "client.ip": "172.21.3.15", "message": "Connection error", "event.duration": 8268153}
{"index": {}}
{"@timestamp": "2023-10-23T13:53:55.832Z", "client.ip": "172.21.3.15", "message": "Connection error", "event.duration": 5033755}
{"index": {}}
{"@timestamp": "2023-10-23T13:55:01.543Z", "client.ip": "172.21.3.15", "message": "Connected to 10.1.0.1", "event.duration": 1756467}
使用 Elasticsearch 客戶端來進(jìn)行查詢
Elasticsearch 查詢語言 (ES|QL) 提供了一種強(qiáng)大的方法來過濾、轉(zhuǎn)換和分析 Elasticsearch 中存儲的數(shù)據(jù)。 它旨在易于最終用戶、SRE 團(tuán)隊(duì)、應(yīng)用程序開發(fā)人員和管理員學(xué)習(xí)和使用。 但它也非常適合熟悉 Pandas 和其他基于數(shù)據(jù)框的框架的數(shù)據(jù)科學(xué)家。
事實(shí)上,ES|QL 查詢會生成帶有命名列的表,即數(shù)據(jù)幀。 但是如何使用 Python 處理這些數(shù)據(jù)呢? ES|QL 目前沒有 Apache Arrow 輸出,但 CSV 輸出是一個很好的開始。
我們使用如下的測試程序:
esql.py
from io import StringIO
import numpy as np
import os
from elasticsearch import Elasticsearch
import pandas as pd
endpoint = os.getenv("ES_SERVER")
username = os.getenv("ES_USER")
password = os.getenv("ES_PASSWORD")
fingerprint = os.getenv("ES_FINGERPRINT")
url = f"https://{endpoint}:9200"
es = Elasticsearch( url ,
basic_auth = (username, password),
ssl_assert_fingerprint = fingerprint,
http_compress = True )
# print(es.info())
response = es.esql.query(query="FROM sample_data", format="csv")
df = pd.read_csv(StringIO(response.body))
print(df)
print("==================================================================")
response = es.esql.query(
query="""
FROM sample_data
| LIMIT 5
| sort @timestamp desc
| WHERE event.duration > 3000000
| WHERE message LIKE "Connection *"
""",
format="csv"
)
df = pd.DataFrame = pd.read_csv(StringIO(response.body))
print(df)
print("==================================================================")
response = es.esql.query(
query="""
FROM sample_data
| STATS avg=AVG(event.duration), count=COUNT(*) BY client.ip
| SORT count
""",
format="csv"
)
df = pd.DataFrame = pd.read_csv(
StringIO(response.body),
dtype={"count":"Int64", "avg":np.float64}
)
print(df)
print("==================================================================")
在運(yùn)行上面的代碼之前,我們需要在 terminal 中設(shè)置相應(yīng)的環(huán)境變量:
export ES_SERVER="localhost"
export ES_USER="elastic"
export ES_PASSWORD="q2rqAIphl-fx9ndQ36CO"
export ES_FINGERPRINT="bce66ed55097f255fc8e4420bdadafc8d609cc8027038c2dd09d805668f3459e"
然后,我們使用如下的命令來運(yùn)行:文章來源:http://www.zghlxwxcb.cn/news/detail-841555.html
python3 esql.py
$ python3 esql.py
/Users/liuxg/python/esql/esql.py:22: ElasticsearchWarning: No limit defined, adding default limit of [500]
response = es.esql.query(query="FROM sample_data", format="csv")
@timestamp client.ip event.duration message
0 2023-10-23T12:15:03.360Z 172.21.2.162 3450233 Connected to 10.1.0.3
1 2023-10-23T12:27:28.948Z 172.21.2.113 2764889 Connected to 10.1.0.2
2 2023-10-23T13:33:34.937Z 172.21.0.5 1232382 Disconnected
3 2023-10-23T13:51:54.732Z 172.21.3.15 725448 Connection error
4 2023-10-23T13:52:55.015Z 172.21.3.15 8268153 Connection error
5 2023-10-23T13:53:55.832Z 172.21.3.15 5033755 Connection error
6 2023-10-23T13:55:01.543Z 172.21.3.15 1756467 Connected to 10.1.0.1
==================================================================
@timestamp client.ip event.duration message
0 2023-10-23T13:52:55.015Z 172.21.3.15 8268153 Connection error
==================================================================
/Users/liuxg/python/esql/esql.py:44: ElasticsearchWarning: No limit defined, adding default limit of [500]
response = es.esql.query(
avg count client.ip
0 1232382.00 1 172.21.0.5
1 3450233.00 1 172.21.2.162
2 2764889.00 1 172.21.2.113
3 3945955.75 4 172.21.3.15
==================================================================
很顯然,我們得到了最終的結(jié)果。文章來源地址http://www.zghlxwxcb.cn/news/detail-841555.html
到了這里,關(guān)于Elasticsearch:從 ES|QL 到 Python 數(shù)據(jù)幀的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!