上一篇:Elasticsearch Java REST Client Term Vectors API
下一篇:Elasticsearch Java REST Client Search APIs 查詢文章來源地址http://www.zghlxwxcb.cn/news/detail-515835.html
BulkRequest
BulkRequest可用于使用單個(gè)請(qǐng)求執(zhí)行多個(gè)索引、更新和/或刪除操作。
它需要至少一個(gè)操作添加到 Bulk 請(qǐng)求中:
# 方式一:
@GetMapping("test")
public String test() throws IOException {
BulkRequest request = new BulkRequest();
request.add(new IndexRequest("edu-app-user", "doc", "1")
.source(XContentType.JSON, "name", "foo"));
request.add(new IndexRequest("edu-app-user", "doc", "2")
.source(XContentType.JSON, "name", "elastic"));
request.add(new IndexRequest("edu-app-user", "doc", "3")
.source(XContentType.JSON, "name", "wdz"));
BulkResponse bulk = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
return response(bulk);
}
# 混合操作
@GetMapping("test1")
public String test1() throws IOException {
BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("edu-app-user", "doc", "3"));
request.add(new UpdateRequest("edu-app-user", "doc", "2")
.doc(XContentType.JSON, "name", "update"));
request.add(new IndexRequest("edu-app-user", "doc", "4")
.source(XContentType.JSON, "name", "baz"));
BulkResponse bulk = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
return response(bulk);
}
private String response(BulkResponse bulk){
String str = "";
for (BulkItemResponse responses : bulk) {
System.out.println(responses.toString());
DocWriteResponse response = responses.getResponse();
switch (responses.getOpType()) {
case CREATE:
System.out.println("創(chuàng)建數(shù)據(jù)-----------------");
break;
case INDEX:
IndexResponse indexResponse = (IndexResponse) response;
System.out.println("操作索引數(shù)據(jù)-----------------"+indexResponse.toString());
str = indexResponse.toString();
break;
case DELETE:
DeleteResponse deleteResponse = (DeleteResponse) response;
System.out.println("操作刪除數(shù)據(jù)-----------------"+deleteResponse.toString());
str = deleteResponse.toString();
break;
case UPDATE:
UpdateResponse updateResponse = (UpdateResponse) response;
System.out.println("操作更新數(shù)據(jù)-----------------"+updateResponse.toString());
str = updateResponse.toString();
break ;
}
}
return str;
}
# 異步處理
@GetMapping("test3")
public void test3() throws IOException {
BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("edu-app-user", "doc", "3"));
request.add(new UpdateRequest("edu-app-user", "doc", "100")
.doc(XContentType.JSON, "name", "update"));
request.add(new IndexRequest("edu-app-user", "doc", "1")
.source(XContentType.JSON, "name", "baz"));
restHighLevelClient.bulkAsync(request, RequestOptions.DEFAULT,new BulkListen());
}
# 監(jiān)聽
package com.wdz.es.config.es;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateResponse;
public class BulkListen implements ActionListener<BulkResponse> {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
String response = response(bulkItemResponses);
System.out.println("異步成功: "+response);
}
@Override
public void onFailure(Exception e) {
System.out.println("異步處理失敗:"+e.getMessage());
}
public static String response(BulkResponse bulk) {
String str = "";
for (BulkItemResponse responses : bulk) {
System.out.println(responses.toString());
DocWriteResponse response = responses.getResponse();
switch (responses.getOpType()) {
case CREATE:
System.out.println("創(chuàng)建數(shù)據(jù)-----------------");
break;
case INDEX:
IndexResponse indexResponse = (IndexResponse) response;
System.out.println("操作索引數(shù)據(jù)-----------------" + indexResponse.toString());
str = indexResponse.toString();
break;
case DELETE:
DeleteResponse deleteResponse = (DeleteResponse) response;
System.out.println("操作刪除數(shù)據(jù)-----------------" + deleteResponse.toString());
str = deleteResponse.toString();
break;
case UPDATE:
UpdateResponse updateResponse = (UpdateResponse) response;
System.out.println("操作更新數(shù)據(jù)-----------------" + updateResponse.toString());
str = updateResponse.toString();
break;
}
// 獲取失敗的處理
BulkItemResponse.Failure failure = responses.getFailure();
System.out.println(failure.toString());
}
return str;
}
}
# 結(jié)果
org.elasticsearch.action.bulk.BulkItemResponse@6ca820cd
操作刪除數(shù)據(jù)-----------------DeleteResponse[index=edu-app-user,type=doc,id=3,version=4,result=deleted,shards=ShardInfo{total=2, successful=1, failures=[]}]
org.elasticsearch.action.bulk.BulkItemResponse@25ae673
操作更新數(shù)據(jù)-----------------UpdateResponse[index=edu-app-user,type=doc,id=2,version=4,seqNo=5,primaryTerm=1,result=updated,shards=ShardInfo{total=2, successful=1, failures=[]}]
org.elasticsearch.action.bulk.BulkItemResponse@5e6d6373
操作索引數(shù)據(jù)-----------------IndexResponse[index=edu-app-user,type=doc,id=4,version=1,result=created,seqNo=6,primaryTerm=1,shards={"total":2,"successful":1,"failed":0}]
可選操作
# 設(shè)置超時(shí)時(shí)間(兩種方式)
request.timeout(TimeValue.timeValueMinutes(2));
request.timeout("2m");
# 設(shè)置在繼續(xù)執(zhí)行索引/更新/刪除操作之前必須處于活動(dòng)狀態(tài)的分片副本數(shù)。
request.waitForActiveShards(2);
# 提供的分片副本數(shù)ActiveShardCount: 可以是 ActiveShardCount.ALL,ActiveShardCount.ONE或 ActiveShardCount.DEFAULT(默認(rèn))
request.waitForActiveShards(ActiveShardCount.ALL);
批處理 BulkProcessor
@GetMapping("test2")
public void test2() throws IOException {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
System.out.println(executionId+"批處理之前request:"+JSONObject.toJSONString(request));
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
System.out.println(executionId+"批處理之后request:"+JSONObject.toJSONString(request));
System.out.println(executionId+"批處理之后response:"+JSONObject.toJSONString(response));
}
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
System.out.println(executionId+"批處理異常request:"+JSONObject.toJSONString(request));
System.out.println(executionId+"批處理異常failure:"+failure.getMessage());
}
};
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request, bulkListener) ->
restHighLevelClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
BulkProcessor bulkProcessor =
BulkProcessor.builder(bulkConsumer, listener).build();
bulkProcessor.add(new IndexRequest("edu-app-user","doc","1002").source(XContentType.JSON,"name","BulkProcessor"));
bulkProcessor.add(new DeleteRequest("edu-app-user","doc","2"));
bulkProcessor.add(new UpdateRequest("edu-app-user","doc","28").doc(XContentType.JSON,"name","更新測(cè)試"));
// 這兩種方法都在關(guān)閉處理器之前刷新添加到處理器的請(qǐng)求,并且還禁止向其添加任何新請(qǐng)求
// 直到所有請(qǐng)求都已處理或指定的等待時(shí)間過去
try {
bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 該close()方法可用于立即關(guān)閉BulkProcessor
bulkProcessor.close();
System.out.println(JSONObject.toJSONString(bulkConsumer));
System.out.println(JSONObject.toJSONString(add));
System.out.println(JSONObject.toJSONString(bulkProcessor));
}
# 異常結(jié)果
1批處理之前request:{"description":"requests[1], indices[bulk-test]","parentTask":{"id":-1,"nodeId":"","set":false},"refreshPolicy":"NONE","shouldStoreResult":false}
1批處理異常request:{"description":"requests[1], indices[bulk-test]","parentTask":{"id":-1,"nodeId":"","set":false},"refreshPolicy":"NONE","shouldStoreResult":false}
1批處理異常failure:Validation Failed: 1: source is missing;2: content type is missing;
{}
{}
{}
# 成功結(jié)果
1批處理之前request:{"description":"requests[1], indices[edu-app-user]","parentTask":{"id":-1,"nodeId":"","set":false},"refreshPolicy":"NONE","shouldStoreResult":false}
{}
{}
{}
1批處理之后request:{"description":"requests[1], indices[edu-app-user]","parentTask":{"id":-1,"nodeId":"","set":false},"refreshPolicy":"NONE","shouldStoreResult":false}
1批處理之后response:{"fragment":false,"ingestTook":{"days":0,"daysFrac":-1.1574074074074074E-8,"hours":0,"hoursFrac":-2.7777777777777776E-7,"micros":-1000,"microsFrac":-1000.0,"millis":-1,"millisFrac":-1.0,"minutes":0,"minutesFrac":-1.6666666666666667E-5,"nanos":-1000000,"seconds":0,"secondsFrac":-0.001,"stringRep":"-1"},"ingestTookInMillis":-1,"items":[{"failed":false,"fragment":false,"id":"100","index":"edu-app-user","itemId":0,"opType":"INDEX","response":{"fragment":false,"id":"100","index":"edu-app-user","primaryTerm":1,"result":"CREATED","seqNo":1,"shardId":{"fragment":true,"id":-1,"index":{"fragment":false,"name":"edu-app-user","uUID":"_na_"},"indexName":"edu-app-user"},"shardInfo":{"failed":0,"failures":[],"fragment":false,"successful":1,"total":2},"type":"doc","version":1},"type":"doc","version":1}],"took":{"days":0,"daysFrac":1.0300925925925926E-6,"hours":0,"hoursFrac":2.4722222222222223E-5,"micros":89000,"microsFrac":89000.0,"millis":89,"millisFrac":89.0,"minutes":0,"minutesFrac":0.0014833333333333332,"nanos":89000000,"seconds":0,"secondsFrac":0.089,"stringRep":"89ms"}}
# 混合批處理結(jié)果
1批處理之前request:{"description":"requests[3], indices[edu-app-user]","parentTask":{"id":-1,"nodeId":"","set":false},"refreshPolicy":"NONE","shouldStoreResult":false}
{}
{}
{}
1批處理之后request:{"description":"requests[3], indices[edu-app-user]","parentTask":{"id":-1,"nodeId":"","set":false},"refreshPolicy":"NONE","shouldStoreResult":false}
1批處理之后response:{"fragment":false,"ingestTook":{"days":0,"daysFrac":-1.1574074074074074E-8,"hours":0,"hoursFrac":-2.7777777777777776E-7,"micros":-1000,"microsFrac":-1000.0,"millis":-1,"millisFrac":-1.0,"minutes":0,"minutesFrac":-1.6666666666666667E-5,"nanos":-1000000,"seconds":0,"secondsFrac":-0.001,"stringRep":"-1"},"ingestTookInMillis":-1,"items":[{"failed":false,"fragment":false,"id":"1002","index":"edu-app-user","itemId":0,"opType":"INDEX","response":{"fragment":false,"id":"1002","index":"edu-app-user","primaryTerm":1,"result":"CREATED","seqNo":16,"shardId":{"fragment":true,"id":-1,"index":{"fragment":false,"name":"edu-app-user","uUID":"_na_"},"indexName":"edu-app-user"},"shardInfo":{"failed":0,"failures":[],"fragment":false,"successful":1,"total":2},"type":"doc","version":1},"type":"doc","version":1},{"failed":false,"fragment":false,"id":"2","index":"edu-app-user","itemId":1,"opType":"DELETE","response":{"fragment":false,"id":"2","index":"edu-app-user","primaryTerm":1,"result":"DELETED","seqNo":7,"shardId":{"fragment":true,"id":-1,"index":{"fragment":false,"name":"edu-app-user","uUID":"_na_"},"indexName":"edu-app-user"},"shardInfo":{"failed":0,"failures":[],"fragment":false,"successful":1,"total":2},"type":"doc","version":5},"type":"doc","version":5},{"failed":false,"fragment":false,"id":"28","index":"edu-app-user","itemId":2,"opType":"UPDATE","response":{"fragment":false,"id":"28","index":"edu-app-user","primaryTerm":1,"result":"UPDATED","seqNo":17,"shardId":{"fragment":true,"id":-1,"index":{"fragment":false,"name":"edu-app-user","uUID":"_na_"},"indexName":"edu-app-user"},"shardInfo":{"failed":0,"failures":[],"fragment":false,"successful":1,"total":2},"type":"doc","version":8},"type":"doc","version":8}],"took":{"days":0,"daysFrac":1.0069444444444445E-6,"hours":0,"hoursFrac":2.4166666666666667E-5,"micros":87000,"microsFrac":87000.0,"millis":87,"millisFrac":87.0,"minutes":0,"minutesFrac":0.00145,"nanos":87000000,"seconds":0,"secondsFrac":0.087,"stringRep":"87ms"}}
multi Get API 多獲取
multiGetAPI 在單個(gè) http 請(qǐng)求中并行執(zhí)行多個(gè)請(qǐng)求get 。
MultiGetRequest,添加 `MultiGetRequest.Item 來配置要獲取的內(nèi)容:
@GetMapping("get")
public MultiGetResponse get(){
MultiGetRequest request = new MultiGetRequest();
MultiGetRequest.Item item = new MultiGetRequest.Item(index, type, "28");
MultiGetRequest.Item item1 = new MultiGetRequest.Item(index, type, "1");
MultiGetRequest.Item item2 = new MultiGetRequest.Item(index, type, "3");
request.add(item);
request.add(item1);
request.add(item2);
MultiGetResponse mget = null;
try {
mget = restHighLevelClient.mget(request, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
return mget;
}
可選參數(shù):
# 禁用抓取_source
new MultiGetRequest.Item("index", "type", "example_id").fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE)
# 過濾數(shù)據(jù)
String[] includes = new String[] {"name", "age"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.add(item.fetchSourceContext(fetchSourceContext));
多獲取異步處理方式與其他異步一致更新泛型即可文章來源:http://www.zghlxwxcb.cn/news/detail-515835.html
上一篇:Elasticsearch Java REST Client Term Vectors API
下一篇:Elasticsearch Java REST Client Search APIs 查詢
到了這里,關(guān)于Elasticsearch Java REST Client 批量操作(Bulk API)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!