国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka 基礎(chǔ)整理、 Springboot 簡單整合

這篇具有很好參考價值的文章主要介紹了Kafka 基礎(chǔ)整理、 Springboot 簡單整合。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

定義:

  • Kafka 是一個分布式的基于發(fā)布/訂閱默認(rèn)的消息隊列
  • 是一個開源的分布式事件流平臺,被常用用于數(shù)據(jù)管道、流分析、數(shù)據(jù)集成、關(guān)鍵任務(wù)應(yīng)用

消費模式:

  • 點對點模式 (少用)
    消費者主動拉取數(shù)據(jù),消息收到后清除消息
    Kafka 基礎(chǔ)整理、 Springboot 簡單整合
  • 發(fā)布/訂閱模式
    生產(chǎn)者推送消息到隊列,都消費者訂閱各自所需的消息
    Kafka 基礎(chǔ)整理、 Springboot 簡單整合

基本概念:

  • Producer: 消息生產(chǎn)者
  • Consumer: 消費者
  • Consumer: Group 消費者組,消費者組id相同得消費者為一個消費者組;一個消費者也為一個消費者組去消費
  • Broker: kafka服務(wù)器
  • Topic :消息主題, 數(shù)據(jù)分類
  • Partition :分區(qū),一個Tpoic 有多個分區(qū)組成
  • Replica : 副本,每個分區(qū)對應(yīng)多個副本
  • Leader:副本里包含leader、follower ;生產(chǎn)以及消費只針對 leader

生產(chǎn)者發(fā)送流程:

  • producer -> send(producerRecord) -> interceprots 攔截器 -> Serializer 序列化器 -> Partitioner 分區(qū)器
  • 當(dāng)數(shù)據(jù)累積到 batch.size之后,sender才會發(fā)送數(shù)據(jù);默認(rèn)16k
  • 如果數(shù)據(jù)遲遲未達(dá)到batch.size , sender等待linger.ms設(shè)置的時間,到了之后就會發(fā)送數(shù)據(jù)。單位ms.默認(rèn)值 0ms,標(biāo)識沒有延遲
  • compression.type 數(shù)據(jù)壓縮方式
  • RecordAccumulator 緩沖區(qū)大小,默認(rèn)32m
  • 應(yīng)答模式ack
    • 0: 生產(chǎn)者發(fā)送數(shù)據(jù)后,不需要等待數(shù)據(jù)應(yīng)答
    • 1:生產(chǎn)者發(fā)送過來的數(shù)據(jù),Leader收到數(shù)據(jù)后應(yīng)答
    • 1:all leader與其它所有節(jié)點收齊數(shù)據(jù)后應(yīng)答Kafka 基礎(chǔ)整理、 Springboot 簡單整合

消費大概邏輯:

  • Kafka 基礎(chǔ)整理、 Springboot 簡單整合

消費者組(Consumer Group (CG)):

  • groupid相同的消費者形成一個消費者組
  • 消費者組內(nèi)每個消費者負(fù)責(zé)消費不同分區(qū)的數(shù)據(jù),一個分區(qū)只能由一個組內(nèi)的一個消費者消費
  • 消費者組之間互不影響
  • 當(dāng)消費者組的數(shù)量,大于分區(qū)數(shù),則會有閑置
  • coordinator:輔助實現(xiàn)消費者組的初始化分區(qū)的分配
    • 每個節(jié)點有個coordinator, 通過 groupid % 50,選擇出 coordinator 節(jié)點 50為 _consumer_offset 的分區(qū)數(shù)
    • 1%50 = 1 , _consumer_offset 的 號分區(qū)上的 coordinator 則為 leader
    • coordinator 再消費者組中隨機選擇一個 consumer 成為leader,由leader 制定消費計劃,讓后返回給 coordinator ,再由coordinator 來把消費技化 分配給其它消費者
    • coordinator 與消費者的心跳保持時間 3秒,45秒 超時 - 會移除消費者,觸發(fā)再平衡
    • 消費者消費時間過長,默認(rèn) 5分鐘 - 會移除消費者觸發(fā)再平衡

消費流程:

  • 創(chuàng)建消費者網(wǎng)絡(luò)連接客戶端 ConsumerNetworkClient,與kafka交互
  • 消費請求初始化:每批次最小抓取大小、數(shù)據(jù)未達(dá)到超時 時間 500ms 、抓取數(shù)據(jù)大小上限
  • 發(fā)送消費請求 -》onSuccess() 回調(diào), 拉取數(shù)據(jù) -》 按批次放入消息隊列
  • 消費者從 消息隊列每批次消費數(shù)據(jù) (500條) -》反序列化 -》攔截器 -》 處理數(shù)據(jù)
  • Kafka 基礎(chǔ)整理、 Springboot 簡單整合

消費計劃(分區(qū)分配策略)默認(rèn) Range + CooperativeSticky:

  • Range:針對每一個topic,對topic分區(qū)排序、消息者排序,通過分區(qū)數(shù) / 消費者數(shù),決定每個消息者消費幾個分區(qū),除不盡的前面的消費者多消費。 容易產(chǎn)生數(shù)據(jù)傾斜
    Kafka 基礎(chǔ)整理、 Springboot 簡單整合
  • RoundRobin:輪詢分區(qū)策略,針對所有topic ,把所有topic的分區(qū)和消費者列出來,按照hashcode進(jìn)行排序,通過輪詢算法把分區(qū)分配給消費者
  • Sticky :黏性 (執(zhí)行新的分配的時,盡量靠近上次的分配結(jié)果),首先回盡量的均勻,且隨機分配分區(qū)到消費者
  • CooperativeSticky:協(xié)作者黏性,Sticky 的策略相同,但支持合作式再平衡,消費者可以繼續(xù)從沒有被重新分配的分區(qū)消費

offset 位移: 是標(biāo)記消費消費位置

  • <0.9 : 是維護(hù)在 zookeeper中
  • 0.9 之后:offset 維護(hù)在一個內(nèi)置的 topic :_consumer_offsets 中
  • 采用 key - value 方式存儲數(shù)據(jù),key:groupid +topic + 分區(qū)號
  • offset 自動提交:默認(rèn)每5秒自動提交offset ,默認(rèn)就是 true
  • offset 手動提交:消費的時候,手動提交offset
    • 同步:等待提交offset成功,再消費下一條
    • 異步:不等待,直接消費,失敗后沒有重試機制
  • 指定offset 消費:
    • earliest: 自動將偏移量重置為最早的偏移量 --from-beginning
    • latest (默認(rèn)): 自動將偏離量充值為最新偏移量
    • nono: 如果未找到消費者組的先前偏移量,則向消費者拋出異常。
//設(shè)置自動提交offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
//自動提交時間 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"5000");
//offset 手動提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

KafkaConsumer kafkaConsumer = new KafkaConsumer<String,String>(properties);
//定義主題
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
//訂閱
kafkaConsumer.subscribe(topics);
while (true){
    ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    if (CollectionUtil.isNotEmpty(consumerRecords)){
        for (ConsumerRecord<String, String> record : consumerRecords) {
            System.out.println(record);
        }
    }
    //手動提交offset
    kafkaConsumer.commitAsync();
}

指定時間消費:

 //查詢對應(yīng)分區(qū)
 Set<TopicPartition> partitions = kafkaConsumer.assignment();

  //保證分區(qū)分配方案定制完畢
  while (partitions.size()==0){
      kafkaConsumer.poll(Duration.ofSeconds(1));
      partitions=kafkaConsumer.assignment();
  }
  //把時間轉(zhuǎn)換成對應(yīng)的 offset
  Map<TopicPartition,Long> map = new HashMap<>(6);
  Map<TopicPartition,Long> offsetmap = kafkaConsumer.offsetsForTimes(map);
  for (TopicPartition topicPartition : partitions) {
      //一天前
      offsetmap.put(topicPartition,System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
  }
  Map<TopicPartition, OffsetAndTimestamp> offsetsForTimeMap = kafkaConsumer.offsetsForTimes(offsetmap);
  for (TopicPartition partition : partitions) {
      OffsetAndTimestamp timestamp = offsetsForTimeMap.get(partition);
      kafkaConsumer.seek(partition,timestamp.offset());
  }

kafka 文件存儲機制 :

  • Topic 是邏輯上的概念,partition是物理上的概念, 每個partition對應(yīng)一個log文件。該log文件中存儲的就是 producer生產(chǎn)的數(shù)據(jù)
  • Producer生產(chǎn)的數(shù)據(jù),會被不斷追加到該log文件末端,為防止log文件過大導(dǎo)致數(shù)據(jù)定位效率低,kafka采取了分片和索引機制
  • 每個partition分為多個 segment,每個segment包含, .index .log .timeindex .snapshot 文件
  • 這些文件位于一個文件夾下,該文件夾命名規(guī)則為:topic名稱+分區(qū)序號 first-0
    Kafka 基礎(chǔ)整理、 Springboot 簡單整合
    Kafka 基礎(chǔ)整理、 Springboot 簡單整合
  • 稀疏索引:大約每往log文件寫入 4kb數(shù)據(jù),會往index文件寫入一條索引。
    • index文件中保存的 odffset是相對offset,這樣能確保 offset得值所占空間不會過大,因此能將offset得值控制在固定大小

文件清除、壓縮策略:

  • kafka 默認(rèn)日志保存時間為 7 天
  • 壓縮策略:compact,對應(yīng)相同key的value,只保留最新的一個版本。

kafka 高效讀寫:

  • Kafka 本身是分布式集群,可以采用分區(qū)技術(shù),并行度高
  • 讀數(shù)據(jù)采用稀疏索引,可以快速定位要消費得數(shù)據(jù)
  • 順序?qū)懘疟P,kafka得producer生產(chǎn)數(shù)據(jù),要寫入log文件中,寫得過程是一直追加到文件末端,為順序?qū)?/code>
  • 零拷貝: Kaka的數(shù)據(jù)加工處理操作交由Kaka生產(chǎn)者和Kaka消費者處理。Kaka Broker應(yīng)用層不關(guān)心存儲的數(shù)據(jù),所以就不用走應(yīng)用層,傳輸效率高。
  • 頁緩存: Kaka重度依賴底層操作系統(tǒng)提供的PageCache功能。當(dāng)上層有寫操作時,操作系統(tǒng)只是將數(shù)據(jù)寫PageCache。當(dāng)讀操作發(fā)生時,先從PageCache中查找,如果找不到,再去磁盤中讀取。實際上PageCache是把盡可能多的空閑內(nèi)存都當(dāng)做了磁盤緩存來使用。

常用腳本命名:

  • topic 相關(guān)命令
  • 查詢topic列表 :sh kafka-topics.sh --bootstrap-server localhost:9092 --list
  • 創(chuàng)建topic (名稱:first 分區(qū):1個 副本 3個)副本數(shù)量不能超過集群數(shù)量
    • sh kafka-topics.sh --bootstrap-server localhost:9092 --topic first --create --partitions 1 --replication-factor 3
  • topic 信息
    • sh kafka-topics.sh --bootstrap-server localhost:9092 --topic first --describe
  • 修改topic 分區(qū)數(shù)(只能增加)
    • sh kafka-topics.sh --bootstrap-server localhost:9092 --topic first --describe --partitions 3
  • 生產(chǎn)消息:
    • sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first
  • 消費消費:
    • sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
    • sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first --from-beginning

Spring boot 簡單整合:

<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
</dependency>
server:
  port: 8200

spring:
  mvc:
    pathmatch:
      matching-strategy: ant_path_matcher
  application:
    name: @artifactId@
  kafka:
    bootstrap-servers:
      - 192.168.1.250:32010
    # 生產(chǎn)配置
    producer:
      #序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        linger.ms: 10 #sender 等待事件
         #ssl認(rèn)證配置相關(guān)
#        sasl.mechanism: PLAIN
#        security.protocol: SASL_PLAINTEXT
#        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
      #緩存區(qū)大小 32m
      buffer-memory: 33554432
      #批次大小 16k
      batch-size: 16
      # ISR 全部應(yīng)答
      #acks: -1
      #事務(wù)ID前綴 ,配合 @Transactional ,保證多個消息的原子性
      #transaction-id-prefix: "transaction-id-xx"
    #消費配置
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #group-id: xiaoshu-1
      enable-auto-commit: false
      # 從最早消息開始消費,但是消費后,會記錄offset、相同 group-id不會再次消費 
      # offset 是針對每個消費者組
      auto-offset-reset: earliest
      #批量消費,每次最多消費多少條
      #max-poll-records: 50
       #ssl認(rèn)證配置相關(guān)
#      properties:
#        sasl.mechanism: PLAIN
#        security.protocol: SASL_PLAINTEXT
#        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";

    listener:
      # 手動調(diào)用Acknowledgment.acknowledge()后立即提交
      ack-mode: manual
      #批量消費,配合 @KafkaListener - batch="true"
      #type: batch

生產(chǎn):

	@Resource
    private KafkaTemplate<String,String> kafkaTemplate;
    //@Transactional(rollbackFor = RuntimeException.class),配合 ack配置 實現(xiàn)多條消息發(fā)送,原子性
    @ApiOperation(value = "推送消息到kafak")
    @GetMapping("/sendMsg")
    public String sendMsg(String topic,String msg){
        kafkaTemplate.send(topic,msg).addCallback(success -> {
            if (success==null){
                System.out.println("消息發(fā)送失敗");
                return;
            }
            // 消息發(fā)送到的topic
            String topicName = success.getRecordMetadata().topic();
            // 消息發(fā)送到的分區(qū)
            int partition = success.getRecordMetadata().partition();
            // 消息在分區(qū)內(nèi)的offset
            long offset = success.getRecordMetadata().offset();
            System.out.println("發(fā)送消息成功:" + topic + "-" + partition + "-" + offset);
        }, failure -> {
            System.out.println("發(fā)送消息失敗:" + failure.getMessage());
        });
        return "ok";
    }

消費:文章來源地址http://www.zghlxwxcb.cn/news/detail-434853.html

@Configuration
public class KafkaConsumer {

    private static final String TOPIC_DLT=".DLT";

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    /**
     * 每個分區(qū)由消費者組種得一個消費者消費,每個消費者獨立
     * 分區(qū) -》 消費 、2分區(qū)2個消費監(jiān)聽
     * @param record
     * @param consumer
     */
    @KafkaListener(groupId = "group-1", topicPartitions ={ @TopicPartition(topic = "four",partitions = {"0"})},batch = "false")
    public void consumerTopic1(ConsumerRecord<String, String> record, Consumer consumer){
        String value = record.value();
        String topic1 = record.topic();
        long offset = record.offset();
        int partition = record.partition();
        try {
            log.info("收到消息:"+value+"topic:"+topic1+"offset:"+offset+"分區(qū)"+partition);
            //TODO 異常,推送到 對應(yīng)死信 ↓
            //int i=1/0;
        } catch (Exception e) {
            System.out.println("commit failed");
            kafkaTemplate.send(topic1+TOPIC_DLT,value);
        } finally {
            consumer.commitAsync();
        }
    }

    @KafkaListener(groupId = "group-1", topicPartitions ={ @TopicPartition(topic = "four",partitions = {"1"})},batch = "false")
    public void consumerTopic2(ConsumerRecord<String, String> record, Consumer consumer){
        String value = record.value();
        String topic1 = record.topic();
        long offset = record.offset();
        int partition = record.partition();
        try {
            log.info("收到消息:"+value+"topic:"+topic1+"offset:"+offset+"分區(qū)"+partition);
            //TODO 異常,推送到 對應(yīng)死信 ↓
            //int i=1/0;
        } catch (Exception e) {
            System.out.println("commit failed");
            kafkaTemplate.send(topic1+TOPIC_DLT,value);
        } finally {
            consumer.commitAsync();
        }
    }

}

	/**
     * 監(jiān)聽 topic1 ->轉(zhuǎn)發(fā)到 topic2
     */
    @KafkaListener(topics = {"topic1"},groupId = "group-4")
    @SendTo("topic2")
    public String onMessage7(ConsumerRecord<?, ?> record) {
        return record.value()+"-轉(zhuǎn)發(fā)消息";
    }

    @KafkaListener(topics = {"topic2"},groupId = "group-5")
    public void onMessage8(ConsumerRecord<?, ?> record) {
        System.out.println("收到轉(zhuǎn)發(fā)消息"+record.value());
    }

到了這里,關(guān)于Kafka 基礎(chǔ)整理、 Springboot 簡單整合的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • SpringBoot整合Kafka

    SpringBoot整合Kafka

    官網(wǎng):Apache Kafka cmd進(jìn)入到kafka安裝目錄: 1:cmd啟動zookeeer .binwindowszookeeper-server-start.bat .configzookeeper.properties 2:cmd啟動kafka server .binwindowszookeeper-server-start.bat .configzookeeper.properties 3:使用cmd窗口啟動一個生產(chǎn)者命令: .binwindowskafka-console-producer.bat --bootstrap-server local

    2024年01月20日
    瀏覽(20)
  • springboot整合kafka入門

    springboot整合kafka入門

    producer: 生產(chǎn)者,負(fù)責(zé)發(fā)布消息到kafka cluster(kafka集群)中。生產(chǎn)者可以是web前端產(chǎn)生的page view,或者是服務(wù)器日志,系統(tǒng)CPU、memory等。 consumer: 消費者,每個consumer屬于一個特定的consuer group(可為每個consumer指定group name,若不指定group name則屬于默認(rèn)的group)。創(chuàng)建消費者時,

    2024年02月07日
    瀏覽(24)
  • Kafka 整合 SpringBoot

    1. 添加依賴 2. 配置文件 application.properties 3. 代碼實現(xiàn)發(fā)送消息 1. 添加依賴 2. 配置文件 applicatio.properties ?3. 代碼實現(xiàn)消費消息 首先在 kafka 節(jié)點上創(chuàng)建 topic: 打開kafka節(jié)點服務(wù)器的終端,輸入以下命令: 一些常用命令: 編寫測試代碼:ApplicationTests.java

    2024年02月07日
    瀏覽(23)
  • 【SpringBoot系列】SpringBoot整合Kafka(含源碼)

    【SpringBoot系列】SpringBoot整合Kafka(含源碼)

    前言 在現(xiàn)代的微服務(wù)架構(gòu)中,消息隊列已經(jīng)成為了一個不可或缺的組件。 它能夠幫助我們在不同的服務(wù)之間傳遞消息,并且能夠確保這些消息不會丟失。 在眾多的消息隊列中,Kafka 是一個非常出色的選擇。 它能夠處理大量的實時數(shù)據(jù),并且提供了強大的持久化能力。 在本

    2024年02月05日
    瀏覽(25)
  • 什么是kafka,如何學(xué)習(xí)kafka,整合SpringBoot

    什么是kafka,如何學(xué)習(xí)kafka,整合SpringBoot

    目錄 一、什么是Kafka,如何學(xué)習(xí) 二、如何整合SpringBoot 三、Kafka的優(yōu)勢 ? Kafka是一種分布式的消息隊列系統(tǒng),它可以用于處理大量實時數(shù)據(jù)流 。學(xué)習(xí)Kafka需要掌握如何安裝、配置和運行Kafka集群,以及如何使用Kafka API編寫生產(chǎn)者和消費者代碼來讀寫數(shù)據(jù)。此外,還需要了解Ka

    2024年02月10日
    瀏覽(26)
  • 實戰(zhàn):徹底搞定 SpringBoot 整合 Kafka

    kafka是一個消息隊列產(chǎn)品,基于Topic partitions的設(shè)計,能達(dá)到非常高的消息發(fā)送處理性能。Spring創(chuàng)建了一個項目Spring-kafka,封裝了Apache 的Kafka-client,用于在Spring項目里快速集成kafka。 除了簡單的收發(fā)消息外,Spring-kafka還提供了很多高級功能,下面我們就來一一探秘這些用法。

    2024年02月10日
    瀏覽(26)
  • Kafka入門(安裝和SpringBoot整合)

    Kafka入門(安裝和SpringBoot整合)

    app-tier:網(wǎng)絡(luò)名稱 –driver:網(wǎng)絡(luò)類型為bridge Kafka依賴zookeeper所以先安裝zookeeper -p:設(shè)置映射端口(默認(rèn)2181) -d:后臺啟動 安裝并運行Kafka, –name:容器名稱 -p:設(shè)置映射端口(默認(rèn)9092 ) -d:后臺啟動 ALLOW_PLAINTEXT_LISTENER任何人可以訪問 KAFKA_CFG_ZOOKEEPER_CONNECT鏈接的zookeeper K

    2024年02月11日
    瀏覽(21)
  • springboot整合ELK+kafka采集日志

    springboot整合ELK+kafka采集日志

    在分布式的項目中,各功能模塊產(chǎn)生的日志比較分散,同時為滿足性能要求,同一個微服務(wù)會集群化部署,當(dāng)某一次業(yè)務(wù)報錯后,如果不能確定產(chǎn)生的節(jié)點,那么只能逐個節(jié)點去查看日志文件;logback中RollingFileAppender,ConsoleAppender這類同步化記錄器也降低系統(tǒng)性能,綜上一些

    2024年02月15日
    瀏覽(23)
  • springboot整合kafka多數(shù)據(jù)源

    springboot整合kafka多數(shù)據(jù)源

    在很多與第三方公司對接的時候,或者處在不同的網(wǎng)絡(luò)環(huán)境下,比如在互聯(lián)網(wǎng)和政務(wù)外網(wǎng)的分布部署服務(wù)的時候,我們需要對接多臺kafka來達(dá)到我們的業(yè)務(wù)需求,那么當(dāng)kafka存在多數(shù)據(jù)源的情況,就與單機的情況有所不同。 單機的情況 如果是單機的kafka我們直接通過springboot自

    2024年02月13日
    瀏覽(28)
  • SpringBoot-Learning系列之Kafka整合

    SpringBoot-Learning系列之Kafka整合

    SpringBoot-Learning系列之Kafka整合 本系列是一個獨立的SpringBoot學(xué)習(xí)系列,本著 What Why How 的思想去整合Java開發(fā)領(lǐng)域各種組件。 消息系統(tǒng) 主要應(yīng)用場景 流量消峰(秒殺 搶購)、應(yīng)用解耦(核心業(yè)務(wù)與非核心業(yè)務(wù)之間的解耦) 異步處理、順序處理 實時數(shù)據(jù)傳輸管道 異構(gòu)語言架構(gòu)系

    2024年02月09日
    瀏覽(18)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包