ElasticSearch
1、ElasticSearch學(xué)習(xí)隨筆之基礎(chǔ)介紹
2、ElasticSearch學(xué)習(xí)隨筆之簡單操作
3、ElasticSearch學(xué)習(xí)隨筆之java api 操作
4、ElasticSearch學(xué)習(xí)隨筆之SpringBoot Starter 操作
5、ElasticSearch學(xué)習(xí)隨筆之嵌套操作
6、ElasticSearch學(xué)習(xí)隨筆之分詞算法
7、ElasticSearch學(xué)習(xí)隨筆之高級檢索
8、ELK技術(shù)棧介紹
9、Logstash部署與使用
10、ElasticSearch 7.x 版本使用 BulkProcessor 實(shí)現(xiàn)批量添加數(shù)據(jù)
11、ElasticSearch 8.x 棄用了 High Level REST Client,移除了 Java Transport Client,推薦使用 Elasticsearch Java API
12、ElasticSearch 8.x 使用 snapshot(快照)進(jìn)行數(shù)據(jù)遷移
13、ElasticSearch 8.x 版本如何使用 SearchRequestBuilder 檢索
ElasticSearch,創(chuàng)始人 Shay Banon(謝巴農(nóng))
前言
本文主要應(yīng)用 Rest High Level Client 來進(jìn)行對 ElasticSearch 進(jìn)行操作,雖說官方已經(jīng)不推薦,但是 ES 升級帶來的代價也是相當(dāng)大的,所以,此處略去一萬字。
- 那什么是
BulkProcessor
呢?BulkProcessor
是ElasticSearch
客戶端中的一個功能,用于批量執(zhí)行索引、更新或刪除操作,BulkProcessor 運(yùn)行將多個操作打包成一個請求進(jìn)行發(fā)送,以提高效率和性能。
批量操作索引的好處:
- 性能優(yōu)勢:將多個操作打包成一個請求,這樣可以減少網(wǎng)絡(luò)開銷,提高數(shù)據(jù)傳輸效率,從而可以加快數(shù)據(jù)寫入索引速度。
- 減少開銷:較少的網(wǎng)絡(luò)開銷和較少的服務(wù)器的交互,減少服務(wù)器開銷,尤其是大規(guī)模寫入數(shù)據(jù)時。
- 原子性:批量操作可以保證一組操作要么全部成功,要么全部失敗,報(bào)錯數(shù)據(jù)的一致性。
- 減少開發(fā)成本:批量操作,可以簡化客戶端代碼,減少請求和管理連接的操作。
當(dāng)然,批量操作也是有缺點(diǎn)的:
- 內(nèi)存消耗:在執(zhí)行批量操作時,首先會將數(shù)據(jù)寫入內(nèi)存,這樣會消耗更多的內(nèi)存。
- 錯誤處理復(fù)雜性:單條數(shù)據(jù)上傳,如果出錯可以重試或者進(jìn)行記錄操作等,但是批量操作中的某個請求失敗,需要額外來處理,比單條操作復(fù)雜。
- 延遲響應(yīng):批量操作可能導(dǎo)致請求排隊(duì)等待,會產(chǎn)生一些延遲。
多余的不說,來上代碼。
一:引入 pom
首先引入客戶端依賴,我的測試 ES 服務(wù)是 8.7.0 版本的,這里對應(yīng) High Level REST Client 客戶端 7.3.2 版本的。
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.3.2</version>
</dependency>
之所以不用更高版本,是因?yàn)榘姹靖吡藭?bào)如下錯誤:
java.io.IOException: Unable to parse response body for Response{requestLine=POST /devintcompany@1562219164186/_doc?timeout=1m HTTP/1.1, host=http://192.168.*。*:9200, response=HTTP/1.1 201 Created}
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1473)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1424)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1394)
at org.elasticsearch.client.RestHighLevelClient.index(RestHighLevelClient.java:836)
at com.example.es.EsTest.addIndex(EsTest.java:97)
at com.example.es.EsTest.main(EsTest.java:36)
Caused by: java.lang.NullPointerException
at java.util.Objects.requireNonNull(Objects.java:203)
at org.elasticsearch.action.DocWriteResponse.<init>(DocWriteResponse.java:127)
at org.elasticsearch.action.index.IndexResponse.<init>(IndexResponse.java:50)
at org.elasticsearch.action.index.IndexResponse.<init>(IndexResponse.java:39)
at org.elasticsearch.action.index.IndexResponse$Builder.build(IndexResponse.java:103)
at org.elasticsearch.action.index.IndexResponse.fromXContent(IndexResponse.java:85)
at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1727)
at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAndParseEntity$8(RestHighLevelClient.java:1395)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1471)
... 5 more
親自測試過的,應(yīng)該還是版本不兼容的緣故,但是數(shù)據(jù)已經(jīng)插入到 Index 了,就很奇怪。
二:創(chuàng)建 ES Client
這里初始化客戶端,需要用戶名密碼進(jìn)行認(rèn)證的。
private static RestHighLevelClient createClient(){
String hostname = "192.168.*.*";
int port = 9200;
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("your username", "your password"));
RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(hostname, port))
.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
return new RestHighLevelClient(restClientBuilder);
}
三:創(chuàng)建 BulkProcessor
這里創(chuàng)建 BulkProcessor 批量操作對象,通過 High Level REST Client 來綁定,加入監(jiān)聽器 BulkProcessor.Listener,如果批量操作失敗或發(fā)生異常,在 afterBulk()
方法中處理。
批量處理需要設(shè)置的參數(shù)代碼中已有注釋,一般就設(shè)置這些參數(shù)就可以了,可根據(jù)自己的使用場景進(jìn)行調(diào)節(jié)。文章來源:http://www.zghlxwxcb.cn/news/detail-795879.html
public static BulkProcessor getBulkProcessor(RestHighLevelClient client) {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
System.out.println("開始執(zhí)行批量操作,ID: " + executionId);
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
System.out.println("批量操作完成,ID: " + executionId);
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
System.out.println("批量操作失敗,ID: " + executionId);
failure.printStackTrace();
}
};
BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> {
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
bulkRequest.timeout(TimeValue.timeValueSeconds(100));
client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener);
}), listener);
// 當(dāng)達(dá)到1000個操作時觸發(fā)批量請求
builder.setBulkActions(1000);
// 當(dāng)達(dá)到5MB大小時觸發(fā)批量請求
builder.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.MB));
// 每5秒觸發(fā)一次批量請求,無論大小和操作數(shù)如何
builder.setFlushInterval(TimeValue.timeValueSeconds(5));
// 設(shè)置退避策略,以防服務(wù)器過載或拒絕請求
builder.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 3));
// 設(shè)置并發(fā)請求的數(shù)量為1,即同時只有一個批量請求在執(zhí)行
builder.setConcurrentRequests(1);
return builder.build();
}
四:批量推數(shù)據(jù)
我們在 main 方法中進(jìn)行測試,代碼如下:文章來源地址http://www.zghlxwxcb.cn/news/detail-795879.html
public static void main(String[] args) throws IOException {
RestHighLevelClient client = createClient();
BulkProcessor bulkProcessor = getBulkProcessor(client);
for (int i = 0; i < 10; i++) {
String source = "{\"ApplianceType\":[{\"ApplianceTypeCn\":\"國產(chǎn)\",\"ApplianceTypeEn\":\"Domestic\",\"ApplianceTypeId\":\"1\"}],\"ApplicationCount\":0,\"ClassICount\":17,\"ClassIICount\":1,\"ClassIIICount\":0,\"Classification\":[{\"Cn\":\"2002版分類\",\"En\":\"2002 reg. category of relevant app.\",\"Id\":\"Class2002\",\"Items\":[{\"Cn\":\"Ⅰ類\",\"En\":\"Class Ⅰ\",\"Id\":\"1\",\"Id2\":\"I\",\"Items\":[{\"Cn\":\"進(jìn)口第一類醫(yī)療器械(含第一類體外診斷試劑)備案信息\",\"En\":\"Information on imported ClassⅠmedical devices (including ClassⅠ IVD reagents)\",\"Id\":\"100\"}]},{\"Cn\":\"Ⅱ類\",\"En\":\"Class Ⅱ\",\"Id\":\"2\",\"Id2\":\"II\",\"Items\":[{\"Cn\":\"婦產(chǎn)科、輔助生殖和避孕器械\",\"En\":\"Obstetrics and gynecology, assisted reproductive and contraceptive devices\",\"Id\":\"201818\"}]}]},{\"Class1Code\":[{\"Id\":\"02\"}],\"Class2Code\":[{\"Id\":\"03\"}],\"DataType\":[{\"Id\":\"1\"},{\"Id\":\"3\"}],\"ProductClassificationCode\":[{\"Id\":\"09\"}],\"ProductClassificationNameCode\":[]}],\"Company\":{\"Cn\":\"海南創(chuàng)鑫醫(yī)藥科技發(fā)展有限公司\",\"En\":\"Hainan Chuangxin Pharmaceutical Technology Development Co. Ltd.\",\"Id\":\"1000002388\"},\"CompanyAliasCn\":[\"海南創(chuàng)鑫醫(yī)藥科技發(fā)展有限公司\"],\"CompanyAliasEn\":[\"Hainan Chuangxin Pharmaceutical Technology Development Co. Ltd.\"],\"CompanyCn\":\"海南創(chuàng)鑫醫(yī)藥科技發(fā)展有限公司\",\"CompanyCnSearch\":\"海南創(chuàng)鑫醫(yī)藥科技發(fā)展有限公司\",\"CompanyEn\":\"Hainan Chuangxin Pharmaceutical Technology Development Co. Ltd.\",\"CompanyEnSearch\":\"Hainan Chuangxin Pharmaceutical Technology Development Co. Ltd.\",\"CompanyId\":\"1000002388\",\"CompanyType\":{\"Cn\":\"國內(nèi)公司\",\"En\":\"Domestic company\",\"Id\":\"Domestic company\"},\"CompanyTypeCn\":\"國內(nèi)公司\",\"CompanyTypeEn\":\"Domestic company\",\"CompanyTypeId\":\"Domestic company\",\"DomesticCount\":18,\"EffectiveRegistrationCount\":18,\"FirstApplicationYear\":null,\"FirstRegistrationYear\":\"2017\",\"IVD\":\"0\",\"ImportCount\":0,\"LatestApplicationYear\":null,\"LatestRegistrationYear\":\"2020\",\"Listing\":{\"Cn\":null,\"En\":null,\"Id\":null},\"ListingCn\":null,\"ListingEn\":null,\"ListingId\":null,\"TotalCount\":18,\"company_registration_relation\":{\"name\":\"company\"},\"website_url\":\"\"}";
bulkProcessor.add(new IndexRequest("devintcompany@1562219164186").source(source, XContentType.JSON));
System.out.println("添加第 " + i + "條數(shù)據(jù)!");
}
try {
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
client.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("添加完成!");
}
到了這里,關(guān)于ElasticSearch 7.x 版本使用 BulkProcessor 實(shí)現(xiàn)批量添加數(shù)據(jù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!