?個人簡介:?
> ??個人主頁:趙四司機
> ??學(xué)習方向:JAVA后端開發(fā)?
> ?往期文章:SpringBoot項目整合微信支付
> ??博主推薦網(wǎng)站:??途W(wǎng) 刷題|面試|找工作神器
> ??種一棵樹最好的時間是十年前,其次是現(xiàn)在!
> ??喜歡的話麻煩點點關(guān)注喔,你們的支持是我的最大動力。
前言:
最近在做一個基于SpringCloud+Springboot+Docker的新聞頭條微服務(wù)項目,用的是黑馬的教程,現(xiàn)在項目開發(fā)進入了尾聲,我打算通過寫文章的形式進行梳理一遍,并且會將梳理過程中發(fā)現(xiàn)的Bug進行修復(fù),有需要改進的地方我也會繼續(xù)做出改進。這一系列的文章我將會放入微服務(wù)項目專欄中,這個項目適合剛接觸微服務(wù)的人作為練手項目,假如你對這個項目感興趣你可以訂閱我的專欄進行查看,需要資料可以私信我,當然要是能給我點個小小的關(guān)注就更好了,你們的支持是我最大的動力。
如果你想要一個可以系統(tǒng)學(xué)習的網(wǎng)站,那么我推薦的是牛客網(wǎng),個人感覺用著還是不錯的,頁面很整潔,而且內(nèi)容也很全面,語法練習,算法題練習,面試知識匯總等等都有,論壇也很活躍,傳送門鏈接:牛客刷題神器
目錄
一:Springboot集成Kafka Stream
1.設(shè)置配置類信息
2.修改application.yml文件
3.新增配置類,創(chuàng)建KStream對象,進行聚合
二:熱點文章實時計算
1.實現(xiàn)思路
2.環(huán)境搭建
2.1:在文章微服務(wù)中集成Kafka生產(chǎn)者配置
2.2:記錄用戶行為
2.3:定義Stream實現(xiàn)消息接收并聚合
2.4:重新計算文章分值并更新Redis緩存數(shù)據(jù)
2.5:設(shè)置監(jiān)聽類
三:功能測試
一:Springboot集成Kafka Stream
1.設(shè)置配置類信息
package com.my.kafka.config;
import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import java.util.HashMap;
import java.util.Map;
/**
* 通過重新注冊KafkaStreamsConfiguration對象,設(shè)置自定配置參數(shù)
*/
@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
private String hosts;
private String group;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
props.put(StreamsConfig.RETRIES_CONFIG, 10);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return new KafkaStreamsConfiguration(props);
}
}
????????可能你會有這樣的疑問,前面介紹Kafka時候不是直接在yml文件里面設(shè)置參數(shù)就行了嗎?為什么這里還要自己寫配置類呢?是因為Spring對KafkaStream的集成并不是很好,所以我們才需要自己去寫配置類信息。需要注意的一點是,配置類中必須添加@EnableKafkaStreams這一注解。
2.修改application.yml文件
kafka:
hosts: 192.168.200.130:9092
group: ${spring.application.name}
3.新增配置類,創(chuàng)建KStream對象,進行聚合
package com.my.kafka.stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
import java.util.Arrays;
@Slf4j
@Configuration
public class KafkaStreamHelloListener {
@Bean
public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
//創(chuàng)建KStream對象,同時指定從那個topic中接收消息
KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> Arrays.asList(value.split(" ")))
//根據(jù)value進行聚合分組
.groupBy((key,value)->value)
//聚合計算時間間隔
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
//求單詞的個數(shù)
.count()
.toStream()
//處理后的結(jié)果轉(zhuǎn)換為string字符串
.map((key,value)->{
System.out.println("key:"+key+",value:"+value);
return new KeyValue<>(key.key().toString(),value.toString());
})
//發(fā)送消息
.to("itcast-topic-out");
return stream;
}
}
? ? ? ? 這里實現(xiàn)的功能還是計算單詞個數(shù),假如你有其他計算需求你可以更改里面的邏輯代碼以符合你的需求。該類可注入StreamBuilder,其返回值必須是KStream且放入Spring容器中(添加了@Bean注解)。
二:熱點文章實時計算
1.實現(xiàn)思路
?
? ? ? ? 實現(xiàn)思路很簡單,當用戶有點贊、收藏、閱讀等行為記錄時候,就將消息發(fā)送給Kafka進行流式處理,隨后Kafka再進行聚合并重新計算文章分值,除此之外還需要更新數(shù)據(jù)庫中的數(shù)據(jù)。需要注意的是,按常理來說當天的文章熱度權(quán)重是要比非當天的文章熱度權(quán)重大的,因此當日文章的熱度權(quán)重需要乘以3,隨后查詢Redis中的數(shù)據(jù),假如該文章分數(shù)大于Redis中最低分文章,這時候就需要進行替換操作,更新Redis數(shù)據(jù)。?
2.環(huán)境搭建
2.1:在文章微服務(wù)中集成Kafka生產(chǎn)者配置
(1)修改nacos,增加內(nèi)容:
kafka:
bootstrap-servers: 49.234.52.192:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
hosts: 49.234.52.192:9092
group: ${spring.application.name}
(2)定義相關(guān)實體類、常量
package com.my.model.mess;
import lombok.Data;
@Data
public class UpdateArticleMess {
/**
* 修改文章的字段類型
*/
private UpdateArticleType type;
/**
* 文章ID
*/
private Long articleId;
/**
* 修改數(shù)據(jù)的增量,可為正負
*/
private Integer add;
public enum UpdateArticleType{
COLLECTION,COMMENT,LIKES,VIEWS;
}
}
2.2:記錄用戶行為
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/**
* 讀文章行為記錄(閱讀量+1)
* @param map
* @return
*/
public ResponseResult readBehavior(Map map) {
if(map == null || map.get("articleId") == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}
Long articleId = Long.parseLong((String) map.get("articleId"));
ApArticle apArticle = getById(articleId);
if(apArticle != null) {
//獲取文章閱讀數(shù)
Integer views = apArticle.getViews();
if(views == null) {
views = 0;
}
//調(diào)用Kafka發(fā)送消息
UpdateArticleMess mess = new UpdateArticleMess();
mess.setArticleId(articleId);
mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);
mess.setAdd(1);
kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));
//更新文章閱讀數(shù)
LambdaUpdateWrapper<ApArticle> luw = new LambdaUpdateWrapper<>();
luw.eq(ApArticle::getId,articleId);
luw.set(ApArticle::getViews,views + 1);
update(luw);
}
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
/**
* 用戶點贊
* @param map
* @return
*/
@Override
public ResponseResult likesBehavior(Map map) {
if(map == null || map.get("articleId") == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}
Long articleId = Long.parseLong((String) map.get("articleId"));
Integer operation = (Integer) map.get("operation");
ApArticle apArticle = getById(articleId);
UpdateArticleMess mess = new UpdateArticleMess();
mess.setArticleId(articleId);
mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);
if(apArticle != null) {
//獲取文章點贊數(shù)
Integer likes = apArticle.getLikes();
if(likes == null) {
likes = 0;
}
//更新文章點贊數(shù)
LambdaUpdateWrapper<ApArticle> luw = new LambdaUpdateWrapper<>();
luw.eq(ApArticle::getId,articleId);
if(operation == 0) {
//點贊
log.info("用戶點贊文章...");
luw.set(ApArticle::getLikes,likes + 1);
//分值增加
mess.setAdd(1);
} else {
//取消點贊
log.info("用戶取消點贊文章...");
luw.set(ApArticle::getLikes,likes - 1);
//分值減少
mess.setAdd(-1);
}
//調(diào)用Kafka發(fā)送消息
kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));
update(luw);
}
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
/**
* 用戶收藏
* @param map
* @return
*/
@Override
public ResponseResult collBehavior(Map map) {
if(map == null || map.get("entryId") == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}
Long articleId = Long.parseLong((String) map.get("entryId"));
Integer operation = (Integer) map.get("operation");
ApArticle apArticle = getById(articleId);
//消息載體
UpdateArticleMess mess = new UpdateArticleMess();
mess.setArticleId(articleId);
mess.setType(UpdateArticleMess.UpdateArticleType.COLLECTION);
if(apArticle != null) {
//獲取文章收藏數(shù)
Integer collection = apArticle.getCollection();
if(collection == null) {
collection = 0;
}
//更新文章收藏數(shù)
LambdaUpdateWrapper<ApArticle> luw = new LambdaUpdateWrapper<>();
luw.eq(ApArticle::getId,articleId);
if(operation == 0) {
//收藏
log.info("用戶收藏文章...");
luw.set(ApArticle::getCollection,collection + 1);
mess.setAdd(1);
} else {
//取消收藏
log.info("用戶取消收藏文章...");
luw.set(ApArticle::getCollection,collection - 1);
mess.setAdd(-1);
}
//調(diào)用Kafka發(fā)送消息
kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));
update(luw);
}
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
? ? ? ? 這一步主要是當用戶對文章進行訪問、點贊、評論或者收藏時候就會更新數(shù)據(jù)庫中的記錄,同時還要將該行為記錄封裝并發(fā)送至Kafka。
2.3:定義Stream實現(xiàn)消息接收并聚合
package com.my.article.stream;
import com.alibaba.fastjson.JSON;
import com.my.common.constans.HotArticleConstants;
import com.my.model.mess.ArticleVisitStreamMess;
import com.my.model.mess.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
@Configuration
@Slf4j
public class HotArticleStreamHandler {
@Bean
public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
//接收消息
KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
//聚合流式處理
stream.map((key,value)->{
UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
//重置消息的key:1234343434 和 value: likes:1
return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());
})
//按照文章id進行聚合
.groupBy((key,value)->key)
//時間窗口 每十秒聚合一次
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
/*
自行地完成聚合的計算
*/
.aggregate(new Initializer<String>() {
/**
* 初始方法,返回值是消息的value
*/
@Override
public String apply() {
return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
}
/*
真正的聚合操作,返回值是消息的value
*/
}, new Aggregator<String, String, String>() {
/**
* 聚合并返回
* @param key 文章id
* @param value 重置后的value ps:likes:1
* @param aggValue "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0"
* @return aggValue格式
*/
@Override
public String apply(String key, String value, String aggValue) {
//用戶沒有進行任何操作
if(StringUtils.isBlank(value)){
return aggValue;
}
String[] aggAry = aggValue.split(",");
//收藏、評論、點贊、閱讀量初始值
int col = 0,com=0,lik=0,vie=0;
for (String agg : aggAry) {
//for --> COLLECTION:0
String[] split = agg.split(":");
//split[0]:COLLECTION,split[1]:0
/*
獲得初始值,也是時間窗口內(nèi)計算之后的值
第一次獲取到的值為0
*/
switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
case COLLECTION:
col = Integer.parseInt(split[1]);
break;
case COMMENT:
com = Integer.parseInt(split[1]);
break;
case LIKES:
lik = Integer.parseInt(split[1]);
break;
case VIEWS:
vie = Integer.parseInt(split[1]);
break;
}
}
/*
累加操作
*/
String[] valAry = value.split(":");
switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){
case COLLECTION:
col += Integer.parseInt(valAry[1]);
break;
case COMMENT:
com += Integer.parseInt(valAry[1]);
break;
case LIKES:
lik += Integer.parseInt(valAry[1]);
break;
case VIEWS:
vie += Integer.parseInt(valAry[1]);
break;
}
String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);
log.info("文章的id:{}",key);
log.info("當前時間窗口內(nèi)的消息處理結(jié)果:{}",formatStr);
//必須返回和apply()的返回類型
return formatStr;
}
}, Materialized.as("hot-article-stream-count-001"))
.toStream()
.map((key,value)->{
return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));
})
//發(fā)送消息
.to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);
return stream;
}
/**
* 格式化消息的value數(shù)據(jù)
* @param articleId 文章id
* @param value 聚合結(jié)果
* @return String
*/
public String formatObj(String articleId,String value){
ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
mess.setArticleId(Long.valueOf(articleId));
//COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
String[] valAry = value.split(",");
for (String val : valAry) {
String[] split = val.split(":");
switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
case COLLECTION:
mess.setCollect(Integer.parseInt(split[1]));
break;
case COMMENT:
mess.setComment(Integer.parseInt(split[1]));
break;
case LIKES:
mess.setLike(Integer.parseInt(split[1]));
break;
case VIEWS:
mess.setView(Integer.parseInt(split[1]));
break;
}
}
log.info("聚合消息處理之后的結(jié)果為:{}",JSON.toJSONString(mess));
return JSON.toJSONString(mess);
}
}
????????這一步是最難但是也是最重要的,首先我們接收到消息之后需要先對其key和value進行重置,因為這時候接收到的數(shù)據(jù)是一個JSON字符串格式的UpdateArticleMess對象,我們需要將其重置為key value鍵值對的格式。也即將其格式轉(zhuǎn)化成key為文章id,value為用戶行為記錄,如key:182738789987,value:LIKES:1,表示用戶對該文章點贊一次。隨后選擇對文章id進行聚合,每10秒鐘聚合一次,需要注意的是,apply()函數(shù)中返回結(jié)構(gòu)必須是“COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0”格式。
2.4:重新計算文章分值并更新Redis緩存數(shù)據(jù)
@Service
@Transactional
@Slf4j
public class ApArticleServiceImpl extends ServiceImpl<ApArticleMapper, ApArticle> implements ApArticleService {
/**
* 更新文章分值,同時更新redis中熱點文章數(shù)據(jù)
* @param mess
*/
@Override
public void updateScore(ArticleVisitStreamMess mess) {
//1.獲取文章數(shù)據(jù)
ApArticle apArticle = getById(mess.getArticleId());
//2.計算文章分值
Integer score = computeScore(apArticle);
score = score * 3;
//3.替換當前文章對應(yīng)頻道熱點數(shù)據(jù)
replaceDataToRedis(apArticle,score,ArticleConstas.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId());
//4.替換推薦頻道文章熱點數(shù)據(jù)
replaceDataToRedis(apArticle,score,ArticleConstas.HOT_ARTICLE_FIRST_PAGE + ArticleConstas.DEFAULT_TAG);
}
/**
* 根據(jù)權(quán)重計算文章分值
* @param apArticle
* @return
*/
private Integer computeScore(ApArticle apArticle) {
Integer score = 0;
if(apArticle.getLikes() != null){
score += apArticle.getLikes() * ArticleConstas.HOT_ARTICLE_LIKE_WEIGHT;
}
if(apArticle.getViews() != null){
score += apArticle.getViews();
}
if(apArticle.getComment() != null){
score += apArticle.getComment() * ArticleConstas.HOT_ARTICLE_COMMENT_WEIGHT;
}
if(apArticle.getCollection() != null){
score += apArticle.getCollection() * ArticleConstas.HOT_ARTICLE_COLLECTION_WEIGHT;
}
return score;
}
/**
* 替換數(shù)據(jù)并存入到redis
* @param apArticle 文章信息
* @param score 文章新的得分
* @param key redis數(shù)據(jù)的key值
*/
private void replaceDataToRedis(ApArticle apArticle,Integer score, String key) {
String articleListStr = cacheService.get(key);
if(StringUtils.isNotBlank(articleListStr)) {
List<HotArticleVo> hotArticleVos = JSON.parseArray(articleListStr, HotArticleVo.class);
boolean flag = true;
//如果緩存中存在該文章,直接更新文章分值
for (HotArticleVo hotArticleVo : hotArticleVos) {
if(hotArticleVo.getId().equals(apArticle.getId())) {
if(key.equals(ArticleConstas.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId())) {
log.info("頻道{}緩存中存在該文章,文章{}分值更新{}-->{}",apArticle.getChannelName(),apArticle.getId(),hotArticleVo.getScore(),score);
} else {
log.info("推薦頻道緩存中存在該文章,文章{}分值更新{}-->{}",apArticle.getId(),hotArticleVo.getScore(),score);
}
hotArticleVo.setScore(score);
flag = false;
break;
}
}
//如果緩存中不存在該文章
if(flag) {
//緩存中熱點文章數(shù)少于30,直接增加
if(hotArticleVos.size() < 30) {
log.info("該文章{}不在緩存,但是文章數(shù)少于30,直接添加",apArticle.getId());
HotArticleVo hotArticleVo = new HotArticleVo();
BeanUtils.copyProperties(apArticle,hotArticleVo);
hotArticleVo.setScore(score);
hotArticleVos.add(hotArticleVo);
} else {
//緩存中熱點文章數(shù)大于或等于30
//1.排序
hotArticleVos = hotArticleVos.stream().sorted(Comparator.
comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
//2.獲取最小得分值
HotArticleVo minScoreHotArticleVo = hotArticleVos.get(hotArticleVos.size() - 1);
if(minScoreHotArticleVo.getScore() <= score) {
//3.移除分值最小文章
log.info("替換分值最小的文章...");
hotArticleVos.remove(minScoreHotArticleVo);
HotArticleVo hotArticleVo = new HotArticleVo();
BeanUtils.copyProperties(apArticle,hotArticleVo);
hotArticleVo.setScore(score);
hotArticleVos.add(hotArticleVo);
}
}
}
//重新排序并緩存到redis
hotArticleVos = hotArticleVos.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed())
.collect(Collectors.toList());
cacheService.set(key,JSON.toJSONString(hotArticleVos));
if(key.equals(ArticleConstas.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId())) {
log.info("成功刷新{}頻道中熱點文章緩存數(shù)據(jù)",apArticle.getChannelName());
} else {
log.info("成功刷新推薦頻道中熱點文章緩存數(shù)據(jù)");
}
}
}
}
????????這一步主要是邏輯處理部分,在這里我們需要完成對文章的得分進行重新計算并根據(jù)計算結(jié)果更新Redis中的緩存數(shù)據(jù)。計算到得分之后,我們需要分別對不同頻道和推薦頻道進行處理,但是處理流程相同。首先我們會先判斷緩存中的數(shù)據(jù)有沒有滿30條,如果沒滿則直接該文章添加到緩存中作為熱榜文章;如果緩存中已滿30條數(shù)據(jù),這時候就要分兩種情況處理,如果緩存中存在該文章數(shù)據(jù),則直接對其得分進行更新,如若不然則需要將該文章分值與緩存中的最低分進行比較,如果改文章得分比最低分高則直接進行替換,否則不做處理。最后還需要對緩存中的數(shù)據(jù)重新排序并再次發(fā)送到Reids中。
2.5:設(shè)置監(jiān)聽類
package com.my.article.listener;
import com.alibaba.fastjson.JSON;
import com.my.article.service.ApArticleService;
import com.my.common.constans.HotArticleConstants;
import com.my.model.mess.ArticleVisitStreamMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ArticleIncrHandleListener {
@Autowired
private ApArticleService apArticleService;
@KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)
public void onMessage(String mess){
if(StringUtils.isNotBlank(mess)){
ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);
apArticleService.updateScore(articleVisitStreamMess);
}
}
}
三:功能測試
打開App端對一篇文章進行瀏覽并點贊收藏
到控制臺查看日志信息?
????????可以看到?成功記錄用戶行為并且將文章得分進行了更改,其處理流程是這樣的,首先接收到的是用戶的點贊數(shù)據(jù),隨后接收到用戶的瀏覽記錄,最后接收到的是用戶的收藏記錄,由于前面提到的消息處理是增加而不是更新,所以最后我們可以看到時間窗口處理結(jié)果為COLLECTION:1,COMMENT:0,LIKES:1,VIEWS:1,10秒鐘之后就會對消息進行聚合,假如這10秒之內(nèi)還有其他用戶也進行了點贊閱讀操作,這時候就會繼續(xù)將消息增加在原來處理結(jié)果上面,過了10秒之后就會進行一次聚合處理,也即拿著這批數(shù)據(jù)進行數(shù)據(jù)更新操作。
至此該項目的開發(fā)就告一段落了,后續(xù)有什么優(yōu)化我會再發(fā)文介紹。文章來源:http://www.zghlxwxcb.cn/news/detail-780527.html
友情鏈接:???途W(wǎng)? 刷題|面試|找工作神器文章來源地址http://www.zghlxwxcb.cn/news/detail-780527.html
到了這里,關(guān)于猿創(chuàng)征文 | 項目整合KafkaStream實現(xiàn)文章熱度實時計算的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!