2.3. Pulsar Adaptors適配器
2.3.1.kafka適配器
2.3.2.Spark適配器
2.3. Pulsar Adaptors適配器
2.3.1.kafka適配器
Pulsar 為使用 Apache Kafka Java 客戶端 API 編寫的應(yīng)用程序提供了一個簡單的解決方案。
在生產(chǎn)者中, 如果想不改變原有kafka的代碼架構(gòu), 就切換到Pulsar的平臺中, 那么Pulsar adaptor on kafka就變的非常的有用了, 它可以幫助我們在不改變原有kafka的代碼基礎(chǔ)上, 即可接入pulsar, 但是需要注意, 相關(guān)配置信息需要進行一些調(diào)整, 例如: 地址與topic
- 1- 需要導入Pulsar集成kafka的依賴包, 刪除掉原有Kafka-client包
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-kafka</artifactId>
<version>2.8.0</version>
</dependency>
注: 目前Pulsar并在Maven中央倉庫中并沒有提供Pulsar-client-kafka 2.8.1的包, 故此處導入2.8.0
- 2-編寫生產(chǎn)者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaAdaptorProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1. 創(chuàng)建kafka生產(chǎn)者的核心類對象: KafkaProducer
// 1.1: 創(chuàng)建生產(chǎn)者配置對象: 設(shè)置相關(guān)配置
Properties props = new Properties();
props.put("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650");
// 消息的確認方案
props.put("acks", "all");
// key序列化類型
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value 序列化類型
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
//2. 發(fā)送數(shù)據(jù)
for (int i = 0; i < 10; i++) {
//2.1: 創(chuàng)建 生產(chǎn)者數(shù)據(jù)承載對象 一個對象代表是一條消息數(shù)據(jù)
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("persistent://public/default/txn_t1",Integer.toString(i), Integer.toString(i));
producer.send(producerRecord).get();
}
//3. 釋放資源
producer.close();
}
}
- 3-編寫消費者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaAdaptorConsumer {
public static void main(String[] args) {
//1. 創(chuàng)建kafka的消費者的核心對象: KafkaConsumer
//1.1: 創(chuàng)建消費者配置對象, 并設(shè)置相關(guān)的參數(shù):
Properties props = new Properties();
props.setProperty("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650");
//消費者組的 id
props.setProperty("group.id", "test");
//是否啟動消費者自動提交消費偏移量
props.setProperty("enable.auto.commit", "true");
//每間隔多長時間提交一次偏移量:單位 毫秒
props.setProperty("auto.commit.interval.ms","1000");
//key 反序列化
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//val 發(fā)序列化
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//2. 給消費者設(shè)置訂閱topic:
consumer.subscribe(Arrays.asList("persistent://public/default/txn_t1"));
//3. 循環(huán)獲取相關(guān)的消息數(shù)據(jù)
while (true) {
//3.1: 從kafka中獲取消息數(shù)據(jù): 參數(shù)表示等待超時時間
//注意: 如果沒有獲取到數(shù)據(jù), 返回一個空集合對象, 如果數(shù)據(jù)集合中有多個 ConsumerRecord 對象
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
//3.2 遍歷ConsumerRecords 獲取每一個 ConsumerRecord 對象 : ConsumerRecord 消費者數(shù)據(jù)承載對象, 一個對象就是一條消息
for (ConsumerRecord<String, String> record : records) {
String massage = record.value();
System.out.println("消息數(shù)據(jù)為:"+massage);
}
}
}
}
- 4- 先運行消費者, 進行監(jiān)聽, 然后運行生產(chǎn)者, 觀察消費者是否可以正常消費到數(shù)據(jù)
2.3.2.Spark適配器
Pulsar 的 Spark Streaming 接收器是一個自定義的接收器,它使用 Apache Spark Streaming 能夠從 Pulsar 接
收原始數(shù)據(jù)。文章來源:http://www.zghlxwxcb.cn/news/detail-634637.html
應(yīng)用程序可以通過 Spark Streaming receiver 接收 Resilient Distributed Dataset (RDD) 格式的數(shù)據(jù),并可
以通過多種方式對其進行處理。文章來源地址http://www.zghlxwxcb.cn/news/detail-634637.html
- 1-導入相關(guān)的依賴包
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-spark</artifactId>
<version>2.8.0</version>
</dependency>
- 2-編寫spark的流式代碼
String serviceUrl = "pulsar://localhost:6650/";
String topic = "persistent://public/default/test_src";
String subs = "test_sub";
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(60));
ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData();
Set<String> set = new HashSet<>();
set.add(topic);
pulsarConf.setTopicNames(set);
pulsarConf.setSubscriptionName(subs);
SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(
serviceUrl,
pulsarConf,
new AuthenticationDisabled());
JavaReceiverInputDStream<byte[]> lineDStream = jsc.receiverStream(pulsarReceiver);
到了這里,關(guān)于11_Pulsar Adaptors適配器、kafka適配器、Spark適配器的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!