需求:寫一個生產(chǎn)者,不斷的去生產(chǎn)用戶行為數(shù)據(jù),寫入到kafka的一個topic中
生產(chǎn)的數(shù)據(jù)格式: 造數(shù)據(jù)
{“guid”:1,“eventId”:“pageview”,“timestamp”:1637868346789} isNew = 1
{“guid”:1,“eventId”:“addcard”,“timestamp”:1637868347625} isNew = 0
{“guid”:2,“eventId”:“collect”,“timestamp”:16378683463219}
{“guid”:3,“eventId”:“paid”,“timestamp”:16378683467829}
…
再寫一個消費者,不斷的從kafka中消費上面的用戶行為數(shù)據(jù),做一個統(tǒng)計
1.每5s輸出一次當(dāng)前來了多少用戶(去重) uv
2.將每條數(shù)據(jù)添加一個字段來標(biāo)識,如果這個用戶的id是第一次出現(xiàn),那么就標(biāo)注1,否則就是0
首先添加依賴:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.8.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.roaringbitmap/RoaringBitmap -->
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>0.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
生產(chǎn)者代碼示例:
package com.doitedu;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* 驗證數(shù)據(jù):
* 創(chuàng)建topic
* kafka-topics.sh --create --topic event-log --zookeeper linux01:2181 --partitions 3 --replication-factor 3
* 搞個消費者消費數(shù)據(jù)
* kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic event-log
* {"eventId":"zTUAbXcWbn","guid":7170,"timeStamp":1659944455262}
* {"eventId":"KSzaaNmczb","guid":9743,"timeStamp":1659944455823}
* {"eventId":"FNUERLlCNu","guid":7922,"timeStamp":1659944456295}
* {"eventId":"VmXVJHlpOF","guid":2505,"timeStamp":1659944458267}
* {"eventId":"pMIHwLzSIE","guid":7668,"timeStamp":1659944460088}
* {"eventId":"ZvGYIvmKTx","guid":3636,"timeStamp":1659944460461}
* {"eventId":"jBanTDSlCO","guid":3468,"timeStamp":1659944460787}
* {"eventId":"vXregpYeHu","guid":1107,"timeStamp":1659944462525}
* {"eventId":"PComosCafr","guid":7765,"timeStamp":1659944463640}
* {"eventId":"xCHFOYIJlb","guid":3443,"timeStamp":1659944464697}
* {"eventId":"xDToApWwFo","guid":5034,"timeStamp":1659944465953}
*/
public class Exercise_kafka編程練習(xí) {
public static void main(String[] args) throws InterruptedException {
MyData myData = new MyData();
myData.genData();
}
}
class MyData{
KafkaProducer<String, String> producer = null;
public MyData(){
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092,linux02:9092,linux03:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producer = new KafkaProducer<String, String>(props);
}
public void genData() throws InterruptedException {
UserEvent userEvent = new UserEvent();
while (true){
//造數(shù)據(jù)
userEvent.setGuid(RandomUtils.nextInt(0,10000));
userEvent.setEventId(RandomStringUtils.randomAlphabetic(10));
userEvent.setTimeStamp(System.currentTimeMillis());
String json = JSON.toJSONString(userEvent);
//數(shù)據(jù)造完了就往kafka中寫
ProducerRecord<String, String> stringProducerRecord = new ProducerRecord<>("event-log", json);
Thread.sleep(RandomUtils.nextInt(200,1000));
producer.send(stringProducerRecord);
}
}
}
/*
{"guid":1,"eventId":"pageview","timestamp":1637868346789}
{"guid":1,"eventId":"addcard","timestamp":1637868347625}
{"guid":2,"eventId":"collect","timestamp":16378683463219}
*/
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
class UserEvent{
private Integer guid;
private String eventId;
private long timeStamp;
}
消費者代碼示例:用hashset來實現(xiàn):
package com.doitedu;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
/**
* 分兩步走:
* 第一步:一個消費者不斷的去消費數(shù)據(jù)
* 第二步:5分鐘計算一次,返回用戶數(shù)這個結(jié)果
*/
public class Exercise_consumerDemo {
public static void main(String[] args) {
HashSet<Integer> set = new HashSet<>();
new Thread(new ConsumerThread(set)).start();
//定時的任務(wù)調(diào)度
Timer timer = new Timer();
//調(diào)度,第一個參數(shù),你給我一個任務(wù),
//第二個參數(shù)代表過多久之后我開始執(zhí)行任務(wù)
//第三個參數(shù)代表每隔多久執(zhí)行一次
timer.schedule(new ConsumerTask(set),5000,10000);
}
}
class ConsumerThread implements Runnable {
HashSet<Integer> set = null;
KafkaConsumer<String, String> consumer = null;
public ConsumerThread(HashSet<Integer> set) {
this.set = set;
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001");
consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("event-log"));
}
/**
* 重寫run方法的話,我需要在里面實現(xiàn)什么邏輯?
* 消費者消費數(shù)據(jù),拿到數(shù)據(jù)以后,只需要獲取到用戶id
* 將用戶id寫到hashset集合里面
*/
@Override
public void run() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
for (ConsumerRecord<String, String> record : records) {
String json = record.value();
UserEvent userEvent = JSON.parseObject(json, UserEvent.class);
Integer guid = userEvent.getGuid();
set.add(guid);
}
}
}
}
class ConsumerTask extends TimerTask {
HashSet<Integer> set = null;
public ConsumerTask(HashSet<Integer> set) {
this.set = set;
}
/**
* 這里面就是返回的一個用戶數(shù)
*/
@Override
public void run() {
int userCount = set.size();
System.out.println(System.currentTimeMillis() + ",截至到當(dāng)前為止的一個用戶數(shù)為:"+userCount);
}
}
用hashset來實現(xiàn)很顯然會出問題,如果數(shù)據(jù)量一直往上增長,會出現(xiàn)oom的問題,而且占用資源越來越多,影響電腦性能?。。?br> 方案二:將HashSet改成bitMap來計數(shù),就很完美,大邏輯不變,小邏輯就是將HashMap改成bitMap文章來源:http://www.zghlxwxcb.cn/news/detail-480447.html
package com.doitedu;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.roaringbitmap.RoaringBitmap;
import java.time.Duration;
import java.util.*;
/**
* 分兩步走:
* 第一步:一個消費者不斷的去消費數(shù)據(jù)
* 第二步:5分鐘計算一次,返回用戶數(shù)這個結(jié)果
*/
public class BitMap_consumerDemo {
public static void main(String[] args) {
//原來我用的是Hashset來記錄,現(xiàn)在我用RoaringBitmap來記錄
RoaringBitmap bitMap = RoaringBitmap.bitmapOf();
new Thread(new BitMapConsumerThread(bitMap)).start();
//定時的任務(wù)調(diào)度
Timer timer = new Timer();
timer.schedule(new BitMapConsumerTask(bitMap),1000,5000);
}
}
class BitMapConsumerThread implements Runnable {
RoaringBitmap bitMap = null;
KafkaConsumer<String, String> consumer = null;
public BitMapConsumerThread(RoaringBitmap bitMap) {
this.bitMap = bitMap;
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001");
consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("event-log"));
}
/**
* 重寫run方法的話,我需要在里面實現(xiàn)什么邏輯?
* 消費者消費數(shù)據(jù),拿到數(shù)據(jù)以后,只需要獲取到用戶id
* 將用戶id寫到hashset集合里面
*/
@Override
public void run() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
for (ConsumerRecord<String, String> record : records) {
String json = record.value();
UserEvent userEvent = JSON.parseObject(json, UserEvent.class);
Integer guid = userEvent.getGuid();
bitMap.add(guid);
}
}
}
}
class BitMapConsumerTask extends TimerTask {
RoaringBitmap bitMap = null;
public BitMapConsumerTask(RoaringBitmap bitMap) {
this.bitMap = bitMap;
}
/**
* 這里面就是返回的一個用戶數(shù)
*/
@Override
public void run() {
int userCount = bitMap.getCardinality();
System.out.println(System.currentTimeMillis() + ",截至到當(dāng)前為止的一個用戶數(shù)為:"+userCount);
}
}
需求二:判斷來沒來過的問題,可以用bitmap來搞,當(dāng)然還可以用布隆過濾器來搞文章來源地址http://www.zghlxwxcb.cn/news/detail-480447.html
package com.doitedu;
import com.alibaba.fastjson.JSON;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
/**
* 用布隆過濾器來判定是否重復(fù),當(dāng)然,bitMap也是可以操作的
*/
public class BloomFilter_consumerDemo {
public static void main(String[] args) {
BloomFilter<Long> longBloomFilter = BloomFilter.create(Funnels.longFunnel(), 100000);
new Thread(new BloomFilterConsumerThread(longBloomFilter)).start();
}
}
class BloomFilterConsumerThread implements Runnable {
BloomFilter<Long> longBloomFilter = null;
KafkaConsumer<String, String> consumer = null;
public BloomFilterConsumerThread(BloomFilter<Long> longBloomFilter) {
this.longBloomFilter = longBloomFilter;
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001");
consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("event-log"));
}
/**
* 重寫run方法的話,我需要在里面實現(xiàn)什么邏輯?
* 消費者消費數(shù)據(jù),拿到數(shù)據(jù)以后,只需要獲取到用戶id
* 將用戶id寫到hashset集合里面
*/
@Override
public void run() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
for (ConsumerRecord<String, String> record : records) {
String json = record.value();
UserEvent userEvent = JSON.parseObject(json, UserEvent.class);
Integer guid = userEvent.getGuid();
boolean flag = longBloomFilter.mightContain((long) guid);
if (flag) {
userEvent.setIsNew(0);
} else {
userEvent.setIsNew(1);
}
//判斷完成以后,得把他加進(jìn)去
longBloomFilter.put((long) guid);
System.out.println(JSON.toJSONString(userEvent));
}
}
}
}
到了這里,關(guān)于kafka生產(chǎn)者消費者練習(xí)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!