国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

kafka復習:(20):消費者攔截器的使用

這篇具有很好參考價值的文章主要介紹了kafka復習:(20):消費者攔截器的使用。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

一、定義消費者攔截器(只消費含"sister"的消息)

package com.cisdi.dsp.modules.metaAnalysis.rest;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

public class MyConsumerInterceptor implements ConsumerInterceptor<String,String> {
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {

        Map<TopicPartition,List<ConsumerRecord<String,String>>> finalResult=new HashMap<>();
        Set<TopicPartition> partitionSet = records.partitions();
        for(TopicPartition topicPartition: partitionSet){
            List<ConsumerRecord<String,String>> partitionRecordList=records.records(topicPartition);
            List<ConsumerRecord<String,String>> newPartitionRecordList=new LinkedList<>();
            for(ConsumerRecord<String,String> record: partitionRecordList){

                if(record.value().contains("sister")){
                    newPartitionRecordList.add(record);
                }

            }
            finalResult.put(topicPartition,newPartitionRecordList);
        }

        return new ConsumerRecords<>(finalResult);
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        offsets.forEach((tp,meta) -> {
            System.out.println("消費者攔截器:"+tp.topic()+":"+meta.offset());
        });
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

二、定義消費者,配置消費者攔截器文章來源地址http://www.zghlxwxcb.cn/news/detail-673182.html

package com.cisdi.dsp.modules.metaAnalysis.rest;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerInterceptorTest  {
    public static void main(String[] args) {
        String topic="testTopic2";
        String server="xx.xx.xx.xx:9092";
        Properties properties=new Properties();
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"consumerGroupTest4");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,server);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,MyConsumerInterceptor.class.getName());

        KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(properties);

        myConsumer.subscribe(Arrays.asList(topic));
        while(true){
            ConsumerRecords<String,String> records=myConsumer.poll(Duration.ofMillis(2000));
            for(ConsumerRecord consumerRecord: records){
                System.out.println(consumerRecord.value());
            }
            //myConsumer.commitSync();
        }
    }
}

到了這里,關(guān)于kafka復習:(20):消費者攔截器的使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關(guān)文章

  • 分布式 - 消息隊列Kafka:Kafka消費者和消費者組

    分布式 - 消息隊列Kafka:Kafka消費者和消費者組

    1. Kafka 消費者是什么? 消費者負責訂閱Kafka中的主題,并且從訂閱的主題上拉取消息。與其他一些消息中間件不同的是:在Kafka的消費理念中還有一層消費組的概念,每個消費者都有一個對應的消費組。當消息發(fā)布到主題后,只會被投遞給訂閱它的每個消費組中的一個消費者

    2024年02月13日
    瀏覽(29)
  • kafka配置多個消費者groupid kafka多個消費者消費同一個partition(java)

    kafka配置多個消費者groupid kafka多個消費者消費同一個partition(java)

    kafka是由Apache軟件基金會開發(fā)的一個開源流處理平臺。kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費者在網(wǎng)站中的所有動作流數(shù)據(jù)。 kafka中partition類似數(shù)據(jù)庫中的分表數(shù)據(jù),可以起到水平擴展數(shù)據(jù)的目的,比如有a,b,c,d,e,f 6個數(shù)據(jù),某個topic有兩個partition,一

    2024年01月22日
    瀏覽(161)
  • Kafka3.0.0版本——消費者(消費者組詳細消費流程圖解及消費者重要參數(shù))

    Kafka3.0.0版本——消費者(消費者組詳細消費流程圖解及消費者重要參數(shù))

    創(chuàng)建一個消費者網(wǎng)絡連接客戶端,主要用于與kafka集群進行交互,如下圖所示: 調(diào)用sendFetches發(fā)送消費請求,如下圖所示: (1)、Fetch.min.bytes每批次最小抓取大小,默認1字節(jié) (2)、fetch.max.wait.ms一批數(shù)據(jù)最小值未達到的超時時間,默認500ms (3)、Fetch.max.bytes每批次最大抓取大小,默

    2024年02月09日
    瀏覽(21)
  • 10、Kafka ------ 消費者組 和 消費者實例,分區(qū) 和 消費者實例 之間的分配策略

    10、Kafka ------ 消費者組 和 消費者實例,分區(qū) 和 消費者實例 之間的分配策略

    形象來說:你可以把主題內(nèi)的多個分區(qū)當成多個子任務、多個子任務組成項目,每個消費者實例就相當于一個員工,假如你們 team 包含2個員工。 同理: 同一主題下,每個分區(qū)最多只會分給同一個組內(nèi)的一個消費者實例 消費者以組的名義來訂閱主題,前面的 kafka-console-consu

    2024年01月19日
    瀏覽(19)
  • Kafka消費者不消費數(shù)據(jù)

    Kafka消費者不消費數(shù)據(jù)

    背景: 工作往往是千篇一律,真正能學到點知識都是在上線后。使用Skywalking+Kafka+ES進行應用監(jiān)控。 現(xiàn)象: 公司使用Skywalking在開發(fā)測試環(huán)境中Kafka順利消費數(shù)據(jù),到了UAT環(huán)境一開始還正常,后面接入了更多的應用后出現(xiàn)了問題:OAP服務正常但是ES里不再有數(shù)據(jù)。 排查: 通過

    2023年04月14日
    瀏覽(53)
  • Kafka-消費者組消費流程

    Kafka-消費者組消費流程

    消費者向kafka集群發(fā)送消費請求,消費者客戶端默認每次從kafka集群拉取50M數(shù)據(jù),放到緩沖隊列中,消費者從緩沖隊列中每次拉取500條數(shù)據(jù)進行消費。? ?

    2024年02月12日
    瀏覽(21)
  • Kafka3.0.0版本——消費者(消費者組原理)

    Kafka3.0.0版本——消費者(消費者組原理)

    1.1、消費者組概述 Consumer Group(CG):消費者組,由多個consumer組成。形成一個消費者組的條件,是所有消費者的groupid相同。 注意: (1)、消費者組內(nèi)每個消費者負責消費不同分區(qū)的數(shù)據(jù),一個分區(qū)只能由一個組內(nèi)消費者消費。 (2)、消費者組之間互不影響。所有的消費者

    2024年02月09日
    瀏覽(28)
  • 【Kafka】Kafka消費者

    【Kafka】Kafka消費者

    pull(拉)模式:consumer采用從broker中主動拉取數(shù)據(jù)。 Kafka采用這種方式。 push(推)模式:Kafka沒有采用這種方式,因為由broker決定消息發(fā)送速率,很難適應所有的消費者的消費速率。例如推送的速度是50m/s,consumer1和consumer2舊來不及處理消息。 pull模式不足之處是,如果Kafka沒有數(shù)

    2024年02月13日
    瀏覽(24)
  • Kafka消費者無法消費數(shù)據(jù),解決

    作為一個在項目中邊學邊用的實習生,真的被昨天還好好的今天就不能消費數(shù)據(jù)的kafka折磨到了,下面提供一點建議,希望能對大家有所幫助。 //操作前集群都關(guān)了 1.首先去kafka-home的config目錄下找到server.properties文件, 加入advertised.listeners=PLAINTEXT://ip:9092? ? 如果有配置liste

    2024年02月17日
    瀏覽(21)
  • 【Kafka】【十七】消費者poll消息的細節(jié)與消費者心跳配置

    默認情況下,消費者?次會poll500條消息。 代碼中設置了?輪詢的時間是1000毫秒 意味著: 如果?次poll到500條,就直接執(zhí)?for循環(huán) 如果這?次沒有poll到500條。且時間在1秒內(nèi),那么?輪詢繼續(xù)poll,要么到500條,要么到1s 如果多次poll都沒達到500條,且1秒時間到了,那么直接執(zhí)

    2024年02月09日
    瀏覽(30)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包