国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

11_Pulsar Adaptors適配器、kafka適配器、Spark適配器

這篇具有很好參考價值的文章主要介紹了11_Pulsar Adaptors適配器、kafka適配器、Spark適配器。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

2.3. Pulsar Adaptors適配器
2.3.1.kafka適配器
2.3.2.Spark適配器

2.3. Pulsar Adaptors適配器

2.3.1.kafka適配器

Pulsar 為使用 Apache Kafka Java 客戶端 API 編寫的應(yīng)用程序提供了一個簡單的解決方案。
在生產(chǎn)者中, 如果想不改變原有kafka的代碼架構(gòu), 就切換到Pulsar的平臺中, 那么Pulsar adaptor on kafka就變的非常的有用了, 它可以幫助我們在不改變原有kafka的代碼基礎(chǔ)上, 即可接入pulsar, 但是需要注意, 相關(guān)配置信息需要進行一些調(diào)整, 例如: 地址與topic

  • 1- 需要導入Pulsar集成kafka的依賴包, 刪除掉原有Kafka-client包
<dependency> 
     <groupId>org.apache.pulsar</groupId> 
     <artifactId>pulsar-client-kafka</artifactId> 
     <version>2.8.0</version> 
</dependency>

注: 目前Pulsar并在Maven中央倉庫中并沒有提供Pulsar-client-kafka 2.8.1的包, 故此處導入2.8.0

  • 2-編寫生產(chǎn)者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaAdaptorProducer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1. 創(chuàng)建kafka生產(chǎn)者的核心類對象: KafkaProducer
        // 1.1: 創(chuàng)建生產(chǎn)者配置對象: 設(shè)置相關(guān)配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650");
        // 消息的確認方案
        props.put("acks", "all");
        // key序列化類型
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value 序列化類型
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props); 
        //2. 發(fā)送數(shù)據(jù) 
        for (int i = 0; i < 10; i++) { 
            //2.1: 創(chuàng)建 生產(chǎn)者數(shù)據(jù)承載對象 一個對象代表是一條消息數(shù)據(jù)
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("persistent://public/default/txn_t1",Integer.toString(i), Integer.toString(i)); 
            producer.send(producerRecord).get(); 
        }
        
        //3. 釋放資源 
        producer.close();
    }

}
  • 3-編寫消費者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaAdaptorConsumer {
    public static void main(String[] args) {
        //1. 創(chuàng)建kafka的消費者的核心對象: KafkaConsumer
        //1.1: 創(chuàng)建消費者配置對象, 并設(shè)置相關(guān)的參數(shù):
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650");
        //消費者組的 id
        props.setProperty("group.id", "test");
        //是否啟動消費者自動提交消費偏移量
        props.setProperty("enable.auto.commit", "true");
        //每間隔多長時間提交一次偏移量:單位 毫秒
        props.setProperty("auto.commit.interval.ms","1000");
        //key 反序列化
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //val 發(fā)序列化
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //2. 給消費者設(shè)置訂閱topic:
        consumer.subscribe(Arrays.asList("persistent://public/default/txn_t1"));
        //3. 循環(huán)獲取相關(guān)的消息數(shù)據(jù)
        while (true) {
            //3.1: 從kafka中獲取消息數(shù)據(jù): 參數(shù)表示等待超時時間
            //注意: 如果沒有獲取到數(shù)據(jù), 返回一個空集合對象, 如果數(shù)據(jù)集合中有多個 ConsumerRecord 對象
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            //3.2 遍歷ConsumerRecords 獲取每一個 ConsumerRecord 對象 : ConsumerRecord 消費者數(shù)據(jù)承載對象, 一個對象就是一條消息
            for (ConsumerRecord<String, String> record : records) {
                String massage = record.value();
                System.out.println("消息數(shù)據(jù)為:"+massage);
            }
        } 
    } 
}
  • 4- 先運行消費者, 進行監(jiān)聽, 然后運行生產(chǎn)者, 觀察消費者是否可以正常消費到數(shù)據(jù)
    11_Pulsar Adaptors適配器、kafka適配器、Spark適配器,# Apache Pulsar,pulsar

2.3.2.Spark適配器

Pulsar 的 Spark Streaming 接收器是一個自定義的接收器,它使用 Apache Spark Streaming 能夠從 Pulsar 接
收原始數(shù)據(jù)。

應(yīng)用程序可以通過 Spark Streaming receiver 接收 Resilient Distributed Dataset (RDD) 格式的數(shù)據(jù),并可
以通過多種方式對其進行處理。文章來源地址http://www.zghlxwxcb.cn/news/detail-634637.html

  • 1-導入相關(guān)的依賴包
<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-spark</artifactId>
    <version>2.8.0</version>
</dependency>
  • 2-編寫spark的流式代碼
String serviceUrl = "pulsar://localhost:6650/"; 
String topic = "persistent://public/default/test_src"; 
String subs = "test_sub"; 
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example"); 
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(60)); 
ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData(); 
Set<String> set = new HashSet<>(); 
set.add(topic); 
pulsarConf.setTopicNames(set); 
pulsarConf.setSubscriptionName(subs); 
SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver( 
serviceUrl, 
pulsarConf, 
new AuthenticationDisabled()); 
JavaReceiverInputDStream<byte[]> lineDStream = jsc.receiverStream(pulsarReceiver);

到了這里,關(guān)于11_Pulsar Adaptors適配器、kafka適配器、Spark適配器的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Windows11系統(tǒng)的電腦中出現(xiàn)網(wǎng)絡(luò)適配器VMnet1和VMnet8消失問題解決

    Windows11系統(tǒng)的電腦中出現(xiàn)網(wǎng)絡(luò)適配器VMnet1和VMnet8消失問題解決

    ? ? ? ??查看電腦中 高級網(wǎng)絡(luò)設(shè)置? 如下圖; ? ? ? ???如果這兩個網(wǎng)卡消失,可能是自己誤刪了這兩個網(wǎng)卡的原故(如下圖), 也可能有其他原因,可以通過以下方式解決: 1.點擊 Win窗口 ,在搜索框輸入 控制面板 2.打開 控制面板 ? 3.點擊? 卸載程序? 出現(xiàn)(如下圖),

    2024年02月10日
    瀏覽(44)
  • 網(wǎng)絡(luò)適配器是什么 網(wǎng)絡(luò)適配器有什么用

    網(wǎng)絡(luò)適配器是什么? 網(wǎng)絡(luò)適配器又稱網(wǎng)卡或網(wǎng)絡(luò)接口卡(NIC),英文名NetworkInterfaceCard。它是使計算機聯(lián)網(wǎng)的設(shè)備。平常所說的網(wǎng)卡就是將PC機和LAN連接的網(wǎng)絡(luò)適配器。網(wǎng)卡(NIC) 插在計算機主板插槽中,負責將用戶要傳遞的數(shù)據(jù)轉(zhuǎn)換為網(wǎng)絡(luò)上其它設(shè)備能夠識別的格式,通過

    2024年02月05日
    瀏覽(41)
  • 適配器模式:代理、適配器、橋接、裝飾,這四個模式有何區(qū)別?

    ????????關(guān)于適配器模式,今天我們主要學習它的兩種實現(xiàn)方式,類適配器和對象適配器,以及5種常見的應(yīng)用場景。同時,我還會通過剖析slf4j日志框架,來給你展示這個模式在真實項目中的應(yīng)用。除此之外,在文章的最后,我還對代理、橋接、裝飾器、適配器,這4種代

    2024年02月13日
    瀏覽(19)
  • 【C++】STL 算法 ⑩ ( 函數(shù)適配器 | 函數(shù)適配器概念 | 函數(shù)適配器分類 | 函數(shù)適配器輔助函數(shù) | std::bind2nd 函數(shù)原型及示例 | std::bind 函數(shù)原型及示例 )

    【C++】STL 算法 ⑩ ( 函數(shù)適配器 | 函數(shù)適配器概念 | 函數(shù)適配器分類 | 函數(shù)適配器輔助函數(shù) | std::bind2nd 函數(shù)原型及示例 | std::bind 函數(shù)原型及示例 )

    在 STL 中 預定義了很多 函數(shù)對象 , 如果要 對 函數(shù)對象 的 參數(shù) / 返回值 進行 計算 或 設(shè)置 , 可以 使用 \\\" 函數(shù)適配器 \\\" 實現(xiàn)上述需求 ; \\\" 函數(shù)適配器 \\\" 可以 將 已存在的 函數(shù)對象 轉(zhuǎn)化為 另一種符合要求的 函數(shù)對象 ; \\\" 函數(shù)適配器 \\\" 定義在 functional 頭文件 中 ; \\\" 函數(shù)適配器

    2024年02月02日
    瀏覽(59)
  • 網(wǎng)絡(luò)適配器沒有啟用tcp/ip服務(wù),WLAN 適配器的驅(qū)動程序可能出現(xiàn)問題

    網(wǎng)絡(luò)適配器沒有啟用tcp/ip服務(wù),WLAN 適配器的驅(qū)動程序可能出現(xiàn)問題

    筆記本抽風。登得上wifi和熱點,但網(wǎng)不能用,“無法訪問Internet” ? win10自帶的網(wǎng)絡(luò)診斷提示: “找到問題 WLAN 適配器的驅(qū)動程序可能出現(xiàn)問題 Windows 無法自動將 IP 協(xié)議堆棧綁定到網(wǎng)絡(luò)適配器。 未修復 無線網(wǎng)絡(luò) 適配器出現(xiàn)問題 已失敗 ” 試了試火絨的斷網(wǎng)修復,提示網(wǎng)絡(luò)

    2024年02月11日
    瀏覽(17)
  • 設(shè)計模式——適配器

    說起適配器,大家第一個想到的可能就是電源適配器。 電源適配器的作用想必同學們也都清楚,那就是將220伏高電壓轉(zhuǎn)換成想要的5伏至20伏左右穩(wěn)定的低電壓。 從某種程度上講,編程中經(jīng)常提起的適配器模式的原理與上面講到的基本是一致的。 用于將一個類的接口轉(zhuǎn)換成另

    2024年02月12日
    瀏覽(22)
  • 適配器模式介紹

    適配器模式介紹

    目錄 一、適配器模式介紹 1.1 適配器模式定義 1.2 適配器模式原理 1.2.1 適配器模式類圖 1.2.2 模式角色說明 二、適配器模式的應(yīng)用 2.1 類適配器模式 2.1.1 需求說明 2.1.2 需求實現(xiàn) 2.1.2.1 類圖 2.1.2.2 具體實現(xiàn) 2.1.2.2.1 SDCard接口 2.1.2.2.2 SDCardImpl實現(xiàn)類 2.1.2.2.3 TFCard接口 2.1.2.2.4 TFCard

    2024年01月17日
    瀏覽(20)
  • 適配器模式(Adapter)

    適配器模式(Adapter)

    適配器是一種 結(jié)構(gòu)型設(shè)計模式 ,它能使 接口不兼容的對象能夠相互合作 。 封裝器模式(Wrapper)。 1. 問題 假如你正在開發(fā)一款股票市場監(jiān)測程序,它會從 不同來源下載 XML 格式的股票數(shù)據(jù),然后向用戶呈現(xiàn)出美觀的圖表 。 在開發(fā)過程中, 你決定在程序中 整合一個第三方

    2024年02月11日
    瀏覽(17)
  • java  適配器模式

    java 適配器模式

    適配器模式(Adapter Pattern) 結(jié)構(gòu)型設(shè)計模式,見名知意,就是兩個不兼容的接口之間的橋梁。它結(jié)合了兩個獨立接口的功能。 主要解決: 常常要將一些\\\"現(xiàn)存的對象\\\"放到新的環(huán)境中,而新環(huán)境要求的接口是現(xiàn)對象不能滿足的。 關(guān)鍵代碼: 適配器繼承或依賴已有的對象,實現(xiàn)

    2024年04月16日
    瀏覽(22)
  • 適配器設(shè)計模式

    適配器設(shè)計模式

    一、適配器模式 B站:java架構(gòu)師 定義:適配器模式把一個類的接口變換成客戶端所期待的另一種接口,從而使原本因接口不匹配而無法在一起工作的兩個類能夠在一起工作 三種適配器:類的適配器模式、對象的適配器模式、接口的適配器模式 1.類適配器模式 實現(xiàn)方式:讓

    2024年02月11日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包