国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

springboot整合kafka多數(shù)據(jù)源

這篇具有很好參考價值的文章主要介紹了springboot整合kafka多數(shù)據(jù)源。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

項目背景

在很多與第三方公司對接的時候,或者處在不同的網(wǎng)絡環(huán)境下,比如在互聯(lián)網(wǎng)和政務外網(wǎng)的分布部署服務的時候,我們需要對接多臺kafka來達到我們的業(yè)務需求,那么當kafka存在多數(shù)據(jù)源的情況,就與單機的情況有所不同。

依賴

    implementation 'org.springframework.kafka:spring-kafka:2.8.2'

配置

單機的情況
如果是單機的kafka我們直接通過springboot自動配置的就可以使用,例如在yml里面直接引用

spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    bootstrap-servers: server001.bbd:9092

在使用的時候直接注入,然后就可以使用里面的方法了

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

springboot整合kafka多數(shù)據(jù)源,spring boot,kafka

多數(shù)據(jù)源情況下

本篇文章主要講的是在多數(shù)據(jù)源下的使用,和單機的有所不同,我也看了網(wǎng)上的一些博客,但是當我去按照網(wǎng)上的配置的時候,總是會報錯 kafakTemplate這個bean找不到,所以沒辦法只有按照springboot自動配置里面的來改
springboot整合kafka多數(shù)據(jù)源,spring boot,kafka文章來源地址http://www.zghlxwxcb.cn/news/detail-649560.html

package com.ddb.zggz.config;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;

import java.io.IOException;

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConfiguration {

    private final KafkaProperties properties;

    private final KafkaSecondProperties kafkaSecondProperties;



    public KafkaConfiguration(KafkaProperties properties, KafkaSecondProperties kafkaSecondProperties) {
        this.properties = properties;
        this.kafkaSecondProperties = kafkaSecondProperties;
    }

    @Bean("kafkaTemplate")
    @Primary
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
                                             ProducerListener<Object, Object> kafkaProducerListener,
                                             ObjectProvider<RecordMessageConverter> messageConverter) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }


    @Bean("kafkaSecondTemplate")
    public KafkaTemplate<?, ?> kafkaSecondTemplate(@Qualifier("kafkaSecondProducerFactory") ProducerFactory<Object, Object> kafkaProducerFactory,
                                                   @Qualifier("kafkaSecondProducerListener") ProducerListener<Object, Object> kafkaProducerListener,
                                                   ObjectProvider<RecordMessageConverter> messageConverter) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }


    @Bean("kafkaProducerListener")
    @Primary
    public ProducerListener<Object, Object> kafkaProducerListener() {
        return new LoggingProducerListener<>();
    }


    @Bean("kafkaSecondProducerListener")
    public ProducerListener<Object, Object> kafkaSecondProducerListener() {
        return new LoggingProducerListener<>();
    }

    @Bean("kafkaConsumerFactory")
    @Primary
    public ConsumerFactory<Object, Object> kafkaConsumerFactory(
            ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
        DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(
                this.properties.buildConsumerProperties());
        customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
        return factory;
    }

    @Bean("kafkaSecondConsumerFactory")
    public ConsumerFactory<Object, Object> kafkaSecondConsumerFactory(
            ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
        DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(
                this.kafkaSecondProperties.buildConsumerProperties());
        customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
        return factory;
    }


    @Bean("zwKafkaContainerFactory")
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> zwKafkaContainerFactory(@Qualifier(value = "kafkaSecondConsumerFactory") ConsumerFactory<Object, Object> kafkaSecondConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaSecondConsumerFactory);
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }


    @Bean("kafkaProducerFactory")
    @Primary
    public ProducerFactory<Object, Object> kafkaProducerFactory(
            ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
        DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(
                this.properties.buildProducerProperties());
        String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) {
            factory.setTransactionIdPrefix(transactionIdPrefix);
        }
        customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
        return factory;
    }

    @Bean("kafkaSecondProducerFactory")
    public ProducerFactory<Object, Object> kafkaSecondProducerFactory(
            ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
        DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(
                this.kafkaSecondProperties.buildProducerProperties());
        String transactionIdPrefix = this.kafkaSecondProperties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) {
            factory.setTransactionIdPrefix(transactionIdPrefix);
        }
        customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
        return factory;
    }

    @Bean
    @ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")
    public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }

    @Bean
    @ConditionalOnProperty(name = "spring.kafka.jaas.enabled")
    public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {
        KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();
        KafkaProperties.Jaas jaasProperties = this.properties.getJaas();
        if (jaasProperties.getControlFlag() != null) {
            jaas.setControlFlag(jaasProperties.getControlFlag());
        }
        if (jaasProperties.getLoginModule() != null) {
            jaas.setLoginModule(jaasProperties.getLoginModule());
        }
        jaas.setOptions(jaasProperties.getOptions());
        return jaas;
    }

    @Bean("kafkaAdmin")
    @Primary
    public KafkaAdmin kafkaAdmin() {
        KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
        kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
        return kafkaAdmin;
    }

}


生產(chǎn)者


package com.ddb.zggz.event;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import com.ddb.zggz.config.ApplicationConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import javax.annotation.Resource;

@Component
@Slf4j
public class KafkaPushEvent {


    @Resource
    private KafkaTemplate<String, String> kafkaSecondTemplate;

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private ApplicationConfiguration configuration;


    public void pushEvent(PushParam param) {
        ListenableFuture<SendResult<String, String>> sendResultListenableFuture = null;
        if ("zw".equals(configuration.getEnvironment())){
            sendResultListenableFuture = kafkaSecondTemplate.send(configuration.getPushTopic(), JSON.toJSONString(param));
        }
        if ("net".equals(configuration.getEnvironment())){
            sendResultListenableFuture = kafkaTemplate.send(configuration.getPushTopic(), JSON.toJSONString(param));
        }
        if (sendResultListenableFuture == null){
            throw new IllegalArgumentException("kakfa發(fā)送消息失敗");
        }
        sendResultListenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("kafka發(fā)送的message報錯,發(fā)送數(shù)據(jù):{}", param);
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("kafka發(fā)送的message成功,發(fā)送數(shù)據(jù):{}", param);
            }
        });


    }


}

消費者

package com.ddb.zggz.event;

import com.alibaba.fastjson.JSONObject;

import com.ddb.zggz.config.ApplicationConfiguration;
import com.ddb.zggz.model.dto.ApprovalDTO;
import com.ddb.zggz.param.OffShelfParam;
import com.ddb.zggz.service.GzApprovalService;
import com.ddb.zggz.service.GzServiceService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;


@Component
@Slf4j
public class SendMessageListener {

    @Autowired
    private GzApprovalService gzApprovalService;

    @Autowired
    private GzServiceService gzServiceService;

    @KafkaListener(topics = "${application.config.push-topic}", groupId = "zggz",containerFactory = "zwKafkaContainerFactory")
    @RetryableTopic(include = {Exception.class},
            backoff = @Backoff(delay = 3000, multiplier = 1.5, maxDelay = 15000)
    )
    public void listen(ConsumerRecord<?, ?> consumerRecord) {
        String value = (String) consumerRecord.value();
        PushParam pushParam = JSONObject.parseObject(value, PushParam.class);

        //版本提審
        if ("version-approval".equals(pushParam.getEvent())) {
            ApprovalDTO approvalDTO = JSONObject.parseObject(JSONObject.toJSONString(pushParam.getData()), ApprovalDTO.class);
            gzApprovalService.approval(approvalDTO);
        }

        //服務下架
        if (pushParam.getEvent().equals("server-OffShelf-gzt")) {
            OffShelfParam offShelfParam = JSONObject.parseObject(JSONObject.toJSONString(pushParam.getData()), OffShelfParam.class);
            gzServiceService.offShelfV1(offShelfParam.getReason(), null, offShelfParam.getUserName(), "ZGGZ", offShelfParam.getH5Id(), offShelfParam.getAppId(), offShelfParam.getVersion());

        }

    }
    @DltHandler
    public void processMessage(String message) {

    }
}

消息體

package com.ddb.zggz.event;

import com.alibaba.fastjson.annotation.JSONField;
import com.ddb.zggz.model.GzH5VersionManage;
import com.ddb.zggz.model.GzService;
import com.ddb.zggz.model.dto.ApprovalDTO;
import com.ddb.zggz.param.OffShelfParam;
import com.ddb.zggz.param.PublishParam;
import com.ddb.zggz.param.ReviewAndRollback;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.Data;

import java.io.Serializable;
import java.time.LocalDateTime;

/**
 * @author bbd
 */
@Data
public class PushParam implements Serializable {

    /**
     * 發(fā)送的消息數(shù)據(jù)
     */
    private Object data;
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @JsonSerialize(using = LocalDateTimeSerializer.class)
    @JsonDeserialize(using = LocalDateTimeDeserializer.class)
    @JSONField(format = "yyyy-MM-dd HH:mm:ss")
    private LocalDateTime createTime = LocalDateTime.now();

    /**
     * 事件名稱,用于消費者處理相關業(yè)務
     */
    private String event;


    /**
     * 保存版本參數(shù)
     */
    public static PushParam toKafkaVersion(GzH5VersionManage gzH5VersionManage) {
        PushParam pushParam = new PushParam();
        pushParam.setData(gzH5VersionManage);
        pushParam.setEvent("save-version");
        return pushParam;
    }

    /**
     * 保存服務參數(shù)
     */
    public static PushParam toKafkaServer(GzService gzService) {
        PushParam pushParam = new PushParam();
        pushParam.setData(gzService);
        pushParam.setEvent("save-server");
        return pushParam;
    }

到了這里,關于springboot整合kafka多數(shù)據(jù)源的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • Spring Boot入門(07):整合 MySQL 和 Druid數(shù)據(jù)源 | 全網(wǎng)最詳細保姆級教學(兩萬字)

    Spring Boot入門(07):整合 MySQL 和 Druid數(shù)據(jù)源 | 全網(wǎng)最詳細保姆級教學(兩萬字)

    ????????作為現(xiàn)代Web應用開發(fā)的重要技術棧之一,Spring Boot在快速構(gòu)建可靠、高效、易維護的應用方面具有獨特的優(yōu)勢。而在實際開發(fā)中,數(shù)據(jù)庫作為系統(tǒng)的重要組成部分,對于數(shù)據(jù)源的選擇和配置也是至關重要的。本篇文章將全面介紹如何使用Spring Boot整合MySQL和Druid數(shù)據(jù)

    2024年02月12日
    瀏覽(25)
  • springboot整合多數(shù)據(jù)源的配置以及動態(tài)切換數(shù)據(jù)源,注解切換數(shù)據(jù)源

    springboot整合多數(shù)據(jù)源的配置以及動態(tài)切換數(shù)據(jù)源,注解切換數(shù)據(jù)源

    在許多應用程序中,可能需要使用多個數(shù)據(jù)庫或數(shù)據(jù)源來處理不同的業(yè)務需求。Spring Boot提供了簡便的方式來配置和使用多數(shù)據(jù)源,使開發(fā)人員能夠輕松處理多個數(shù)據(jù)庫連接。如果你的項目中可能需要隨時切換數(shù)據(jù)源的話,那我這篇文章可能能幫助到你 ??:這里對于pom文件

    2024年02月10日
    瀏覽(32)
  • 【Spring Boot 3】【數(shù)據(jù)源】自定義多數(shù)據(jù)源

    軟件開發(fā)是一門實踐性科學,對大多數(shù)人來說,學習一種新技術不是一開始就去深究其原理,而是先從做出一個可工作的DEMO入手。但在我個人學習和工作經(jīng)歷中,每次學習新技術總是要花費或多或少的時間、檢索不止一篇資料才能得出一個可工作的DEMO,這占用了我大量的時

    2024年02月01日
    瀏覽(24)
  • 【Spring Boot 3】【數(shù)據(jù)源】自定義JPA數(shù)據(jù)源

    軟件開發(fā)是一門實踐性科學,對大多數(shù)人來說,學習一種新技術不是一開始就去深究其原理,而是先從做出一個可工作的DEMO入手。但在我個人學習和工作經(jīng)歷中,每次學習新技術總是要花費或多或少的時間、檢索不止一篇資料才能得出一個可工作的DEMO,這占用了我大量的時

    2024年01月21日
    瀏覽(32)
  • SpringBoot整合MybatisPlus多數(shù)據(jù)源

    SpringBoot整合MybatisPlus多數(shù)據(jù)源

    相信在很多使用MybatisPlus框架的小伙伴都會遇到多數(shù)據(jù)源的配置問題,并且官網(wǎng)也給出了推薦使用多數(shù)據(jù)源 (dynamic-datasource-spring-boot-starter) 組件來實現(xiàn)。由于最近項目也在使用這個組件來實現(xiàn)多數(shù)據(jù)源切換,因此想了解一下該組件是如何運行的,經(jīng)過自己的調(diào)試,簡單記錄一

    2024年02月13日
    瀏覽(25)
  • 【Spring Boot 3】【數(shù)據(jù)源】自定義JDBC多數(shù)據(jù)源

    軟件開發(fā)是一門實踐性科學,對大多數(shù)人來說,學習一種新技術不是一開始就去深究其原理,而是先從做出一個可工作的DEMO入手。但在我個人學習和工作經(jīng)歷中,每次學習新技術總是要花費或多或少的時間、檢索不止一篇資料才能得出一個可工作的DEMO,這占用了我大量的時

    2024年01月23日
    瀏覽(28)
  • 【Spring Boot 3】【數(shù)據(jù)源】自定義JPA多數(shù)據(jù)源

    軟件開發(fā)是一門實踐性科學,對大多數(shù)人來說,學習一種新技術不是一開始就去深究其原理,而是先從做出一個可工作的DEMO入手。但在我個人學習和工作經(jīng)歷中,每次學習新技術總是要花費或多或少的時間、檢索不止一篇資料才能得出一個可工作的DEMO,這占用了我大量的時

    2024年01月22日
    瀏覽(30)
  • MyBatis整合Springboot多數(shù)據(jù)源實現(xiàn)

    MyBatis整合Springboot多數(shù)據(jù)源實現(xiàn)

    數(shù)據(jù)源,實際就是數(shù)據(jù)庫連接池,負責管理數(shù)據(jù)庫連接,在 Springboot 中,數(shù)據(jù)源通常以一個 bean 的形式存在于 IOC 容器中,也就是我們可以通過依賴注入的方式拿到數(shù)據(jù)源,然后再從數(shù)據(jù)源中獲取數(shù)據(jù)庫連接。 那么什么是多數(shù)據(jù)源呢,其實就是 IOC 容器中有多個數(shù)據(jù)源的 bea

    2023年04月22日
    瀏覽(25)
  • SpringBoot整合Druid配置多數(shù)據(jù)源

    SpringBoot整合Druid配置多數(shù)據(jù)源

    目錄 1.初始化項目 1.1.初始化工程 1.2.添加依賴 1.3.配置yml文件 1.4.Spring Boot 啟動類中添加?@MapperScan?注解,掃描 Mapper 文件夾 1.5.配置使用數(shù)據(jù)源 1.5.1.注解方式 1.5.2.基于AOP手動實現(xiàn)多數(shù)據(jù)源原生的方式 2.結(jié)果展示 Mybatis-Plus:簡介 | MyBatis-Plus (baomidou.com) 在正式開始之前,先初始

    2024年02月01日
    瀏覽(38)
  • Spring Boot 配置雙數(shù)據(jù)源

    Spring Boot 配置雙數(shù)據(jù)源

    Survive by day and develop by night. talk for import biz , show your perfect code,full busy,skip hardness,make a better result,wait for change,challenge Survive. happy for hardess to solve denpendies. 需求: 1.基本步驟 添加依賴 添加 Spring Boot 和數(shù)據(jù)庫驅(qū)動的依賴 配置數(shù)據(jù)源 在 application.properties 或 application.yml 中分別配

    2024年01月22日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包