国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

spring集成kafka并對(duì)消息進(jìn)行監(jiān)聽(tīng)

這篇具有很好參考價(jià)值的文章主要介紹了spring集成kafka并對(duì)消息進(jìn)行監(jiān)聽(tīng)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

spring集成kafka


需要依賴zookeeper,需提前啟動(dòng)

在server.properties文件中配置kafka連接zookeeper相關(guān)信息

############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000

在zookeeper.properties中配置zookeeper所需配置

# 數(shù)據(jù)文件保存地址
dataDir=/tmp/zookeeper
# 客戶端端口
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# 設(shè)置此功能端口將不在沖突
admin.enableServer=false
# admin.serverPort=8080

kafka本地安裝啟動(dòng)

windows下載kafka二進(jìn)制包到本機(jī):http://kafka.apache.org/downloads
2、在config下面的server.properties文件,修改:
listeners=PLAINTEXT://localhost:9092
log.dirs=F:\kafka_2.13-2.5.0\logs
3、在bin同級(jí)目錄下打開(kāi)shell窗口,啟動(dòng)kafka:
.\bin\windows\kafka-server-start.bat .\config\server.properties
4、創(chuàng)建主題 查看可用主題
.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
5、刪除指定topic
.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --delete --topic topic_kedacom_icms_alarm_jy_3725
5.1、如果出現(xiàn)臨時(shí)存儲(chǔ)的topic需要到zookeeper刪除指定的topic
#查看存儲(chǔ)的topic
ls /brokers/topics
#刪除指定的topic
rmr /brokers/topics/topicName
6、另起窗口,開(kāi)啟指定topic
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_kedacom_icms_alarm_jy_3725
7、另起窗口、開(kāi)啟生產(chǎn)端
.\bin\windows\kafka-console-producer.bat --broker-list 189.1.0.55:9092 --topic topic_kedacom_icms_spdc_jy_3725
8、另起窗口,開(kāi)啟消費(fèi)端
chcp 65001
.\bin\windows\kafka-console-consumer.bat --bootstrap-server 189.1.0.55:9092 --topic topic_kedacom_icms_spdc_sj_3725 --from-beginning
如果遇到文本過(guò)長(zhǎng) 指令識(shí)別錯(cuò)誤,是因?yàn)榇娣拍夸涍^(guò)長(zhǎng)不規(guī)范引起

pom文件

#在選擇版本,高版本會(huì)提示缺少anntnationprocess...
   <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.1.8.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.8.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>connect-api</artifactId>
      <version>2.6.0</version>
    </dependency>
    <dependency>
      <groupId>commons-httpclient</groupId>
      <artifactId>commons-httpclient</artifactId>
      <version>3.1</version>
    </dependency>
  </dependencies>

生產(chǎn)配置

/**
 * @Auther: lyp
 * @Date: 2021/11/22 15:46
 */
 @Configuration
 @EnableKafka
 public class KafkaProducerConfig {

     @Value("${bootstrap.servers}")
     private String bootstrapServers;

     public KafkaProducerConfig(){
         System.out.println("kafka--------------------------------生產(chǎn)配置");
     }

     /**
     * 創(chuàng)建生產(chǎn)值消息工廠
     */
     @Bean
     public ProducerFactory<Integer, String> producerFactory() {
         return new DefaultKafkaProducerFactory(producerProperties());
     }

    /**
     * 生產(chǎn)基本配置
     */
    @Bean
     public Map<String, Object> producerProperties() {
         Map<String, Object> props = new HashMap<String, Object>();
         //設(shè)置kafka訪問(wèn)地址
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
         //消息轉(zhuǎn)化
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
         //重試次數(shù)
         props.put(ProducerConfig.RETRIES_CONFIG,1);
         //分批處理內(nèi)存設(shè)置
         props.put(ProducerConfig.BATCH_SIZE_CONFIG,1048576);
         props.put(ProducerConfig.LINGER_MS_CONFIG,1);
         //使用內(nèi)存配置
         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432L);
         //確認(rèn)標(biāo)志符使用配置
         props.put(ProducerConfig.ACKS_CONFIG,"all");
         return props;
     }
 
     @Bean
     public KafkaTemplate<Integer, String> kafkaTemplate() {
         KafkaTemplate kafkaTemplate = new KafkaTemplate<Integer, String>(producerFactory(),true);
         kafkaTemplate.setDefaultTopic(KafkaSendEnum.ALARM_WARN_PUSH.getTopic());
         return kafkaTemplate;
     }
 
 }

消費(fèi)者配置

package com.huating.jfp.msg.api.kafka.config;

import com.huating.jfp.msg.api.kafka.construct.KafkaConsumerEnum;
import com.huating.jfp.msg.api.kafka.listener.KafkaConsumerListener;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

/**
 * @author lyp
 * @ClassName KafkaConsumerConfig
 * @description: 消費(fèi)者配置
 * @datetime 2022年 07月 20日 9:15
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${bootstrap.servers}")
    private String bootstrapServers;

    public KafkaConsumerConfig() {
        System.out.println("kafka消費(fèi)者配置加載...");
    }

    public Map<String, Object> consumerProperties() {
        Map<String, Object> props = new HashMap<String, Object>();
        //Kafka服務(wù)地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        //消費(fèi)組
        props.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConsumerEnum.SD_SJ.getGroupId());
        //關(guān)閉自動(dòng)提交位移
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //設(shè)置間隔時(shí)間,默認(rèn)5000ms
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
        //Key反序列化類
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 					                                             "org.apache.kafka.common.serialization.StringSerializer");
        //Value反序列化
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,                                                                   "org.apache.kafka.common.serialization.StringSerializer");
        //earliest:當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開(kāi)始消費(fèi);無(wú)提交的offset時(shí),從頭開(kāi)始消費(fèi)
	   //latest:當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開(kāi)始消費(fèi);無(wú)提交的offset時(shí),消費(fèi)新產(chǎn)生的該分區(qū)		下的數(shù)據(jù)
	   //none:topic各分區(qū)都存在已提交的offset時(shí),從offset后開(kāi)始消費(fèi);只要有一個(gè)分區(qū)不存在已提交的			offset,則拋出異常
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<String, String>(consumerProperties());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>                                                                  kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new                                                                          ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public KafkaConsumerListener kafkaConsumerListener() {
        return new KafkaConsumerListener();
    }
}

創(chuàng)建topic工具類

/**
 * @author lyp
 */
public class KafkaTopicUtil {

    private static final Logger logger = LoggerFactory.getLogger(KafkaTopicUtil.class);

    /**
     * 功能描述:創(chuàng)建topic,并返回創(chuàng)建結(jié)果
     * @param: topicName
     * @return: boolean
     * @auther: lyp
     * @date: 2021/11/12 16:06
     */
    public static boolean createTopics(String bootstrapServers,String topicName,int partitions,short replication) {
        boolean res = false;
        try {
            Properties properties = new Properties();
            properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            properties.put("sasl.jaas.config",
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";");
            AdminClient adminClient = KafkaAdminClient.create(properties);
            NewTopic newTopic = new NewTopic(topicName, partitions, replication);
            adminClient.createTopics(Arrays.asList(newTopic));
            logger.info("創(chuàng)建Topic:"+topicName+"成功!");
            res = true;
        } catch (Exception e) {
            e.printStackTrace();
            logger.info("創(chuàng)建異常!");
        }
        return res;
    }

    /**
     * 功能描述:獲取當(dāng)前kafka所存在的topic列表
     * @return: set
     * @auther: lyp
     * @date: 2021/11/12 16:07
     */
    public static Set<String> getTopics(String bootstrapServers){
        Set<String> nameSet = new HashSet<>();
        try {
            Properties properties = new Properties();
            properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            AdminClient adminClient = KafkaAdminClient.create(properties);
            ListTopicsResult listTopicsResult = adminClient.listTopics();
            KafkaFuture<Set<String>> names = listTopicsResult.names();
            nameSet = names.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return nameSet;
    }
}

生產(chǎn)業(yè)務(wù)

public interface KafkaProduceService {

    /**設(shè)備報(bào)警消息發(fā)送*/
    boolean sendWarnMessage(DeviceWarnInfo deviceWarnInfo);
}
/**
 * @author lyp
 */
@Service("kafkaProducerService")
public class KafkaProducerServiceImpl implements KafkaProduceService {

    private static final Logger logger = LoggerFactory.getLogger(KafkaProduceService.class);

    @Value("${bootstrap.servers}")
    private String bootstrapServers;

    @Value("${topic.name}")
    private String topicName;

    @Value("${srcUnit.code}")
    private String srcUnitCode;

    @Value("${srcUnit.name}")
    private String srcUnitName;

    @Override
    public boolean sendWarnMessage(DeviceWarnInfo deviceWarnInfo) {
        boolean res = false;
        Map<String, Object> reportData = new HashMap<>();
        reportData.put("command","reportAlarm");
        deviceWarnInfo.setSrcUnitCode(srcUnitCode);
        deviceWarnInfo.setSrcUnitName(srcUnitName);
        reportData.put("data",deviceWarnInfo);
        //判斷是否存在當(dāng)前主題
        Set<String> topics = KafkaTopicUtil.getTopics(bootstrapServers);
        if (!topics.contains(KafkaSendEnum.ALARM_WARN_PUSH.getTopic())){
            if (!KafkaTopicUtil.createTopics(bootstrapServers,topicName,1,(short)1)){
                logger.info("topic創(chuàng)建失敗,消息發(fā)送不成功!");
                return res;
            }
        }

        KafkaTemplate kafkaTemplate = SpringContextUtil.getBean("kafkaTemplate");
        ListenableFuture send = kafkaTemplate.sendDefault(topicName, JSONArray.toJSONString(reportData));
        send.addCallback(new ListenableFutureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                logger.error(ex.getMessage()+"發(fā)送失?。≡颍?+ex.getCause());
                System.out.println("發(fā)送失敗!");
            }

            @Override
            public void onSuccess(Object result) {
                logger.info("消息發(fā)送成功"+result.toString());
                System.out.println("發(fā)送成功!");
            }
        });
        return res;
    }

}

消費(fèi)業(yè)務(wù)

消息接收類
package com.huating.jfp.msg.api.kafka.entity;

/**
 * @author lyp
 * @ClassName MesBody
 * @description: 消息實(shí)體
 * @datetime 2022年 07月 21日 14:48
 */
@Data
public class MesBody {
	//類型標(biāo)記字段
    private String command;
	//消息實(shí)體字段
    private String data;
}


監(jiān)聽(tīng)類
package com.huating.jfp.msg.api.kafka.listener;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;

import com.alibaba.fastjson.JSONObject;
import com.huating.jfp.msg.api.kafka.construct.KafkaMesType;
import com.huating.jfp.msg.api.kafka.construct.KafkaTopics;
import com.huating.jfp.msg.api.kafka.consumer.service.KafkaConsumerService;
import com.huating.jfp.msg.api.kafka.entity.InspectorIssue;
import com.huating.jfp.msg.api.kafka.entity.MesBody;
import com.huating.jfp.msg.api.kafka.entity.Notice;

/**
 * @author lyp
 * @ClassName KafkaConsumerListener
 * @description: 主題監(jiān)聽(tīng)
 * @datetime 2022年 07月 20日 9:27
 */

public class KafkaConsumerListener {

    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerListener.class);
    @Autowired
    private KafkaConsumerService consumerService;

    /**
     * 功能描述: 監(jiān)聽(tīng)指定topic,多個(gè)使用,
     * groupId:分組id
     * topics:監(jiān)聽(tīng)當(dāng)前topic數(shù)組
     * topic:監(jiān)聽(tīng)單個(gè)topic
     */
    @KafkaListener(groupId = "${group.id}",topics = "#{'${consumer.topics}'.split(',')}",containerFactory = "")
    public void listener(ConsumerRecord<String, String> consumerRecord) {
        logger.info("開(kāi)始消費(fèi)" + KafkaTopics.SD_JY_DUTY_TOPIC.getTopicName() + "的消息{}", consumerRecord.value());
        MesBody mesBody = JSONObject.parseObject(consumerRecord.value(), MesBody.class);
        logger.error("kafka監(jiān)聽(tīng)-當(dāng)前消息類型:"+mesBody.getCommand());
        //督查督辦
        if (mesBody.getCommand().equals(KafkaMesType.SD_INSPECTOR_ISSUE.getMesCode())
                || mesBody.getCommand().equals(KafkaMesType.SD_INSPECT_DISPOSE.getMesCode())
                || mesBody.getCommand().equals(KafkaMesType.SD_INSPECT_RES.getMesCode())) {
            logger.error("督查督辦監(jiān)聽(tīng)消息處理開(kāi)始----->----->");
            InspectorIssue inspectorIssue = JSONObject.parseObject(mesBody.getData(), InspectorIssue.class);
            consumerService.inspectorListener(inspectorIssue);
        }

        //通知通報(bào)
        if (mesBody.getCommand().equals(KafkaMesType.SD_NOTICE_ISSUE.getMesCode())) {
            logger.error("通知通報(bào)開(kāi)始監(jiān)聽(tīng)");
            Notice notice = JSONObject.parseObject(mesBody.getData(), Notice.class);
            consumerService.noticeListener(notice);
        }
    }
}

業(yè)務(wù)處理
package com.huating.jfp.msg.api.kafka.consumer.service;


import com.huating.jfp.msg.api.kafka.entity.InspectorIssue;
import com.huating.jfp.msg.api.kafka.entity.Notice;

/**
 * @author lyp
 */
public interface KafkaConsumerService {

    /**
     * 功能描述: 督查下發(fā) 督查辦結(jié)監(jiān)聽(tīng)處理
     *
     * @param inspectorIssue
     */
    void inspectorListener(InspectorIssue inspectorIssue);

    /**
     * 功能描述: 通知通報(bào)下發(fā)監(jiān)聽(tīng)
     *
     * @param notice
     */
    void noticeListener(Notice notice);
}


package com.huating.jfp.msg.api.kafka.consumer.service.impl;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.huating.jfp.common.dao.MsgConfigureDao;
import com.huating.jfp.common.dao.MsgDao;
import com.huating.jfp.common.entity.Msg;
import com.huating.jfp.common.entity.MsgConfigure;
import com.huating.jfp.common.entity.MsgReceive;
import com.huating.jfp.common.service.MsgReceiveService;
import com.huating.jfp.core.base.ViewPublicRewrite;
import com.huating.jfp.msg.api.http.servcie.HttpRequestService;
import com.huating.jfp.msg.api.kafka.consumer.service.KafkaConsumerService;
import com.huating.jfp.msg.api.kafka.dao.InspectorEventMapper;
import com.huating.jfp.msg.api.kafka.dao.NoticeMapper;
import com.huating.jfp.msg.api.kafka.entity.*;
import com.huating.jfp.msg.api.kafka.producer.service.KafkaProducerService;
import com.huating.jfp.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/**
 * @author lyp
 * @ClassName KafkaConsumerServiceImpl
 * @description: 消費(fèi)實(shí)現(xiàn)
 * @datetime 2022年 07月 20日 9:13
 */
@Service
public class KafkaConsumerServiceImpl implements KafkaConsumerService {

    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerServiceImpl.class);
    private static final String SRC_UNIT_CODE = "gaol_code";
    private static final String SRC_UNIT_NAME = "gaol_name";
    @Autowired
    private InspectorEventMapper inspectorEventMapper;
    @Autowired
    private HttpRequestService httpRequestService;
    @Autowired
    private NoticeMapper noticeMapper;

    @Autowired
    private MsgDao msgMapper;

    @Autowired
    private MsgConfigureDao msgConfigureMapper;
    @Autowired
    private MsgReceiveService msgReceiveService;
    @Autowired
    private ViewPublicRewrite vp;

    @Override
    public void inspectorListener(InspectorIssue inspectorIssue) {
        if (!StrUtil.isEmpty(inspectorIssue.getUuid())) {
            if (!StrUtil.isEmpty(inspectorIssue.getDubanTime())) {
                logger.error("督辦下發(fā)處理");
                InspectorEventDispose inspectorEventDispose = new InspectorEventDispose();
                //督查督辦
                String uuid = StringUtil.getUUID();
                inspectorEventDispose.setIedUuid(uuid);
                inspectorEventDispose.setIedIeUuid(inspectorIssue.getUuid());
                inspectorEventDispose.setIedExpireTime(inspectorIssue.getDubanTime());
                inspectorEventDispose.setIedContent(inspectorIssue.getContent());

                //督辦下發(fā)持久化
                inspectorEventMapper.insertDispose(inspectorEventDispose);
                logger.error("督辦下發(fā)數(shù)據(jù)新增完成");
                //督辦文件持久化
                List<FileEntity> files = inspectorIssue.getFiles();
                List<InspectorFile> fileList = new ArrayList<>();
                downloadFile(files, fileList, uuid);
                inspectorEventMapper.insertFiles(fileList);
                logger.error("督辦下發(fā)完成");
            } else if (!StrUtil.isEmpty(inspectorIssue.getSrcUnitCode())) {
                logger.error("督查下發(fā)處理");
                InspectorEvent inspectorEvent = new InspectorEvent();
                //督查下發(fā)
                inspectorEvent.setIeUuid(inspectorIssue.getUuid());
                inspectorEvent.setIeAreaCode(vp.getBusinessValue("gaol_code"));
                inspectorEvent.setIeEventType(inspectorIssue.getType());
                inspectorEvent.setIeDescribe(inspectorIssue.getContent());
                inspectorEvent.setIeGrabTime(inspectorIssue.getPublishTime());
                inspectorEvent.setIeExpireTime(inspectorIssue.getQxTime());
                inspectorEvent.setIeCusNunmber(vp.getBusinessValue("base_cus"));
                inspectorEvent.setIeNature(inspectorIssue.getNature());
                inspectorEvent.setIeIsSj(0);
                //督查下發(fā)持久化
                inspectorEventMapper.insertSynData(inspectorEvent);
                logger.error("督查下發(fā)數(shù)據(jù)新增成功");
                //督查文件持久化
                List<FileEntity> files = inspectorIssue.getFiles();
                List<InspectorFile> fileList = new ArrayList<>();
                downloadFile(files, fileList, inspectorIssue.getUuid());
                inspectorEventMapper.insertFiles(fileList);
                logger.error("督查文件數(shù)據(jù)新增成功");
                logger.error("督查下發(fā)完成");
            } else {
                //督查辦結(jié)
                if (inspectorEventMapper.searchIsSj(inspectorIssue.getUuid()) > 0) {
                    //修改督查狀態(tài)為辦結(jié)
                    inspectorEventMapper.updateState("3", inspectorIssue.getUuid());
                    logger.error("督查辦結(jié)完成");
                }
            }
        }
    }

    @Override
    public void noticeListener(Notice notice) {
        logger.error("通知通報(bào)下發(fā)開(kāi)始處理");
        //通知通報(bào)持久化
        noticeMapper.insertData(notice);

        Msg msg = new Msg();
        String uuid = StringUtil.getUUID();
        msg.setMUuid(uuid);

        MsgConfigure msgConfigure = new MsgConfigure();
        msgConfigure.setMcCode("NOTIC_ISSUE");
        MsgConfigure config = msgConfigureMapper.selectByData(msgConfigure).get(0);

        msg.setMcUuid(config.getMcUuid());
        msg.setMcMsglevel(config.getMcMsglevel());
        msg.setMStatus(Byte.parseByte(notice.getFeedback() == 0 ? "1" : "0"));
        msg.setMParam(notice.getUuid());
        msg.setMContent(notice.getTitle());
        msg.setCreateTime(new Date());
        if (notice.getFeedback() == 0) {
            msg.setMHandleTime(new Date());
            msg.setMHandleUser("當(dāng)前通知通報(bào)無(wú)需處置");
        }
        msgMapper.insertMsg(msg);

        MsgReceive msgReceive = new MsgReceive();
        msgReceive.setMrUuid(StringUtil.getUUID());
        msgReceive.setmUuid(uuid);
        msgReceiveService.insertMsgReceive(msgReceive);

        //文件持久化
        List<FileEntity> files = notice.getFiles();
        noticeDownloadFile(files, notice.getUuid());
        noticeMapper.insertFiles(files);

    }

    private void downloadFile(List<FileEntity> files, List<InspectorFile> fileList, String uuid) {
        logger.error("文件下載開(kāi)始");
        if (!files.isEmpty()) {
            for (FileEntity file : files) {
                InspectorFile inspectorFile = new InspectorFile();
                String fileName = file.getFileName();
                logger.error(fileName);
                inspectorFile.setIfFileName(fileName);
                String last = fileName.substring(fileName.lastIndexOf("."));
                if (last.equals(".jpg") || last.equals(".JPG") || last.equals(".png") || last.equals(".gif") || last.equals(".bmp")) {
                    inspectorFile.setIfFileType(1);
                } else {
                    inspectorFile.setIfFileType(2);
                }
                inspectorFile.setIfSourceType(1);
                inspectorFile.setIfIeUuid(uuid);
                //需要確定省局的其他類型文件詳情
                inspectorFile.setIfPath(file.getFileName());
                String fileId = file.getFileId();
                //文件下載
                String token = httpRequestService.sendPostMessage();
                boolean res = httpRequestService.downloadFile(fileId, token, vp.getCusBusinessValue("duty_file_disk_mapping_path", "1000") + "/dutyUpLoad/");
                if (res) {
                    fileList.add(inspectorFile);
                }
            }
        }
    }

    private void noticeDownloadFile(List<FileEntity> files, String uuid) {
        files.stream().forEach((file) -> {
            file.setParentId(uuid);
            String token = httpRequestService.sendPostMessage();
            httpRequestService.downloadFile(file.getFileId(), token, vp.getBusinessValue("notice_file_disk_mapping_path") + "/noticeUpLoad/");
        });
    }

    public boolean checkSj() {
        return vp.getBusinessValue("is_sj") != null &&
                Boolean.parseBoolean(vp.getBusinessValue("is_sj"));
    }
}


異步 同步 ONEWAY

kafka消息發(fā)送方式有同步、異步和ONEWAY三種方式,producer.type參數(shù)指定同步或者異步,request.require.acks指定ONEWAY。

producer.type=sync默認(rèn)同步

設(shè)置異步需配套配置

Property Default Description
queue.buffering.max.ms 5000 啟用異步模式時(shí),producer緩存消息的時(shí)間。比如我們?cè)O(shè)置成1000時(shí),它會(huì)緩存1s的數(shù)據(jù)再一次發(fā)送出去,這樣可以極大的增加broker吞吐量,但也會(huì)造成時(shí)效性的降低。
queue.buffering.max.messages 10000 啟用異步模式時(shí),producer緩存隊(duì)列里最大緩存的消息數(shù)量,如果超過(guò)這個(gè)值,producer就會(huì)阻塞或者丟掉消息。
queue.enqueue.timeout.ms -1 當(dāng)達(dá)到上面參數(shù)時(shí)producer會(huì)阻塞等待的時(shí)間。如果設(shè)置為0,buffer隊(duì)列滿時(shí)producer不會(huì)阻塞,消息直接被丟掉;若設(shè)置為-1,producer會(huì)被阻塞,不會(huì)丟消息。
batch.num.messages 200 啟用異步模式時(shí),一個(gè)batch緩存的消息數(shù)量。達(dá)到這個(gè)數(shù)值時(shí),producer才會(huì)發(fā)送消息。(每次批量發(fā)送的數(shù)量)
以batch的方式推送數(shù)據(jù)可以極大的提高處理效率,kafka producer可以將消息在內(nèi)存中累計(jì)到一定數(shù)量后作為一個(gè)batch發(fā)送請(qǐng)求。batch的數(shù)量大小可以通過(guò)producer的參數(shù)(batch.num.messages)控制。通過(guò)增加batch的大小,可以減少網(wǎng)絡(luò)請(qǐng)求和磁盤IO的次數(shù),當(dāng)然具體參數(shù)設(shè)置需要在效率和時(shí)效性方面做一個(gè)權(quán)衡。在比較新的版本中還有batch.size這個(gè)參數(shù)。

在代碼中如果需要同步發(fā)送,可以在每次發(fā)送之后使用get方法,因?yàn)閜roducer.send方法返回一個(gè)Future類型的結(jié)果,F(xiàn)uture的get方法會(huì)一直阻塞直到該線程的任務(wù)得到返回值,也就是broker返回發(fā)送成功。

kafkaTemplate.send().get("key",value);

異步發(fā)送只需要在發(fā)送成功獲取消息是否成功即可:

ListenableFuture future = kafkaTemplate.send();
future.addCallback(new ListenableFutureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                logger.error(ex.getMessage()+"發(fā)送失敗!原因:"+ex.getCause());
            }
            @Override
            public void onSuccess(Object result) {
                logger.info("消息發(fā)送成功"+result.toString());
            }
        });

消息可靠性

producers可以一步的并行向kafka發(fā)送消息,但是通常producer在發(fā)送完消息之后會(huì)得到一個(gè)響應(yīng),返回的是offset值或者發(fā)送過(guò)程中遇到的錯(cuò)誤。這其中有個(gè)非常重要的參數(shù)“request.required.acks",這個(gè)參數(shù)決定了producer要求leader partition收到確認(rèn)的副本個(gè)數(shù):文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-775283.html

  • 如果acks設(shè)置為0,表示producer不會(huì)等待broker的相應(yīng),所以,producer無(wú)法知道消息是否發(fā)生成功,這樣有可能導(dǎo)致數(shù)據(jù)丟失,但同時(shí),acks值為0會(huì)得到最大的系統(tǒng)吞吐量。
  • 若acks設(shè)置為1,表示producer會(huì)在leader partition收到消息時(shí)得到broker的一個(gè)確認(rèn),這樣會(huì)有更好的可靠性,因?yàn)榭蛻舳藭?huì)等待知道broker確認(rèn)收到消息。
  • 若設(shè)置為-1,producer會(huì)在所有備份的partition收到消息時(shí)得到broker的確認(rèn),這個(gè)設(shè)置可以得到最高的可靠性保證。

到了這里,關(guān)于spring集成kafka并對(duì)消息進(jìn)行監(jiān)聽(tīng)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【Spring Boot】集成Kafka實(shí)現(xiàn)消息發(fā)送和訂閱

    【Spring Boot】集成Kafka實(shí)現(xiàn)消息發(fā)送和訂閱

    最近忙著搞低代碼開(kāi)發(fā),好久沒(méi)新建spring項(xiàng)目了,結(jié)果今天心血來(lái)潮準(zhǔn)備建個(gè)springboot項(xiàng)目 注意Type選Maven,java選8,其他默認(rèn) 點(diǎn)下一步后完成就新建了一個(gè)spring boot項(xiàng)目,配置下Maven環(huán)境,主要是settings.xml文件,里面要包含阿里云倉(cāng)庫(kù),不然可能依賴下載不下來(lái) 在maven配置沒(méi)問(wèn)

    2024年02月09日
    瀏覽(32)
  • 深入理解Spring Kafka中@KafkaListener注解的參數(shù)與使用方式

    Apache Kafka作為一個(gè)強(qiáng)大的消息代理系統(tǒng),與Spring框架的集成使得在分布式應(yīng)用中處理消息變得更加簡(jiǎn)單和靈活。Spring Kafka提供了 @KafkaListener 注解,為開(kāi)發(fā)者提供了一種聲明式的方式來(lái)定義消息監(jiān)聽(tīng)器。在本文中,我們將深入探討 @KafkaListener 注解的各種參數(shù)以及它們的使用方

    2024年01月16日
    瀏覽(31)
  • 實(shí)戰(zhàn):Spring Cloud Stream集成兼容多消息中間件kafka、rabbitmq

    實(shí)戰(zhàn):Spring Cloud Stream集成兼容多消息中間件kafka、rabbitmq

    前面的博文我們介紹并實(shí)戰(zhàn)演示了Spring Cloud Stream整合rabbitmq,其中主要介紹了如何使用和配置完成消息中間件的集成。但是,在實(shí)際的生產(chǎn)環(huán)境中可能會(huì)用到多個(gè)消息中間件,又或者是由于業(yè)務(wù)改變需要更換消息中間件,在這些情況下我們的Spring Cloud Stream框架可以完全兼容多

    2024年02月08日
    瀏覽(25)
  • spring cloud steam 整合kafka 進(jìn)行消息發(fā)送與接收

    spring cloud steam : Binder和Binding Binder是SpringCloud Stream的一個(gè)抽象概念,是應(yīng)用與消息中間件之間的粘合劑,目前SpringCloud Stream實(shí)現(xiàn)了Kafka和RabbitMQ的binder Binder可以生成Binding,Binding用來(lái)綁定消息容器的生產(chǎn)者和消費(fèi)者,它有兩種類型,INPUT和OUTPUT,INPUT對(duì)應(yīng)于消費(fèi)者,OUTPUT對(duì)應(yīng)于

    2024年02月10日
    瀏覽(23)
  • 使用 Spring Kafka 進(jìn)行非阻塞重試的集成測(cè)試

    ?Kafka的非阻塞重試是通過(guò)為主題配置重試主題來(lái)實(shí)現(xiàn)的。如果需要,還可以配置額外的死信主題。如果所有重試都耗盡,事件將被轉(zhuǎn)發(fā)到DLT。在公共領(lǐng)域中有很多資源可用于了解技術(shù)細(xì)節(jié)。對(duì)于代碼中的重試機(jī)制編寫集成測(cè)試確實(shí)是一項(xiàng)具有挑戰(zhàn)性的工作。以下是一些測(cè)試

    2024年02月10日
    瀏覽(20)
  • kafka消息監(jiān)聽(tīng)

    kafka消息監(jiān)聽(tīng)

    1, spring配置kafka網(wǎng)址 2,listener groupId表示分組,不同組的消費(fèi)者不是競(jìng)爭(zhēng)關(guān)系 3, 這段代碼使用了Spring Kafka提供的注解 @KafkaListener 來(lái)定義一個(gè) Kafka消費(fèi)者 。具體的配置如下: groupId = \\\"order-service-2\\\" :指定該消費(fèi)者所屬的消費(fèi)者組ID,即\\\"order-service-2\\\"。 topicPartitions :表示要訂

    2024年02月15日
    瀏覽(12)
  • 07、Kafka ------ 消息生產(chǎn)者(演示 發(fā)送消息) 和 消息消費(fèi)者(演示 監(jiān)聽(tīng)消息)

    07、Kafka ------ 消息生產(chǎn)者(演示 發(fā)送消息) 和 消息消費(fèi)者(演示 監(jiān)聽(tīng)消息)

    簡(jiǎn)單來(lái)說(shuō),就是一個(gè)數(shù)據(jù)項(xiàng)。 ▲ 消息就是 Kafka 所記錄的數(shù)據(jù)節(jié)點(diǎn),消息在 Kafka 中又被稱為記錄(record)或事件(event)。 從存儲(chǔ)上來(lái)看,消息就是存儲(chǔ)在分區(qū)文件(有點(diǎn)類似于List)中的一個(gè)數(shù)據(jù)項(xiàng),消息具有 key、value、時(shí)間戳 和 可選的元數(shù)據(jù)頭。 ▲ 下面是一個(gè)示例事件

    2024年01月20日
    瀏覽(46)
  • Kafka:springboot集成kafka收發(fā)消息

    Kafka:springboot集成kafka收發(fā)消息

    kafka環(huán)境搭建參考Kafka:安裝和配置_moreCalm的博客-CSDN博客 1、springboot中引入kafka依賴 2、配置application.yml 傳遞String類型的消息 3、controller實(shí)現(xiàn)消息發(fā)送接口 4、component中實(shí)現(xiàn)接收類HelloListener? 5、測(cè)試 瀏覽器訪問(wèn)該接口并查看控制臺(tái) ? ? ? ? 接收成功 ? 傳遞對(duì)象類型的消息

    2024年02月13日
    瀏覽(26)
  • Java集成消息隊(duì)列Kafka

    在使用Maven構(gòu)建Java項(xiàng)目時(shí),你可以通過(guò)添加Kafka的Maven依賴來(lái)引入Kafka相關(guān)的庫(kù)。下面是Kafka的Maven坐標(biāo): 將上述依賴坐標(biāo)添加到你的項(xiàng)目的pom.xml文件中,即可下載并引入Kafka客戶端庫(kù)。請(qǐng)注意,版本號(hào)可能會(huì)有所不同,你可以根據(jù)自己的需求選擇最合適的版本。 另外,如果你

    2024年01月18日
    瀏覽(35)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包