国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

kafka生產(chǎn)者消費者練習(xí)

這篇具有很好參考價值的文章主要介紹了kafka生產(chǎn)者消費者練習(xí)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

需求:寫一個生產(chǎn)者,不斷的去生產(chǎn)用戶行為數(shù)據(jù),寫入到kafka的一個topic中
生產(chǎn)的數(shù)據(jù)格式: 造數(shù)據(jù)
{“guid”:1,“eventId”:“pageview”,“timestamp”:1637868346789} isNew = 1
{“guid”:1,“eventId”:“addcard”,“timestamp”:1637868347625} isNew = 0
{“guid”:2,“eventId”:“collect”,“timestamp”:16378683463219}
{“guid”:3,“eventId”:“paid”,“timestamp”:16378683467829}

再寫一個消費者,不斷的從kafka中消費上面的用戶行為數(shù)據(jù),做一個統(tǒng)計
1.每5s輸出一次當(dāng)前來了多少用戶(去重) uv
2.將每條數(shù)據(jù)添加一個字段來標(biāo)識,如果這個用戶的id是第一次出現(xiàn),那么就標(biāo)注1,否則就是0

首先添加依賴:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>${kafka.version}</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.75</version>
    </dependency>

    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.8.1</version>
    </dependency>


    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.22</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.roaringbitmap/RoaringBitmap -->
    <dependency>
        <groupId>org.roaringbitmap</groupId>
        <artifactId>RoaringBitmap</artifactId>
        <version>0.9.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>31.1-jre</version>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.28</version>
    </dependency>

生產(chǎn)者代碼示例:

package com.doitedu;

import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * 驗證數(shù)據(jù):
 * 創(chuàng)建topic
 * kafka-topics.sh --create --topic event-log --zookeeper linux01:2181 --partitions 3 --replication-factor 3
 * 搞個消費者消費數(shù)據(jù)
 * kafka-console-consumer.sh  --bootstrap-server linux01:9092 --topic event-log
 * {"eventId":"zTUAbXcWbn","guid":7170,"timeStamp":1659944455262}
 * {"eventId":"KSzaaNmczb","guid":9743,"timeStamp":1659944455823}
 * {"eventId":"FNUERLlCNu","guid":7922,"timeStamp":1659944456295}
 * {"eventId":"VmXVJHlpOF","guid":2505,"timeStamp":1659944458267}
 * {"eventId":"pMIHwLzSIE","guid":7668,"timeStamp":1659944460088}
 * {"eventId":"ZvGYIvmKTx","guid":3636,"timeStamp":1659944460461}
 * {"eventId":"jBanTDSlCO","guid":3468,"timeStamp":1659944460787}
 * {"eventId":"vXregpYeHu","guid":1107,"timeStamp":1659944462525}
 * {"eventId":"PComosCafr","guid":7765,"timeStamp":1659944463640}
 * {"eventId":"xCHFOYIJlb","guid":3443,"timeStamp":1659944464697}
 * {"eventId":"xDToApWwFo","guid":5034,"timeStamp":1659944465953}
 */
public class Exercise_kafka編程練習(xí) {
    public static void main(String[] args) throws InterruptedException {
        MyData myData = new MyData();
        myData.genData();
    }
}

class MyData{
    KafkaProducer<String, String> producer = null;
    public MyData(){
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092,linux02:9092,linux03:9092");
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producer = new KafkaProducer<String, String>(props);
    }
    
    public void genData() throws InterruptedException {
        UserEvent userEvent = new UserEvent();
        while (true){
            //造數(shù)據(jù)
            userEvent.setGuid(RandomUtils.nextInt(0,10000));
            userEvent.setEventId(RandomStringUtils.randomAlphabetic(10));
            userEvent.setTimeStamp(System.currentTimeMillis());
            String json = JSON.toJSONString(userEvent);
            //數(shù)據(jù)造完了就往kafka中寫
            ProducerRecord<String, String> stringProducerRecord = new ProducerRecord<>("event-log", json);
            Thread.sleep(RandomUtils.nextInt(200,1000));
            producer.send(stringProducerRecord);
        }
    }
}
/*
{"guid":1,"eventId":"pageview","timestamp":1637868346789}
{"guid":1,"eventId":"addcard","timestamp":1637868347625}
{"guid":2,"eventId":"collect","timestamp":16378683463219}
 */
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
class UserEvent{
    private Integer guid;
    private String eventId;
    private long timeStamp;
}

消費者代碼示例:用hashset來實現(xiàn):

package com.doitedu;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

/**
 * 分兩步走:
 * 第一步:一個消費者不斷的去消費數(shù)據(jù)
 * 第二步:5分鐘計算一次,返回用戶數(shù)這個結(jié)果
 */
public class Exercise_consumerDemo {
    public static void main(String[] args) {
        HashSet<Integer> set = new HashSet<>();
        new Thread(new ConsumerThread(set)).start();
        //定時的任務(wù)調(diào)度
        Timer timer = new Timer();
        //調(diào)度,第一個參數(shù),你給我一個任務(wù),
        //第二個參數(shù)代表過多久之后我開始執(zhí)行任務(wù)
        //第三個參數(shù)代表每隔多久執(zhí)行一次
        timer.schedule(new ConsumerTask(set),5000,10000);

    }
}

class ConsumerThread implements Runnable {
    HashSet<Integer> set = null;
    KafkaConsumer<String, String> consumer = null;

    public ConsumerThread(HashSet<Integer> set) {
        this.set = set;
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001");

        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("event-log"));
    }
    /**
     * 重寫run方法的話,我需要在里面實現(xiàn)什么邏輯?
     * 消費者消費數(shù)據(jù),拿到數(shù)據(jù)以后,只需要獲取到用戶id
     * 將用戶id寫到hashset集合里面
     */
    @Override
    public void run() {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
            for (ConsumerRecord<String, String> record : records) {
                String json = record.value();
                UserEvent userEvent = JSON.parseObject(json, UserEvent.class);
                Integer guid = userEvent.getGuid();
                set.add(guid);
            }
        }
    }
}

class ConsumerTask extends TimerTask {
    HashSet<Integer> set = null;

    public ConsumerTask(HashSet<Integer> set) {
        this.set = set;
    }
    /**
     * 這里面就是返回的一個用戶數(shù)
     */
    @Override
    public void run() {
        int userCount = set.size();
        System.out.println(System.currentTimeMillis() + ",截至到當(dāng)前為止的一個用戶數(shù)為:"+userCount);
    }
}

用hashset來實現(xiàn)很顯然會出問題,如果數(shù)據(jù)量一直往上增長,會出現(xiàn)oom的問題,而且占用資源越來越多,影響電腦性能?。。?br> 方案二:將HashSet改成bitMap來計數(shù),就很完美,大邏輯不變,小邏輯就是將HashMap改成bitMap

package com.doitedu;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.roaringbitmap.RoaringBitmap;

import java.time.Duration;
import java.util.*;

/**
 * 分兩步走:
 * 第一步:一個消費者不斷的去消費數(shù)據(jù)
 * 第二步:5分鐘計算一次,返回用戶數(shù)這個結(jié)果
 */
public class BitMap_consumerDemo {
    public static void main(String[] args) {
        //原來我用的是Hashset來記錄,現(xiàn)在我用RoaringBitmap來記錄
        RoaringBitmap bitMap = RoaringBitmap.bitmapOf();

        new Thread(new BitMapConsumerThread(bitMap)).start();
        //定時的任務(wù)調(diào)度
        Timer timer = new Timer();
        timer.schedule(new BitMapConsumerTask(bitMap),1000,5000);

    }
}

class BitMapConsumerThread implements Runnable {
    RoaringBitmap bitMap = null;
    KafkaConsumer<String, String> consumer = null;

    public BitMapConsumerThread(RoaringBitmap bitMap) {
        this.bitMap = bitMap;
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001");

        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("event-log"));
    }
    /**
     * 重寫run方法的話,我需要在里面實現(xiàn)什么邏輯?
     * 消費者消費數(shù)據(jù),拿到數(shù)據(jù)以后,只需要獲取到用戶id
     * 將用戶id寫到hashset集合里面
     */
    @Override
    public void run() {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
            for (ConsumerRecord<String, String> record : records) {
                String json = record.value();
                UserEvent userEvent = JSON.parseObject(json, UserEvent.class);
                Integer guid = userEvent.getGuid();
                bitMap.add(guid);
            }
        }
    }
}


class BitMapConsumerTask extends TimerTask {
    RoaringBitmap bitMap = null;

    public BitMapConsumerTask(RoaringBitmap bitMap) {
        this.bitMap = bitMap;
    }
    /**
     * 這里面就是返回的一個用戶數(shù)
     */
    @Override
    public void run() {
        int userCount = bitMap.getCardinality();
        System.out.println(System.currentTimeMillis() + ",截至到當(dāng)前為止的一個用戶數(shù)為:"+userCount);
    }
}

需求二:判斷來沒來過的問題,可以用bitmap來搞,當(dāng)然還可以用布隆過濾器來搞文章來源地址http://www.zghlxwxcb.cn/news/detail-480447.html

package com.doitedu;

import com.alibaba.fastjson.JSON;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;

/**
 * 用布隆過濾器來判定是否重復(fù),當(dāng)然,bitMap也是可以操作的
 */
public class BloomFilter_consumerDemo {
    public static void main(String[] args) {

        BloomFilter<Long> longBloomFilter = BloomFilter.create(Funnels.longFunnel(), 100000);

        new Thread(new BloomFilterConsumerThread(longBloomFilter)).start();
    }
}

class BloomFilterConsumerThread implements Runnable {
    BloomFilter<Long> longBloomFilter = null;
    KafkaConsumer<String, String> consumer = null;

    public BloomFilterConsumerThread(BloomFilter<Long> longBloomFilter) {
        this.longBloomFilter = longBloomFilter;
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001");
        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("event-log"));
    }

    /**
     * 重寫run方法的話,我需要在里面實現(xiàn)什么邏輯?
     * 消費者消費數(shù)據(jù),拿到數(shù)據(jù)以后,只需要獲取到用戶id
     * 將用戶id寫到hashset集合里面
     */
    @Override
    public void run() {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
            for (ConsumerRecord<String, String> record : records) {
                String json = record.value();
                UserEvent userEvent = JSON.parseObject(json, UserEvent.class);
                Integer guid = userEvent.getGuid();
                boolean flag = longBloomFilter.mightContain((long) guid);
                if (flag) {
                    userEvent.setIsNew(0);
                } else {
                    userEvent.setIsNew(1);
                }
                //判斷完成以后,得把他加進(jìn)去
                longBloomFilter.put((long) guid);
                System.out.println(JSON.toJSONString(userEvent));
            }
        }
    }
}

到了這里,關(guān)于kafka生產(chǎn)者消費者練習(xí)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 筆記:配置多個kafka生產(chǎn)者和消費者

    如果只有一個kafka,那么使用自帶的KafkaAutoConfiguration配置類即可,對應(yīng)已有屬性類KafkaProperties,屬性前綴為spring.kafka.xxx; 本文記錄配置多個kafka的情況,即在KafkaAutoConfiguration的基礎(chǔ)上,自定義額外的kafka生產(chǎn)者和消費者。 適用場景:需要消費來源于不同kafka的消息、需要在不

    2024年02月15日
    瀏覽(32)
  • kafka生產(chǎn)者和消費者(python版)

    生產(chǎn)者 消費者 消費者中的組名主要用戶針對主題的偏移量進(jìn)行更改,也涉及到主題中分區(qū)的問題, kafka工具類 此工具類基本上拿過去就可以用 疑問 當(dāng)消費者鏈接kafka時發(fā)現(xiàn)topic沒有未讀的消息怎樣退出呢,默認(rèn)是在一直等待,但是我期望沒有要讀的消息的時候直接退出即可

    2024年02月16日
    瀏覽(21)
  • Kafka官方生產(chǎn)者和消費者腳本簡單使用

    怎樣使用Kafka官方生產(chǎn)者和消費者腳本進(jìn)行消費生產(chǎn)和消費?這里假設(shè)已經(jīng)下載了kafka官方文件,并已經(jīng)解壓. 這就可以見到測試kafka對應(yīng)topic了.

    2024年02月04日
    瀏覽(23)
  • Kafka:主題創(chuàng)建、分區(qū)修改查看、生產(chǎn)者、消費者

    Kafka:主題創(chuàng)建、分區(qū)修改查看、生產(chǎn)者、消費者

    1.創(chuàng)建主題 2.查看所有主題 3.查看詳細(xì)主題 序號從0開始計算 Partition:分區(qū)數(shù),該主題有3個分區(qū) Replica:副本數(shù),該主題有3個副本 Leader:副本數(shù)中的主的序號,生產(chǎn)消費的對象 1.修改分區(qū)數(shù) 修改的分區(qū)數(shù)量不可以小于或者等于當(dāng)前主題分區(qū)的數(shù)量,否則會報錯 在根目錄kaf

    2024年02月11日
    瀏覽(32)
  • Kafka系列:查看Topic列表、消息消費情況、模擬生產(chǎn)者消費者

    Kafka系列:查看Topic列表、消息消費情況、模擬生產(chǎn)者消費者

    執(zhí)行topic刪除命令時,出現(xiàn)提示 這條命令其實并不執(zhí)行刪除動作,僅僅是在zookeeper上標(biāo)記該topic要被刪除而已,同時也提醒用戶一定要提前打開delete.topic.enable開關(guān),否則刪除動作是不會執(zhí)行的。 解決辦法: a)在server.properties中設(shè)置delete.topic.enable參數(shù)為ture b)如下操作: 1.登

    2023年04月26日
    瀏覽(30)
  • 探究:kafka生產(chǎn)者/消費者與多線程安全

    探究:kafka生產(chǎn)者/消費者與多線程安全

    目錄 1. 多線程安全 1.1. 生產(chǎn)者是多線程安全的么? 1.1. 消費者是多線程安全的么? 2. 消費者規(guī)避多線程安全方案 2.1. 每個線程維護一個kafkaConsumer 2.2. [單/多]kafkaConsumer實例 + 多worker線程 2.3.方案優(yōu)缺點對比 ????????Kafka生產(chǎn)者是 線程安全 的,可以在多個線程中共享一個

    2023年04月26日
    瀏覽(24)
  • Linux安裝Kafka,創(chuàng)建topic、生產(chǎn)者、消費者

    Linux安裝Kafka,創(chuàng)建topic、生產(chǎn)者、消費者

    1.創(chuàng)建安裝目錄/usr/local/kafka mkdir /usr/local/kafka 2.進(jìn)入安裝包目錄 cd?/usr/local/kafka? 3.下載安裝包 wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz 4.解壓安裝包 tar -zxvf kafka_2.12-3.3.1.tgz 5.進(jìn)入cd kafka_2.12-3.3.1目錄 cd kafka_2.12-3.3.1/ 6.修改zookeeper配置 cat ./config/zookeeper.properties | grep

    2023年04月17日
    瀏覽(29)
  • Kafka 架構(gòu)深度解析:生產(chǎn)者(Producer)和消費者(Consumer)

    Kafka 架構(gòu)深度解析:生產(chǎn)者(Producer)和消費者(Consumer)

    Apache Kafka 作為分布式流處理平臺,其架構(gòu)中的生產(chǎn)者和消費者是核心組件,負(fù)責(zé)實現(xiàn)高效的消息生產(chǎn)和消費。本文將深入剖析 Kafka 架構(gòu)中生產(chǎn)者和消費者的工作原理、核心概念以及高級功能。 1 發(fā)送消息到 Kafka Kafka 生產(chǎn)者負(fù)責(zé)將消息發(fā)布到指定的主題。以下是一個簡單的生

    2024年02月03日
    瀏覽(50)
  • 07、Kafka ------ 消息生產(chǎn)者(演示 發(fā)送消息) 和 消息消費者(演示 監(jiān)聽消息)

    07、Kafka ------ 消息生產(chǎn)者(演示 發(fā)送消息) 和 消息消費者(演示 監(jiān)聽消息)

    簡單來說,就是一個數(shù)據(jù)項。 ▲ 消息就是 Kafka 所記錄的數(shù)據(jù)節(jié)點,消息在 Kafka 中又被稱為記錄(record)或事件(event)。 從存儲上來看,消息就是存儲在分區(qū)文件(有點類似于List)中的一個數(shù)據(jù)項,消息具有 key、value、時間戳 和 可選的元數(shù)據(jù)頭。 ▲ 下面是一個示例事件

    2024年01月20日
    瀏覽(46)
  • kafka配置大全broker、topic、生產(chǎn)者和消費者等配置介紹

    每個kafka broker中配置文件 server.properties 默認(rèn)必須配置的屬性如下: **bootstrap.servers** - 指定生產(chǎn)者客戶端連接kafka集群所需的broker地址列表,格式為host1:port1,host2:port2,可以設(shè)置一個或多個。這里并非需要所有的broker地址,因為生產(chǎn)者會從給定的broker里尋找其它的broker。 **key

    2024年02月05日
    瀏覽(40)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包