前言
概念詞就不多說(shuō)了,我簡(jiǎn)單地介紹下 , spring batch 是一個(gè) 方便使用的 較健全的 批處理 框架。
為什么說(shuō)是方便使用的,因?yàn)檫@是 基于spring的一個(gè)框架,接入簡(jiǎn)單、易理解、流程分明。
為什么說(shuō)是較健全的, 因?yàn)樗峁┝送N覀冊(cè)趯?duì)大批量數(shù)據(jù)進(jìn)行處理時(shí)需要考慮到的 日志跟蹤、事務(wù)粒度調(diào)配、可控執(zhí)行、失敗機(jī)制、重試機(jī)制、數(shù)據(jù)讀寫等。
正文
那么回到文章,我們?cè)撈恼聦?huì)帶來(lái)給大家的是什么?(結(jié)合實(shí)例講解那是當(dāng)然的)
從實(shí)現(xiàn)的業(yè)務(wù)場(chǎng)景來(lái)說(shuō),有以下兩個(gè):
-
從 ?csv文件 讀取數(shù)據(jù),進(jìn)行業(yè)務(wù)處理再存儲(chǔ)
-
從 數(shù)據(jù)庫(kù) 讀取數(shù)據(jù),進(jìn)行業(yè)務(wù)處理再存儲(chǔ)
也就是平時(shí)經(jīng)常遇到的數(shù)據(jù)清理或者數(shù)據(jù)過(guò)濾,又或者是數(shù)據(jù)遷移備份等等。大批量的數(shù)據(jù),自己實(shí)現(xiàn)分批處理需要考慮的東西太多了,又不放心,那么使用 Spring Batch 框架 是一個(gè)很好的選擇。
首先,在進(jìn)入實(shí)例教程前,我們看看這次的實(shí)例里,我們使用springboot 整合spring batch 框架,要編碼的東西有什么?
通過(guò)一張簡(jiǎn)單的圖來(lái)了解:
可能大家看到這個(gè)圖,是不是多多少少想起來(lái)定時(shí)任務(wù)框架?確實(shí)有那么點(diǎn)像,但是我必須在這告訴大家,這是一個(gè)批處理框架,不是一個(gè)schuedling 框架。但是前面提到它提供了可執(zhí)行控制,也就是說(shuō),啥時(shí)候執(zhí)行是可控的,那么顯然就是自己可以進(jìn)行擴(kuò)展結(jié)合定時(shí)任務(wù)框架,實(shí)現(xiàn)你心中所想。
ok,回到主題,相信大家能從圖中簡(jiǎn)單明了地看到我們這次實(shí)例,需要實(shí)現(xiàn)的東西有什么了。所以我就不在對(duì)各個(gè)小組件進(jìn)行大批量文字的描述了。
那么我們事不宜遲,開始我們的實(shí)例教程。
首先準(zhǔn)備一個(gè)數(shù)據(jù)庫(kù),里面建一張簡(jiǎn)單的表,用于實(shí)例數(shù)據(jù)的寫入存儲(chǔ)或者說(shuō)是讀取等等。
bloginfo表
相關(guān)建表sql語(yǔ)句:
CREATE?TABLE?`bloginfo`??(
??`id`?int(11)?NOT?NULL?AUTO_INCREMENT?COMMENT?'主鍵',
??`blogAuthor`?varchar(255)?CHARACTER?SET?utf8?COLLATE?utf8_general_ci?NULL?DEFAULT?NULL?COMMENT?'博客作者標(biāo)識(shí)',
??`blogUrl`?varchar(255)?CHARACTER?SET?utf8?COLLATE?utf8_general_ci?NULL?DEFAULT?NULL?COMMENT?'博客鏈接',
??`blogTitle`?varchar(255)?CHARACTER?SET?utf8?COLLATE?utf8_general_ci?NULL?DEFAULT?NULL?COMMENT?'博客標(biāo)題',
??`blogItem`?varchar(255)?CHARACTER?SET?utf8?COLLATE?utf8_general_ci?NULL?DEFAULT?NULL?COMMENT?'博客欄目',
??PRIMARY?KEY?(`id`)?USING?BTREE
)?ENGINE?=?InnoDB?AUTO_INCREMENT?=?89031?CHARACTER?SET?=?utf8?COLLATE?=?utf8_general_ci?ROW_FORMAT?=?Dynamic;
pom文件里的核心依賴:
<dependency>
????<groupId>org.springframework.boot</groupId>
????<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
????<groupId>org.springframework.boot</groupId>
????<artifactId>spring-boot-starter-test</artifactId>
????<scope>test</scope>
</dependency>
<!--??spring?batch?-->
<dependency>
????<groupId>org.springframework.boot</groupId>
????<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!--?hibernate?validator?-->
<dependency>
????<groupId>org.hibernate</groupId>
????<artifactId>hibernate-validator</artifactId>
????<version>6.0.7.Final</version>
</dependency>
<!--??mybatis?-->
<dependency>
????<groupId>org.mybatis.spring.boot</groupId>
????<artifactId>mybatis-spring-boot-starter</artifactId>
????<version>2.0.0</version>
</dependency>
<!--??mysql?-->
<dependency>
????<groupId>mysql</groupId>
????<artifactId>mysql-connector-java</artifactId>
????<scope>runtime</scope>
</dependency>
<!--?druid數(shù)據(jù)源驅(qū)動(dòng)?1.1.10解決springboot從1.0——2.0版本問(wèn)題-->
<dependency>
????<groupId>com.alibaba</groupId>
????<artifactId>druid-spring-boot-starter</artifactId>
????<version>1.1.18</version>
</dependency>
yml文件:
spring:
??batch:
????job:
#設(shè)置為?false?-需要jobLaucher.run執(zhí)行
??????enabled:?false
????initialize-schema:?always
#????table-prefix:?my-batch
?
??datasource:
????druid:
??????username:?root
??????password:?root
??????url:?jdbc:mysql://localhost:3306/hellodemo?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull
??????driver-class-name:?com.mysql.cj.jdbc.Driver
??????initialSize:?5
??????minIdle:?5
??????maxActive:?20
??????maxWait:?60000
??????timeBetweenEvictionRunsMillis:?60000
??????minEvictableIdleTimeMillis:?300000
??????validationQuery:?SELECT?1?FROM?DUAL
??????testWhileIdle:?true
??????testOnBorrow:?false
??????testOnReturn:?false
??????poolPreparedStatements:?true
??????maxPoolPreparedStatementPerConnectionSize:?20
??????useGlobalDataSourceStat:?true
??????connectionProperties:?druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
server:
??port:?8665
ps:這里我們用到了druid數(shù)據(jù)庫(kù)連接池,其實(shí)有個(gè)小坑,后面文章會(huì)講到。
因?yàn)槲覀冞@次的實(shí)例最終數(shù)據(jù)處理完之后,是寫入數(shù)據(jù)庫(kù)存儲(chǔ)(當(dāng)然你也可以輸出到文件等等)。
所以我們前面也建了一張表,pom文件里面我們也整合的mybatis,那么我們?cè)谡蟬pring batch 主要編碼前,我們先把這些關(guān)于數(shù)據(jù)庫(kù)打通用到的簡(jiǎn)單過(guò)一下。
pojo 層
BlogInfo.java :
/**
?*?@Author?:?JCccc
?*?@Description?:
?**/
public?class?BlogInfo?{
?
????private?Integer?id;
????private?String?blogAuthor;
????private?String?blogUrl;
????private?String?blogTitle;
????private?String?blogItem;
?
????@Override
????public?String?toString()?{
????????return?"BlogInfo{"?+
????????????????"id="?+?id?+
????????????????",?blogAuthor='"?+?blogAuthor?+?'\''?+
????????????????",?blogUrl='"?+?blogUrl?+?'\''?+
????????????????",?blogTitle='"?+?blogTitle?+?'\''?+
????????????????",?blogItem='"?+?blogItem?+?'\''?+
????????????????'}';
????}
?
????public?Integer?getId()?{
????????return?id;
????}
?
????public?void?setId(Integer?id)?{
????????this.id?=?id;
????}
?
????public?String?getBlogAuthor()?{
????????return?blogAuthor;
????}
?
????public?void?setBlogAuthor(String?blogAuthor)?{
????????this.blogAuthor?=?blogAuthor;
????}
?
????public?String?getBlogUrl()?{
????????return?blogUrl;
????}
?
????public?void?setBlogUrl(String?blogUrl)?{
????????this.blogUrl?=?blogUrl;
????}
?
????public?String?getBlogTitle()?{
????????return?blogTitle;
????}
?
????public?void?setBlogTitle(String?blogTitle)?{
????????this.blogTitle?=?blogTitle;
????}
?
????public?String?getBlogItem()?{
????????return?blogItem;
????}
?
????public?void?setBlogItem(String?blogItem)?{
????????this.blogItem?=?blogItem;
????}
}
mapper層
BlogMapper.java :
ps:可以看到這個(gè)實(shí)例我用的是注解的方式,哈哈為了省事,而且我還不寫servcie層和impl層,也是為了省事,因?yàn)樵撈恼轮攸c(diǎn)不在這些,所以這些不好的大家不要學(xué)。
import?com.example.batchdemo.pojo.BlogInfo;
import?org.apache.ibatis.annotations.*;
import?java.util.List;
import?java.util.Map;
?
/**
?*?@Author?:?JCccc
?*?@Description?:
?**/
@Mapper
public?interface?BlogMapper?{
????@Insert("INSERT?INTO?bloginfo?(?blogAuthor,?blogUrl,?blogTitle,?blogItem?)???VALUES?(?#{blogAuthor},?#{blogUrl},#{blogTitle},#{blogItem})?")
????@Options(useGeneratedKeys?=?true,?keyProperty?=?"id")
????int?insert(BlogInfo?bloginfo);
?
?
????@Select("select?blogAuthor,?blogUrl,?blogTitle,?blogItem?from?bloginfo?where?blogAuthor?<?#{authorId}")
?????List<BlogInfo>?queryInfoById(Map<String?,?Integer>?map);
?
}
接下來(lái) ,重頭戲,我們開始對(duì)前邊那張圖里涉及到的各個(gè)小組件進(jìn)行編碼。
首先創(chuàng)建一個(gè) 配置類,?MyBatchConfig.java
:
從我起名來(lái)看,可以知道這基本就是咱們整合spring batch 涉及到的一些配置組件都會(huì)寫在這里了。
首先我們按照咱們上面的圖來(lái)看,里面包含內(nèi)容有:
JobRepository?job的注冊(cè)/存儲(chǔ)器
JobLauncher?job的執(zhí)行器?
Job?job任務(wù),包含一個(gè)或多個(gè)Step
Step?包含(ItemReader、ItemProcessor和ItemWriter)?
ItemReader?數(shù)據(jù)讀取器?
ItemProcessor?數(shù)據(jù)處理器
ItemWriter?數(shù)據(jù)輸出器
首先,在MyBatchConfig類前加入注解:
@Configuration
??用于告訴spring,咱們這個(gè)類是一個(gè)自定義配置類,里面很多bean都需要加載到spring容器里面
@EnableBatchProcessing
?開啟批處理支持
然后開始往MyBatchConfig類里,編寫各個(gè)小組件。
JobRepository
寫在MyBatchConfig類里
/**
?* JobRepository定義:Job的注冊(cè)容器以及和數(shù)據(jù)庫(kù)打交道(事務(wù)管理等)
?*?@param?dataSource
?*?@param?transactionManager
?*?@return
?*?@throws?Exception
?*/
@Bean
public?JobRepository?myJobRepository(DataSource?dataSource,?PlatformTransactionManager?transactionManager)?throws?Exception{
????JobRepositoryFactoryBean?jobRepositoryFactoryBean?=?new?JobRepositoryFactoryBean();
????jobRepositoryFactoryBean.setDatabaseType("mysql");
????jobRepositoryFactoryBean.setTransactionManager(transactionManager);
????jobRepositoryFactoryBean.setDataSource(dataSource);
????return?jobRepositoryFactoryBean.getObject();
}
JobLauncher
寫在MyBatchConfig類里
/**
?* jobLauncher定義:job的啟動(dòng)器,綁定相關(guān)的jobRepository
?*?@param?dataSource
?*?@param?transactionManager
?*?@return
?*?@throws?Exception
?*/
@Bean
public?SimpleJobLauncher?myJobLauncher(DataSource?dataSource,?PlatformTransactionManager?transactionManager)?throws?Exception{
????SimpleJobLauncher?jobLauncher?=?new?SimpleJobLauncher();
????//?設(shè)置jobRepository
????jobLauncher.setJobRepository(myJobRepository(dataSource,?transactionManager));
????return?jobLauncher;
}
Job
寫在MyBatchConfig類里
/**
?*?定義job
?*?@param?jobs
?*?@param?myStep
?*?@return
?*/
@Bean
public?Job?myJob(JobBuilderFactory?jobs,?Step?myStep){
????return?jobs.get("myJob")
????????????.incrementer(new?RunIdIncrementer())
????????????.flow(myStep)
????????????.end()
????????????.listener(myJobListener())
????????????.build();
}
對(duì)于Job的運(yùn)行,是可以配置監(jiān)聽器的
JobListener
寫在MyBatchConfig類里
/**
?*?注冊(cè)job監(jiān)聽器
?*?@return
?*/
@Bean
public?MyJobListener?myJobListener(){
????return?new?MyJobListener();
}
這是一個(gè)我們自己自定義的監(jiān)聽器,所以是單獨(dú)創(chuàng)建的,MyJobListener.java
:
/**
?*?@Author?:?JCccc
?*?@Description?:監(jiān)聽Job執(zhí)行情況,實(shí)現(xiàn)JobExecutorListener,且在batch配置類里,Job的Bean上綁定該監(jiān)聽器
?**/
?
public?class?MyJobListener?implements?JobExecutionListener?{
?
????private?Logger?logger?=?LoggerFactory.getLogger(MyJobListener.class);
?
????@Override
????public?void?beforeJob(JobExecution?jobExecution)?{
????????logger.info("job?開始,?id={}",jobExecution.getJobId());
????}
?
????@Override
????public?void?afterJob(JobExecution?jobExecution)?{
????????logger.info("job?結(jié)束,?id={}",jobExecution.getJobId());
????}
}
Step(ItemReader ? ItemProcessor ? ItemWriter)
step里面包含數(shù)據(jù)讀取器,數(shù)據(jù)處理器,數(shù)據(jù)輸出器三個(gè)小組件的的實(shí)現(xiàn)。
我們也是一個(gè)個(gè)拆解來(lái)進(jìn)行編寫。
文章前邊說(shuō)到,該篇實(shí)現(xiàn)的場(chǎng)景包含兩種,一種是從csv文件讀入大量數(shù)據(jù)進(jìn)行處理,另一種是從數(shù)據(jù)庫(kù)表讀入大量數(shù)據(jù)進(jìn)行處理。
從CSV文件讀取數(shù)據(jù)
ItemReader
寫在MyBatchConfig類里
/**
?* ItemReader定義:讀取文件數(shù)據(jù)+entirty實(shí)體類映射
?*?@return
?*/
@Bean
public?ItemReader<BlogInfo>?reader(){
????//?使用FlatFileItemReader去讀cvs文件,一行即一條數(shù)據(jù)
????FlatFileItemReader<BlogInfo>?reader?=?new?FlatFileItemReader<>();
????//?設(shè)置文件處在路徑
????reader.setResource(new?ClassPathResource("static/bloginfo.csv"));
????//?entity與csv數(shù)據(jù)做映射
????reader.setLineMapper(new?DefaultLineMapper<BlogInfo>()?{
????????{
????????????setLineTokenizer(new?DelimitedLineTokenizer()?{
????????????????{
????????????????????setNames(new?String[]{"blogAuthor","blogUrl","blogTitle","blogItem"});
????????????????}
????????????});
????????????setFieldSetMapper(new?BeanWrapperFieldSetMapper<BlogInfo>()?{
????????????????{
????????????????????setTargetType(BlogInfo.class);
????????????????}
????????????});
????????}
????});
????return?reader;
}
簡(jiǎn)單代碼解析:
對(duì)于數(shù)據(jù)讀取器 ItemReader ,我們給它安排了一個(gè)讀取監(jiān)聽器,創(chuàng)建?MyReadListener.java
?:
/**
?*?@Author?:?JCccc
?*?@Description?:
?**/
?
public?class?MyReadListener?implements?ItemReadListener<BlogInfo>?{
?
????private?Logger?logger?=?LoggerFactory.getLogger(MyReadListener.class);
?
????@Override
????public?void?beforeRead()?{
????}
?
????@Override
????public?void?afterRead(BlogInfo?item)?{
????}
?
????@Override
????public?void?onReadError(Exception?ex)?{
????????try?{
????????????logger.info(format("%s%n",?ex.getMessage()));
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}
????}
}
ItemProcessor
寫在MyBatchConfig類里
/**
?*?注冊(cè)ItemProcessor:?處理數(shù)據(jù)+校驗(yàn)數(shù)據(jù)
?*?@return
?*/
@Bean
public?ItemProcessor<BlogInfo,?BlogInfo>?processor(){
????MyItemProcessor?myItemProcessor?=?new?MyItemProcessor();
????//?設(shè)置校驗(yàn)器
????myItemProcessor.setValidator(myBeanValidator());
????return?myItemProcessor;
}
數(shù)據(jù)處理器,是我們自定義的,里面主要是包含我們對(duì)數(shù)據(jù)處理的業(yè)務(wù)邏輯,并且我們?cè)O(shè)置了一些數(shù)據(jù)校驗(yàn)器,我們這里使用 JSR-303的Validator來(lái)作為校驗(yàn)器。
校驗(yàn)器
寫在MyBatchConfig類里
/**
?*?注冊(cè)校驗(yàn)器
?*?@return
?*/
@Bean
public?MyBeanValidator?myBeanValidator(){
????return?new?MyBeanValidator<BlogInfo>();
}
創(chuàng)建MyItemProcessor.java
?:
ps:里面我的數(shù)據(jù)處理邏輯是,獲取出讀取數(shù)據(jù)里面的每條數(shù)據(jù)的blogItem字段,如果是springboot,那就對(duì)title字段值進(jìn)行替換。
其實(shí)也就是模擬一個(gè)簡(jiǎn)單地?cái)?shù)據(jù)處理場(chǎng)景。
import?com.example.batchdemo.pojo.BlogInfo;
import?org.springframework.batch.item.validator.ValidatingItemProcessor;
import?org.springframework.batch.item.validator.ValidationException;
?
/**
?*?@Author?:?JCccc
?*?@Description?:
?**/
public?class?MyItemProcessor?extends?ValidatingItemProcessor<BlogInfo>?{
????@Override
????public?BlogInfo?process(BlogInfo?item)?throws?ValidationException?{
????????/**
?????????*?需要執(zhí)行super.process(item)才會(huì)調(diào)用自定義校驗(yàn)器
?????????*/
????????super.process(item);
????????/**
?????????*?對(duì)數(shù)據(jù)進(jìn)行簡(jiǎn)單的處理
?????????*/
????????if?(item.getBlogItem().equals("springboot"))?{
????????????item.setBlogTitle("springboot?系列還請(qǐng)看看我Jc");
????????}?else?{
????????????item.setBlogTitle("未知系列");
????????}
????????return?item;
????}
}
創(chuàng)建MyBeanValidator.java:
import?org.springframework.batch.item.validator.ValidationException;
import?org.springframework.batch.item.validator.Validator;
import?org.springframework.beans.factory.InitializingBean;
import?javax.validation.ConstraintViolation;
import?javax.validation.Validation;
import?javax.validation.ValidatorFactory;
import?java.util.Set;
?
/**
?*?@Author?:?JCccc
?*?@Description?:
?**/
public?class?MyBeanValidator<T>?implements?Validator<T>,?InitializingBean?{
?
????private?javax.validation.Validator?validator;
?
????@Override
????public?void?validate(T?value)?throws?ValidationException?{
????????/**
?????????*?使用Validator的validate方法校驗(yàn)數(shù)據(jù)
?????????*/
????????Set<ConstraintViolation<T>>?constraintViolations?=
????????????????validator.validate(value);
????????if?(constraintViolations.size()?>?0)?{
????????????StringBuilder?message?=?new?StringBuilder();
????????????for?(ConstraintViolation<T>?constraintViolation?:?constraintViolations)?{
????????????????message.append(constraintViolation.getMessage()?+?"\n");
????????????}
????????????throw?new?ValidationException(message.toString());
????????}
????}
?
????/**
?????*?使用JSR-303的Validator來(lái)校驗(yàn)我們的數(shù)據(jù),在此進(jìn)行JSR-303的Validator的初始化
?????*?@throws?Exception
?????*/
????@Override
????public?void?afterPropertiesSet()?throws?Exception?{
????????ValidatorFactory?validatorFactory?=
????????????????Validation.buildDefaultValidatorFactory();
????????validator?=?validatorFactory.usingContext().getValidator();
????}
?
}
ps:其實(shí)該篇文章沒(méi)有使用這個(gè)數(shù)據(jù)校驗(yàn)器,大家想使用的話,可以在實(shí)體類上添加一些校驗(yàn)器的注解@NotNull @Max @Email等等。我偏向于直接在處理器里面進(jìn)行處理,想把關(guān)于數(shù)據(jù)處理的代碼都寫在一塊。
ItemWriter
寫在MyBatchConfig類里
/**
?* ItemWriter定義:指定datasource,設(shè)置批量插入sql語(yǔ)句,寫入數(shù)據(jù)庫(kù)
?*?@param?dataSource
?*?@return
?*/
@Bean
public?ItemWriter<BlogInfo>?writer(DataSource?dataSource){
????//?使用jdbcBcatchItemWrite寫數(shù)據(jù)到數(shù)據(jù)庫(kù)中
????JdbcBatchItemWriter<BlogInfo>?writer?=?new?JdbcBatchItemWriter<>();
????//?設(shè)置有參數(shù)的sql語(yǔ)句
????writer.setItemSqlParameterSourceProvider(new?BeanPropertyItemSqlParameterSourceProvider<BlogInfo>());
????String?sql?=?"insert?into?bloginfo?"+"?(blogAuthor,blogUrl,blogTitle,blogItem)?"
????????????+"?values(:blogAuthor,:blogUrl,:blogTitle,:blogItem)";
????writer.setSql(sql);
????writer.setDataSource(dataSource);
????return?writer;
}
簡(jiǎn)單代碼解析:
同樣 對(duì)于數(shù)據(jù)讀取器 ItemWriter ,我們給它也安排了一個(gè)輸出監(jiān)聽器,創(chuàng)建?MyWriteListener.java
:
import?com.example.batchdemo.pojo.BlogInfo;
import?org.slf4j.Logger;
import?org.slf4j.LoggerFactory;
import?org.springframework.batch.core.ItemWriteListener;
import?java.util.List;
import?static?java.lang.String.format;
?
/**
?*?@Author?:?JCccc
?*?@Description?:
?**/
public?class?MyWriteListener?implements?ItemWriteListener<BlogInfo>?{
????private?Logger?logger?=?LoggerFactory.getLogger(MyWriteListener.class);
?
????@Override
????public?void?beforeWrite(List<??extends?BlogInfo>?items)?{
????}
?
????@Override
????public?void?afterWrite(List<??extends?BlogInfo>?items)?{
????}
?
????@Override
????public?void?onWriteError(Exception?exception,?List<??extends?BlogInfo>?items)?{
????????try?{
????????????logger.info(format("%s%n",?exception.getMessage()));
????????????for?(BlogInfo?message?:?items)?{
????????????????logger.info(format("Failed?writing?BlogInfo?:?%s",?message.toString()));
????????????}
?
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}
?
????}
}
ItemReader
、ItemProcessor
、ItemWriter
,這三個(gè)小組件到這里,我們都實(shí)現(xiàn)了,那么接下來(lái)就是把這三個(gè)小組件跟我們的step去綁定起來(lái)。
寫在MyBatchConfig類里
/**
?* step定義:
?*?包括
?*?ItemReader?讀取
?*?ItemProcessor??處理
?*?ItemWriter?輸出
?*?@param?stepBuilderFactory
?*?@param?reader
?*?@param?writer
?*?@param?processor
?*?@return
?*/
@Bean
public?Step?myStep(StepBuilderFactory?stepBuilderFactory,?ItemReader<BlogInfo>?reader,
?????????????????ItemWriter<BlogInfo>?writer,?ItemProcessor<BlogInfo,?BlogInfo>?processor){
????return?stepBuilderFactory
????????????.get("myStep")
????????????.<BlogInfo,?BlogInfo>chunk(65000)?//?Chunk的機(jī)制(即每次讀取一條數(shù)據(jù),再處理一條數(shù)據(jù),累積到一定數(shù)量后再一次性交給writer進(jìn)行寫入操作)
????????????.reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2)
????????????.listener(new?MyReadListener())
????????????.processor(processor)
????????????.writer(writer).faultTolerant().skip(Exception.class).skipLimit(2)
????????????.listener(new?MyWriteListener())
????????????.build();
}
這個(gè)Step,稍作講解。
前邊提到了,spring batch框架,提供了事務(wù)的控制,重啟,檢測(cè)跳過(guò)等等機(jī)制。
那么,這些東西的實(shí)現(xiàn),很多都在于這個(gè)step環(huán)節(jié)的設(shè)置。
首先看到我們代碼出現(xiàn)的第一個(gè)設(shè)置,chunk( 6500 )?
,Chunk的機(jī)制(即每次讀取一條數(shù)據(jù),再處理一條數(shù)據(jù),累積到一定數(shù)量后再一次性交給writer進(jìn)行寫入操作。
沒(méi)錯(cuò),對(duì)于整個(gè)step環(huán)節(jié),就是數(shù)據(jù)的讀取,處理最后到輸出。
這個(gè)chunk機(jī)制里,我們傳入的 6500,也就是是告訴它,讀取處理數(shù)據(jù),累計(jì)達(dá)到 6500條進(jìn)行一次批次處理,去執(zhí)行寫入操作。
這個(gè)傳值,是根據(jù)具體業(yè)務(wù)而定,可以是500條一次,1000條一次,也可以是20條一次,50條一次。
通過(guò)一張簡(jiǎn)單的小圖來(lái)幫助理解:
在我們大量數(shù)據(jù)處理,不管是讀取或者說(shuō)是寫入,都肯定會(huì)涉及到一些未知或者已知因素導(dǎo)致某條數(shù)據(jù)失敗了。
那么如果說(shuō)咱們啥也不設(shè)置,失敗一條數(shù)據(jù),那么我們就當(dāng)作整個(gè)失敗了?。顯然這個(gè)太不人性,所以spring batch 提供了 retry 和 skip 兩個(gè)設(shè)置(其實(shí)還有restart) ,通過(guò)這兩個(gè)設(shè)置來(lái)人性化地解決一些數(shù)據(jù)操作失敗場(chǎng)景。
retryLimit(3).retry(Exception.class)??
沒(méi)錯(cuò),這個(gè)就是設(shè)置重試,當(dāng)出現(xiàn)異常的時(shí)候,重試多少次。我們?cè)O(shè)置為3,也就是說(shuō)當(dāng)一條數(shù)據(jù)操作失敗,那我們會(huì)對(duì)這條數(shù)據(jù)進(jìn)行重試3次,還是失敗就是 當(dāng)做失敗了, 那么我們?nèi)绻信渲胹kip(推薦配置使用),那么這個(gè)數(shù)據(jù)失敗記錄就會(huì)留到給 skip 來(lái)處理。
skip(Exception.class).skipLimit(2)??
skip,跳過(guò),也就是說(shuō)我們?nèi)绻O(shè)置3, 那么就是可以容忍 3條數(shù)據(jù)的失敗。只有達(dá)到失敗數(shù)據(jù)達(dá)到3次,我們才中斷這個(gè)step。
對(duì)于失敗的數(shù)據(jù),我們做了相關(guān)的監(jiān)聽器以及異常信息記錄,供與后續(xù)手動(dòng)補(bǔ)救。
那么記下來(lái)我們開始去調(diào)用這個(gè)批處理job,我們通過(guò)接口去觸發(fā)這個(gè)批處理事件,新建一個(gè)Controller,TestController.java
:
/**
?*?@Author?:?JCccc
?*?@Description?:
?**/
@RestController
public?class?TestController?{
????@Autowired
????SimpleJobLauncher?jobLauncher;
?
????@Autowired
????Job?myJob;
?
????@GetMapping("testJob")
????public??void?testJob()?throws?JobParametersInvalidException,?JobExecutionAlreadyRunningException,?JobRestartException,?JobInstanceAlreadyCompleteException?{
?????//????后置參數(shù):使用JobParameters中綁定參數(shù) addLong addString 等方法
????????JobParameters?jobParameters?=?new?JobParametersBuilder().toJobParameters();
????????jobLauncher.run(myJob,?jobParameters);
?
????}
}
對(duì)了,我準(zhǔn)備了一個(gè)csv文件?bloginfo.csv
,里面大概8萬(wàn)多條數(shù)據(jù),用來(lái)進(jìn)行批處理測(cè)試:
這個(gè)文件的路徑跟我們的數(shù)據(jù)讀取器里面讀取的路徑要一直,
目前我們數(shù)據(jù)庫(kù)是這個(gè)樣子,
接下來(lái)我們把我們的項(xiàng)目啟動(dòng)起來(lái),再看一眼數(shù)據(jù)庫(kù),生成了一些batch用來(lái)跟蹤記錄job的一些數(shù)據(jù)表:
我們來(lái)調(diào)用一下testJob接口,
然后看下數(shù)據(jù)庫(kù),可以看的數(shù)據(jù)全部都進(jìn)行了相關(guān)的邏輯處理并插入到了數(shù)據(jù)庫(kù):
到這里,我們對(duì)Springboot 整合 spring batch 其實(shí)已經(jīng)操作完畢了,也實(shí)現(xiàn)了從csv文件讀取數(shù)據(jù)處理存儲(chǔ)的業(yè)務(wù)場(chǎng)景。
從數(shù)據(jù)庫(kù)讀取數(shù)據(jù)
ps:前排提示使用druid有坑。后面會(huì)講到。
那么接下來(lái)實(shí)現(xiàn)場(chǎng)景,從數(shù)據(jù)庫(kù)表內(nèi)讀取數(shù)據(jù)進(jìn)行處理輸出到新的表里面。
那么基于我們上邊的整合,我們已經(jīng)實(shí)現(xiàn)了
JobRepository?job的注冊(cè)/存儲(chǔ)器
JobLauncher?job的執(zhí)行器?
Job?job任務(wù),包含一個(gè)或多個(gè)Step
Step?包含(ItemReader、ItemProcessor和ItemWriter)?
ItemReader?數(shù)據(jù)讀取器?
ItemProcessor?數(shù)據(jù)處理器
ItemWriter?數(shù)據(jù)輸出器
job?監(jiān)聽器
reader?監(jiān)聽器
writer?監(jiān)聽器
process?數(shù)據(jù)校驗(yàn)器
那么對(duì)于我們新寫一個(gè)job完成 一個(gè)新的場(chǎng)景,我們需要全部重寫么?
顯然沒(méi)必要,當(dāng)然完全新寫一套也是可以的。
那么該篇,對(duì)于一個(gè)新的也出場(chǎng)景,從csv文件讀取數(shù)據(jù)轉(zhuǎn)換到數(shù)據(jù)庫(kù)表讀取數(shù)據(jù),我們重新新建的有:
-
數(shù)據(jù)讀取器:??原先使用的是?
FlatFileItemReader
?,我們現(xiàn)在改為使用?MyBatisCursorItemReader
-
數(shù)據(jù)處理器:??新的場(chǎng)景,業(yè)務(wù)為了好擴(kuò)展,所以我們處理器最好也新建一個(gè)
-
數(shù)據(jù)輸出器:?? ?新的場(chǎng)景,業(yè)務(wù)為了好擴(kuò)展,所以我們數(shù)據(jù)輸出器最好也新建一個(gè)
-
step的綁定設(shè)置:?新的場(chǎng)景,業(yè)務(wù)為了好擴(kuò)展,所以我們step最好也新建一個(gè)
-
Job:??當(dāng)然是要重新寫一個(gè)了
其他我們照用原先的就行,JobRepository,JobLauncher以及各種監(jiān)聽器啥的,暫且不重新建了。
新建MyItemProcessorNew.java
:
import?org.springframework.batch.item.validator.ValidatingItemProcessor;
import?org.springframework.batch.item.validator.ValidationException;
?
/**
?*?@Author?:?JCccc
?*?@Description?:
?**/
public?class?MyItemProcessorNew?extends?ValidatingItemProcessor<BlogInfo>?{
????@Override
????public?BlogInfo?process(BlogInfo?item)?throws?ValidationException?{
????????/**
?????????*?需要執(zhí)行super.process(item)才會(huì)調(diào)用自定義校驗(yàn)器
?????????*/
????????super.process(item);
????????/**
?????????*?對(duì)數(shù)據(jù)進(jìn)行簡(jiǎn)單的處理
?????????*/
????????Integer?authorId=?Integer.valueOf(item.getBlogAuthor());
????????if?(authorId<20000)?{
????????????item.setBlogTitle("這是都是小于20000的數(shù)據(jù)");
????????}?else?if?(authorId>20000?&&?authorId<30000){
????????????item.setBlogTitle("這是都是小于30000但是大于20000的數(shù)據(jù)");
????????}else?{
????????????item.setBlogTitle("舊書不厭百回讀");
????????}
????????return?item;
????}
}
然后其他重新定義的小組件,寫在MyBatchConfig類里:
/**
?*?定義job
?*?@param?jobs
?*?@param?stepNew
?*?@return
?*/
@Bean
public?Job?myJobNew(JobBuilderFactory?jobs,?Step?stepNew){
????return?jobs.get("myJobNew")
????????????.incrementer(new?RunIdIncrementer())
????????????.flow(stepNew)
????????????.end()
????????????.listener(myJobListener())
????????????.build();
}
@Bean
public?Step?stepNew(StepBuilderFactory?stepBuilderFactory,?MyBatisCursorItemReader<BlogInfo>?itemReaderNew,
????????????????????ItemWriter<BlogInfo>?writerNew,?ItemProcessor<BlogInfo,?BlogInfo>?processorNew){
????return?stepBuilderFactory
????????????.get("stepNew")
????????????.<BlogInfo,?BlogInfo>chunk(65000)?//?Chunk的機(jī)制(即每次讀取一條數(shù)據(jù),再處理一條數(shù)據(jù),累積到一定數(shù)量后再一次性交給writer進(jìn)行寫入操作)
????????????.reader(itemReaderNew).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(10)
????????????.listener(new?MyReadListener())
????????????.processor(processorNew)
????????????.writer(writerNew).faultTolerant().skip(Exception.class).skipLimit(2)
????????????.listener(new?MyWriteListener())
????????????.build();
}
@Bean
public?ItemProcessor<BlogInfo,?BlogInfo>?processorNew(){
????MyItemProcessorNew?csvItemProcessor?=?new?MyItemProcessorNew();
????//?設(shè)置校驗(yàn)器
????csvItemProcessor.setValidator(myBeanValidator());
????return?csvItemProcessor;
}
@Autowired
private?SqlSessionFactory?sqlSessionFactory;
@Bean
@StepScope
//Spring Batch提供了一個(gè)特殊的bean scope類(StepScope:作為一個(gè)自定義的Spring bean scope)。這個(gè)step scope的作用是連接batches的各個(gè)steps。這個(gè)機(jī)制允許配置在Spring的beans當(dāng)steps開始時(shí)才實(shí)例化并且允許你為這個(gè)step指定配置和參數(shù)。
public?MyBatisCursorItemReader<BlogInfo>?itemReaderNew(@Value("#{jobParameters[authorId]}")?String?authorId)?{
????????System.out.println("開始查詢數(shù)據(jù)庫(kù)");
????????MyBatisCursorItemReader<BlogInfo>?reader?=?new?MyBatisCursorItemReader<>();
????????reader.setQueryId("com.example.batchdemo.mapper.BlogMapper.queryInfoById");
????????reader.setSqlSessionFactory(sqlSessionFactory);
?????????Map<String?,?Object>?map?=?new?HashMap<>();
??????????map.put("authorId"?,?Integer.valueOf(authorId));
?????????reader.setParameterValues(map);
????????return?reader;
}
/**
?* ItemWriter定義:指定datasource,設(shè)置批量插入sql語(yǔ)句,寫入數(shù)據(jù)庫(kù)
?*?@param?dataSource
?*?@return
?*/
@Bean
public?ItemWriter<BlogInfo>?writerNew(DataSource?dataSource){
????//?使用jdbcBcatchItemWrite寫數(shù)據(jù)到數(shù)據(jù)庫(kù)中
????JdbcBatchItemWriter<BlogInfo>?writer?=?new?JdbcBatchItemWriter<>();
????//?設(shè)置有參數(shù)的sql語(yǔ)句
????writer.setItemSqlParameterSourceProvider(new?BeanPropertyItemSqlParameterSourceProvider<BlogInfo>());
????String?sql?=?"insert?into?bloginfonew?"+"?(blogAuthor,blogUrl,blogTitle,blogItem)?"
????????????+"?values(:blogAuthor,:blogUrl,:blogTitle,:blogItem)";
????writer.setSql(sql);
????writer.setDataSource(dataSource);
????return?writer;
}
代碼需要注意的點(diǎn)
數(shù)據(jù)讀取器?MyBatisCursorItemReader
對(duì)應(yīng)的mapper方法:
數(shù)據(jù)處理器 MyItemProcessorNew:
數(shù)據(jù)輸出器,新插入到別的數(shù)據(jù)庫(kù)表去,特意這樣為了測(cè)試:
當(dāng)然我們的數(shù)據(jù)庫(kù)為了測(cè)試這個(gè)場(chǎng)景,也是新建了一張表,bloginfonew 表。
接下來(lái),我們新寫一個(gè)接口來(lái)執(zhí)行新的這個(gè)job:
@Autowired
SimpleJobLauncher?jobLauncher;
@Autowired
Job?myJobNew;
@GetMapping("testJobNew")
public??void?testJobNew(@RequestParam("authorId")?String?authorId)?throws?JobParametersInvalidException,?JobExecutionAlreadyRunningException,?JobRestartException,?JobInstanceAlreadyCompleteException?{
????JobParameters?jobParametersNew?=?new?JobParametersBuilder().addLong("timeNew",?System.currentTimeMillis())
????????????.addString("authorId",authorId)
????????????.toJobParameters();
????jobLauncher.run(myJobNew,jobParametersNew);
}
ok,我們來(lái)調(diào)用一些這個(gè)接口:
看下控制臺(tái):
沒(méi)錯(cuò),這就是失敗的,原因是因?yàn)楦鷇ruid有關(guān),報(bào)了一個(gè)數(shù)據(jù)庫(kù)功能不支持。這是在數(shù)據(jù)讀取的時(shí)候報(bào)的錯(cuò)。
我初步測(cè)試認(rèn)為是MyBatisCursorItemReader
?,druid 數(shù)據(jù)庫(kù)連接池不支持。
那么,我們只需要:
-
注釋掉druid連接池 jar依賴
-
yml里替換連接池配置
其實(shí)我們不配置其他連接池,springboot 2.X 版本已經(jīng)為我們整合了默認(rèn)的連接池 HikariCP 。
在Springboot2.X版本,數(shù)據(jù)庫(kù)的連接池官方推薦使用HikariCP
如果不是為了druid的那些后臺(tái)監(jiān)控?cái)?shù)據(jù),sql分析等等,完全是優(yōu)先使用HikariCP的。
官方的原話:
We preferHikariCPfor its performance and concurrency. If HikariCP is available, we always choose it.
翻譯:
我們更喜歡hikaricpf的性能和并發(fā)性。如果有HikariCP,我們總是選擇它。
所以我們就啥連接池也不配了,使用默認(rèn)的HikariCP 連接池。
當(dāng)然你想配,也是可以的:
所以我們剔除掉druid鏈接池后,我們?cè)賮?lái)調(diào)用一下新接口:
可以看到,從數(shù)據(jù)庫(kù)獲取數(shù)據(jù)并進(jìn)行批次處理寫入job是成功的:
新的表里面插入的數(shù)據(jù)都進(jìn)行了自己寫的邏輯處理:
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-444598.html
好了,springboot 整合 spring batch 批處理框架, 就到此吧。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-444598.html
到了這里,關(guān)于非常強(qiáng),批處理框架 Spring Batch 就該這么用?。▓?chǎng)景實(shí)戰(zhàn))的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!