SpringBoot對(duì)接kafka,批量、并發(fā)、異步獲取消息,并動(dòng)態(tài)、批量插入庫表
?更多優(yōu)秀文章,請(qǐng)掃碼關(guān)注個(gè)人微信公眾號(hào)或搜索“程序猿小楊”添加。
一、背景
? ? ? ? 因業(yè)務(wù)發(fā)展需要,需要對(duì)接kafka,快速批量接收消息日志,避免消息日志累積過多,必須做到數(shù)據(jù)處理后,動(dòng)態(tài)插入到庫表(相同表結(jié)構(gòu),不同表名)下,并且還要支持批量事務(wù)提交,實(shí)現(xiàn)消息快速消費(fèi)。(注意:源碼文章最后有獲取方式)
二、核心代碼
2.1、開啟批量、并發(fā)消費(fèi)
kafka:
????bootstrap-servers:?10.1.*.*:9092?????#服務(wù)器的ip及端口,可以寫多個(gè),服務(wù)器之間用“:”間隔
????producer:?#生產(chǎn)者配置?
??????key-serializer:?org.apache.kafka.common.serialization.StringSerializer
??????value-serializer:?org.apache.kafka.common.serialization.StringSerializer
????consumer:?#消費(fèi)者配置
??????#指定默認(rèn)消費(fèi)者group?id?-->?由于在kafka中,同一組中的consumer不會(huì)讀取到同一個(gè)消息,依靠groud.id設(shè)置組名
??????group-id:?myGroup?????????????????#設(shè)置消費(fèi)者的組id?default:Group
??????enable-auto-commit:?true??#設(shè)置自動(dòng)提交offset
??????auto-commit-interval:?2000??#默認(rèn)值為5000
??????key-deserializer:?org.apache.kafka.common.serialization.StringDeserializer
??????#值的反序列化方式
??????value-serializer:?org.apache.kafka.common.serialization.StringSerializer
??????auto-offset-reset:?latest
??????max-poll-records:?2000??#批量一次最大拉取數(shù)據(jù)量?默認(rèn)500
????listener:
??????#?poll-timeout:?1000
??????type:?batch??#?開啟批量消費(fèi)
??????concurrency:?3??#指定listener?容器中的線程數(shù),用于提高并發(fā)量
????properties:
??????session:
????????timeout:
??????????ms:?120000??#默認(rèn)10000
????????max:
??????????poll:
????????????interval:
??????????????ms:?600000??#默認(rèn)300000(5分鐘)
? ? ? ?說明:type:?batch??#?開啟批量消費(fèi),?max-poll-records:?2000,批量消費(fèi)每次最多消費(fèi)記錄數(shù)。這里設(shè)置?max-poll-records是2000,并不是說如果沒有達(dá)到2000條消息,我們就一直等待。而是說一次poll最多返回的記錄數(shù)為2000。concurrency:?3??#指定listener?容器中的線程數(shù),用于提高并發(fā)量。注意:并發(fā)量根據(jù)實(shí)際分區(qū)數(shù)決定,必須小于等于分區(qū)數(shù),否則會(huì)有線程一直處于空閑狀態(tài)。例如:設(shè)置concurrency為3,也就是將會(huì)啟動(dòng)3條線程進(jìn)行監(jiān)聽,而要監(jiān)聽的topic有5個(gè)partition,意味著將有2條線程都是分配到2個(gè)partition,還有1條線程分配到1個(gè)partition。
2.2、多線程異步配置
????具體配置參加前面文章:SpringBoot使用@Async實(shí)現(xiàn)多線程異步
????注意:在啟動(dòng)類上需要加上注解@EnableAsync,開啟異步。
2.3、redis相關(guān)配置
1、yml相關(guān)配置:
spring:
??redis:
????#?地址
????host:?127.0.0.1
????#?端口,默認(rèn)為6379
????port:?6379
????#?密碼
????#?連接超時(shí)時(shí)間
????timeout:?10s
????lettuce:
??????pool:
????????#?連接池中的最小空閑連接
????????min-idle:?0
????????#?連接池中的最大空閑連接
????????max-idle:?8
????????#?連接池的最大數(shù)據(jù)庫連接數(shù)
????????max-active:?8
????????#?#連接池最大阻塞等待時(shí)間(使用負(fù)值表示沒有限制)
????????max-wait:?-1ms
2、RedisConfig配置
package?com.wonders.config;
import?com.fasterxml.jackson.annotation.JsonAutoDetect;
import?com.fasterxml.jackson.annotation.PropertyAccessor;
import?com.fasterxml.jackson.databind.ObjectMapper;
import?org.springframework.cache.annotation.CachingConfigurerSupport;
import?org.springframework.context.annotation.Bean;
import?org.springframework.context.annotation.Configuration;
import?org.springframework.data.redis.connection.RedisConnectionFactory;
import?org.springframework.data.redis.core.RedisTemplate;
import?org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import?org.springframework.data.redis.serializer.StringRedisSerializer;
/**
?*?〈自定義redis序列化方式〉
?*?@author?yangyalin
?*?@create?2018/11/1
?*?@since?1.0.0
?*/
@Configuration
public?class?RedisConfig?extends?CachingConfigurerSupport?{
????/**
?????*?@Author?yangyalin
?????*?@Description?redisTemplate序列化使用的jdkSerializeable,?存儲(chǔ)二進(jìn)制字節(jié)碼(默認(rèn)),?所以自定義序列化類
?????*?用于存儲(chǔ)可視化內(nèi)容
?????*?@Date?15:07?2018/11/1
?????*?@Param?[redisConnectionFactory]
?????*?@return?org.springframework.data.redis.core.RedisTemplate<java.lang.Object,java.lang.Object>
?????**/
????@Bean
????public?RedisTemplate<Object,?Object>?redisTemplate(RedisConnectionFactory?redisConnectionFactory){
????????RedisTemplate<Object,Object>?redisTemplate=new?RedisTemplate();
????????redisTemplate.setConnectionFactory(redisConnectionFactory);
????????//使用jackson2JsonRedisSerializer替換默認(rèn)序列化
????????Jackson2JsonRedisSerializer?jackson2JsonRedisSerializer=new?Jackson2JsonRedisSerializer(Object.class);
????????ObjectMapper?objectMapper=new?ObjectMapper();
????????objectMapper.setVisibility(PropertyAccessor.ALL,?JsonAutoDetect.Visibility.ANY);
????????objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
????????jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
????????//設(shè)置key和value的序列化規(guī)則
????????redisTemplate.setKeySerializer(new?StringRedisSerializer());
????????redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
????????redisTemplate.setHashKeySerializer(jackson2JsonRedisSerializer);
????????redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
????????redisTemplate.afterPropertiesSet();
????????return?redisTemplate;
????}
}
2.4、動(dòng)態(tài)表名
????<!--插入到kafka日志臨時(shí)表中-->
????<insert?id="insertMsgInfoTemp"?parameterType="com.wonders.entity.KafkaMsgConfig">
??????INSERT?INTO?${logTableName}("EVN_LOG_ID",?"TABLE_NAME",?"OPERATION",?"PK_VALUE1",?"PK_VALUE2",
???????????"PK_VALUE3",?"PK_VALUE4",?"PK_VALUE5",?"TRANS_FLAG",?"PKS",?"BASE_CODE",?"PLA_BRANCH_CODE",
???????????"CREATE_TIME","MSG_PRODUCE_TIME")
??????VALUES?(#{id,jdbcType=VARCHAR},?#{tableName,jdbcType=VARCHAR},?#{operation,jdbcType=VARCHAR},
????????????#{pk1,jdbcType=VARCHAR},?#{pk2,jdbcType=VARCHAR},#{pk3,jdbcType=VARCHAR},
????????????#{pk4,jdbcType=VARCHAR},#{pk5,jdbcType=VARCHAR},?'Y',
????????????#{pks,jdbcType=VARCHAR},?#{baseCode,jdbcType=VARCHAR},
????????????#{plaBranchCode,jdbcType=VARCHAR},sysdate,#{msgProduceTime,jdbcType=VARCHAR})
????</insert>
????說明:1、#{}?:會(huì)根據(jù)參數(shù)的類型進(jìn)行處理,當(dāng)傳入String類型,則會(huì)為參數(shù)加上雙引號(hào)(占位符);2、${}?:將參數(shù)取出不做任何處理,直接放入語句中,就是簡(jiǎn)單的字符串替換(替換符)。
2.5、sql批量提交
public?void?batchInsert(List<KafkaMsgInfo>?kafkaMsgInfoList)?throws?Exception{
????????//如果自動(dòng)提交設(shè)置為true,將無法控制提交的條數(shù),改為最后統(tǒng)一提交
????????//?創(chuàng)建session實(shí)列
????????SqlSessionFactory?sqlSessionFactory?=?ApplicationContextUtils.getBean("sqlSessionFactory");
????????//?開啟批量處理模式?BATCH?、關(guān)閉自動(dòng)提交事務(wù)?false
????????SqlSession?sqlSession?=?sqlSessionFactory.openSession(ExecutorType.BATCH,false);
????????KafkaMsgConfigMapper?KafkaMsgMapper?=?sqlSession.getMapper(KafkaMsgConfigMapper.class);
????????int?BATCH?=?1000;
????????for?(int?i?=?0,size=kafkaMsgInfoList.size();?i?<?size;?i++)?{
????????????//循環(huán)插入?+?開啟批處理模式
????????????KafkaMsgMapper.insertKafkaMsgInfo(kafkaMsgInfoList.get(i));
????????????if?(i?!=?0?&&?i?%?BATCH?==?0)?{
????????????????sqlSession?.commit();
????????????}
????????}
????????//?一次性提交事務(wù)
????????sqlSession.commit();
????????//?關(guān)閉資源
????????sqlSession.close();
????}
2.6、業(yè)務(wù)代碼
?@KafkaListener(topics?=?{"${mykafka.topics:mytopic}"})
????public?void?myMQConsumer(List<String>?msgList){
????????log.info("接收到的消息條數(shù)size:"+msgList.size());
????????//計(jì)算程序耗時(shí)時(shí)間
????????StopWatch?stopWatch?=?new?StopWatch();
????????//?開始計(jì)時(shí)
????????stopWatch.start();
????????this.getKafkaMsgAndDel(msgList);??//2、接收kafka日志并解析
????????stopWatch.stop();
????????log.info("本次任務(wù)耗時(shí)(秒):"?+?stopWatch.getLastTaskTimeMillis()/1000?+?"s");
????}
三、測(cè)試結(jié)果
序號(hào) | kafka數(shù)量(萬條) | 消耗(秒) |
1 | 1 | 3 |
2 | 10 | 13 |
3 | 100 | 120 |
?
更多詳細(xì)資料,請(qǐng)關(guān)注個(gè)人微信公眾號(hào)或搜索“程序猿小楊”添加。
回復(fù):源碼,可以獲取該項(xiàng)目對(duì)應(yīng)的源碼及表結(jié)構(gòu),開箱即可使用。
推薦文章:
????1、SpringBoot使用@Async實(shí)現(xiàn)多線程異步;
????2、SpringBoot用線程池ThreadPoolTaskExecutor異步處理百萬級(jí)數(shù)據(jù);文章來源:http://www.zghlxwxcb.cn/news/detail-682758.html
????3、SpringBoot用線程池ThreadPoolExecutor處理百萬級(jí)數(shù)據(jù)。文章來源地址http://www.zghlxwxcb.cn/news/detail-682758.html
到了這里,關(guān)于SpringBoot對(duì)接kafka,批量、并發(fā)、異步獲取消息,并動(dòng)態(tài)、批量插入庫表的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!