ElasticSearch
介紹(Introduction)
特征
- 適用于所有 Elasticsearch API 的強(qiáng)類(lèi)型請(qǐng)求和響應(yīng)。
- 所有 API 的阻塞和異步版本。
- 在創(chuàng)建復(fù)雜的嵌套結(jié)構(gòu)時(shí),使用流暢的構(gòu)建器和功能模式允許編寫(xiě)簡(jiǎn)潔但可讀的代碼。
- 通過(guò)使用對(duì)象映射器(例如 Jackson 或任何 JSON-B 實(shí)現(xiàn))無(wú)縫集成應(yīng)用程序類(lèi)。
- 將協(xié)議處理委托給一個(gè) http 客戶(hù)端,例如Java 低級(jí) REST 客戶(hù)端 ,它負(fù)責(zé)處理所有傳輸級(jí)別的問(wèn)題:HTTP 連接池、重試、節(jié)點(diǎn)發(fā)現(xiàn)等。
服務(wù)器兼容策略
Elasticsearch Java 客戶(hù)端是向前兼容的;這意味著客戶(hù)端支持與更大或相等的次要版本的 Elasticsearch 進(jìn)行通信。Elasticsearch 語(yǔ)言客戶(hù)端僅向后兼容默認(rèn)發(fā)行版,并且不作任何保證。
入門(mén)
安裝
安裝要求
- 至少Java8以上的版本
- 需要一個(gè) JSON 對(duì)象映射庫(kù),允許您的應(yīng)用程序類(lèi)與 Elasticsearch API 無(wú)縫集成。
maven
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>7.17.9</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.3</version>
</dependency>
運(yùn)行時(shí)可能會(huì)報(bào)異常 ClassNotFoundException: jakarta.json.spi.JsonProvider
則需要加入以下依賴(lài)
<dependency>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
<version>2.0.1</version>
</dependency>
為什么會(huì)出現(xiàn)這個(gè)問(wèn)題?
SpringBoot框架帶有的maven插件,用于簡(jiǎn)化開(kāi)發(fā)和依賴(lài)管理,這些插件中帶有很多知名的庫(kù)。
1.x的庫(kù)里面使用了 javax.json
的包,2.x里面用的jakarta.json
的包,(從 JavaEE 過(guò)渡到 JakartaEE),所以導(dǎo)致了 ClassNotFoundException: jakarta.json.spi.JsonProvider
我們只需要添加正確的項(xiàng)目依賴(lài)就可以了
連接中(connecting)
Java API 客戶(hù)端圍繞三個(gè)主要組件構(gòu)建
-
API 客戶(hù)端類(lèi)。這些為 Elasticsearch API 提供強(qiáng)類(lèi)型數(shù)據(jù)結(jié)構(gòu)和方法。由于 Elasticsearch API 很大,它以功能組(也稱(chēng)為“命名空間”)為結(jié)構(gòu),每個(gè)功能組都有自己的客戶(hù)端類(lèi)。Elasticsearch 核心功能在類(lèi)中實(shí)現(xiàn)
ElasticsearchClient
。 - 一個(gè) JSON 對(duì)象映射器。這會(huì)將您的應(yīng)用程序類(lèi)映射到 JSON,并將它們與 API 客戶(hù)端無(wú)縫集成。
- 傳輸層實(shí)現(xiàn)。這是所有 HTTP 請(qǐng)求處理發(fā)生的地方。
以下代碼完成了上面的三個(gè)組件
// Create the low-level client
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9200)).build();
// Create the transport with a Jackson mapper
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
// And create the API client
ElasticsearchClient client = new ElasticsearchClient(transport);
身份驗(yàn)證由Java Low Level REST Client管理。有關(guān)配置身份驗(yàn)證的更多詳細(xì)信息,請(qǐng)參閱 其文檔。
第一個(gè) ElasticSearch 請(qǐng)求
需求:查詢(xún)es數(shù)據(jù)庫(kù)中,name = ‘bike’ 的所有Product對(duì)象
SearchResponse<Product> search = client.search(s -> s
.index("products")
.query(q -> q
.term(t -> t
.field("name")
.value(v -> v.stringValue("bicycle"))
)),
Product.class);
for (Hit<Product> hit: search.hits().hits()) {
processProduct(hit.source());
}
從RestHighLevelClient 遷移
個(gè)人理解
大概意思就是,7.15以前的版本用的是RestHighLevelClient ,其以后用的是Elasticsearch Java API Client ,這兩種客戶(hù)端可以并存,你可以同時(shí)使用,并把原來(lái)RestHighLevelClient 里面的一些廢棄的接口、low的寫(xiě)法逐步過(guò)渡到Elasticsearch Java API Client 。
官方文檔
棄用 RestHighLevelClient 服務(wù)端,使用 Elasticsearch Java API Client 客戶(hù)端。
兩個(gè)客戶(hù)端庫(kù)可以在沒(méi)有操作開(kāi)銷(xiāo)的情況下共存于單個(gè)應(yīng)用程序中,因?yàn)?Elasticsearch Java API Client 客戶(hù)端 是一個(gè)全新的客戶(hù)端,獨(dú)立于 Elasticsearch 服務(wù)器。
兼容模式
HLRC(RestHighLevelClient )可以啟用兼容模式,讓HLRC版本的7.17和Elasticsearch 一起使用,在這種模式下,
Java API 客戶(hù)端不需要此設(shè)置,因?yàn)榧嫒菽J绞冀K處于啟用狀態(tài)。
將相同的 http 客戶(hù)端與 HLRC 和 Java API 客戶(hù)端一起使用
為了避免在應(yīng)用程序同時(shí)使用 HLRC 和新的 Java API 客戶(hù)端的過(guò)渡階段產(chǎn)生任何操作開(kāi)銷(xiāo),兩個(gè)客戶(hù)端可以共享相同的 Low Level Rest Client,這是管理所有連接、循環(huán)策略的網(wǎng)絡(luò)層,節(jié)點(diǎn)嗅探等。
下面的代碼顯示了如何使用相同的 HTTP 客戶(hù)端初始化兩個(gè)客戶(hù)端:
// Create the low-level client
RestClient httpClient = RestClient.builder(
new HttpHost("localhost", 9200)
).build();
// Create the HLRC
RestHighLevelClient hlrc = new RestHighLevelClientBuilder(httpClient)
// Enables compatibility mode that allows HLRC 7.17 to work with Elasticsearch 8.x.
.setApiCompatibilityMode(true)
.build();
// Create the Java API Client with the same low level client
ElasticsearchTransport transport = new RestClientTransport(
httpClient,
new JacksonJsonpMapper()
);
ElasticsearchClient esClient = new ElasticsearchClient(transport);
// hlrc and esClient share the same httpClient
轉(zhuǎn)型策略
您可以通過(guò)多種不同的方式在應(yīng)用程序代碼中開(kāi)始從 HLRC 過(guò)渡。
例如:
- 保持現(xiàn)有代碼不變,并使用新的 Java API 客戶(hù)端實(shí)現(xiàn)應(yīng)用程序中的新功能,然后遷移現(xiàn)有代碼,
- 重寫(xiě)應(yīng)用程序中新的 Java API 客戶(hù)端比 HLRC 客戶(hù)端更容易使用的部分,例如與搜索相關(guān)的所有內(nèi)容,
- 通過(guò)利用新的 Java API 客戶(hù)端與 JSON 對(duì)象映射器的緊密集成,重寫(xiě)那些需要將應(yīng)用程序?qū)ο笥成涞?JSON 或從 JSON 映射到 JSON 的部分。
Api約定
包結(jié)構(gòu)和命名空間客戶(hù)端
大概是說(shuō)Java api和ElasticSearch Api 的分組,都是以包的結(jié)構(gòu)和命名空間的方式
// Create the "products" index
ElasticsearchClient client = ...
client.indices().create(c -> c.index("products"));
tips:如果數(shù)據(jù)量較大,可能會(huì)按月創(chuàng)建索引,比如 xx_2023_02,xx_2023_3 … 等,你往es上傳數(shù)據(jù)的時(shí)候,會(huì)設(shè)置一個(gè)indexName,如果現(xiàn)在的es中沒(méi)有這個(gè)索引,則創(chuàng)建索引并保存數(shù)據(jù)。
方法命名約定
- 作為 API 一部分的方法和屬性,例如
ElasticsearchClient.search()
或SearchResponse.maxScore()
。它們是使用標(biāo)準(zhǔn) Java 約定從 Elasticsearch JSON API 中各自的名稱(chēng)派生而來(lái)的camelCaseNaming
。 - 作為構(gòu)建 Java API 客戶(hù)端的框架的一部分的方法和屬性,例如
Query._kind()
. 這些方法和屬性以下劃線為前綴,以避免與 API 名稱(chēng)發(fā)生任何命名沖突,并作為區(qū)分 API 和框架的簡(jiǎn)單方法。
阻塞和異步 client
API 客戶(hù)端有兩種形式:阻塞式和異步式。異步客戶(hù)端上的所有方法都返回一個(gè)標(biāo)準(zhǔn)的CompletableFuture
.
ElasticsearchTransport transport = ...
// Synchronous blocking client
ElasticsearchClient client = new ElasticsearchClient(transport);
if (client.exists(b -> b.index("products").id("foo")).value()) {
logger.info("product exists");
}
// Asynchronous non-blocking client
ElasticsearchAsyncClient asyncClient =
new ElasticsearchAsyncClient(transport);
asyncClient
.exists(b -> b.index("products").id("foo"))
.whenComplete((response, exception) -> {
if (exception != null) {
logger.error("Failed to index", exception);
} else {
logger.info("Product exists");
}
});
切記,異步時(shí)一定要對(duì)異常情況進(jìn)行處理?。?!
構(gòu)建Api對(duì)象
建造者對(duì)象(Builder Project)
ElasticsearchClient client = ...
CreateIndexResponse createResponse = client.indices().create(
new CreateIndexRequest.Builder()
.index("my-index")
.aliases("foo",
new Alias.Builder().isWriteIndex(true).build()
)
.build()
);
在調(diào)用構(gòu)建器的方法后,不能重用構(gòu)建器。
構(gòu)建器lambda表達(dá)式(感覺(jué)比上面那個(gè)清爽)
ElasticsearchClient client = ...
CreateIndexResponse createResponse = client.indices()
.create(createIndexBuilder -> createIndexBuilder
.index("my-index")
.aliases("foo", aliasBuilder -> aliasBuilder
.isWriteIndex(true)
)
);
比上面那個(gè)更清爽,lambda表達(dá)式的入?yún)⒁?jiǎn)潔明了,不需要和fori循環(huán)一樣望文生義。
ElasticsearchClient client = ...
CreateIndexResponse createResponse = client.indices()
.create(c -> c
.index("my-index")
.aliases("foo", a -> a
.isWriteIndex(true)
)
);
Builder 的 lambda 在復(fù)雜嵌套查詢(xún)下的應(yīng)用.
剛?cè)腴T(mén),看不懂下面這個(gè)查詢(xún)啥意思
ElasticsearchClient client = ...
SearchResponse<SomeApplicationData> results = client
.search(b0 -> b0
.query(b1 -> b1
.intervals(b2 -> b2
.field("my_text")
.allOf(b3 -> b3
.ordered(true)
.intervals(b4 -> b4
.match(b5 -> b5
.query("my favorite food")
.maxGaps(0)
.ordered(true)
)
)
.intervals(b4 -> b4
.anyOf(b5 -> b5
.intervals(b6 -> b6
.match(b7 -> b7
.query("hot water")
)
)
.intervals(b6 -> b6
.match(b7 -> b7
.query("cold porridge")
)
)
)
)
)
)
),
// 搜索結(jié)果將映射到SomeApplicationData實(shí)例,以便應(yīng)用程序隨時(shí)可用
SomeApplicationData.class
);
List And Map(兩種數(shù)據(jù)類(lèi)型)
添加的 builder setters
// Prepare a list of index names
List<String> names = Arrays.asList("idx-a", "idx-b", "idx-c");
// Prepare cardinality aggregations for fields "foo" and "bar"
Map<String, Aggregation> cardinalities = new HashMap<>();
cardinalities.put("foo-count", Aggregation.of(a -> a.cardinality(c -> c.field("foo"))));
cardinalities.put("bar-count", Aggregation.of(a -> a.cardinality(c -> c.field("bar"))));
// Prepare an aggregation that computes the average of the "size" field
final Aggregation avgSize = Aggregation.of(a -> a.avg(v -> v.field("size")));
SearchRequest search = SearchRequest.of(r -> r
// Index list:
// - add all elements of a list
.index(names)
// - add a single element
.index("idx-d")
// - add a vararg list of elements
.index("idx-e", "idx-f", "idx-g")
// Sort order list: add elements defined by builder lambdas
.sort(s -> s.field(f -> f.field("foo").order(SortOrder.Asc)))
.sort(s -> s.field(f -> f.field("bar").order(SortOrder.Desc)))
// Aggregation map:
// - add all entries of an existing map
.aggregations(cardinalities)
// - add a key/value entry
.aggregations("avg-size", avgSize)
// - add a key/value defined by a builder lambda
.aggregations("price-histogram",
a -> a.histogram(h -> h.field("price")))
);
List And Map 永遠(yuǎn)不為 null
NodeStatistics stats = NodeStatistics.of(b -> b
.total(1)
.failed(0)
.successful(1)
);
// The `failures` list was not provided.
// - it's not null
assertNotNull(stats.failures());
// - it's empty
assertEquals(0, stats.failures().size());
// - and if needed we can know it was actually not defined
assertFalse(ApiTypeHelper.isDefined(stats.failures()));
Variant types(變種類(lèi)型)
大概意思就是說(shuō),比如查詢(xún)條件,可以允許多種類(lèi)型的查詢(xún)條件拼接,允許在構(gòu)建查詢(xún)條件中的各種騷操作。
Elasticsearch API 有很多變體類(lèi)型:查詢(xún)、聚合、字段映射、分析器等等。在如此龐大的集合中找到正確的類(lèi)名可能具有挑戰(zhàn)性。
Java API 客戶(hù)端構(gòu)建器使這變得簡(jiǎn)單:變體類(lèi)型的構(gòu)建器(例如 Query
)具有適用于每個(gè)可用實(shí)現(xiàn)的方法。我們已經(jīng)在上面的操作中看到了intervals
(一種查詢(xún))和allOf
,match
以及 anyOf
(各種間隔)。
這是因?yàn)?Java API 客戶(hù)端中的變體對(duì)象是“標(biāo)記聯(lián)合”的實(shí)現(xiàn):它們包含它們持有的變體的標(biāo)識(shí)符(或標(biāo)記)以及該變體的值。例如,一個(gè)Query
對(duì)象可以包含一個(gè) IntervalsQuery
with tag intervals
、一個(gè)TermQuery
with tagterm
等等。這種方法允許編寫(xiě)流暢的代碼,您可以讓 IDE 完成功能指導(dǎo)您構(gòu)建和導(dǎo)航復(fù)雜的嵌套結(jié)構(gòu):
變體構(gòu)建器為每個(gè)可用的實(shí)現(xiàn)都提供了 setter 方法。它們使用與常規(guī)屬性相同的約定,并接受構(gòu)建器 lambda 表達(dá)式和變體實(shí)際類(lèi)型的現(xiàn)成對(duì)象。下面是構(gòu)建術(shù)語(yǔ)查詢(xún)的示例:
Query query = new Query.Builder()
// 選擇term變體以構(gòu)建術(shù)語(yǔ)查詢(xún)。
.term(t -> t
// 使用構(gòu)建器 lambda 表達(dá)式構(gòu)建術(shù)語(yǔ)查詢(xún)。
.field("name")
.value(v -> v.stringValue("foo"))
)
// 構(gòu)建Query現(xiàn)在持有一個(gè)TermQuery對(duì)象的 kind term。
.build();
變體對(duì)象還提供有關(guān)它們當(dāng)前持有的變體種類(lèi)的信息:
-
具有
is
針對(duì)每種變體類(lèi)型的方法:isTerm()
、isIntervals()
、isFuzzy()
等。 -
帶有
Kind
定義所有變體類(lèi)型的嵌套枚舉。 -
// 測(cè)試變體是否屬于特定種類(lèi) if (query.isTerm()) { doSomething(query.term()); } // 測(cè)試一組更大的變體類(lèi)型。 switch(query._kind()) { case Term: doSomething(query.term()); break; case Intervals: doSomething(query.intervals()); break; default: // 獲取變體對(duì)象所持有的種類(lèi)和值。 doSomething(query._kind(), query._get()); }
對(duì)象生命周期和線程安全
Java API Client 中有五種不同生命周期的對(duì)象:
Object mapper
無(wú)狀態(tài)和線程安全,但創(chuàng)建成本高。它通常是在應(yīng)用程序啟動(dòng)時(shí)創(chuàng)建并用于創(chuàng)建傳輸?shù)膯卫?/p>
Transport
線程安全,通過(guò)底層 HTTP 客戶(hù)端持有網(wǎng)絡(luò)資源。傳輸對(duì)象與 Elasticsearch 集群相關(guān)聯(lián),必須顯式關(guān)閉才能釋放底層資源,例如網(wǎng)絡(luò)連接。
Clients
不可變的、無(wú)狀態(tài)的和線程安全的。這些是非常輕量級(jí)的對(duì)象,僅包裝傳輸并提供 API 端點(diǎn)作為方法。
Builders
可變的,非線程安全的。生成器是臨時(shí)對(duì)象,不應(yīng)在調(diào)用 build()
.
Requests & other API objects
不可變的,線程安全的。如果您的應(yīng)用程序反復(fù)使用相同的請(qǐng)求或請(qǐng)求的相同部分,則可以提前準(zhǔn)備好這些對(duì)象,并在使用不同傳輸?shù)亩鄠€(gè)客戶(hù)端的多個(gè)調(diào)用中重復(fù)使用。
從JSON數(shù)據(jù)創(chuàng)建API對(duì)象(7.17.2開(kāi)始支持的)
從JSON中加載的屬性會(huì)設(shè)置上文中未出現(xiàn)的屬性值,也會(huì)替換上文中已出現(xiàn)的值
可以讀取文件中JSON數(shù)據(jù),進(jìn)行創(chuàng)建索引、查詢(xún)等操作。
比如有個(gè)some-index.json文件內(nèi)容如下
{
"mappings": {
"properties": {
"field1": { "type": "text" }
}
}
}
我們可以這樣創(chuàng)建一個(gè)索引
InputStream input = this.getClass()
.getResourceAsStream("some-index.json");
CreateIndexRequest req = CreateIndexRequest.of(b -> b
.index("some-index")
.withJson(input)
);
boolean created = client.indices().create(req).acknowledged();
也可以的自定義JSON格式的查詢(xún)、添加條件,進(jìn)行操作
Reader queryJson = new StringReader(
"{" +
" \"query\": {" +
" \"range\": {" +
" \"@timestamp\": {" +
" \"gt\": \"now-1w\"" +
" }" +
" }" +
" }" +
"}");
SearchRequest aggRequest = SearchRequest.of(b -> b
.withJson(queryJson)
.aggregations("max-cpu", a1 -> a1
.dateHistogram(h -> h
.field("@timestamp")
.calendarInterval(CalendarInterval.Hour)
)
.aggregations("max", a2 -> a2
.max(m -> m.field("host.cpu.usage"))
)
)
.size(0)
);
Map<String, Aggregate> aggs = client
.search(aggRequest, Void.class)
.aggregations();
也支持聚合
Reader queryJson = new StringReader(
"{" +
" \"query\": {" +
" \"range\": {" +
" \"@timestamp\": {" +
" \"gt\": \"now-1w\"" +
" }" +
" }" +
" }," +
" \"size\": 100" +
"}");
Reader aggregationJson = new StringReader(
"{" +
" \"size\": 0, " +
" \"aggregations\": {" +
" \"hours\": {" +
" \"date_histogram\": {" +
" \"field\": \"@timestamp\"," +
" \"interval\": \"hour\"" +
" }," +
" \"aggregations\": {" +
" \"max-cpu\": {" +
" \"max\": {" +
" \"field\": \"host.cpu.usage\"" +
" }" +
" }" +
" }" +
" }" +
" }" +
"}");
SearchRequest aggRequest = SearchRequest.of(b -> b
.withJson(queryJson)
.withJson(aggregationJson)
// 默認(rèn)Es中不存在目標(biāo)索引會(huì)報(bào)錯(cuò),加此配置不報(bào)錯(cuò)
.ignoreUnavailable(true)
);
Map<String, Aggregate> aggs = client
.search(aggRequest, Void.class)
.aggregations();
當(dāng)各個(gè)JSON中都有通用屬性時(shí),順序很重要,就像第二個(gè)set會(huì)覆蓋第一個(gè)set的值
兩種異常
-
ElasticsearchException
服務(wù)器收到但是被拒絕報(bào)此異常,比如驗(yàn)證錯(cuò)誤、服務(wù)器內(nèi)部超時(shí)。
-
TransportException
未達(dá)到服務(wù)器報(bào)此異常,比如網(wǎng)絡(luò)錯(cuò)誤、服務(wù)器不可用。該異常原因是較低級(jí)別的實(shí)現(xiàn)拋出的異常。在這種情況下,
RestClientTransport
它將是一個(gè)ResponseException
包含低級(jí)別 HTTP 響應(yīng)的。
使用Java Api 客戶(hù)端
索引單個(gè)文檔
using the fluent DSL(使用DSL語(yǔ)句)
Product product = new Product("bk-1", "City bike", 123.0);
IndexResponse response = esClient.index(i -> i
.index("products")
.id(product.getSku())
.document(product)
);
logger.info("Indexed with version " + response.version());
Product product = new Product("bk-1", "City bike", 123.0);
IndexRequest<Product> request = IndexRequest.of(i -> i
.index("products")
.id(product.getSku())
.document(product)
);
IndexResponse response = esClient.index(request);
logger.info("Indexed with version " + response.version());
using classic builders(使用經(jīng)典構(gòu)建器)
Product product = new Product("bk-1", "City bike", 123.0);
IndexRequest.Builder<Product> indexReqBuilder = new IndexRequest.Builder<>();
indexReqBuilder.index("product");
indexReqBuilder.id(product.getSku());
indexReqBuilder.document(product);
// 注釋?zhuān)簜€(gè)人覺(jué)得這樣用鏈?zhǔn)秸{(diào)用比較好看
Query termQuery = new TermQuery.Builder()
.field()
.value()
.build();
IndexResponse response = esClient.index(indexReqBuilder.build());
logger.info("Indexed with version " + response.version());
Using the asynchronous client 使用異步客戶(hù)端
ElasticsearchAsyncClient esAsyncClient = new ElasticsearchAsyncClient(transport);
Product product = new Product("bk-1", "City bike", 123.0);
esAsyncClient.index(i -> i
.index("products")
.id(product.getSku())
.document(product)
).whenComplete((response, exception) -> {
if (exception != null) {
logger.error("Failed to index", exception);
} else {
logger.info("Indexed with version " + response.version());
}
});
api客戶(hù)端有兩種形式,阻塞式和異步式,兩種客戶(hù)端都返回一個(gè)標(biāo)準(zhǔn)的CompletableFuture.
如下:
ElasticsearchTransport transport = ...
// Synchronous blocking client
ElasticsearchClient client = new ElasticsearchClient(transport);
if (client.exists(b -> b.index("products").id("foo")).value()) {
logger.info("product exists");
}
// Asynchronous non-blocking client
ElasticsearchAsyncClient asyncClient =
new ElasticsearchAsyncClient(transport);
asyncClient
.exists(b -> b.index("products").id("foo"))
.whenComplete((response, exception) -> {
if (exception != null) {
logger.error("Failed to index", exception);
} else {
logger.info("Product exists");
}
});
異步客戶(hù)端一定要記得處理異步失敗
Using raw JSON data(使用原始JSON數(shù)據(jù))
當(dāng)您要索引的數(shù)據(jù)來(lái)自外部來(lái)源時(shí),必須創(chuàng)建域?qū)ο罂赡芎苈闊?,或者?duì)于半結(jié)構(gòu)化數(shù)據(jù)來(lái)說(shuō)是完全不可能的。(個(gè)人理解:假如你現(xiàn)在有一個(gè)篩選條件篩選數(shù)據(jù),且已經(jīng)上到了線上環(huán)境,現(xiàn)在需要修改這個(gè)查詢(xún)條件,不應(yīng)該代碼里修改然后再打包發(fā)布。應(yīng)該用文本寫(xiě)好查詢(xún)條件,修改的時(shí)候直接修改文本即可)。
方法withJson(),此方法讀取源并用于索引請(qǐng)求的document屬性,案例Creating API objects from JSON data
Reader input = new StringReader(
"{'@timestamp': '2022-04-08T13:55:32Z', 'level': 'warn', 'message': 'Some log message'}"
.replace('\'', '"'));
IndexRequest<JsonData> request = IndexRequest.of(i -> i
.index("logs")
.withJson(input)
);
IndexResponse response = esClient.index(request);
logger.info("Indexed with version " + response.version());
以上的源碼案例來(lái)自于Java API Client tests.
批量:索引多個(gè)文檔
批量請(qǐng)求允許在一個(gè)請(qǐng)求中向ElasticSearch發(fā)送多個(gè)文檔相關(guān)的操作。
批量請(qǐng)求說(shuō)明:Elasticsearch API 文檔。
批量請(qǐng)求可以包含多種操作
- 創(chuàng)建document,確保它不存在后對(duì)其進(jìn)行索引
- 索引文檔,在需要時(shí)創(chuàng)建它
- 使用腳本或部分文檔更新已存在的文檔
- 刪除文檔
indexing application projects(索引應(yīng)用程序?qū)ο?
一個(gè)BulkRequest包含一組操作,每個(gè)操作都是一個(gè)variant type,主要請(qǐng)求使用構(gòu)建器對(duì)象,每個(gè)操作使用DSL。
如下:
List<Product> products = fetchProducts();
BulkRequest.Builder br = new BulkRequest.Builder();
for (Product product : products) {
// 添加一個(gè)操作(記住列表屬性是可加的)。opis 是一個(gè)構(gòu)建器,BulkOperation它是一個(gè)變體類(lèi)型。此類(lèi)型有index、create和變update體delete。
br.operations(op -> op
// 選擇index操作變體,idx是 的構(gòu)建器IndexOperation。
.index(idx -> idx
// 設(shè)置索引操作的屬性,類(lèi)似于單文檔索引:索引名稱(chēng)、標(biāo)識(shí)符和文檔。
.index("products")
.id(product.getSku())
.document(product)
)
);
}
BulkResponse result = esClient.bulk(br.build());
// Log errors, if any
if (result.errors()) {
logger.error("Bulk had errors");
for (BulkResponseItem item: result.items()) {
if (item.error() != null) {
logger.error(item.error().reason());
}
}
}
indexing raw json data(索引原始JSON數(shù)據(jù))
// List json log files in the log directory
File[] logFiles = logDir.listFiles(
file -> file.getName().matches("log-.*\\.json")
);
BulkRequest.Builder br = new BulkRequest.Builder();
for (File file: logFiles) {
FileInputStream input = new FileInputStream(file);
BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON);
br.operations(op -> op
.index(idx -> idx
.index("logs")
.document(data)
)
);
}
使用Bulk Ingester 進(jìn)行流式攝取
通過(guò)提供允許索引/更新/刪除操作在批量請(qǐng)求中透明分組的實(shí)用程序類(lèi),簡(jiǎn)化BulkIngester
了批量 API 的使用。您只需要add()
對(duì) ingester 進(jìn)行批量操作,它會(huì)根據(jù)其配置負(fù)責(zé)分組并批量發(fā)送它們。
當(dāng)滿(mǎn)足以下條件之一時(shí),ingester 將發(fā)送批量請(qǐng)求:
- 操作次數(shù)超過(guò)最大值(默認(rèn)為 1000)
- 以字節(jié)為單位的批量請(qǐng)求大小超過(guò)最大值(默認(rèn)為 5 MiB)
- 自上次請(qǐng)求過(guò)期以來(lái)的延遲(定期刷新,無(wú)默認(rèn)值)
此外,您可以定義等待 Elasticsearch 執(zhí)行的并發(fā)請(qǐng)求的最大數(shù)量(默認(rèn)為 1)。當(dāng)達(dá)到最大值并且已收集到最大操作數(shù)時(shí),向索引器添加新操作將阻塞。這是為了避免通過(guò)對(duì)客戶(hù)端應(yīng)用程序施加背壓來(lái)使 Elasticsearch 服務(wù)器過(guò)載。
BulkIngester<Void> ingester = BulkIngester.of(b -> b
// 設(shè)置用于發(fā)送批量請(qǐng)求的 Elasticsearch 客戶(hù)端
.client(esClient)
// 設(shè)置在發(fā)送批量請(qǐng)求之前要收集的最大操作數(shù)。
.maxOperations(100)
// 設(shè)置刷新間隔。
.flushInterval(1, TimeUnit.SECONDS)
);
for (File file: logFiles) {
FileInputStream input = new FileInputStream(file);
BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON);
// 向攝取器添加批量操作
ingester.add(op -> op
.index(idx -> idx
.index("logs")
.document(data)
)
);
}
// 關(guān)閉 ingester 以刷新掛起的操作并釋放資源。
ingester.close();
此外,批量攝取器接受一個(gè)偵聽(tīng)器,以便您的應(yīng)用程序可以收到發(fā)送的批量請(qǐng)求及其結(jié)果的通知。為了允許將批量操作關(guān)聯(lián)到應(yīng)用程序上下文,該add()
方法可選擇接受一個(gè)context
參數(shù)。此上下文參數(shù)的類(lèi)型用作對(duì)象的通用參數(shù)BulkIngester
。您可能已經(jīng)注意到上面Void
的類(lèi)型BulkIngester<Void>
:這是因?yàn)槲覀儧](méi)有注冊(cè)監(jiān)聽(tīng)器,因此不關(guān)心上下文值。
以下示例顯示了如何使用上下文值來(lái)實(shí)現(xiàn)批量攝取偵聽(tīng)器:與以前一樣,它批量發(fā)送 JSON 日志文件,但跟蹤批量請(qǐng)求錯(cuò)誤和失敗的操作。當(dāng)操作失敗時(shí),根據(jù)錯(cuò)誤類(lèi)型,您可能希望將其重新添加到 ingester。
BulkListener<String> listener = new BulkListener<String>() {
@Override
public void beforeBulk(long executionId, BulkRequest request, List<String> contexts) {
}
@Override
public void afterBulk(long executionId, BulkRequest request, List<String> contexts, BulkResponse response) {
// The request was accepted, but may contain failed items.
// The "context" list gives the file name for each bulk item.
logger.debug("Bulk request " + executionId + " completed");
for (int i = 0; i < contexts.size(); i++) {
BulkResponseItem item = response.items().get(i);
if (item.error() != null) {
// Inspect the failure cause
logger.error("Failed to index file " + contexts.get(i) + " - " + item.error().reason());
}
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, List<String> contexts, Throwable failure) {
// The request could not be sent
logger.debug("Bulk request " + executionId + " failed", failure);
}
};
BulkIngester<String> ingester = BulkIngester.of(b -> b
.client(esClient)
.maxOperations(100)
.flushInterval(1, TimeUnit.SECONDS)
.listener(listener)
);
for (File file: logFiles) {
FileInputStream input = new FileInputStream(file);
BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON);
ingester.add(op -> op
.index(idx -> idx
.index("logs")
.document(data)
),
file.getName()
);
}
ingester.close();
批量攝取還公開(kāi)了允許監(jiān)視攝取過(guò)程并調(diào)整其配置的統(tǒng)計(jì)信息:
- 添加的操作數(shù),
-
add()
由于達(dá)到最大并發(fā)請(qǐng)求數(shù)(爭(zhēng)用)而被阻止的 調(diào)用數(shù), - 發(fā)送的批量請(qǐng)求數(shù),
- 由于達(dá)到最大并發(fā)請(qǐng)求數(shù)而被阻止的批量請(qǐng)求數(shù)。
通過(guò)id讀取文檔
Reading a domain object
從索引product中獲取id = bk-1的對(duì)象,此get請(qǐng)求有兩個(gè)參數(shù)
GetResponse<Product> response = esClient.get(g -> g
.index("products")
.id("bk-1"),
// 結(jié)果需要轉(zhuǎn)成的對(duì)象
Product.class
);
if (response.found()) {
Product product = response.source();
logger.info("Product name " + product.getName());
} else {
logger.info ("Product not found");
}
Reading raw JSON
GetResponse<ObjectNode> response = esClient.get(g -> g
.index("products")
.id("bk-1"),
// The target class is a raw JSON object.
ObjectNode.class
);
if (response.found()) {
ObjectNode json = response.source();
String name = json.get("name").asText();
logger.info("Product name " + name);
} else {
logger.info("Product not found");
}
搜索文件(Search for document)
索引文檔可用于近乎實(shí)時(shí)的搜索
簡(jiǎn)單的搜索查詢(xún)
String searchText = "bike";
SearchResponse<Product> response = esClient.search(s -> s
.index("products")
.query(q -> q
.match(t -> t
.field("name")
.query(searchText)
)
),
Product.class
);
TotalHits total = response.hits().total();
boolean isExactResult = total.relation() == TotalHitsRelation.Eq;
if (isExactResult) {
logger.info("There are " + total.value() + " results");
} else {
logger.info("There are more than " + total.value() + " results");
}
List<Hit<Product>> hits = response.hits().hits();
for (Hit<Product> hit: hits) {
Product product = hit.source();
logger.info("Found product " + product.getSku() + ", score " + hit.score());
}
嵌套搜索查詢(xún)
String searchText = "bike";
double maxPrice = 200.0;
// Search by product name
Query byName = MatchQuery.of(m -> m
.field("name")
.query(searchText)
)._toQuery();
// Search by max price
Query byMaxPrice = RangeQuery.of(r -> r
.field("price")
.gte(JsonData.of(maxPrice))
)._toQuery();
// Combine name and price queries to search the product index
SearchResponse<Product> response = esClient.search(s -> s
.index("products")
.query(q -> q
.bool(b -> b
.must(byName)
.must(byMaxPrice)
)
),
Product.class
);
List<Hit<Product>> hits = response.hits().hits();
for (Hit<Product> hit: hits) {
Product product = hit.source();
logger.info("Found product " + product.getSku() + ", score " + hit.score());
}
模板搜索
搜索模板是一種存儲(chǔ)的搜索,您可以使用不同的變量運(yùn)行它。搜索模板讓您無(wú)需修改應(yīng)用程序代碼即可更改搜索。
在運(yùn)行模板搜索之前,您首先必須創(chuàng)建模板。這是一個(gè)返回搜索請(qǐng)求主體的存儲(chǔ)腳本,通常定義為 Mustache 模板。這個(gè)存儲(chǔ)的腳本可以在應(yīng)用程序外部創(chuàng)建,也可以使用 Java API 客戶(hù)端創(chuàng)建:
// Create a script
esClient.putScript(r -> r
// 模板腳本標(biāo)識(shí)符/名稱(chēng)
.id("query-script")
.script(s -> s
.lang("mustache")
.source("{\"query\":{\"match\":{\"{{field}}\":\"{{value}}\"}}}")
));
要使用搜索模板,請(qǐng)使用方法searchTemplate
引用腳本并為其參數(shù)提供值:
SearchTemplateResponse<Product> response = esClient.searchTemplate(r -> r
.index("some-index")
// 要使用的模板腳本標(biāo)識(shí)符
.id("query-script")
// 模板腳本里面需要的參數(shù)
.params("field", JsonData.of("some-field"))
.params("value", JsonData.of("some-data")),
Product.class
);
List<Hit<Product>> hits = response.hits().hits();
for (Hit<Product> hit: hits) {
Product product = hit.source();
logger.info("Found product " + product.getSku() + ", score " + hit.score());
}
聚合
Elasticsearch 文檔。
一個(gè)簡(jiǎn)單的聚合
分析類(lèi)的聚合,一般不使用document,所以查詢(xún)的size為0,返回結(jié)果的目標(biāo)類(lèi)為Void。
例如-查詢(xún):
String searchText = "bike";
Query query = MatchQuery.of(m -> m
.field("name")
.query(searchText)
)._toQuery();
// 查詢(xún)的是價(jià)格直方圖,價(jià)格以50為步進(jìn)遞增分區(qū)查詢(xún)
SearchResponse<Void> response = esClient.search(b -> b
.index("products")
// 不需要document返回?cái)?shù)據(jù),純聚合
.size(0)
.query(query)
// 聚合名稱(chēng)為 'price-histogram'
.aggregations("price-histogram", a -> a
.histogram(h -> h
.field("price")
.interval(50.0)
)
),
Void.class
);
例如-響應(yīng):
List<HistogramBucket> buckets = response.aggregations()
// 聚合名稱(chēng)
.get("price-histogram")
// 聚合類(lèi)型/聚合函數(shù),例如avg、sum、max、min
.histogram()
// 數(shù)組或者map
.buckets().array();
for (HistogramBucket bucket: buckets) {
logger.info("There are " + bucket.docCount() +
" bikes under " + bucket.key());
}
Java API Client tests.
故障排除
MissingRequiredPropertyException
in a response
Java API客戶(hù)端區(qū)分兩種屬性,可選屬性和必須屬性,可選屬性用@Nullable注釋標(biāo)記,未被@Nullable標(biāo)記的必然不會(huì)為null。
但是,Elasticsearch API 規(guī)范可能存在錯(cuò)誤,錯(cuò)誤的需要響應(yīng)對(duì)象的屬性,導(dǎo)致MissingRequiredPropertyException
反序列化響應(yīng)時(shí)出現(xiàn)錯(cuò)誤。如果發(fā)生這種情況,您可以按照以下方法解決:
- 確保使用最新版本的 Java API 客戶(hù)端。該問(wèn)題可能已經(jīng)得到解決。(新版本可能沒(méi)有這個(gè)問(wèn)題)
- 如果問(wèn)題仍然存在于最新版本中,請(qǐng)打開(kāi)一個(gè)問(wèn)題,以便我們可以在下一個(gè)版本中修復(fù)它。請(qǐng)幫助我們改進(jìn) Java API 客戶(hù)端。(如果新版本有,請(qǐng)反饋給官方)
- 暫時(shí)禁用違規(guī)請(qǐng)求所需的屬性檢查(干掉屬性檢查)
ApiTypeHelper.DANGEROUS_disableRequiredPropertiesCheck(true);
SomeRequest request = SomeRequest.of(...);
SomeResponse response = esClient.someApi(request);
ApiTypeHelper.DANGEROUS_disableRequiredPropertiesCheck(false);
// Do something with response
}
該DANGEROUS_disableRequiredPropertiesCheck
方法禁用當(dāng)前線程所需的屬性檢查,以及異步請(qǐng)求中的響應(yīng)反序列化。顧名思義,它是危險(xiǎn)的,因?yàn)樗∠藢?duì)非 . 屬性的保證@Nullable
。在問(wèn)題得到解決之前,這是一個(gè)臨時(shí)解決方法。
請(qǐng)注意,此方法的結(jié)果是一個(gè)AutoCloseable
將所需屬性檢查重置為其先前設(shè)置的對(duì)象。因此,您可以在 try-with-resource 塊中使用它,如下所示:
try (ApiTypeHelper.DisabledChecksHandle h =
ApiTypeHelper.DANGEROUS_disableRequiredPropertiesCheck(true)) {
SomeRequest request = SomeRequest.of(...);
SomeResponse response = esClient.someApi(request);
// Do something with response
}
Serializing aggregations and suggestions without typed keys
大概意思就是
當(dāng)你不需要格式化序列對(duì)象的時(shí)候,typed_keys
您可以通過(guò)將JsonpMapperFeatures.SERIALIZE_TYPED_KEYS
屬性設(shè)置為false
映射器對(duì)象來(lái)禁用序列化。
Elasticsearch 搜索請(qǐng)求接受一個(gè)typed_key
參數(shù),該參數(shù)允許返回類(lèi)型信息以及聚合和建議結(jié)果中的名稱(chēng)(有關(guān)更多詳細(xì)信息,請(qǐng)參閱聚合文檔)。
Java API 客戶(hù)端總是將此參數(shù)添加到搜索請(qǐng)求中,因?yàn)樾枰?lèi)型信息來(lái)了解應(yīng)該用于反序列化聚合和建議結(jié)果的具體類(lèi)。
對(duì)稱(chēng)地,Java API Client 也使用這種typed_keys
格式序列化聚合和建議結(jié)果,以便它可以正確地反序列化自己序列化的結(jié)果。
代碼如下,默認(rèn)有序列化文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-625133.html
ElasticsearchClient esClient = ...
JsonpMapper mapper = esClient._jsonpMapper();
StringWriter writer = new StringWriter();
try (JsonGenerator generator = mapper.jsonProvider().createGenerator(writer)) {
mapper.serialize(searchResponse, generator);
}
String result = writer.toString();
// The aggregation property provides the "avg" type and "price" name
assertTrue(result.contains("\"aggregations\":{\"avg#price\":{\"value\":3.14}}}"));
設(shè)置沒(méi)序列化文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-625133.html
ElasticsearchClient esClient = ...
// Create a new mapper with the typed_keys feature disabled
JsonpMapper mapper = esClient._jsonpMapper()
.withAttribute(JsonpMapperFeatures.SERIALIZE_TYPED_KEYS, false);
StringWriter writer = new StringWriter();
try (JsonGenerator generator = mapper.jsonProvider().createGenerator(writer)) {
mapper.serialize(searchResponse, generator);
}
String result = writer.toString();
// The aggregation only provides the "price" name
assertTrue(result.contains("\"aggregations\":{\"price\":{\"value\":3.14}}}"));
到了這里,關(guān)于學(xué)習(xí)筆記-elstaciElasticSearch7.17官方文檔的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!