項目背景
在很多與第三方公司對接的時候,或者處在不同的網(wǎng)絡環(huán)境下,比如在互聯(lián)網(wǎng)和政務外網(wǎng)的分布部署服務的時候,我們需要對接多臺kafka來達到我們的業(yè)務需求,那么當kafka存在多數(shù)據(jù)源的情況,就與單機的情況有所不同。
依賴
implementation 'org.springframework.kafka:spring-kafka:2.8.2'
配置
單機的情況
如果是單機的kafka我們直接通過springboot自動配置的就可以使用,例如在yml里面直接引用
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
bootstrap-servers: server001.bbd:9092
在使用的時候直接注入,然后就可以使用里面的方法了
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
多數(shù)據(jù)源情況下文章來源:http://www.zghlxwxcb.cn/news/detail-649560.html
本篇文章主要講的是在多數(shù)據(jù)源下的使用,和單機的有所不同,我也看了網(wǎng)上的一些博客,但是當我去按照網(wǎng)上的配置的時候,總是會報錯 kafakTemplate
這個bean
找不到,所以沒辦法只有按照springboot自動配置里面的來改文章來源地址http://www.zghlxwxcb.cn/news/detail-649560.html
package com.ddb.zggz.config;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import java.io.IOException;
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConfiguration {
private final KafkaProperties properties;
private final KafkaSecondProperties kafkaSecondProperties;
public KafkaConfiguration(KafkaProperties properties, KafkaSecondProperties kafkaSecondProperties) {
this.properties = properties;
this.kafkaSecondProperties = kafkaSecondProperties;
}
@Bean("kafkaTemplate")
@Primary
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
@Bean("kafkaSecondTemplate")
public KafkaTemplate<?, ?> kafkaSecondTemplate(@Qualifier("kafkaSecondProducerFactory") ProducerFactory<Object, Object> kafkaProducerFactory,
@Qualifier("kafkaSecondProducerListener") ProducerListener<Object, Object> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
@Bean("kafkaProducerListener")
@Primary
public ProducerListener<Object, Object> kafkaProducerListener() {
return new LoggingProducerListener<>();
}
@Bean("kafkaSecondProducerListener")
public ProducerListener<Object, Object> kafkaSecondProducerListener() {
return new LoggingProducerListener<>();
}
@Bean("kafkaConsumerFactory")
@Primary
public ConsumerFactory<Object, Object> kafkaConsumerFactory(
ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(
this.properties.buildConsumerProperties());
customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
return factory;
}
@Bean("kafkaSecondConsumerFactory")
public ConsumerFactory<Object, Object> kafkaSecondConsumerFactory(
ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(
this.kafkaSecondProperties.buildConsumerProperties());
customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
return factory;
}
@Bean("zwKafkaContainerFactory")
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> zwKafkaContainerFactory(@Qualifier(value = "kafkaSecondConsumerFactory") ConsumerFactory<Object, Object> kafkaSecondConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaSecondConsumerFactory);
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean("kafkaProducerFactory")
@Primary
public ProducerFactory<Object, Object> kafkaProducerFactory(
ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(
this.properties.buildProducerProperties());
String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
return factory;
}
@Bean("kafkaSecondProducerFactory")
public ProducerFactory<Object, Object> kafkaSecondProducerFactory(
ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(
this.kafkaSecondProperties.buildProducerProperties());
String transactionIdPrefix = this.kafkaSecondProperties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
return factory;
}
@Bean
@ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")
public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
@Bean
@ConditionalOnProperty(name = "spring.kafka.jaas.enabled")
public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {
KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();
KafkaProperties.Jaas jaasProperties = this.properties.getJaas();
if (jaasProperties.getControlFlag() != null) {
jaas.setControlFlag(jaasProperties.getControlFlag());
}
if (jaasProperties.getLoginModule() != null) {
jaas.setLoginModule(jaasProperties.getLoginModule());
}
jaas.setOptions(jaasProperties.getOptions());
return jaas;
}
@Bean("kafkaAdmin")
@Primary
public KafkaAdmin kafkaAdmin() {
KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
return kafkaAdmin;
}
}
生產(chǎn)者
package com.ddb.zggz.event;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ddb.zggz.config.ApplicationConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import javax.annotation.Resource;
@Component
@Slf4j
public class KafkaPushEvent {
@Resource
private KafkaTemplate<String, String> kafkaSecondTemplate;
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private ApplicationConfiguration configuration;
public void pushEvent(PushParam param) {
ListenableFuture<SendResult<String, String>> sendResultListenableFuture = null;
if ("zw".equals(configuration.getEnvironment())){
sendResultListenableFuture = kafkaSecondTemplate.send(configuration.getPushTopic(), JSON.toJSONString(param));
}
if ("net".equals(configuration.getEnvironment())){
sendResultListenableFuture = kafkaTemplate.send(configuration.getPushTopic(), JSON.toJSONString(param));
}
if (sendResultListenableFuture == null){
throw new IllegalArgumentException("kakfa發(fā)送消息失敗");
}
sendResultListenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable ex) {
log.error("kafka發(fā)送的message報錯,發(fā)送數(shù)據(jù):{}", param);
}
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("kafka發(fā)送的message成功,發(fā)送數(shù)據(jù):{}", param);
}
});
}
}
消費者
package com.ddb.zggz.event;
import com.alibaba.fastjson.JSONObject;
import com.ddb.zggz.config.ApplicationConfiguration;
import com.ddb.zggz.model.dto.ApprovalDTO;
import com.ddb.zggz.param.OffShelfParam;
import com.ddb.zggz.service.GzApprovalService;
import com.ddb.zggz.service.GzServiceService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@Component
@Slf4j
public class SendMessageListener {
@Autowired
private GzApprovalService gzApprovalService;
@Autowired
private GzServiceService gzServiceService;
@KafkaListener(topics = "${application.config.push-topic}", groupId = "zggz",containerFactory = "zwKafkaContainerFactory")
@RetryableTopic(include = {Exception.class},
backoff = @Backoff(delay = 3000, multiplier = 1.5, maxDelay = 15000)
)
public void listen(ConsumerRecord<?, ?> consumerRecord) {
String value = (String) consumerRecord.value();
PushParam pushParam = JSONObject.parseObject(value, PushParam.class);
//版本提審
if ("version-approval".equals(pushParam.getEvent())) {
ApprovalDTO approvalDTO = JSONObject.parseObject(JSONObject.toJSONString(pushParam.getData()), ApprovalDTO.class);
gzApprovalService.approval(approvalDTO);
}
//服務下架
if (pushParam.getEvent().equals("server-OffShelf-gzt")) {
OffShelfParam offShelfParam = JSONObject.parseObject(JSONObject.toJSONString(pushParam.getData()), OffShelfParam.class);
gzServiceService.offShelfV1(offShelfParam.getReason(), null, offShelfParam.getUserName(), "ZGGZ", offShelfParam.getH5Id(), offShelfParam.getAppId(), offShelfParam.getVersion());
}
}
@DltHandler
public void processMessage(String message) {
}
}
消息體
package com.ddb.zggz.event;
import com.alibaba.fastjson.annotation.JSONField;
import com.ddb.zggz.model.GzH5VersionManage;
import com.ddb.zggz.model.GzService;
import com.ddb.zggz.model.dto.ApprovalDTO;
import com.ddb.zggz.param.OffShelfParam;
import com.ddb.zggz.param.PublishParam;
import com.ddb.zggz.param.ReviewAndRollback;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* @author bbd
*/
@Data
public class PushParam implements Serializable {
/**
* 發(fā)送的消息數(shù)據(jù)
*/
private Object data;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JsonSerialize(using = LocalDateTimeSerializer.class)
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JSONField(format = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime createTime = LocalDateTime.now();
/**
* 事件名稱,用于消費者處理相關業(yè)務
*/
private String event;
/**
* 保存版本參數(shù)
*/
public static PushParam toKafkaVersion(GzH5VersionManage gzH5VersionManage) {
PushParam pushParam = new PushParam();
pushParam.setData(gzH5VersionManage);
pushParam.setEvent("save-version");
return pushParam;
}
/**
* 保存服務參數(shù)
*/
public static PushParam toKafkaServer(GzService gzService) {
PushParam pushParam = new PushParam();
pushParam.setData(gzService);
pushParam.setEvent("save-server");
return pushParam;
}
到了這里,關于springboot整合kafka多數(shù)據(jù)源的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!