Java更新索引(update & upset)
update
更新使用UpdateRequest(update類型更新,只能更新)
public class EsUpdate{
public void updateIndex(TransportClient client){
Date time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.parse("2016-7-21 00:00:01");
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("pointdata")
.type("pointdata")
.id("1")
.doc(XContentFactory.jsonBuilder()
.startObject()
.field("pointid","W3.UNIT1.10LBG01CP302")
.field("pointvalue","0.8029")
.field("inputtime",
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time))
.endObject()
);
//執(zhí)行修改
UpdateResponse response1 = client.update(updateRequest).get();
//查詢修改狀態(tài),返回ok表示成功
System.out.println(response1.status());
}
}
upset
要用IndexRequest設(shè)定添加文檔,UpdateRequest設(shè)定更新文檔,設(shè)定upset執(zhí)行有則修改無則更新(upset類型更新,文檔不存在時(shí)創(chuàng)建)文章來源:http://www.zghlxwxcb.cn/news/detail-508644.html
public class EsUpSet{
public void updateIndex(TransportClient client){
Date time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.parse("2016-7-21 00:00:01");
//添加文檔
IndexRequest request1 = new IndexRequest("pointdata", "pointData", "1")
.source(
XContentFactory.jsonBuilder()
.startObject()
.field("pointid","W3.UNIT1.10LBG01CP302")
.field("pointvalue","0.8029")
.field("inputtime",
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time))
.endObject()
);
//修改文檔
UpdateRequest updateRequest2 = new UpdateRequest();
updateRequest.index("pointdata")
.type("pointdata")
.id("1")
.doc(XContentFactory.jsonBuilder()
.startObject()
.field("pointid","W3.UNIT1.10LBG01CP302")
.field("pointvalue","0.8029")
.field("inputtime",
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time))
.endObject()
).upset(request1);
UpdateResponce responce = client.update(request2).get();
//查詢修改狀態(tài),返回ok表示成功
System.out.println(response2.status());
}
}
基于Bulk的批量更新(update & upset)
動(dòng)態(tài)的更新一個(gè) documents 中的任意 field(字段),包括原來沒有的 field (字段)。文章來源地址http://www.zghlxwxcb.cn/news/detail-508644.html
public ResultMsg updateIndex(final String index,
final JSONArray resultList) {
final ResultMsg resultMsg = new ResultMsg();
BulkRequestBuilder bulkRequest = client.prepareBulk();
bulkRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
int count = 0;
int len = resultList.size();
try {
for (int i = 0; i < len; i++) {
final XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject();
JSONObject json = resultList.getJSONObject(i);
final String id = json.getString("id");
json.remove("id");
for (String key : json.keySet()) {
builder.field(key, json.get(key));
}
builder.endObject();
// update更新
UpdateRequestBuilder requestBuilder = client.prepareUpdate(
index, index, id).setDoc(builder);
// upset更新
UpdateRequestBuilder requestBuilder = client.prepareUpdate(
index, index, id).setDoc(builder).setUpsert();
// IndexRequestBuilder requestBuilder = client.prepareIndex(
// "sampling", indexName, id).setSource(builder);
bulkRequest.add(requestBuilder);
count++;
if (count % BULK_SIZE == 0) {
BulkResponse bulkResponse = bulkRequest.execute()
.actionGet();
if (bulkResponse.hasFailures()) {
LOGGER.error("批量更新索引數(shù)據(jù)失敗: " + indexName);
LOGGER.error("批量更新索引數(shù)據(jù)失敗: "
+ bulkResponse.buildFailureMessage());
resultMsg.setHasFailures(true);
BulkItemResponse[] responses = bulkResponse.getItems();
for (int k = 0; k < responses.length; k++) {
BulkItemResponse response = responses[k];
if (response.isFailed()) {
ErrorDetail errorDetail = new ErrorDetail();
errorDetail.setIndex(k + (i / BULK_SIZE));
errorDetail.setId(response.getId());
errorDetail
.setMsg(response.getFailureMessage());
resultMsg.addError(errorDetail);
}
}
}
bulkRequest = client.prepareBulk();
bulkRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
count = 0;
}
}
if (count > 0) {
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
LOGGER.error("批量更新索引數(shù)據(jù)失敗: " + indexName);
LOGGER.error("批量更新索引數(shù)據(jù)失敗: "
+ bulkResponse.buildFailureMessage());
resultMsg.setHasFailures(true);
BulkItemResponse[] responses = bulkResponse.getItems();
for (int k = 0; k < responses.length; k++) {
BulkItemResponse response = responses[k];
if (response.isFailed()) {
ErrorDetail errorDetail = new ErrorDetail();
errorDetail.setIndex(k + (len / BULK_SIZE));
errorDetail.setId(response.getId());
errorDetail.setMsg(response.getFailureMessage());
resultMsg.addError(errorDetail);
}
}
}
}
return resultMsg;
} catch (Exception e) {
LOGGER.error("批量更新索引數(shù)據(jù)失敗: " + indexName);
throw new RuntimeException("批量更新索引數(shù)據(jù)失敗: " + indexName, e);
} finally {
bulkRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
}
}
到了這里,關(guān)于Elasticsearch Java API 的使用-更新索引(update & upset)與 Bulk的批量更新的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!