国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

springboot+es批量新增、批量修改、根據(jù)內(nèi)部id批量查詢

這篇具有很好參考價(jià)值的文章主要介紹了springboot+es批量新增、批量修改、根據(jù)內(nèi)部id批量查詢。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

pom.xml配置

<dependency>
    <!--  ElasticSearch-->
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.8.0</version>
    <exclusions>
        <exclusion>
            <artifactId>elasticsearch</artifactId>
            <groupId>org.elasticsearch</groupId>
        </exclusion>
        <exclusion>
            <artifactId>elasticsearch-rest-client</artifactId>
            <groupId>org.elasticsearch.client</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
    <version>2.3.9.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-client</artifactId>
    <version>7.8.0</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>7.8.0</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.plugin</groupId>
    <artifactId>transport-netty4-client</artifactId>
    <version>7.8.0</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>7.8.0</version>
</dependency>

yml配置

elasticsearch:
  hosts: ${ES_HOST:ip:端口}
es_user_name: ${ES_USER_NAME:xxxx}
es_password: ${ES_PASSWORD:xxxxx}

EsConfig配置

@Configuration
@Slf4j
public class EsConfig {
    @Value("${es_user_name}")
    private String username;
    @Value("${es_password}")
    private String password;
    public static final RequestOptions COMMON_OPTIONS;

    static {
        RequestOptions.Builder builder=RequestOptions.DEFAULT.toBuilder();
        COMMON_OPTIONS=builder.build();
    }
    @Bean
    RestHighLevelClient restHighLevelClient(@Value("${spring.elasticsearch.hosts}") String hosts) {
        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username,password));
        String[] hostsWithPort = hosts.split(",");
        HttpHost[] httpHosts = new HttpHost[hostsWithPort.length];
        for (int i = 0; i < hostsWithPort.length; i++) {
            String hostWithPort = hostsWithPort[i];
            String[] hostPort = hostWithPort.split(":");
            String host = hostPort[0];
            String port = hostPort[1];
            httpHosts[i] = new HttpHost(host, Integer.parseInt(port));
        }
        return new RestHighLevelClient(RestClient.builder(httpHosts).setHttpClientConfigCallback(f->f.setDefaultCredentialsProvider(credentialsProvider)));
    }
}

ElasticSearchConfig 配置

@Configuration
public class ElasticSearchConfig {

    /**
     * 防止netty的bug
     * java.lang.IllegalStateException: availableProcessors is already set to [4], rejecting [4]
     */
    @PostConstruct
    void init() {
        System.setProperty("es.set.netty.runtime.available.processors", "false");
    }
}

啟動(dòng)類配置

public static void main(String[] args) {
    System.setProperty("es.set.netty.runtime.available.processors", "false");
    SpringApplication.run(xxxx.class, args);
}

//批量操作的對(duì)象

private  BulkProcessor bulkProcessor=createBulkProcessor();
@Autowired
RestHighLevelClient restHighLevelClient;
private  BulkProcessor createBulkProcessor() {

    BulkProcessor.Listener listener = new BulkProcessor.Listener() {
        @Override
        public void beforeBulk(long executionId, BulkRequest request) {
            log.info("1. 【beforeBulk】批次[{}] 攜帶 {} 請(qǐng)求數(shù)量", executionId, request.numberOfActions());
        }

        @Override
        public void afterBulk(long executionId, BulkRequest request,
                              BulkResponse response) {
            if (!response.hasFailures()) {
                log.info("2. 【afterBulk-成功】批量 [{}] 完成在 {} ms", executionId, response.getTook().getMillis());
            } else {
                BulkItemResponse[] items = response.getItems();
                for (BulkItemResponse item : items) {
                    if (item.isFailed()) {
                        log.info("2. 【afterBulk-失敗】批量 [{}] 出現(xiàn)異常的原因 : {}", executionId, item.getFailureMessage());
                        break;
                    }
                }
            }
        }

        @Override
        public void afterBulk(long executionId, BulkRequest request,
                              Throwable failure) {

            List<DocWriteRequest<?>> requests = request.requests();
            List<String> esIds = requests.stream().map(DocWriteRequest::id).collect(Collectors.toList());
            log.error("3. 【afterBulk-failure失敗】es執(zhí)行bluk失敗,失敗的esId為:{}", esIds, failure);
        }
    };

    BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> {
        restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener);
    }), listener);
    //到達(dá)10000條時(shí)刷新
    builder.setBulkActions(10000);
    //內(nèi)存到達(dá)8M時(shí)刷新
    builder.setBulkSize(new ByteSizeValue(8L, ByteSizeUnit.MB));
    //設(shè)置的刷新間隔10s
    builder.setFlushInterval(TimeValue.timeValueSeconds(10));
    //設(shè)置允許執(zhí)行的并發(fā)請(qǐng)求數(shù)。
    builder.setConcurrentRequests(8);
    //設(shè)置重試策略
    builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3));
    return builder.build();
}

批量修改

public void bulkUpdate(EsUpdateBO esUpdateBO){
    List<UpdateRequest> updateRequests=new ArrayList<>();

    esUpdateBO.getIds().forEach(e->{
        //獲取id
        UpdateRequest updateRequest = new UpdateRequest();
        updateRequest.index("es索引名稱");
        //更新的id
        updateRequest.id(e);
        //更新的數(shù)據(jù)
        Map<String,Object> map=new HashMap<>();
        map.put(esUpdateBO.getIsWarning(),"1");

        updateRequest.doc(map);
        updateRequests.add(updateRequest);
    });
    updateRequests.forEach(bulkProcessor::add);
}

批量新增

public void bulkAdd(List<Map<String, Object>> result) {
    List<IndexRequest> indexRequests = new ArrayList<>();
    result.forEach(e -> {
        IndexRequest request = new IndexRequest("es索引名稱");
     
        request.source(JSON.toJSONString(e), XContentType.JSON);
        request.opType(DocWriteRequest.OpType.CREATE);
        indexRequests.add(request);
    });
    indexRequests.forEach(bulkProcessor::add);

}

根據(jù)es內(nèi)部id批量查詢數(shù)據(jù)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-514316.html

public List<Map<String, Object>> getWaringEventList(List<String> ids) throws IOException {
    String[] idsStr = ids.toArray(new String[ids.size()]);
    List<Map<String, Object>> details=new ArrayList<>();
    SearchRequest searchRequest = new SearchRequest("es索引名稱");
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
    IdsQueryBuilder idsQueryBuilder = QueryBuilders.idsQuery().addIds(idsStr);
    boolQueryBuilder.must(idsQueryBuilder);
    searchSourceBuilder.query(boolQueryBuilder);
    searchSourceBuilder.size(10000);
    //將所有的條件進(jìn)行整合
    searchRequest.source(searchSourceBuilder);
    //根據(jù)restHighLevelClient進(jìn)行查詢
    SearchResponse search = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
    SearchHits searchHits = search.getHits();
    SearchHit[] hits = searchHits.getHits();
    for (SearchHit inhit : hits) {
Map<String, Object> thirdSourceAsMap = inhit.getSourceAsMap();
    }

到了這里,關(guān)于springboot+es批量新增、批量修改、根據(jù)內(nèi)部id批量查詢的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包