国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

學(xué)習(xí)筆記-elstaciElasticSearch7.17官方文檔

這篇具有很好參考價(jià)值的文章主要介紹了學(xué)習(xí)筆記-elstaciElasticSearch7.17官方文檔。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

ElasticSearch

介紹(Introduction)

特征

  • 適用于所有 Elasticsearch API 的強(qiáng)類(lèi)型請(qǐng)求和響應(yīng)。
  • 所有 API 的阻塞和異步版本。
  • 在創(chuàng)建復(fù)雜的嵌套結(jié)構(gòu)時(shí),使用流暢的構(gòu)建器和功能模式允許編寫(xiě)簡(jiǎn)潔但可讀的代碼。
  • 通過(guò)使用對(duì)象映射器(例如 Jackson 或任何 JSON-B 實(shí)現(xiàn))無(wú)縫集成應(yīng)用程序類(lèi)。
  • 將協(xié)議處理委托給一個(gè) http 客戶(hù)端,例如Java 低級(jí) REST 客戶(hù)端 ,它負(fù)責(zé)處理所有傳輸級(jí)別的問(wèn)題:HTTP 連接池、重試、節(jié)點(diǎn)發(fā)現(xiàn)等。

服務(wù)器兼容策略

Elasticsearch Java 客戶(hù)端是向前兼容的;這意味著客戶(hù)端支持與更大或相等的次要版本的 Elasticsearch 進(jìn)行通信。Elasticsearch 語(yǔ)言客戶(hù)端僅向后兼容默認(rèn)發(fā)行版,并且不作任何保證。

入門(mén)

安裝

安裝要求
  • 至少Java8以上的版本
  • 需要一個(gè) JSON 對(duì)象映射庫(kù),允許您的應(yīng)用程序類(lèi)與 Elasticsearch API 無(wú)縫集成。
maven
    <dependency>
      <groupId>co.elastic.clients</groupId>
      <artifactId>elasticsearch-java</artifactId>
      <version>7.17.9</version>
    </dependency>

    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.12.3</version>
    </dependency>

運(yùn)行時(shí)可能會(huì)報(bào)異常 ClassNotFoundException: jakarta.json.spi.JsonProvider

則需要加入以下依賴(lài)

<dependency>
      <groupId>jakarta.json</groupId>
      <artifactId>jakarta.json-api</artifactId>
      <version>2.0.1</version>
    </dependency>

為什么會(huì)出現(xiàn)這個(gè)問(wèn)題?

SpringBoot框架帶有的maven插件,用于簡(jiǎn)化開(kāi)發(fā)和依賴(lài)管理,這些插件中帶有很多知名的庫(kù)。

1.x的庫(kù)里面使用了 javax.json的包,2.x里面用的jakarta.json的包,(從 JavaEE 過(guò)渡到 JakartaEE),所以導(dǎo)致了 ClassNotFoundException: jakarta.json.spi.JsonProvider

我們只需要添加正確的項(xiàng)目依賴(lài)就可以了

連接中(connecting)

Java API 客戶(hù)端圍繞三個(gè)主要組件構(gòu)建

  • API 客戶(hù)端類(lèi)。這些為 Elasticsearch API 提供強(qiáng)類(lèi)型數(shù)據(jù)結(jié)構(gòu)和方法。由于 Elasticsearch API 很大,它以功能組(也稱(chēng)為“命名空間”)為結(jié)構(gòu),每個(gè)功能組都有自己的客戶(hù)端類(lèi)。Elasticsearch 核心功能在類(lèi)中實(shí)現(xiàn)ElasticsearchClient。
  • 一個(gè) JSON 對(duì)象映射器。這會(huì)將您的應(yīng)用程序類(lèi)映射到 JSON,并將它們與 API 客戶(hù)端無(wú)縫集成。
  • 傳輸層實(shí)現(xiàn)。這是所有 HTTP 請(qǐng)求處理發(fā)生的地方。

以下代碼完成了上面的三個(gè)組件

// Create the low-level client
RestClient restClient = RestClient.builder(
    new HttpHost("localhost", 9200)).build();

// Create the transport with a Jackson mapper
ElasticsearchTransport transport = new RestClientTransport(
    restClient, new JacksonJsonpMapper());

// And create the API client
ElasticsearchClient client = new ElasticsearchClient(transport);

身份驗(yàn)證由Java Low Level REST Client管理。有關(guān)配置身份驗(yàn)證的更多詳細(xì)信息,請(qǐng)參閱 其文檔。

第一個(gè) ElasticSearch 請(qǐng)求

需求:查詢(xún)es數(shù)據(jù)庫(kù)中,name = ‘bike’ 的所有Product對(duì)象

SearchResponse<Product> search = client.search(s -> s
    .index("products")
    .query(q -> q
        .term(t -> t
            .field("name")
            .value(v -> v.stringValue("bicycle"))
        )),
    Product.class);

for (Hit<Product> hit: search.hits().hits()) {
    processProduct(hit.source());
}

從RestHighLevelClient 遷移

個(gè)人理解

大概意思就是,7.15以前的版本用的是RestHighLevelClient ,其以后用的是Elasticsearch Java API Client ,這兩種客戶(hù)端可以并存,你可以同時(shí)使用,并把原來(lái)RestHighLevelClient 里面的一些廢棄的接口、low的寫(xiě)法逐步過(guò)渡到Elasticsearch Java API Client 。

官方文檔

棄用 RestHighLevelClient 服務(wù)端,使用 Elasticsearch Java API Client 客戶(hù)端。

兩個(gè)客戶(hù)端庫(kù)可以在沒(méi)有操作開(kāi)銷(xiāo)的情況下共存于單個(gè)應(yīng)用程序中,因?yàn)?Elasticsearch Java API Client 客戶(hù)端 是一個(gè)全新的客戶(hù)端,獨(dú)立于 Elasticsearch 服務(wù)器。

兼容模式

HLRC(RestHighLevelClient )可以啟用兼容模式,讓HLRC版本的7.17和Elasticsearch 一起使用,在這種模式下,

Java API 客戶(hù)端不需要此設(shè)置,因?yàn)榧嫒菽J绞冀K處于啟用狀態(tài)。

將相同的 http 客戶(hù)端與 HLRC 和 Java API 客戶(hù)端一起使用

為了避免在應(yīng)用程序同時(shí)使用 HLRC 和新的 Java API 客戶(hù)端的過(guò)渡階段產(chǎn)生任何操作開(kāi)銷(xiāo),兩個(gè)客戶(hù)端可以共享相同的 Low Level Rest Client,這是管理所有連接、循環(huán)策略的網(wǎng)絡(luò)層,節(jié)點(diǎn)嗅探等。

下面的代碼顯示了如何使用相同的 HTTP 客戶(hù)端初始化兩個(gè)客戶(hù)端:

// Create the low-level client
RestClient httpClient = RestClient.builder(
    new HttpHost("localhost", 9200)
).build();

// Create the HLRC
RestHighLevelClient hlrc = new RestHighLevelClientBuilder(httpClient)
    // Enables compatibility mode that allows HLRC 7.17 to work with Elasticsearch 8.x.
    .setApiCompatibilityMode(true) 
    .build();

// Create the Java API Client with the same low level client
ElasticsearchTransport transport = new RestClientTransport(
    httpClient,
    new JacksonJsonpMapper()
);

ElasticsearchClient esClient = new ElasticsearchClient(transport);

// hlrc and esClient share the same httpClient
轉(zhuǎn)型策略

您可以通過(guò)多種不同的方式在應(yīng)用程序代碼中開(kāi)始從 HLRC 過(guò)渡。

例如:

  • 保持現(xiàn)有代碼不變,并使用新的 Java API 客戶(hù)端實(shí)現(xiàn)應(yīng)用程序中的新功能,然后遷移現(xiàn)有代碼,
  • 重寫(xiě)應(yīng)用程序中新的 Java API 客戶(hù)端比 HLRC 客戶(hù)端更容易使用的部分,例如與搜索相關(guān)的所有內(nèi)容,
  • 通過(guò)利用新的 Java API 客戶(hù)端與 JSON 對(duì)象映射器的緊密集成,重寫(xiě)那些需要將應(yīng)用程序?qū)ο笥成涞?JSON 或從 JSON 映射到 JSON 的部分。

Api約定

包結(jié)構(gòu)和命名空間客戶(hù)端

大概是說(shuō)Java api和ElasticSearch Api 的分組,都是以包的結(jié)構(gòu)和命名空間的方式

// Create the "products" index
ElasticsearchClient client = ...
client.indices().create(c -> c.index("products"));

tips:如果數(shù)據(jù)量較大,可能會(huì)按月創(chuàng)建索引,比如 xx_2023_02,xx_2023_3 … 等,你往es上傳數(shù)據(jù)的時(shí)候,會(huì)設(shè)置一個(gè)indexName,如果現(xiàn)在的es中沒(méi)有這個(gè)索引,則創(chuàng)建索引并保存數(shù)據(jù)。

方法命名約定

  • 作為 API 一部分的方法和屬性,例如 ElasticsearchClient.search()SearchResponse.maxScore()。它們是使用標(biāo)準(zhǔn) Java 約定從 Elasticsearch JSON API 中各自的名稱(chēng)派生而來(lái)的 camelCaseNaming。
  • 作為構(gòu)建 Java API 客戶(hù)端的框架的一部分的方法和屬性,例如Query._kind(). 這些方法和屬性以下劃線為前綴,以避免與 API 名稱(chēng)發(fā)生任何命名沖突,并作為區(qū)分 API 和框架的簡(jiǎn)單方法。

阻塞和異步 client

API 客戶(hù)端有兩種形式:阻塞式和異步式。異步客戶(hù)端上的所有方法都返回一個(gè)標(biāo)準(zhǔn)的CompletableFuture.

ElasticsearchTransport transport = ...

// Synchronous blocking client
ElasticsearchClient client = new ElasticsearchClient(transport);

if (client.exists(b -> b.index("products").id("foo")).value()) {
    logger.info("product exists");
}

// Asynchronous non-blocking client
ElasticsearchAsyncClient asyncClient =
    new ElasticsearchAsyncClient(transport);

asyncClient
    .exists(b -> b.index("products").id("foo"))
    .whenComplete((response, exception) -> {
        if (exception != null) {
            logger.error("Failed to index", exception);
        } else {
            logger.info("Product exists");
        }
    });

切記,異步時(shí)一定要對(duì)異常情況進(jìn)行處理?。?!

構(gòu)建Api對(duì)象

建造者對(duì)象(Builder Project)
ElasticsearchClient client = ...
CreateIndexResponse createResponse = client.indices().create(
    new CreateIndexRequest.Builder()
        .index("my-index")
        .aliases("foo",
            new Alias.Builder().isWriteIndex(true).build()
        )
        .build()
);

在調(diào)用構(gòu)建器的方法后,不能重用構(gòu)建器。

構(gòu)建器lambda表達(dá)式(感覺(jué)比上面那個(gè)清爽)
ElasticsearchClient client = ...
CreateIndexResponse createResponse = client.indices()
    .create(createIndexBuilder -> createIndexBuilder
        .index("my-index")
        .aliases("foo", aliasBuilder -> aliasBuilder
            .isWriteIndex(true)
        )
    );

比上面那個(gè)更清爽,lambda表達(dá)式的入?yún)⒁?jiǎn)潔明了,不需要和fori循環(huán)一樣望文生義。

ElasticsearchClient client = ...
CreateIndexResponse createResponse = client.indices()
    .create(c -> c
        .index("my-index")
        .aliases("foo", a -> a
            .isWriteIndex(true)
        )
    );

Builder 的 lambda 在復(fù)雜嵌套查詢(xún)下的應(yīng)用.

剛?cè)腴T(mén),看不懂下面這個(gè)查詢(xún)啥意思

ElasticsearchClient client = ...
SearchResponse<SomeApplicationData> results = client
    .search(b0 -> b0
        .query(b1 -> b1
            .intervals(b2 -> b2
                .field("my_text")
                .allOf(b3 -> b3
                    .ordered(true)
                    .intervals(b4 -> b4
                        .match(b5 -> b5
                            .query("my favorite food")
                            .maxGaps(0)
                            .ordered(true)
                        )
                    )
                    .intervals(b4 -> b4
                        .anyOf(b5 -> b5
                            .intervals(b6 -> b6
                                .match(b7 -> b7
                                    .query("hot water")
                                )
                            )
                            .intervals(b6 -> b6
                                .match(b7 -> b7
                                    .query("cold porridge")
                                )
                            )
                        )
                    )
                )
            )
        ),
    // 搜索結(jié)果將映射到SomeApplicationData實(shí)例,以便應(yīng)用程序隨時(shí)可用
    SomeApplicationData.class 
);

List And Map(兩種數(shù)據(jù)類(lèi)型)

添加的 builder setters
// Prepare a list of index names
List<String> names = Arrays.asList("idx-a", "idx-b", "idx-c");

// Prepare cardinality aggregations for fields "foo" and "bar"
Map<String, Aggregation> cardinalities = new HashMap<>();
cardinalities.put("foo-count", Aggregation.of(a -> a.cardinality(c -> c.field("foo"))));
cardinalities.put("bar-count", Aggregation.of(a -> a.cardinality(c -> c.field("bar"))));

// Prepare an aggregation that computes the average of the "size" field
final Aggregation avgSize = Aggregation.of(a -> a.avg(v -> v.field("size")));

SearchRequest search = SearchRequest.of(r -> r
    // Index list:
    // - add all elements of a list
    .index(names)
    // - add a single element
    .index("idx-d")
    // - add a vararg list of elements
    .index("idx-e", "idx-f", "idx-g")

    // Sort order list: add elements defined by builder lambdas
    .sort(s -> s.field(f -> f.field("foo").order(SortOrder.Asc)))
    .sort(s -> s.field(f -> f.field("bar").order(SortOrder.Desc)))

    // Aggregation map:
    // - add all entries of an existing map
    .aggregations(cardinalities)
    // - add a key/value entry
    .aggregations("avg-size", avgSize)
    // - add a key/value defined by a builder lambda
    .aggregations("price-histogram",
        a -> a.histogram(h -> h.field("price")))
);
List And Map 永遠(yuǎn)不為 null
NodeStatistics stats = NodeStatistics.of(b -> b
    .total(1)
    .failed(0)
    .successful(1)
);

// The `failures` list was not provided.
// - it's not null
assertNotNull(stats.failures());
// - it's empty
assertEquals(0, stats.failures().size());
// - and if needed we can know it was actually not defined
assertFalse(ApiTypeHelper.isDefined(stats.failures()));

Variant types(變種類(lèi)型)

大概意思就是說(shuō),比如查詢(xún)條件,可以允許多種類(lèi)型的查詢(xún)條件拼接,允許在構(gòu)建查詢(xún)條件中的各種騷操作。

Elasticsearch API 有很多變體類(lèi)型:查詢(xún)、聚合、字段映射、分析器等等。在如此龐大的集合中找到正確的類(lèi)名可能具有挑戰(zhàn)性。

Java API 客戶(hù)端構(gòu)建器使這變得簡(jiǎn)單:變體類(lèi)型的構(gòu)建器(例如 Query)具有適用于每個(gè)可用實(shí)現(xiàn)的方法。我們已經(jīng)在上面的操作中看到了intervals(一種查詢(xún))和allOf,match以及 anyOf(各種間隔)。

這是因?yàn)?Java API 客戶(hù)端中的變體對(duì)象是“標(biāo)記聯(lián)合”的實(shí)現(xiàn):它們包含它們持有的變體的標(biāo)識(shí)符(或標(biāo)記)以及該變體的值。例如,一個(gè)Query對(duì)象可以包含一個(gè) IntervalsQuerywith tag intervals、一個(gè)TermQuerywith tagterm等等。這種方法允許編寫(xiě)流暢的代碼,您可以讓 IDE 完成功能指導(dǎo)您構(gòu)建和導(dǎo)航復(fù)雜的嵌套結(jié)構(gòu):

變體構(gòu)建器為每個(gè)可用的實(shí)現(xiàn)都提供了 setter 方法。它們使用與常規(guī)屬性相同的約定,并接受構(gòu)建器 lambda 表達(dá)式和變體實(shí)際類(lèi)型的現(xiàn)成對(duì)象。下面是構(gòu)建術(shù)語(yǔ)查詢(xún)的示例:

Query query = new Query.Builder()
    // 選擇term變體以構(gòu)建術(shù)語(yǔ)查詢(xún)。
    .term(t -> t       
         // 使用構(gòu)建器 lambda 表達(dá)式構(gòu)建術(shù)語(yǔ)查詢(xún)。
        .field("name")                    
        .value(v -> v.stringValue("foo"))
    )
    // 構(gòu)建Query現(xiàn)在持有一個(gè)TermQuery對(duì)象的 kind term。
    .build();    

變體對(duì)象還提供有關(guān)它們當(dāng)前持有的變體種類(lèi)的信息:

  • 具有is針對(duì)每種變體類(lèi)型的方法:isTerm()、isIntervals()、isFuzzy()等。

  • 帶有Kind定義所有變體類(lèi)型的嵌套枚舉。

  • // 測(cè)試變體是否屬于特定種類(lèi)
    if (query.isTerm()) { 
        doSomething(query.term());
    }
    
    // 測(cè)試一組更大的變體類(lèi)型。
    switch(query._kind()) { 
        case Term:
            doSomething(query.term());
            break;
        case Intervals:
            doSomething(query.intervals());
            break;
        default:
            // 獲取變體對(duì)象所持有的種類(lèi)和值。
            doSomething(query._kind(), query._get()); 
    }
    

對(duì)象生命周期和線程安全

Java API Client 中有五種不同生命周期的對(duì)象:

Object mapper

無(wú)狀態(tài)和線程安全,但創(chuàng)建成本高。它通常是在應(yīng)用程序啟動(dòng)時(shí)創(chuàng)建并用于創(chuàng)建傳輸?shù)膯卫?/p>

Transport

線程安全,通過(guò)底層 HTTP 客戶(hù)端持有網(wǎng)絡(luò)資源。傳輸對(duì)象與 Elasticsearch 集群相關(guān)聯(lián),必須顯式關(guān)閉才能釋放底層資源,例如網(wǎng)絡(luò)連接。

Clients

不可變的、無(wú)狀態(tài)的和線程安全的。這些是非常輕量級(jí)的對(duì)象,僅包裝傳輸并提供 API 端點(diǎn)作為方法。

Builders

可變的,非線程安全的。生成器是臨時(shí)對(duì)象,不應(yīng)在調(diào)用 build().

Requests & other API objects

不可變的,線程安全的。如果您的應(yīng)用程序反復(fù)使用相同的請(qǐng)求或請(qǐng)求的相同部分,則可以提前準(zhǔn)備好這些對(duì)象,并在使用不同傳輸?shù)亩鄠€(gè)客戶(hù)端的多個(gè)調(diào)用中重復(fù)使用。

從JSON數(shù)據(jù)創(chuàng)建API對(duì)象(7.17.2開(kāi)始支持的)

從JSON中加載的屬性會(huì)設(shè)置上文中未出現(xiàn)的屬性值,也會(huì)替換上文中已出現(xiàn)的值

可以讀取文件中JSON數(shù)據(jù),進(jìn)行創(chuàng)建索引、查詢(xún)等操作。

比如有個(gè)some-index.json文件內(nèi)容如下

{
  "mappings": {
    "properties": {
      "field1": { "type": "text" }
    }
  }
}

我們可以這樣創(chuàng)建一個(gè)索引

InputStream input = this.getClass()
    .getResourceAsStream("some-index.json"); 

CreateIndexRequest req = CreateIndexRequest.of(b -> b
    .index("some-index")
    .withJson(input) 
);

boolean created = client.indices().create(req).acknowledged();

也可以的自定義JSON格式的查詢(xún)、添加條件,進(jìn)行操作

Reader queryJson = new StringReader(
    "{" +
    "  \"query\": {" +
    "    \"range\": {" +
    "      \"@timestamp\": {" +
    "        \"gt\": \"now-1w\"" +
    "      }" +
    "    }" +
    "  }" +
    "}");

SearchRequest aggRequest = SearchRequest.of(b -> b
    .withJson(queryJson) 
    .aggregations("max-cpu", a1 -> a1 
        .dateHistogram(h -> h
            .field("@timestamp")
            .calendarInterval(CalendarInterval.Hour)
        )
        .aggregations("max", a2 -> a2
            .max(m -> m.field("host.cpu.usage"))
        )
    )
    .size(0)
);

Map<String, Aggregate> aggs = client
    .search(aggRequest, Void.class) 
    .aggregations();

也支持聚合

Reader queryJson = new StringReader(
    "{" +
    "  \"query\": {" +
    "    \"range\": {" +
    "      \"@timestamp\": {" +
    "        \"gt\": \"now-1w\"" +
    "      }" +
    "    }" +
    "  }," +
    "  \"size\": 100" + 
    "}");

Reader aggregationJson = new StringReader(
    "{" +
    "  \"size\": 0, " + 
    "  \"aggregations\": {" +
    "    \"hours\": {" +
    "      \"date_histogram\": {" +
    "        \"field\": \"@timestamp\"," +
    "        \"interval\": \"hour\"" +
    "      }," +
    "      \"aggregations\": {" +
    "        \"max-cpu\": {" +
    "          \"max\": {" +
    "            \"field\": \"host.cpu.usage\"" +
    "          }" +
    "        }" +
    "      }" +
    "    }" +
    "  }" +
    "}");

SearchRequest aggRequest = SearchRequest.of(b -> b
    .withJson(queryJson) 
    .withJson(aggregationJson) 
    // 默認(rèn)Es中不存在目標(biāo)索引會(huì)報(bào)錯(cuò),加此配置不報(bào)錯(cuò)
    .ignoreUnavailable(true) 
);

Map<String, Aggregate> aggs = client
    .search(aggRequest, Void.class)
    .aggregations();

當(dāng)各個(gè)JSON中都有通用屬性時(shí),順序很重要,就像第二個(gè)set會(huì)覆蓋第一個(gè)set的值

兩種異常

  1. ElasticsearchException

    服務(wù)器收到但是被拒絕報(bào)此異常,比如驗(yàn)證錯(cuò)誤、服務(wù)器內(nèi)部超時(shí)。

  2. TransportException

    未達(dá)到服務(wù)器報(bào)此異常,比如網(wǎng)絡(luò)錯(cuò)誤、服務(wù)器不可用。該異常原因是較低級(jí)別的實(shí)現(xiàn)拋出的異常。在這種情況下,RestClientTransport 它將是一個(gè)ResponseException包含低級(jí)別 HTTP 響應(yīng)的。

使用Java Api 客戶(hù)端

索引單個(gè)文檔

using the fluent DSL(使用DSL語(yǔ)句)
Product product = new Product("bk-1", "City bike", 123.0);

IndexResponse response = esClient.index(i -> i
    .index("products")
    .id(product.getSku())
    .document(product)
);

logger.info("Indexed with version " + response.version());
Product product = new Product("bk-1", "City bike", 123.0);

IndexRequest<Product> request = IndexRequest.of(i -> i
    .index("products")
    .id(product.getSku())
    .document(product)
);

IndexResponse response = esClient.index(request);

logger.info("Indexed with version " + response.version());
using classic builders(使用經(jīng)典構(gòu)建器)
Product product = new Product("bk-1", "City bike", 123.0);

IndexRequest.Builder<Product> indexReqBuilder = new IndexRequest.Builder<>();
indexReqBuilder.index("product");
indexReqBuilder.id(product.getSku());
indexReqBuilder.document(product);

// 注釋?zhuān)簜€(gè)人覺(jué)得這樣用鏈?zhǔn)秸{(diào)用比較好看
      Query termQuery = new TermQuery.Builder()
                .field()
                .value()
                .build();

IndexResponse response = esClient.index(indexReqBuilder.build());

logger.info("Indexed with version " + response.version());
Using the asynchronous client 使用異步客戶(hù)端
ElasticsearchAsyncClient esAsyncClient = new ElasticsearchAsyncClient(transport);

Product product = new Product("bk-1", "City bike", 123.0);

esAsyncClient.index(i -> i
    .index("products")
    .id(product.getSku())
    .document(product)
).whenComplete((response, exception) -> {
    if (exception != null) {
        logger.error("Failed to index", exception);
    } else {
        logger.info("Indexed with version " + response.version());
    }
});

api客戶(hù)端有兩種形式,阻塞式和異步式,兩種客戶(hù)端都返回一個(gè)標(biāo)準(zhǔn)的CompletableFuture.

如下:

ElasticsearchTransport transport = ...

// Synchronous blocking client
ElasticsearchClient client = new ElasticsearchClient(transport);

if (client.exists(b -> b.index("products").id("foo")).value()) {
    logger.info("product exists");
}

// Asynchronous non-blocking client
ElasticsearchAsyncClient asyncClient =
    new ElasticsearchAsyncClient(transport);

asyncClient
    .exists(b -> b.index("products").id("foo"))
    .whenComplete((response, exception) -> {
        if (exception != null) {
            logger.error("Failed to index", exception);
        } else {
            logger.info("Product exists");
        }
    });

異步客戶(hù)端一定要記得處理異步失敗

Using raw JSON data(使用原始JSON數(shù)據(jù))

當(dāng)您要索引的數(shù)據(jù)來(lái)自外部來(lái)源時(shí),必須創(chuàng)建域?qū)ο罂赡芎苈闊?,或者?duì)于半結(jié)構(gòu)化數(shù)據(jù)來(lái)說(shuō)是完全不可能的。(個(gè)人理解:假如你現(xiàn)在有一個(gè)篩選條件篩選數(shù)據(jù),且已經(jīng)上到了線上環(huán)境,現(xiàn)在需要修改這個(gè)查詢(xún)條件,不應(yīng)該代碼里修改然后再打包發(fā)布。應(yīng)該用文本寫(xiě)好查詢(xún)條件,修改的時(shí)候直接修改文本即可)。

方法withJson(),此方法讀取源并用于索引請(qǐng)求的document屬性,案例Creating API objects from JSON data

Reader input = new StringReader(
    "{'@timestamp': '2022-04-08T13:55:32Z', 'level': 'warn', 'message': 'Some log message'}"
    .replace('\'', '"'));

IndexRequest<JsonData> request = IndexRequest.of(i -> i
    .index("logs")
    .withJson(input)
);

IndexResponse response = esClient.index(request);

logger.info("Indexed with version " + response.version());

以上的源碼案例來(lái)自于Java API Client tests.

批量:索引多個(gè)文檔

批量請(qǐng)求允許在一個(gè)請(qǐng)求中向ElasticSearch發(fā)送多個(gè)文檔相關(guān)的操作。

批量請(qǐng)求說(shuō)明:Elasticsearch API 文檔。

批量請(qǐng)求可以包含多種操作

  • 創(chuàng)建document,確保它不存在后對(duì)其進(jìn)行索引
  • 索引文檔,在需要時(shí)創(chuàng)建它
  • 使用腳本或部分文檔更新已存在的文檔
  • 刪除文檔
indexing application projects(索引應(yīng)用程序?qū)ο?

一個(gè)BulkRequest包含一組操作,每個(gè)操作都是一個(gè)variant type,主要請(qǐng)求使用構(gòu)建器對(duì)象,每個(gè)操作使用DSL。

如下:

List<Product> products = fetchProducts();

BulkRequest.Builder br = new BulkRequest.Builder();

for (Product product : products) {
    // 添加一個(gè)操作(記住列表屬性是可加的)。opis 是一個(gè)構(gòu)建器,BulkOperation它是一個(gè)變體類(lèi)型。此類(lèi)型有index、create和變update體delete。
    br.operations(op -> op    
       // 選擇index操作變體,idx是 的構(gòu)建器IndexOperation。
        .index(idx -> idx     
            // 設(shè)置索引操作的屬性,類(lèi)似于單文檔索引:索引名稱(chēng)、標(biāo)識(shí)符和文檔。
            .index("products")       
            .id(product.getSku())
            .document(product)
        )
    );
}

BulkResponse result = esClient.bulk(br.build());

// Log errors, if any
if (result.errors()) {
    logger.error("Bulk had errors");
    for (BulkResponseItem item: result.items()) {
        if (item.error() != null) {
            logger.error(item.error().reason());
        }
    }
}
indexing raw json data(索引原始JSON數(shù)據(jù))
// List json log files in the log directory
File[] logFiles = logDir.listFiles(
    file -> file.getName().matches("log-.*\\.json")
);

BulkRequest.Builder br = new BulkRequest.Builder();

for (File file: logFiles) {
    FileInputStream input = new FileInputStream(file);
    BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON);

    br.operations(op -> op
        .index(idx -> idx
            .index("logs")
            .document(data)
        )
    );
}
使用Bulk Ingester 進(jìn)行流式攝取

通過(guò)提供允許索引/更新/刪除操作在批量請(qǐng)求中透明分組的實(shí)用程序類(lèi),簡(jiǎn)化BulkIngester了批量 API 的使用。您只需要add()對(duì) ingester 進(jìn)行批量操作,它會(huì)根據(jù)其配置負(fù)責(zé)分組并批量發(fā)送它們。

當(dāng)滿(mǎn)足以下條件之一時(shí),ingester 將發(fā)送批量請(qǐng)求:

  • 操作次數(shù)超過(guò)最大值(默認(rèn)為 1000)
  • 以字節(jié)為單位的批量請(qǐng)求大小超過(guò)最大值(默認(rèn)為 5 MiB)
  • 自上次請(qǐng)求過(guò)期以來(lái)的延遲(定期刷新,無(wú)默認(rèn)值)

此外,您可以定義等待 Elasticsearch 執(zhí)行的并發(fā)請(qǐng)求的最大數(shù)量(默認(rèn)為 1)。當(dāng)達(dá)到最大值并且已收集到最大操作數(shù)時(shí),向索引器添加新操作將阻塞。這是為了避免通過(guò)對(duì)客戶(hù)端應(yīng)用程序施加背壓來(lái)使 Elasticsearch 服務(wù)器過(guò)載。

BulkIngester<Void> ingester = BulkIngester.of(b -> b
    // 	設(shè)置用于發(fā)送批量請(qǐng)求的 Elasticsearch 客戶(hù)端
    .client(esClient)    
    // 設(shè)置在發(fā)送批量請(qǐng)求之前要收集的最大操作數(shù)。
    .maxOperations(100)  
    // 設(shè)置刷新間隔。
    .flushInterval(1, TimeUnit.SECONDS) 
);

for (File file: logFiles) {
    FileInputStream input = new FileInputStream(file);
    BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON);
    // 向攝取器添加批量操作
    ingester.add(op -> op 
        .index(idx -> idx
            .index("logs")
            .document(data)
        )
    );
}
// 關(guān)閉 ingester 以刷新掛起的操作并釋放資源。
ingester.close(); 

此外,批量攝取器接受一個(gè)偵聽(tīng)器,以便您的應(yīng)用程序可以收到發(fā)送的批量請(qǐng)求及其結(jié)果的通知。為了允許將批量操作關(guān)聯(lián)到應(yīng)用程序上下文,該add()方法可選擇接受一個(gè)context參數(shù)。此上下文參數(shù)的類(lèi)型用作對(duì)象的通用參數(shù)BulkIngester 。您可能已經(jīng)注意到上面Void的類(lèi)型BulkIngester<Void>:這是因?yàn)槲覀儧](méi)有注冊(cè)監(jiān)聽(tīng)器,因此不關(guān)心上下文值。

以下示例顯示了如何使用上下文值來(lái)實(shí)現(xiàn)批量攝取偵聽(tīng)器:與以前一樣,它批量發(fā)送 JSON 日志文件,但跟蹤批量請(qǐng)求錯(cuò)誤和失敗的操作。當(dāng)操作失敗時(shí),根據(jù)錯(cuò)誤類(lèi)型,您可能希望將其重新添加到 ingester。

BulkListener<String> listener = new BulkListener<String>() { 
    @Override
    public void beforeBulk(long executionId, BulkRequest request, List<String> contexts) {
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, List<String> contexts, BulkResponse response) {
        // The request was accepted, but may contain failed items.
        // The "context" list gives the file name for each bulk item.
        logger.debug("Bulk request " + executionId + " completed");
        for (int i = 0; i < contexts.size(); i++) {
            BulkResponseItem item = response.items().get(i);
            if (item.error() != null) {
                // Inspect the failure cause
                logger.error("Failed to index file " + contexts.get(i) + " - " + item.error().reason());
            }
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, List<String> contexts, Throwable failure) {
        // The request could not be sent
        logger.debug("Bulk request " + executionId + " failed", failure);
    }
};

BulkIngester<String> ingester = BulkIngester.of(b -> b
    .client(esClient)
    .maxOperations(100)
    .flushInterval(1, TimeUnit.SECONDS)
    .listener(listener) 
);

for (File file: logFiles) {
    FileInputStream input = new FileInputStream(file);
    BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON);

    ingester.add(op -> op
        .index(idx -> idx
            .index("logs")
            .document(data)
        ),
        file.getName() 
    );
}

ingester.close();

批量攝取還公開(kāi)了允許監(jiān)視攝取過(guò)程并調(diào)整其配置的統(tǒng)計(jì)信息:

  • 添加的操作數(shù),
  • add()由于達(dá)到最大并發(fā)請(qǐng)求數(shù)(爭(zhēng)用)而被阻止的 調(diào)用數(shù),
  • 發(fā)送的批量請(qǐng)求數(shù),
  • 由于達(dá)到最大并發(fā)請(qǐng)求數(shù)而被阻止的批量請(qǐng)求數(shù)。

通過(guò)id讀取文檔

Reading a domain object

從索引product中獲取id = bk-1的對(duì)象,此get請(qǐng)求有兩個(gè)參數(shù)

GetResponse<Product> response = esClient.get(g -> g
    .index("products") 
    .id("bk-1"),
    // 結(jié)果需要轉(zhuǎn)成的對(duì)象                                         
    Product.class      
);

if (response.found()) {
    Product product = response.source();
    logger.info("Product name " + product.getName());
} else {
    logger.info ("Product not found");
}
Reading raw JSON
GetResponse<ObjectNode> response = esClient.get(g -> g
    .index("products")
    .id("bk-1"),
    // 	The target class is a raw JSON object.
    ObjectNode.class     
);

if (response.found()) {
    ObjectNode json = response.source();
    String name = json.get("name").asText();
    logger.info("Product name " + name);
} else {
    logger.info("Product not found");
}

搜索文件(Search for document)

索引文檔可用于近乎實(shí)時(shí)的搜索

簡(jiǎn)單的搜索查詢(xún)
String searchText = "bike";

SearchResponse<Product> response = esClient.search(s -> s
    .index("products") 
    .query(q -> q      
        .match(t -> t   
            .field("name")  
            .query(searchText)
        )
    ),
    Product.class      
);

TotalHits total = response.hits().total();
boolean isExactResult = total.relation() == TotalHitsRelation.Eq;

if (isExactResult) {
    logger.info("There are " + total.value() + " results");
} else {
    logger.info("There are more than " + total.value() + " results");
}

List<Hit<Product>> hits = response.hits().hits();
for (Hit<Product> hit: hits) {
    Product product = hit.source();
    logger.info("Found product " + product.getSku() + ", score " + hit.score());
}
嵌套搜索查詢(xún)
String searchText = "bike";
double maxPrice = 200.0;

// Search by product name
Query byName = MatchQuery.of(m -> m 
    .field("name")
    .query(searchText)
)._toQuery(); 

// Search by max price
Query byMaxPrice = RangeQuery.of(r -> r
    .field("price")
    .gte(JsonData.of(maxPrice)) 
)._toQuery();

// Combine name and price queries to search the product index
SearchResponse<Product> response = esClient.search(s -> s
    .index("products")
    .query(q -> q
        .bool(b -> b 
            .must(byName) 
            .must(byMaxPrice)
        )
    ),
    Product.class
);

List<Hit<Product>> hits = response.hits().hits();
for (Hit<Product> hit: hits) {
    Product product = hit.source();
    logger.info("Found product " + product.getSku() + ", score " + hit.score());
}
模板搜索

搜索模板是一種存儲(chǔ)的搜索,您可以使用不同的變量運(yùn)行它。搜索模板讓您無(wú)需修改應(yīng)用程序代碼即可更改搜索。

在運(yùn)行模板搜索之前,您首先必須創(chuàng)建模板。這是一個(gè)返回搜索請(qǐng)求主體的存儲(chǔ)腳本,通常定義為 Mustache 模板。這個(gè)存儲(chǔ)的腳本可以在應(yīng)用程序外部創(chuàng)建,也可以使用 Java API 客戶(hù)端創(chuàng)建:

// Create a script
esClient.putScript(r -> r
    // 模板腳本標(biāo)識(shí)符/名稱(chēng)
    .id("query-script") 
    .script(s -> s
        .lang("mustache")
        .source("{\"query\":{\"match\":{\"{{field}}\":\"{{value}}\"}}}")
    ));

要使用搜索模板,請(qǐng)使用方法searchTemplate引用腳本并為其參數(shù)提供值:

SearchTemplateResponse<Product> response = esClient.searchTemplate(r -> r
        .index("some-index")
        // 要使用的模板腳本標(biāo)識(shí)符
        .id("query-script") 
        // 模板腳本里面需要的參數(shù)
        .params("field", JsonData.of("some-field")) 
        .params("value", JsonData.of("some-data")),
    Product.class
);

List<Hit<Product>> hits = response.hits().hits();
for (Hit<Product> hit: hits) {
    Product product = hit.source();
    logger.info("Found product " + product.getSku() + ", score " + hit.score());
}

聚合

Elasticsearch 文檔。

一個(gè)簡(jiǎn)單的聚合

分析類(lèi)的聚合,一般不使用document,所以查詢(xún)的size為0,返回結(jié)果的目標(biāo)類(lèi)為Void。

例如-查詢(xún):

String searchText = "bike";

Query query = MatchQuery.of(m -> m
    .field("name")
    .query(searchText)
)._toQuery();

// 查詢(xún)的是價(jià)格直方圖,價(jià)格以50為步進(jìn)遞增分區(qū)查詢(xún)
SearchResponse<Void> response = esClient.search(b -> b
    .index("products")
    // 不需要document返回?cái)?shù)據(jù),純聚合
    .size(0) 
    .query(query) 
    // 聚合名稱(chēng)為 'price-histogram'
    .aggregations("price-histogram", a -> a 
        .histogram(h -> h 
            .field("price")
            .interval(50.0)
        )
    ),
    Void.class 
);

例如-響應(yīng):

List<HistogramBucket> buckets = response.aggregations()
    // 聚合名稱(chēng)
    .get("price-histogram") 
    // 聚合類(lèi)型/聚合函數(shù),例如avg、sum、max、min
    .histogram() 
    // 數(shù)組或者map
    .buckets().array(); 

for (HistogramBucket bucket: buckets) {
    logger.info("There are " + bucket.docCount() +
        " bikes under " + bucket.key());
}

Java API Client tests.

故障排除

MissingRequiredPropertyException in a response

Java API客戶(hù)端區(qū)分兩種屬性,可選屬性和必須屬性,可選屬性用@Nullable注釋標(biāo)記,未被@Nullable標(biāo)記的必然不會(huì)為null。

但是,Elasticsearch API 規(guī)范可能存在錯(cuò)誤,錯(cuò)誤的需要響應(yīng)對(duì)象的屬性,導(dǎo)致MissingRequiredPropertyException反序列化響應(yīng)時(shí)出現(xiàn)錯(cuò)誤。如果發(fā)生這種情況,您可以按照以下方法解決:

  • 確保使用最新版本的 Java API 客戶(hù)端。該問(wèn)題可能已經(jīng)得到解決。(新版本可能沒(méi)有這個(gè)問(wèn)題)
  • 如果問(wèn)題仍然存在于最新版本中,請(qǐng)打開(kāi)一個(gè)問(wèn)題,以便我們可以在下一個(gè)版本中修復(fù)它。請(qǐng)幫助我們改進(jìn) Java API 客戶(hù)端。(如果新版本有,請(qǐng)反饋給官方)
  • 暫時(shí)禁用違規(guī)請(qǐng)求所需的屬性檢查(干掉屬性檢查)
   ApiTypeHelper.DANGEROUS_disableRequiredPropertiesCheck(true);
    SomeRequest request = SomeRequest.of(...);
    SomeResponse response = esClient.someApi(request);
    ApiTypeHelper.DANGEROUS_disableRequiredPropertiesCheck(false);
    // Do something with response
}

DANGEROUS_disableRequiredPropertiesCheck方法禁用當(dāng)前線程所需的屬性檢查,以及異步請(qǐng)求中的響應(yīng)反序列化。顧名思義,它是危險(xiǎn)的,因?yàn)樗∠藢?duì)非 . 屬性的保證@Nullable。在問(wèn)題得到解決之前,這是一個(gè)臨時(shí)解決方法。

請(qǐng)注意,此方法的結(jié)果是一個(gè)AutoCloseable將所需屬性檢查重置為其先前設(shè)置的對(duì)象。因此,您可以在 try-with-resource 塊中使用它,如下所示:

try (ApiTypeHelper.DisabledChecksHandle h =
        ApiTypeHelper.DANGEROUS_disableRequiredPropertiesCheck(true)) {
    SomeRequest request = SomeRequest.of(...);
    SomeResponse response = esClient.someApi(request);
    // Do something with response
}
Serializing aggregations and suggestions without typed keys

大概意思就是

當(dāng)你不需要格式化序列對(duì)象的時(shí)候,typed_keys您可以通過(guò)將JsonpMapperFeatures.SERIALIZE_TYPED_KEYS屬性設(shè)置為false映射器對(duì)象來(lái)禁用序列化。

Elasticsearch 搜索請(qǐng)求接受一個(gè)typed_key參數(shù),該參數(shù)允許返回類(lèi)型信息以及聚合和建議結(jié)果中的名稱(chēng)(有關(guān)更多詳細(xì)信息,請(qǐng)參閱聚合文檔)。

Java API 客戶(hù)端總是將此參數(shù)添加到搜索請(qǐng)求中,因?yàn)樾枰?lèi)型信息來(lái)了解應(yīng)該用于反序列化聚合和建議結(jié)果的具體類(lèi)。

對(duì)稱(chēng)地,Java API Client 也使用這種typed_keys格式序列化聚合和建議結(jié)果,以便它可以正確地反序列化自己序列化的結(jié)果。

代碼如下,默認(rèn)有序列化

ElasticsearchClient esClient = ...
JsonpMapper mapper = esClient._jsonpMapper();

StringWriter writer = new StringWriter();
try (JsonGenerator generator = mapper.jsonProvider().createGenerator(writer)) {
    mapper.serialize(searchResponse, generator);
}
String result = writer.toString();

// The aggregation property provides the "avg" type and "price" name
assertTrue(result.contains("\"aggregations\":{\"avg#price\":{\"value\":3.14}}}"));

設(shè)置沒(méi)序列化文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-625133.html

ElasticsearchClient esClient = ...
// Create a new mapper with the typed_keys feature disabled
JsonpMapper mapper = esClient._jsonpMapper()
    .withAttribute(JsonpMapperFeatures.SERIALIZE_TYPED_KEYS, false);

StringWriter writer = new StringWriter();
try (JsonGenerator generator = mapper.jsonProvider().createGenerator(writer)) {
    mapper.serialize(searchResponse, generator);
}
String result = writer.toString();

// The aggregation only provides the "price" name
assertTrue(result.contains("\"aggregations\":{\"price\":{\"value\":3.14}}}"));

到了這里,關(guān)于學(xué)習(xí)筆記-elstaciElasticSearch7.17官方文檔的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Docker學(xué)習(xí)筆記17

    Docker學(xué)習(xí)筆記17

    跨主機(jī)容器間網(wǎng)絡(luò): 實(shí)現(xiàn)跨主機(jī)容器間通信的工具: 1)Pipework 2)Flannel 3)Weave 4)Open V Switch (OVS) 5)Calico 1. Weave: 在每個(gè)宿主機(jī)上布置一個(gè)特殊的route的容器,不同宿主機(jī)的route容器連接起來(lái),route攔截所有普通容器的ip請(qǐng)求,并通過(guò) udp包 發(fā)送到其它宿主機(jī)上的普通容器

    2024年02月12日
    瀏覽(13)
  • 企業(yè)架構(gòu)LNMP學(xué)習(xí)筆記17

    企業(yè)架構(gòu)LNMP學(xué)習(xí)筆記17

    反向代理: 反向代理服務(wù)器和真實(shí)訪問(wèn)的服務(wù)器是在一起的,有關(guān)聯(lián)的。 根據(jù)實(shí)際業(yè)務(wù)需求,分發(fā)代理頁(yè)面到不同的解釋器。常見(jiàn)于代理后端服務(wù)器。 安裝apache服務(wù)器: 修改配置文件: ? 由nginx反向代理給后端的apache服務(wù)器處理,apache處理完成后再交給nginx返回給客戶(hù)端。

    2024年02月09日
    瀏覽(14)
  • ros2官方文檔(基于humble版本)學(xué)習(xí)筆記

    ros2官方文檔(基于humble版本)學(xué)習(xí)筆記

    由于市面上專(zhuān)門(mén)講ROS 2開(kāi)發(fā)的書(shū)籍不多,近期看完了《ROS機(jī)器人開(kāi)發(fā)實(shí)踐》其中大部分內(nèi)容還是基于ROS 1寫(xiě)的,涉及topic,service,action等一些重要的概念,常用組件,建模與仿真,應(yīng)用(機(jī)器視覺(jué),機(jī)器語(yǔ)音,SLAM,機(jī)械臂),最后一章寫(xiě)了ROS 2的安裝,話(huà)題通信和服務(wù)通信的示

    2024年02月11日
    瀏覽(29)
  • Spring MVC官方文檔學(xué)習(xí)筆記(二)之DispatcherServlet

    Spring MVC官方文檔學(xué)習(xí)筆記(二)之DispatcherServlet

    1.DispatcherServlet入門(mén) (1) Spring MVC是以前端控制器模式(即圍繞著一個(gè)中央的Servelt, DispatcherServlet)進(jìn)行設(shè)計(jì)的,這個(gè)DispatcherServlet為請(qǐng)求的處理提供了一個(gè)共用的算法,即它都會(huì)將實(shí)際的請(qǐng)求處理工作委托給那些可配置的組件進(jìn)行執(zhí)行,說(shuō)白了,DispatcherServlet的作用就是進(jìn)行統(tǒng)一調(diào)度,并

    2024年02月07日
    瀏覽(28)
  • ROS 2官方文檔(基于humble版本)學(xué)習(xí)筆記(二)

    ROS 2官方文檔(基于humble版本)學(xué)習(xí)筆記(二)

    今天繼續(xù)總結(jié)CLI 工具章的學(xué)習(xí) 理解節(jié)點(diǎn)(node) ROS 2圖是一個(gè)ROS 2元件同時(shí)處理數(shù)據(jù)的網(wǎng)絡(luò),如果將它們?nèi)坑成洳⒖梢暬鼈儯瑒t包括所有可執(zhí)行文件以及它們之間的連接。 ROS中的每個(gè)節(jié)點(diǎn)(node)都應(yīng)該只為了單個(gè)的、模塊化的目的而設(shè)計(jì)的,例如控制車(chē)輪電動(dòng)機(jī)或從激光

    2024年02月10日
    瀏覽(16)
  • Flink|《Flink 官方文檔 - DataStream API - 概覽》學(xué)習(xí)筆記

    學(xué)習(xí)文檔:Flink 官方文檔 - DataStream API - 概覽 學(xué)習(xí)筆記如下: Flink 的 DataStream API: 數(shù)據(jù)里的起始是各種 source,例如消息隊(duì)列、socket 流、文件等; 對(duì)數(shù)據(jù)流進(jìn)行轉(zhuǎn)換,例如過(guò)濾、更新?tīng)顟B(tài)、定義窗口、聚合等; 結(jié)果通過(guò) sink 返回,例如可以將數(shù)據(jù)寫(xiě)入文件或標(biāo)準(zhǔn)輸出。 Da

    2024年01月23日
    瀏覽(49)
  • Flink|《Flink 官方文檔 - 概念透析 - Flink 架構(gòu)》學(xué)習(xí)筆記

    Flink|《Flink 官方文檔 - 概念透析 - Flink 架構(gòu)》學(xué)習(xí)筆記

    學(xué)習(xí)文檔:概念透析 - Flink 架構(gòu) 學(xué)習(xí)筆記如下: 客戶(hù)端(Client):準(zhǔn)備數(shù)據(jù)流程序并發(fā)送給 JobManager(不是 Flink 執(zhí)行程序的進(jìn)程) JobManager:協(xié)調(diào) Flink 應(yīng)用程序的分布式執(zhí)行 ResourceManager:負(fù)責(zé) Flink 集群中的資源提供、回收、分配 Dispatcher:提供了用來(lái)提交 Flink 應(yīng)用程序執(zhí)行

    2024年01月19日
    瀏覽(55)
  • Flink|《Flink 官方文檔 - 概念透析 - 及時(shí)流處理》學(xué)習(xí)筆記

    Flink|《Flink 官方文檔 - 概念透析 - 及時(shí)流處理》學(xué)習(xí)筆記

    學(xué)習(xí)文檔:概念透析 - 及時(shí)流處理 學(xué)習(xí)筆記如下: 及時(shí)流處理時(shí)有狀態(tài)流處理的擴(kuò)展,其中時(shí)間在計(jì)算中起著一定的作用。 及時(shí)流的應(yīng)用場(chǎng)景: 時(shí)間序列分析 基于特定時(shí)間段進(jìn)行聚合 對(duì)發(fā)生時(shí)間很重要的事件進(jìn)行處理 處理時(shí)間(processing time) 處理時(shí)間的即數(shù)據(jù)到達(dá)各個(gè)

    2024年02月03日
    瀏覽(22)
  • Spring MVC官方文檔學(xué)習(xí)筆記(一)之Web入門(mén)

    Spring MVC官方文檔學(xué)習(xí)筆記(一)之Web入門(mén)

    注: 該章節(jié)主要為原創(chuàng)內(nèi)容,為后續(xù)的Spring MVC內(nèi)容做一個(gè)先行鋪墊 1.Servlet的構(gòu)建使用 (1) 選擇Maven - webapp來(lái)構(gòu)建一個(gè)web應(yīng)用 (2) 構(gòu)建好后,打開(kāi)pom.xml文件,一要注意打包方式為war包,二導(dǎo)入servlet依賴(lài),如下 (3) 替換webapp/WEB-INF/web.xml文件為如下內(nèi)容,采用Servlet 3.1版本 (4) 在

    2024年02月03日
    瀏覽(35)
  • Flink|《Flink 官方文檔 - DataStream API - 算子 - 窗口》學(xué)習(xí)筆記

    Flink|《Flink 官方文檔 - DataStream API - 算子 - 窗口》學(xué)習(xí)筆記

    學(xué)習(xí)文檔:《Flink 官方文檔 - DataStream API - 算子 - 窗口》 學(xué)習(xí)筆記如下: 窗口(Window):窗口是處理無(wú)界流的關(guān)鍵所在。窗口可以將數(shù)據(jù)流裝入大小有限的 “桶” 中,再對(duì)每個(gè) “桶” 加以處理。 Keyed Windows 在 Keyed Windows 上使用窗口時(shí),要調(diào)用 keyBy(...) 而后再調(diào)用 window(..

    2024年01月18日
    瀏覽(58)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包