實(shí)現(xiàn)思路
1.Python搭建Flask服務(wù),編寫ES腳本。
2.通過Java調(diào)用Python接口,完成對(duì)ES的插入操作。
環(huán)境配置
Elasticsearch 7.16.0文章來源:http://www.zghlxwxcb.cn/news/detail-557899.html
具體代碼實(shí)現(xiàn)
ESObject模板
import json
from flask import Flask, request, jsonify, Response
import jieba
import time
import hashlib
import random
import string
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
server = Flask(__name__)
server.config["JSON_AS_ASCII"] = False
class ESObject:
def __init__(self, index_name, index_type, host='127.0.0.1', port=9300):
self.host = host
self.port = port
self.index_name = index_name
self.index_type = index_type
self.es_obj = self.connect_elasticsearch()
def set_index_name(self, index_name):
self.index_name = index_name
def set_index_type(self, index_type):
self.index_type = index_type
def connect_elasticsearch(self):
"""
創(chuàng)建連接
:return:
"""
_es = None
_es = Elasticsearch([{'host': self.host, 'port': self.port}], request_timeout=60, max_retries=3,
retry_on_timeout=True)
if _es.ping():
print('The connection is successful!')
else:
print("Error: ES could not connect!")
return _es
def create_index(self, settings):
"""
創(chuàng)建索引(數(shù)據(jù)庫(kù))
訪問“http://localhost:9200/entities/_mappings”驗(yàn)證創(chuàng)建是否成功
:return:
"""
created = False
try:
if not self.es_obj.indices.exists(self.index_name):
# 參數(shù)ignore = 400在檢查后不再需要,因?yàn)檫@可以防止錯(cuò)誤地覆蓋現(xiàn)有索引
self.es_obj.indices.create(index=self.index_name, ignore=400, body=settings)
print("Created Index")
created = True
except Exception as ex:
print(str(ex))
finally:
return created
def delete_index(self):
try:
if self.es_obj.indices.exists(self.index_name):
# 參數(shù)ignore 用來忽略 Index 不存在而刪除失敗導(dǎo)致程序中斷的問題
self.es_obj.indices.delete(index=self.index_name, ignore=[400, 404])
print("Deleted Index")
except Exception as ex:
print(str(ex))
def store_record(self, record):
try:
outcome = self.es_obj.index(index=self.index_name, doc_type=self.index_type, body=record)
print(outcome['result'])
return outcome
except Exception as ex:
print("Error in indexing data")
print(str(ex))
def store_record_list(self, record_list):
for record in record_list:
self.store_record(record)
def bulk_index_data(self, record_list):
"""
批量插入
:param record_list:
:return:
"""
ACTIONS = []
i = 1
for record in record_list:
action = {
"_index": self.index_name,
"_type": self.index_type,
# "_id": i, # _id 可以默認(rèn)生成,不賦值
"_source": record
}
i += 1
ACTIONS.append(action)
success, _ = bulk(self.es_obj, ACTIONS, index=self.index_name, raise_on_error=True)
print('Performed %d actions' % success)
def get_data_by_id(self, id):
res = self.es_obj.get(index=self.index_name, doc_type=self.index_type, id=id)
return res['hits']
def get_data_by_body(self, search):
# res = self.es_obj.search(index=self.index_name, doc_type=self.index_type, body=search)
res = self.es_obj.search(index=self.index_name, body=search)
return res['hits']
def update_data(self, id, body):
res = self.es_obj.update(index=self.index_name, doc_type=self.index_type, id=id, body=body)
return res
def delete_type_data(self):
query_object = {'query': {'match_all': {}}}
res = self.es_obj.delete_by_query(index_name=self.index_name, doc_type=self.index_type, body=query_object)
return res
def delect_index_data(self, id):
res = self.es_obj.delete(index=self.index_name, doc_type=self.index_type, id=id)
return res
def delete_by_query(self, query):
res = self.es_obj.delete_by_query(index=self.index_name, doc_type=self.index_type, body=query)
return res
def secret_key(length=30):
"""
Generate secret key from alpha and digit.
:param length: length of secret key.
:return: [length] long secret key.
"""
key = ''
while length:
key += random.choice(string.ascii_letters + string.digits)
length -= 1
return key
def hash_code(*args, **kwargs):
"""
Generate 64-strings(in hashlib.sha256()) hash code.
:param args: for any other position args packing.
:param kwargs: for any other key-word args packing.
:return: 64-strings long hash code.
"""
text = ''
if not args and not kwargs:
text += time.strftime("%Y%m%d%H%M%s")
if args:
for arg in args:
text += str(arg)
if kwargs:
for kwarg in kwargs:
text += str(kwargs[kwarg])
return hashlib.sha256(text.encode("utf-8")).hexdigest()
if __name__ == '__main__':
server.run(host='0.0.0.0', port=8660, debug=False)
插入函數(shù)及接口
def es_insert(datas):
try:
# 可以等同于 DataBase
index_name = "index_name"
# 可以等同于 Table
index_type = "_doc"
# ES對(duì)象
es = ESObject(index_name, index_type, 'localhost', 9200)
# # 刪除索引
# es.delete_index()
#
# # 建立索引
# settings = {
# "settings": {
# "number_of_shards": 5,
# "number_of_replicas": 0
# },
# "index": {
# "refresh_interval": "20s"
# },
# "mappings": {
# index_type: {
# "dynamic": "strict",
# "properties": {
# "es_id": {
# "type": "text"
# }, "content": {
# "type": "text"
# }, "file_name": {
# "type": "text"
# }, "jieba_content": {
# "type": "text"
# }
# }
# }
# }
# }
# es.create_index(settings)
#插入操作
record_list = []
file_name = datas["filename"]
for i, data in enumerate(datas["contentList"]):
jieba_con_list = jieba.lcut(data)
jieba_con_str = str.join(" ", jieba_con_list)
es_id = secret_key()
# print(es_id)
# print(secret_key())
record = {"es_id": es_id, "content": data, "file_name": file_name, "jieba_content": jieba_con_str}
record_list.append(record)
if len(record_list) >= 10000:
start = time.time()
es.bulk_index_data(record_list)
print("Finished!")
end = time.time()
print(str(end - start))
record_list = []
print("record_list finished")
start = time.time()
es.bulk_index_data(record_list)
print("success finished!")
end = time.time()
print(str(end - start))
return "1"
except Exception as e:
print(e)
return "0"
@server.route("/es_insert", methods=['get', 'post'])
def question_regex():
if not request.data:
return "fail"
#獲取接口調(diào)用傳入的參數(shù)
data = json.loads(request.data.decode("utf-8"))
# print(data)
res_code = es_insert(data)
print(res_code)
return Response(str(res_code))
拓展思路
ESObject是一個(gè)模板,其中有很多其他的函數(shù)。通過Java調(diào)用,還可以實(shí)現(xiàn)很多操作,如刪除、查詢等。文章來源地址http://www.zghlxwxcb.cn/news/detail-557899.html
拓展刪除操作示例
def es_delete_by_id(p_file_name):
try:
# 等同于 DataBase
index_name = "index_name"
# 等同于 Table
index_type = "_doc"
es = ESObject(index_name, index_type, 'localhost', 9200)
ld_datas = es.get_data_by_body(10000)
ll_hits = ld_datas['hits']
ll_delete_list = []
for i, d in enumerate(ll_hits):
l_id = d['_id']
l_file_name = d['_source']['file_name']
if p_file_name == l_file_name:
es.delete_index_data(l_id)
ll_delete_list.append(l_file_name)
print(list(set(ll_delete_list)))
return "1"
except Exception as e:
print(e)
return "0"
@server.route("/es_delete", methods=['get', 'post'])
def question_regex():
if not request.data:
return "fail"
data = json.loads(request.data.decode("utf-8"))
print(data)
# filename = ''
# l_res_code = es_delete_by_id(filename)
l_delete_file_name = data["filename"]
l_res_code = es_delete_by_id(l_delete_file_name)
return Response(str(l_res_code))
到了這里,關(guān)于【Elasticsearch】使用Python完成對(duì)ES的插入操作的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!