国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

猿創(chuàng)征文 | 項目整合KafkaStream實現(xiàn)文章熱度實時計算

這篇具有很好參考價值的文章主要介紹了猿創(chuàng)征文 | 項目整合KafkaStream實現(xiàn)文章熱度實時計算。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

kafka streams 中,哪個方法用于創(chuàng)建一個 kstream 對象,# 微服務(wù)學(xué)習筆記,JAVA學(xué)習筆記,微服務(wù),spring boot,java,kafka

?個人簡介:?

> ??個人主頁:趙四司機
> ??學(xué)習方向:JAVA后端開發(fā)?
> ?往期文章:SpringBoot項目整合微信支付
> ??博主推薦網(wǎng)站:??途W(wǎng) 刷題|面試|找工作神器
> ??種一棵樹最好的時間是十年前,其次是現(xiàn)在!
> ??喜歡的話麻煩點點關(guān)注喔,你們的支持是我的最大動力。

前言:

最近在做一個基于SpringCloud+Springboot+Docker的新聞頭條微服務(wù)項目,用的是黑馬的教程,現(xiàn)在項目開發(fā)進入了尾聲,我打算通過寫文章的形式進行梳理一遍,并且會將梳理過程中發(fā)現(xiàn)的Bug進行修復(fù),有需要改進的地方我也會繼續(xù)做出改進。這一系列的文章我將會放入微服務(wù)項目專欄中,這個項目適合剛接觸微服務(wù)的人作為練手項目,假如你對這個項目感興趣你可以訂閱我的專欄進行查看,需要資料可以私信我,當然要是能給我點個小小的關(guān)注就更好了,你們的支持是我最大的動力。

如果你想要一個可以系統(tǒng)學(xué)習的網(wǎng)站,那么我推薦的是牛客網(wǎng),個人感覺用著還是不錯的,頁面很整潔,而且內(nèi)容也很全面,語法練習,算法題練習,面試知識匯總等等都有,論壇也很活躍,傳送門鏈接:牛客刷題神器

目錄

一:Springboot集成Kafka Stream

1.設(shè)置配置類信息

2.修改application.yml文件

3.新增配置類,創(chuàng)建KStream對象,進行聚合

二:熱點文章實時計算

1.實現(xiàn)思路

2.環(huán)境搭建

2.1:在文章微服務(wù)中集成Kafka生產(chǎn)者配置

2.2:記錄用戶行為

2.3:定義Stream實現(xiàn)消息接收并聚合

2.4:重新計算文章分值并更新Redis緩存數(shù)據(jù)

2.5:設(shè)置監(jiān)聽類

三:功能測試


一:Springboot集成Kafka Stream

1.設(shè)置配置類信息

package com.my.kafka.config;

import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;

import java.util.HashMap;
import java.util.Map;

/**
 * 通過重新注冊KafkaStreamsConfiguration對象,設(shè)置自定配置參數(shù)
 */

@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
    private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
    private String hosts;
    private String group;
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
        props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
        props.put(StreamsConfig.RETRIES_CONFIG, 10);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return new KafkaStreamsConfiguration(props);
    }
}

????????可能你會有這樣的疑問,前面介紹Kafka時候不是直接在yml文件里面設(shè)置參數(shù)就行了嗎?為什么這里還要自己寫配置類呢?是因為Spring對KafkaStream的集成并不是很好,所以我們才需要自己去寫配置類信息。需要注意的一點是,配置類中必須添加@EnableKafkaStreams這一注解。

2.修改application.yml文件

kafka:
  hosts: 192.168.200.130:9092
  group: ${spring.application.name}

3.新增配置類,創(chuàng)建KStream對象,進行聚合

package com.my.kafka.stream;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;
import java.util.Arrays;

@Slf4j
@Configuration
public class KafkaStreamHelloListener {

    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
        //創(chuàng)建KStream對象,同時指定從那個topic中接收消息
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
        stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> Arrays.asList(value.split(" ")))
                //根據(jù)value進行聚合分組
                .groupBy((key,value)->value)
                //聚合計算時間間隔
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                //求單詞的個數(shù)
                .count()
                .toStream()
                //處理后的結(jié)果轉(zhuǎn)換為string字符串
                .map((key,value)->{
                    System.out.println("key:"+key+",value:"+value);
                    return new KeyValue<>(key.key().toString(),value.toString());
                })
                //發(fā)送消息
                .to("itcast-topic-out");
        return stream;
    }
}

? ? ? ? 這里實現(xiàn)的功能還是計算單詞個數(shù),假如你有其他計算需求你可以更改里面的邏輯代碼以符合你的需求。該類可注入StreamBuilder,其返回值必須是KStream且放入Spring容器中(添加了@Bean注解)。

二:熱點文章實時計算

1.實現(xiàn)思路

kafka streams 中,哪個方法用于創(chuàng)建一個 kstream 對象,# 微服務(wù)學(xué)習筆記,JAVA學(xué)習筆記,微服務(wù),spring boot,java,kafka?

? ? ? ? 實現(xiàn)思路很簡單,當用戶有點贊、收藏、閱讀等行為記錄時候,就將消息發(fā)送給Kafka進行流式處理,隨后Kafka再進行聚合并重新計算文章分值,除此之外還需要更新數(shù)據(jù)庫中的數(shù)據(jù)。需要注意的是,按常理來說當天的文章熱度權(quán)重是要比非當天的文章熱度權(quán)重大的,因此當日文章的熱度權(quán)重需要乘以3,隨后查詢Redis中的數(shù)據(jù),假如該文章分數(shù)大于Redis中最低分文章,這時候就需要進行替換操作,更新Redis數(shù)據(jù)。?

2.環(huán)境搭建

2.1:在文章微服務(wù)中集成Kafka生產(chǎn)者配置

(1)修改nacos,增加內(nèi)容:

kafka:
    bootstrap-servers: 49.234.52.192:9092
    producer:
        retries: 10
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer
    hosts: 49.234.52.192:9092
    group: ${spring.application.name}

(2)定義相關(guān)實體類、常量

package com.my.model.mess;

import lombok.Data;

@Data
public class UpdateArticleMess {

    /**
     * 修改文章的字段類型
      */
    private UpdateArticleType type;
    /**
     * 文章ID
     */
    private Long articleId;
    /**
     * 修改數(shù)據(jù)的增量,可為正負
     */
    private Integer add;

    public enum UpdateArticleType{
        COLLECTION,COMMENT,LIKES,VIEWS;
    }
}

2.2:記錄用戶行為

@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/**
 * 讀文章行為記錄(閱讀量+1)
 * @param map
 * @return
 */
public ResponseResult readBehavior(Map map) {
    if(map == null || map.get("articleId") == null) {
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }

    Long articleId = Long.parseLong((String) map.get("articleId"));
    ApArticle apArticle = getById(articleId);

    if(apArticle != null) {
        //獲取文章閱讀數(shù)
        Integer views = apArticle.getViews();
        if(views == null) {
            views = 0;
        }

        //調(diào)用Kafka發(fā)送消息
        UpdateArticleMess mess = new UpdateArticleMess();
        mess.setArticleId(articleId);
        mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);
        mess.setAdd(1);
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));

        //更新文章閱讀數(shù)
        LambdaUpdateWrapper<ApArticle> luw = new LambdaUpdateWrapper<>();
        luw.eq(ApArticle::getId,articleId);
        luw.set(ApArticle::getViews,views + 1);
        update(luw);
    }

    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

/**
 * 用戶點贊
 * @param map
 * @return
 */
@Override
public ResponseResult likesBehavior(Map map) {
    if(map == null || map.get("articleId") == null) {
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }

    Long articleId = Long.parseLong((String) map.get("articleId"));
    Integer operation = (Integer) map.get("operation");
    ApArticle apArticle = getById(articleId);

    UpdateArticleMess mess = new UpdateArticleMess();
    mess.setArticleId(articleId);
    mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);

    if(apArticle != null) {
        //獲取文章點贊數(shù)
        Integer likes = apArticle.getLikes();
        if(likes == null) {
            likes = 0;
        }

        //更新文章點贊數(shù)
        LambdaUpdateWrapper<ApArticle> luw = new LambdaUpdateWrapper<>();
        luw.eq(ApArticle::getId,articleId);
        if(operation == 0) {
            //點贊
            log.info("用戶點贊文章...");
            luw.set(ApArticle::getLikes,likes + 1);
            //分值增加
            mess.setAdd(1);
        } else {
            //取消點贊
            log.info("用戶取消點贊文章...");
            luw.set(ApArticle::getLikes,likes - 1);
            //分值減少
            mess.setAdd(-1);
        }

        //調(diào)用Kafka發(fā)送消息
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));

        update(luw);
    }

    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

/**
 * 用戶收藏
 * @param map
 * @return
 */
@Override
public ResponseResult collBehavior(Map map) {
    if(map == null || map.get("entryId") == null) {
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }

    Long articleId = Long.parseLong((String) map.get("entryId"));
    Integer operation = (Integer) map.get("operation");
    ApArticle apArticle = getById(articleId);

    //消息載體
    UpdateArticleMess mess = new UpdateArticleMess();
    mess.setArticleId(articleId);
    mess.setType(UpdateArticleMess.UpdateArticleType.COLLECTION);

    if(apArticle != null) {
        //獲取文章收藏數(shù)
        Integer collection = apArticle.getCollection();
        if(collection == null) {
            collection = 0;
        }

        //更新文章收藏數(shù)
        LambdaUpdateWrapper<ApArticle> luw = new LambdaUpdateWrapper<>();
        luw.eq(ApArticle::getId,articleId);
        if(operation == 0) {
            //收藏
            log.info("用戶收藏文章...");
            luw.set(ApArticle::getCollection,collection + 1);
            mess.setAdd(1);
        } else {
            //取消收藏
            log.info("用戶取消收藏文章...");
            luw.set(ApArticle::getCollection,collection - 1);
            mess.setAdd(-1);
        }

        //調(diào)用Kafka發(fā)送消息
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));

        update(luw);
    }

    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

? ? ? ? 這一步主要是當用戶對文章進行訪問、點贊、評論或者收藏時候就會更新數(shù)據(jù)庫中的記錄,同時還要將該行為記錄封裝并發(fā)送至Kafka。

2.3:定義Stream實現(xiàn)消息接收并聚合

package com.my.article.stream;

import com.alibaba.fastjson.JSON;
import com.my.common.constans.HotArticleConstants;
import com.my.model.mess.ArticleVisitStreamMess;
import com.my.model.mess.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;

@Configuration
@Slf4j
public class HotArticleStreamHandler {

    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
        //接收消息
        KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
        //聚合流式處理
        stream.map((key,value)->{
            UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
            //重置消息的key:1234343434   和  value: likes:1
            return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());
        })
                //按照文章id進行聚合
                .groupBy((key,value)->key)
                //時間窗口  每十秒聚合一次
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                /*
                  自行地完成聚合的計算
                 */
                .aggregate(new Initializer<String>() {
                    /**
                     * 初始方法,返回值是消息的value
                     */
                    @Override
                    public String apply() {
                        return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
                    }
                    /*
                      真正的聚合操作,返回值是消息的value
                     */
                }, new Aggregator<String, String, String>() {
                    /**
                     * 聚合并返回
                     * @param key  文章id
                     * @param value  重置后的value  ps:likes:1
                     * @param aggValue "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0"
                     * @return  aggValue格式
                     */
                    @Override
                    public String apply(String key, String value, String aggValue) {
                        //用戶沒有進行任何操作
                        if(StringUtils.isBlank(value)){
                            return aggValue;
                        }
                        String[] aggAry = aggValue.split(",");
                        //收藏、評論、點贊、閱讀量初始值
                        int col = 0,com=0,lik=0,vie=0;
                        for (String agg : aggAry) {
                            //for --> COLLECTION:0
                            String[] split = agg.split(":");
                            //split[0]:COLLECTION,split[1]:0
                            /*
                              獲得初始值,也是時間窗口內(nèi)計算之后的值
                              第一次獲取到的值為0
                             */
                            switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
                                case COLLECTION:
                                    col = Integer.parseInt(split[1]);
                                    break;
                                case COMMENT:
                                    com = Integer.parseInt(split[1]);
                                    break;
                                case LIKES:
                                    lik = Integer.parseInt(split[1]);
                                    break;
                                case VIEWS:
                                    vie = Integer.parseInt(split[1]);
                                    break;
                            }
                        }
                        /*
                          累加操作
                         */
                        String[] valAry = value.split(":");
                        switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){
                            case COLLECTION:
                                col += Integer.parseInt(valAry[1]);
                                break;
                            case COMMENT:
                                com += Integer.parseInt(valAry[1]);
                                break;
                            case LIKES:
                                lik += Integer.parseInt(valAry[1]);
                                break;
                            case VIEWS:
                                vie += Integer.parseInt(valAry[1]);
                                break;
                        }

                        String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);
                        log.info("文章的id:{}",key);
                        log.info("當前時間窗口內(nèi)的消息處理結(jié)果:{}",formatStr);

                        //必須返回和apply()的返回類型
                        return formatStr;
                    }
                }, Materialized.as("hot-article-stream-count-001"))
                .toStream()
                .map((key,value)->{
                    return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));
                })
                //發(fā)送消息
                .to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);

        return stream;


    }

    /**
     * 格式化消息的value數(shù)據(jù)
     * @param articleId  文章id
     * @param value  聚合結(jié)果
     * @return  String
     */
    public String formatObj(String articleId,String value){
        ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
        mess.setArticleId(Long.valueOf(articleId));
        //COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
        String[] valAry = value.split(",");
        for (String val : valAry) {
            String[] split = val.split(":");
            switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
                case COLLECTION:
                    mess.setCollect(Integer.parseInt(split[1]));
                    break;
                case COMMENT:
                    mess.setComment(Integer.parseInt(split[1]));
                    break;
                case LIKES:
                    mess.setLike(Integer.parseInt(split[1]));
                    break;
                case VIEWS:
                    mess.setView(Integer.parseInt(split[1]));
                    break;
            }
        }
        log.info("聚合消息處理之后的結(jié)果為:{}",JSON.toJSONString(mess));
        return JSON.toJSONString(mess);
    }
}

????????這一步是最難但是也是最重要的,首先我們接收到消息之后需要先對其key和value進行重置,因為這時候接收到的數(shù)據(jù)是一個JSON字符串格式的UpdateArticleMess對象,我們需要將其重置為key value鍵值對的格式。也即將其格式轉(zhuǎn)化成key為文章id,value為用戶行為記錄,如key:182738789987,value:LIKES:1,表示用戶對該文章點贊一次。隨后選擇對文章id進行聚合,每10秒鐘聚合一次,需要注意的是,apply()函數(shù)中返回結(jié)構(gòu)必須是“COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0”格式。

2.4:重新計算文章分值并更新Redis緩存數(shù)據(jù)

@Service
@Transactional
@Slf4j
public class ApArticleServiceImpl extends ServiceImpl<ApArticleMapper, ApArticle> implements ApArticleService {
    /**
     * 更新文章分值,同時更新redis中熱點文章數(shù)據(jù)
     * @param mess
     */
    @Override
    public void updateScore(ArticleVisitStreamMess mess) {
        //1.獲取文章數(shù)據(jù)
        ApArticle apArticle = getById(mess.getArticleId());
        //2.計算文章分值
        Integer score = computeScore(apArticle);
        score = score * 3;

        //3.替換當前文章對應(yīng)頻道熱點數(shù)據(jù)
        replaceDataToRedis(apArticle,score,ArticleConstas.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId());

        //4.替換推薦頻道文章熱點數(shù)據(jù)
        replaceDataToRedis(apArticle,score,ArticleConstas.HOT_ARTICLE_FIRST_PAGE + ArticleConstas.DEFAULT_TAG);
    }

    /**
     * 根據(jù)權(quán)重計算文章分值
     * @param apArticle
     * @return
     */
    private Integer computeScore(ApArticle apArticle) {
        Integer score = 0;
        if(apArticle.getLikes() != null){
            score += apArticle.getLikes() * ArticleConstas.HOT_ARTICLE_LIKE_WEIGHT;
        }
        if(apArticle.getViews() != null){
            score += apArticle.getViews();
        }
        if(apArticle.getComment() != null){
            score += apArticle.getComment() * ArticleConstas.HOT_ARTICLE_COMMENT_WEIGHT;
        }
        if(apArticle.getCollection() != null){
            score += apArticle.getCollection() * ArticleConstas.HOT_ARTICLE_COLLECTION_WEIGHT;
        }

        return score;
    }

    /**
     * 替換數(shù)據(jù)并存入到redis
     * @param apArticle 文章信息
     * @param score 文章新的得分
     * @param key redis數(shù)據(jù)的key值
     */
    private void replaceDataToRedis(ApArticle apArticle,Integer score, String key) {
        String articleListStr = cacheService.get(key);
        if(StringUtils.isNotBlank(articleListStr)) {
            List<HotArticleVo> hotArticleVos = JSON.parseArray(articleListStr, HotArticleVo.class);

            boolean flag = true;

            //如果緩存中存在該文章,直接更新文章分值
            for (HotArticleVo hotArticleVo : hotArticleVos) {
                if(hotArticleVo.getId().equals(apArticle.getId())) {
                    if(key.equals(ArticleConstas.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId())) {
                        log.info("頻道{}緩存中存在該文章,文章{}分值更新{}-->{}",apArticle.getChannelName(),apArticle.getId(),hotArticleVo.getScore(),score);
                    } else {
                        log.info("推薦頻道緩存中存在該文章,文章{}分值更新{}-->{}",apArticle.getId(),hotArticleVo.getScore(),score);
                    }
                    hotArticleVo.setScore(score);
                    flag = false;
                    break;
                }
            }

            //如果緩存中不存在該文章
            if(flag) {
                //緩存中熱點文章數(shù)少于30,直接增加
                if(hotArticleVos.size() < 30) {
                    log.info("該文章{}不在緩存,但是文章數(shù)少于30,直接添加",apArticle.getId());
                    HotArticleVo hotArticleVo = new HotArticleVo();
                    BeanUtils.copyProperties(apArticle,hotArticleVo);
                    hotArticleVo.setScore(score);
                    hotArticleVos.add(hotArticleVo);
                } else {
                    //緩存中熱點文章數(shù)大于或等于30
                    //1.排序
                    hotArticleVos = hotArticleVos.stream().sorted(Comparator.
                            comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
                    //2.獲取最小得分值
                    HotArticleVo minScoreHotArticleVo = hotArticleVos.get(hotArticleVos.size() - 1);
                    if(minScoreHotArticleVo.getScore() <= score) {
                        //3.移除分值最小文章
                        log.info("替換分值最小的文章...");
                        hotArticleVos.remove(minScoreHotArticleVo);
                        HotArticleVo hotArticleVo = new HotArticleVo();
                        BeanUtils.copyProperties(apArticle,hotArticleVo);
                        hotArticleVo.setScore(score);
                        hotArticleVos.add(hotArticleVo);
                    }
                }
            }

            //重新排序并緩存到redis
            hotArticleVos = hotArticleVos.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed())
                    .collect(Collectors.toList());
            cacheService.set(key,JSON.toJSONString(hotArticleVos));
            if(key.equals(ArticleConstas.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId())) {
                log.info("成功刷新{}頻道中熱點文章緩存數(shù)據(jù)",apArticle.getChannelName());
            } else {
                log.info("成功刷新推薦頻道中熱點文章緩存數(shù)據(jù)");
            }
        }
    }
}

????????這一步主要是邏輯處理部分,在這里我們需要完成對文章的得分進行重新計算并根據(jù)計算結(jié)果更新Redis中的緩存數(shù)據(jù)。計算到得分之后,我們需要分別對不同頻道和推薦頻道進行處理,但是處理流程相同。首先我們會先判斷緩存中的數(shù)據(jù)有沒有滿30條,如果沒滿則直接該文章添加到緩存中作為熱榜文章;如果緩存中已滿30條數(shù)據(jù),這時候就要分兩種情況處理,如果緩存中存在該文章數(shù)據(jù),則直接對其得分進行更新,如若不然則需要將該文章分值與緩存中的最低分進行比較,如果改文章得分比最低分高則直接進行替換,否則不做處理。最后還需要對緩存中的數(shù)據(jù)重新排序并再次發(fā)送到Reids中。

2.5:設(shè)置監(jiān)聽類

package com.my.article.listener;

import com.alibaba.fastjson.JSON;
import com.my.article.service.ApArticleService;
import com.my.common.constans.HotArticleConstants;
import com.my.model.mess.ArticleVisitStreamMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ArticleIncrHandleListener {

    @Autowired
    private ApArticleService apArticleService;

    @KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)
    public void onMessage(String mess){
        if(StringUtils.isNotBlank(mess)){
            ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);
            apArticleService.updateScore(articleVisitStreamMess);
        }
    }
}

三:功能測試

打開App端對一篇文章進行瀏覽并點贊收藏

kafka streams 中,哪個方法用于創(chuàng)建一個 kstream 對象,# 微服務(wù)學(xué)習筆記,JAVA學(xué)習筆記,微服務(wù),spring boot,java,kafka

到控制臺查看日志信息?

kafka streams 中,哪個方法用于創(chuàng)建一個 kstream 對象,# 微服務(wù)學(xué)習筆記,JAVA學(xué)習筆記,微服務(wù),spring boot,java,kafka

????????可以看到?成功記錄用戶行為并且將文章得分進行了更改,其處理流程是這樣的,首先接收到的是用戶的點贊數(shù)據(jù),隨后接收到用戶的瀏覽記錄,最后接收到的是用戶的收藏記錄,由于前面提到的消息處理是增加而不是更新,所以最后我們可以看到時間窗口處理結(jié)果為COLLECTION:1,COMMENT:0,LIKES:1,VIEWS:1,10秒鐘之后就會對消息進行聚合,假如這10秒之內(nèi)還有其他用戶也進行了點贊閱讀操作,這時候就會繼續(xù)將消息增加在原來處理結(jié)果上面,過了10秒之后就會進行一次聚合處理,也即拿著這批數(shù)據(jù)進行數(shù)據(jù)更新操作。

至此該項目的開發(fā)就告一段落了,后續(xù)有什么優(yōu)化我會再發(fā)文介紹。

友情鏈接:???途W(wǎng)? 刷題|面試|找工作神器文章來源地址http://www.zghlxwxcb.cn/news/detail-780527.html

到了這里,關(guān)于猿創(chuàng)征文 | 項目整合KafkaStream實現(xiàn)文章熱度實時計算的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 猿創(chuàng)征文 | Shell編程【上篇】

    猿創(chuàng)征文 | Shell編程【上篇】

    目錄 1,Shell編程 1.1:簡介 1.1.1:shell解釋器 1.2:快速入門 1.2.1:編寫腳本 1.2.2:執(zhí)行shell腳本 1.3:shell變量 1.3.1:簡介 1.3.2:使用變量 1.3.3:刪除變量 1.3.4:只讀變量 ?1.4:字符串 1.4.1:單引號 1.4.2:雙引號? 1.4.3:獲取字符串長度? ?1.4.4:提取子字符串 ?1.5:傳遞參數(shù) 1

    2024年02月02日
    瀏覽(24)
  • 以太坊是什么?|猿創(chuàng)征文

    以太坊是什么?|猿創(chuàng)征文

    以太坊是一個可編程、可視化、更易用的區(qū)塊鏈,它允許任何人編寫智能合約和發(fā)行代幣。 在以太坊(Ethereum)出現(xiàn)之前,各種區(qū)塊鏈應(yīng)用的功能非常有限,例如,比特幣和其他加密貨幣都只是純粹的數(shù)字貨幣。 以太坊(Ethereum)創(chuàng)始人Vitalik Buterin將以太坊(Ethereum)設(shè)想為開發(fā)人員

    2024年02月02日
    瀏覽(23)
  • 猿創(chuàng)征文|【HTML】標簽學(xué)習之路

    猿創(chuàng)征文|【HTML】標簽學(xué)習之路

    ?? 目錄 一、HTML語法規(guī)范 1.基本語法概述 2.標簽關(guān)系 二、HTML基本結(jié)構(gòu)標簽 1.第一個HTML頁面 2.HTML基本結(jié)構(gòu)標簽總結(jié) 1.基本語法概述 html是由尖括號包圍的,列如: html 。 html標簽通常是成對出現(xiàn)的,列如:html和/html,我們稱為 雙標簽 。標簽對里的第一個標簽是開始標

    2024年01月16日
    瀏覽(25)
  • 猿創(chuàng)征文|ZooKeeper(偽)集群搭建

    猿創(chuàng)征文|ZooKeeper(偽)集群搭建

    前言:zookeeper作為一款分布式協(xié)調(diào)中間件,其重要性不言而喻,因此需要保證其高可用性。所以一般都會搭建zookeeper集群,今天葉秋帶領(lǐng)大家在一臺服務(wù)器上搭建偽集群。 目錄 1、 搭建要求 2、 準備工作 3、 配置集群 ?4 啟動集群 ?5 模擬集群異常 1、 搭建要求 真實的集群是

    2024年02月01日
    瀏覽(30)
  • 猿創(chuàng)征文| redis基本數(shù)據(jù)類型

    猿創(chuàng)征文| redis基本數(shù)據(jù)類型

    ??個人主頁:不斷前進的皮卡丘 ??博客描述:夢想也許遙不可及,但重要的是追夢的過程,用博客記錄自己的成長,記錄自己一步一步向上攀登的印記 ??個人專欄:微服務(wù)專欄 ??redis常見的操作命令:http://www.redis.cn/commands.html 命令 功能 keys * 查看當前庫的所有key exists key 判斷

    2023年04月08日
    瀏覽(18)
  • 猿創(chuàng)征文|【深度學(xué)習前沿應(yīng)用】文本生成

    猿創(chuàng)征文|【深度學(xué)習前沿應(yīng)用】文本生成

    作者簡介 :在校大學(xué)生一枚,C/C++領(lǐng)域新星創(chuàng)作者,華為云享專家,阿里云專家博主,騰云先鋒(TDP)成員,云曦智劃項目總負責人,全國高等學(xué)校計算機教學(xué)與產(chǎn)業(yè)實踐資源建設(shè)專家委員會(TIPCC)志愿者,以及編程愛好者,期待和大家一起學(xué)習,一起進步~ . 博客主頁 :

    2024年02月06日
    瀏覽(17)
  • 猿創(chuàng)征文|“云“創(chuàng)新展望:數(shù)據(jù)之浩瀚

    猿創(chuàng)征文|“云“創(chuàng)新展望:數(shù)據(jù)之浩瀚

    ??wei_shuo的個人主頁 ??wei_shuo的學(xué)習社區(qū) ??Hello World ! AWS亞馬遜云科技提供全球覆蓋廣泛、服務(wù)深入的云平臺,全球數(shù)據(jù)中心提供超過 200 項功能齊全的服務(wù) 連續(xù) 11 年被 Gartner 評為\\\"全球云計算領(lǐng)導(dǎo)者\\\" ;2021 年全新 Gartner 魔力象限中被評為\\\"云基礎(chǔ)設(shè)施與平臺服務(wù)(Iaas Pa

    2023年04月24日
    瀏覽(24)
  • 猿創(chuàng)征文 | Solidity 智能合約技術(shù)成長之路

    猿創(chuàng)征文 | Solidity 智能合約技術(shù)成長之路

    Solidity 是鏈上智能合約的開發(fā)語言,鏈上智能合約相當于傳統(tǒng)行業(yè)的后端,鏈上應(yīng)用基本都是由合約 + 前端組成的,雖然不推薦,但部分鏈上應(yīng)用也會加入后端進行數(shù)據(jù)存儲,以降低用戶的使用成本。 Solidity 這門開發(fā)語言并不復(fù)雜,只需要您稍微有一點兒編程基礎(chǔ),英文詞匯

    2024年01月23日
    瀏覽(23)
  • 猿創(chuàng)征文|Hadoop大數(shù)據(jù)技術(shù)綜合實驗

    猿創(chuàng)征文|Hadoop大數(shù)據(jù)技術(shù)綜合實驗

    當前互聯(lián)網(wǎng)應(yīng)用中,萬維網(wǎng)(World Wide Web)應(yīng)用占據(jù)了絕大部分的份額。萬維網(wǎng)應(yīng)用對外提供服務(wù)需要架設(shè)Web服務(wù)器軟件。典型的Web服務(wù)器軟件有Apache、Nginx等。Web服務(wù)器軟件在運行過程中會寫入各種日志到磁盤文件中。例如,Apache Web服務(wù)器軟件運行過程中,會產(chǎn)生access.log文

    2024年02月03日
    瀏覽(31)
  • 猿創(chuàng)征文|【云原生】學(xué)習云原生經(jīng)驗分享

    猿創(chuàng)征文|【云原生】學(xué)習云原生經(jīng)驗分享

    博主昵稱:跳樓梯企鵝 博主主頁面鏈接: 博主主頁傳送門 博主專欄頁面連接: 專欄傳送門--網(wǎng)路安全技術(shù) 創(chuàng)作初心:本博客的初心為與技術(shù)朋友們相互交流,每個人的技術(shù)都存在短板,博主也是一樣,虛心求教,希望各位技術(shù)友給予指導(dǎo)。 博主座右銘:發(fā)現(xiàn)光,追隨光,

    2024年02月02日
    瀏覽(46)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包