国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

SpringBoot對(duì)接kafka,批量、并發(fā)、異步獲取消息,并動(dòng)態(tài)、批量插入庫表

這篇具有很好參考價(jià)值的文章主要介紹了SpringBoot對(duì)接kafka,批量、并發(fā)、異步獲取消息,并動(dòng)態(tài)、批量插入庫表。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

SpringBoot對(duì)接kafka,批量、并發(fā)、異步獲取消息,并動(dòng)態(tài)、批量插入庫表

?更多優(yōu)秀文章,請(qǐng)掃碼關(guān)注個(gè)人微信公眾號(hào)或搜索“程序猿小楊”添加。

一、背景

? ? ? ? 因業(yè)務(wù)發(fā)展需要,需要對(duì)接kafka,快速批量接收消息日志,避免消息日志累積過多,必須做到數(shù)據(jù)處理后,動(dòng)態(tài)插入到庫表(相同表結(jié)構(gòu),不同表名)下,并且還要支持批量事務(wù)提交,實(shí)現(xiàn)消息快速消費(fèi)。(注意:源碼文章最后有獲取方式)

spring連接kafka代碼,kafka,java,java,開發(fā)語言,kafka,spring boot,redis

二、核心代碼

2.1、開啟批量、并發(fā)消費(fèi)

kafka:
????bootstrap-servers:?10.1.*.*:9092?????#服務(wù)器的ip及端口,可以寫多個(gè),服務(wù)器之間用“:”間隔
????producer:?#生產(chǎn)者配置?
??????key-serializer:?org.apache.kafka.common.serialization.StringSerializer
??????value-serializer:?org.apache.kafka.common.serialization.StringSerializer
????consumer:?#消費(fèi)者配置
??????#指定默認(rèn)消費(fèi)者group?id?-->?由于在kafka中,同一組中的consumer不會(huì)讀取到同一個(gè)消息,依靠groud.id設(shè)置組名
??????group-id:?myGroup?????????????????#設(shè)置消費(fèi)者的組id?default:Group
??????enable-auto-commit:?true??#設(shè)置自動(dòng)提交offset
??????auto-commit-interval:?2000??#默認(rèn)值為5000
??????key-deserializer:?org.apache.kafka.common.serialization.StringDeserializer
??????#值的反序列化方式
??????value-serializer:?org.apache.kafka.common.serialization.StringSerializer
??????auto-offset-reset:?latest
??????max-poll-records:?2000??#批量一次最大拉取數(shù)據(jù)量?默認(rèn)500
????listener:
??????#?poll-timeout:?1000
??????type:?batch??#?開啟批量消費(fèi)
??????concurrency:?3??#指定listener?容器中的線程數(shù),用于提高并發(fā)量
????properties:
??????session:
????????timeout:
??????????ms:?120000??#默認(rèn)10000
????????max:
??????????poll:
????????????interval:
??????????????ms:?600000??#默認(rèn)300000(5分鐘)

? ? ? ?說明:type:?batch??#?開啟批量消費(fèi),?max-poll-records:?2000,批量消費(fèi)每次最多消費(fèi)記錄數(shù)。這里設(shè)置?max-poll-records是2000,并不是說如果沒有達(dá)到2000條消息,我們就一直等待。而是說一次poll最多返回的記錄數(shù)為2000。concurrency:?3??#指定listener?容器中的線程數(shù),用于提高并發(fā)量。注意:并發(fā)量根據(jù)實(shí)際分區(qū)數(shù)決定,必須小于等于分區(qū)數(shù),否則會(huì)有線程一直處于空閑狀態(tài)。例如:設(shè)置concurrency為3,也就是將會(huì)啟動(dòng)3條線程進(jìn)行監(jiān)聽,而要監(jiān)聽的topic有5個(gè)partition,意味著將有2條線程都是分配到2個(gè)partition,還有1條線程分配到1個(gè)partition。

2.2、多線程異步配置

????具體配置參加前面文章:SpringBoot使用@Async實(shí)現(xiàn)多線程異步

????注意:在啟動(dòng)類上需要加上注解@EnableAsync,開啟異步。

spring連接kafka代碼,kafka,java,java,開發(fā)語言,kafka,spring boot,redis

2.3、redis相關(guān)配置

1、yml相關(guān)配置:

spring:
??redis:
????#?地址
????host:?127.0.0.1
????#?端口,默認(rèn)為6379
????port:?6379
????#?密碼
????#?連接超時(shí)時(shí)間
????timeout:?10s
????lettuce:
??????pool:
????????#?連接池中的最小空閑連接
????????min-idle:?0
????????#?連接池中的最大空閑連接
????????max-idle:?8
????????#?連接池的最大數(shù)據(jù)庫連接數(shù)
????????max-active:?8
????????#?#連接池最大阻塞等待時(shí)間(使用負(fù)值表示沒有限制)
????????max-wait:?-1ms

2、RedisConfig配置

package?com.wonders.config;

import?com.fasterxml.jackson.annotation.JsonAutoDetect;
import?com.fasterxml.jackson.annotation.PropertyAccessor;
import?com.fasterxml.jackson.databind.ObjectMapper;
import?org.springframework.cache.annotation.CachingConfigurerSupport;
import?org.springframework.context.annotation.Bean;
import?org.springframework.context.annotation.Configuration;
import?org.springframework.data.redis.connection.RedisConnectionFactory;
import?org.springframework.data.redis.core.RedisTemplate;
import?org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import?org.springframework.data.redis.serializer.StringRedisSerializer;

/**
?*?〈自定義redis序列化方式〉
?*?@author?yangyalin
?*?@create?2018/11/1
?*?@since?1.0.0
?*/
@Configuration
public?class?RedisConfig?extends?CachingConfigurerSupport?{
????/**
?????*?@Author?yangyalin
?????*?@Description?redisTemplate序列化使用的jdkSerializeable,?存儲(chǔ)二進(jìn)制字節(jié)碼(默認(rèn)),?所以自定義序列化類
?????*?用于存儲(chǔ)可視化內(nèi)容
?????*?@Date?15:07?2018/11/1
?????*?@Param?[redisConnectionFactory]
?????*?@return?org.springframework.data.redis.core.RedisTemplate<java.lang.Object,java.lang.Object>
?????**/
????@Bean
????public?RedisTemplate<Object,?Object>?redisTemplate(RedisConnectionFactory?redisConnectionFactory){
????????RedisTemplate<Object,Object>?redisTemplate=new?RedisTemplate();
????????redisTemplate.setConnectionFactory(redisConnectionFactory);
????????//使用jackson2JsonRedisSerializer替換默認(rèn)序列化
????????Jackson2JsonRedisSerializer?jackson2JsonRedisSerializer=new?Jackson2JsonRedisSerializer(Object.class);
????????ObjectMapper?objectMapper=new?ObjectMapper();
????????objectMapper.setVisibility(PropertyAccessor.ALL,?JsonAutoDetect.Visibility.ANY);
????????objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
????????jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
????????//設(shè)置key和value的序列化規(guī)則
????????redisTemplate.setKeySerializer(new?StringRedisSerializer());
????????redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
????????redisTemplate.setHashKeySerializer(jackson2JsonRedisSerializer);
????????redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
????????redisTemplate.afterPropertiesSet();
????????return?redisTemplate;
????}
}

2.4、動(dòng)態(tài)表名

????<!--插入到kafka日志臨時(shí)表中-->
????<insert?id="insertMsgInfoTemp"?parameterType="com.wonders.entity.KafkaMsgConfig">
??????INSERT?INTO?${logTableName}("EVN_LOG_ID",?"TABLE_NAME",?"OPERATION",?"PK_VALUE1",?"PK_VALUE2",
???????????"PK_VALUE3",?"PK_VALUE4",?"PK_VALUE5",?"TRANS_FLAG",?"PKS",?"BASE_CODE",?"PLA_BRANCH_CODE",
???????????"CREATE_TIME","MSG_PRODUCE_TIME")
??????VALUES?(#{id,jdbcType=VARCHAR},?#{tableName,jdbcType=VARCHAR},?#{operation,jdbcType=VARCHAR},
????????????#{pk1,jdbcType=VARCHAR},?#{pk2,jdbcType=VARCHAR},#{pk3,jdbcType=VARCHAR},
????????????#{pk4,jdbcType=VARCHAR},#{pk5,jdbcType=VARCHAR},?'Y',
????????????#{pks,jdbcType=VARCHAR},?#{baseCode,jdbcType=VARCHAR},
????????????#{plaBranchCode,jdbcType=VARCHAR},sysdate,#{msgProduceTime,jdbcType=VARCHAR})
????</insert>

????說明:1、#{}?:會(huì)根據(jù)參數(shù)的類型進(jìn)行處理,當(dāng)傳入String類型,則會(huì)為參數(shù)加上雙引號(hào)(占位符);2、${}?:將參數(shù)取出不做任何處理,直接放入語句中,就是簡(jiǎn)單的字符串替換(替換符)。

2.5、sql批量提交

public?void?batchInsert(List<KafkaMsgInfo>?kafkaMsgInfoList)?throws?Exception{
????????//如果自動(dòng)提交設(shè)置為true,將無法控制提交的條數(shù),改為最后統(tǒng)一提交
????????//?創(chuàng)建session實(shí)列
????????SqlSessionFactory?sqlSessionFactory?=?ApplicationContextUtils.getBean("sqlSessionFactory");
????????//?開啟批量處理模式?BATCH?、關(guān)閉自動(dòng)提交事務(wù)?false
????????SqlSession?sqlSession?=?sqlSessionFactory.openSession(ExecutorType.BATCH,false);
????????KafkaMsgConfigMapper?KafkaMsgMapper?=?sqlSession.getMapper(KafkaMsgConfigMapper.class);
????????int?BATCH?=?1000;
????????for?(int?i?=?0,size=kafkaMsgInfoList.size();?i?<?size;?i++)?{
????????????//循環(huán)插入?+?開啟批處理模式
????????????KafkaMsgMapper.insertKafkaMsgInfo(kafkaMsgInfoList.get(i));
????????????if?(i?!=?0?&&?i?%?BATCH?==?0)?{
????????????????sqlSession?.commit();
????????????}
????????}
????????//?一次性提交事務(wù)
????????sqlSession.commit();
????????//?關(guān)閉資源
????????sqlSession.close();
????}
2.6、業(yè)務(wù)代碼
?@KafkaListener(topics?=?{"${mykafka.topics:mytopic}"})
????public?void?myMQConsumer(List<String>?msgList){
????????log.info("接收到的消息條數(shù)size:"+msgList.size());
????????//計(jì)算程序耗時(shí)時(shí)間
????????StopWatch?stopWatch?=?new?StopWatch();
????????//?開始計(jì)時(shí)
????????stopWatch.start();
????????this.getKafkaMsgAndDel(msgList);??//2、接收kafka日志并解析
????????stopWatch.stop();
????????log.info("本次任務(wù)耗時(shí)(秒):"?+?stopWatch.getLastTaskTimeMillis()/1000?+?"s");
????}

三、測(cè)試結(jié)果

spring連接kafka代碼,kafka,java,java,開發(fā)語言,kafka,spring boot,redis

序號(hào) kafka數(shù)量(萬條) 消耗(秒)
1 1 3
2 10 13
3 100 120

?

更多詳細(xì)資料,請(qǐng)關(guān)注個(gè)人微信公眾號(hào)或搜索“程序猿小楊”添加。

回復(fù):源碼,可以獲取該項(xiàng)目對(duì)應(yīng)的源碼及表結(jié)構(gòu),開箱即可使用。

spring連接kafka代碼,kafka,java,java,開發(fā)語言,kafka,spring boot,redis

spring連接kafka代碼,kafka,java,java,開發(fā)語言,kafka,spring boot,redis

推薦文章:

????1、SpringBoot使用@Async實(shí)現(xiàn)多線程異步;

????2、SpringBoot用線程池ThreadPoolTaskExecutor異步處理百萬級(jí)數(shù)據(jù);

????3、SpringBoot用線程池ThreadPoolExecutor處理百萬級(jí)數(shù)據(jù)。文章來源地址http://www.zghlxwxcb.cn/news/detail-682758.html

到了這里,關(guān)于SpringBoot對(duì)接kafka,批量、并發(fā)、異步獲取消息,并動(dòng)態(tài)、批量插入庫表的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【注意】Kafka生產(chǎn)者異步發(fā)送消息仍有可能阻塞

    Kafka是常用的消息中間件。在Spring Boot項(xiàng)目中,使用KafkaTemplate作為生產(chǎn)者發(fā)送消息。有時(shí),為了不影響主業(yè)務(wù)流程,會(huì)采用 異步 發(fā)送的方式,如下所示。 本以為采用異步發(fā)送,必然不會(huì)影響到主業(yè)務(wù)流程。但實(shí)際使用時(shí)發(fā)現(xiàn),在第一次發(fā)送消息時(shí),如果Kafka Broker連接失敗,

    2023年04月13日
    瀏覽(26)
  • Kafka是什么,以及如何使用SpringBoot對(duì)接Kafka

    Kafka是什么,以及如何使用SpringBoot對(duì)接Kafka

    上手第一關(guān),手把手教你安裝kafka與可視化工具kafka-eagle 架構(gòu)必備能力——kafka的選型對(duì)比及應(yīng)用場(chǎng)景 Kafka存取原理與實(shí)現(xiàn)分析,打破面試難關(guān) 防止消息丟失與消息重復(fù)——Kafka可靠性分析及優(yōu)化實(shí)踐 繼上一次教大家手把手安裝kafka后,今天我們直接來到入門實(shí)操教程,也就是

    2024年02月08日
    瀏覽(20)
  • Java中如何使用消息隊(duì)列實(shí)現(xiàn)異步(ActiveMQ,RabbitMQ,Kafka)

    在 Java 中,可以使用消息隊(duì)列實(shí)現(xiàn)異步處理。下面是一個(gè)簡(jiǎn)單的示例代碼,用于說明如何使用 ActiveMQ 實(shí)現(xiàn)消息隊(duì)列異步處理: 添加 ActiveMQ 依賴 在 pom.xml 文件中添加以下依賴: 創(chuàng)建消息隊(duì)列 創(chuàng)建一個(gè)名為 “TestQueue” 的消息隊(duì)列,并配置 ActiveMQ 連接信息: 創(chuàng)建消息消費(fèi)者

    2024年02月16日
    瀏覽(33)
  • ChatGPT工作提效之使用python開發(fā)對(duì)接百度地圖開放平臺(tái)API的實(shí)戰(zhàn)方案(批量路線規(guī)劃、批量獲取POI、突破數(shù)量有限制、批量地理編碼)

    ChatGPT工作提效之使用python開發(fā)對(duì)接百度地圖開放平臺(tái)API的實(shí)戰(zhàn)方案(批量路線規(guī)劃、批量獲取POI、突破數(shù)量有限制、批量地理編碼)

    ChatGPT工作提效之初探路徑獨(dú)孤九劍遇強(qiáng)則強(qiáng) ChatGPT工作提效之在程序開發(fā)中的巧勁和指令(創(chuàng)建MySQL語句、PHP語句、Javascript用法、python的交互) ChatGPT工作提效之生成開發(fā)需求和報(bào)價(jià)單并轉(zhuǎn)為Excel格式 ChatGPT工作提效之小鵝通二次開發(fā)批量API對(duì)接解決方案(學(xué)習(xí)記錄同步、用戶注

    2024年02月06日
    瀏覽(24)
  • 【并發(fā)編程】線程池多線程異步去分頁調(diào)用其他服務(wù)接口獲取海量數(shù)據(jù)

    前段時(shí)間在做一個(gè)數(shù)據(jù)同步工具,其中一個(gè)服務(wù)的任務(wù)是調(diào)用A服務(wù)的接口,將數(shù)據(jù)庫中指定數(shù)據(jù)請(qǐng)求過來,交給kafka去判斷哪些數(shù)據(jù)是需要新增,哪些數(shù)據(jù)是需要修改的。 剛開始的設(shè)計(jì)思路是,,我創(chuàng)建多個(gè)服務(wù)同時(shí)去請(qǐng)求A服務(wù)的接口,每個(gè)服務(wù)都請(qǐng)求到全量數(shù)據(jù),由于這些

    2024年02月13日
    瀏覽(32)
  • Spring-Kafka如何實(shí)現(xiàn)批量消費(fèi)消息并且不丟失數(shù)據(jù)

    先給答案: 某個(gè)業(yè)務(wù)對(duì)象由多張表關(guān)聯(lián)而成,要?jiǎng)?chuàng)建該對(duì)象需要向多張表插入數(shù)據(jù),基于canal的監(jiān)控就會(huì)有多次該對(duì)象的變更記錄,而Kafka消費(fèi)的時(shí)候也會(huì)多次處理同一個(gè)對(duì)象(雖然不同表,但是同一個(gè)對(duì)象的不同部分),原有的Kafka消費(fèi)者是一次處理一條,這將造成重復(fù)對(duì)同

    2024年02月13日
    瀏覽(22)
  • Selenium處理異步加載請(qǐng)求獲取XHR消息體的2種方法

    Selenium處理異步加載請(qǐng)求獲取XHR消息體的2種方法

    目錄 通過Log讀取XHR 簡(jiǎn)單使用示例 異步加載情況下,不涉及瀏覽器全局的加載,因此selenium會(huì)直接往下執(zhí)行,這就導(dǎo)致異步結(jié)果還沒返回,腳本就繼續(xù)執(zhí)行了。 構(gòu)造chrome driver: 通過log來獲取xhr: 其中,上述中“message”的消息如下: 通過requestId可以獲得詳細(xì)的消息體: Git

    2023年04月08日
    瀏覽(20)
  • SpringBoot 整合RabbitMq 自定義消息監(jiān)聽容器來實(shí)現(xiàn)消息批量處理

    SpringBoot 整合RabbitMq 自定義消息監(jiān)聽容器來實(shí)現(xiàn)消息批量處理

    RabbitMQ是一種常用的消息隊(duì)列,Spring Boot對(duì)其進(jìn)行了深度的整合,可以快速地實(shí)現(xiàn)消息的發(fā)送和接收。在RabbitMQ中,消息的發(fā)送和接收都是異步的,因此需要使用監(jiān)聽器來監(jiān)聽消息的到來。Spring Boot中提供了默認(rèn)的監(jiān)聽器容器,但是有時(shí)候我們需要自定義監(jiān)聽器容器,來滿足一

    2024年02月16日
    瀏覽(17)
  • SpringBoot異步任務(wù)獲取HttpServletRequest

    在使用框架日常開發(fā)中需要在controller中進(jìn)行一些異步操作減少請(qǐng)求時(shí)間,但是發(fā)現(xiàn)在使用@Anysc注解后會(huì)出現(xiàn)Request對(duì)象無法獲取的情況,本文就此情況給出完整的解決方案 @Anysc注解會(huì)開啟一個(gè)新的線程,主線程的Request和子線程是不共享的,所以獲取為null 在使用springboot的自定

    2024年02月21日
    瀏覽(22)
  • springboot 集成 kafka批量消費(fèi)數(shù)據(jù)

    yaml配置文件

    2024年02月13日
    瀏覽(20)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包