說明
Pulsar 是一種用于服務(wù)器到服務(wù)器消息傳遞的多租戶高性能解決方案。
Pulsar 的主要特性如下:
對 Pulsar 實例中的多個集群的本機支持,并跨集群無縫地復(fù)制消息。
極低的發(fā)布和端到端延遲。
無縫可擴展至超過一百萬個主題。
一個簡單的客戶端 API,具有Java、Go、Python和C++的綁定。
主題的多種訂閱類型(獨占、共享和故障轉(zhuǎn)移)。
通過Apache BookKeeper提供的持久消息存儲來保證消息傳遞。無服務(wù)器輕量級計算框架Pulsar Functions提供流原生數(shù)據(jù)處理功能。
基于 Pulsar Functions 構(gòu)建的無服務(wù)器連接器框架Pulsar IO可以更輕松地將數(shù)據(jù)移入和移出 Apache Pulsar。
當(dāng)數(shù)據(jù)老化時,分層存儲將數(shù)據(jù)從熱/溫存儲卸載到冷/長期存儲(例如S3和GCS)。
安裝包下載
本文使用的是apache-pulsar-3.2.2-bin.tar.gz版本
csdn下載?也可以自行去官網(wǎng)下載
解壓目錄
tar -zxvf apache-pulsar-3.2.2-bin.tar.gz
目錄說明
目錄 | 描述 |
---|---|
bin | 入口pulsar點腳本和許多其他命令行工具 |
conf | 配置文件,包括broker.conf
|
lib | Pulsar 使用的 JAR |
examples | Pulsar 函數(shù)示例 |
instances | Pulsar 函數(shù)的工件 |
啟動Pulsar
bin/pulsar standalone
注意:需要保證jdk在17+
創(chuàng)建Topic
創(chuàng)建一個名為my-topic的topic
bin/pulsar-admin topics create persistent://public/default/my-topic
生產(chǎn)者發(fā)送消息
bin/pulsar-client produce my-topic --messages 'Hello Pulsar!'
消費者消費消息
測試批量發(fā)送消息
bin/pulsar-client produce my-topic --messages "$(seq -s, -f 'Message NO.%g' 1 10)"
重新消費
bin/pulsar-client consume my-topic -s 'my-subscription' -p Earliest -n 0
java生產(chǎn)消息
pom.xml
<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>3.2.2</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.14.2</version> </dependency>
代碼文章來源:http://www.zghlxwxcb.cn/news/detail-850497.html
package com.pulsar.demo; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; public class PulsarProducer { private static final Logger log = LoggerFactory.getLogger(PulsarProducer.class); private static final String SERVER_URL = "pulsar://192.168.xxx:6650"; public static void main(String[] args) throws Exception { // 構(gòu)造Pulsar Client PulsarClient client = PulsarClient.builder() .serviceUrl(SERVER_URL) .enableTcpNoDelay(true) .build(); // 構(gòu)造生產(chǎn)者 Producer<String> producer = client.newProducer(Schema.STRING) .producerName("my-producer") .topic("my-topic") .batchingMaxMessages(1024) .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) .enableBatching(true) .blockIfQueueFull(true) .maxPendingMessages(512) .sendTimeout(10, TimeUnit.SECONDS) .blockIfQueueFull(true) .create(); // 同步發(fā)送消息 MessageId messageId = producer.send("Hello World"); log.info("message id is {}",messageId); System.out.println(messageId.toString()); // 異步發(fā)送消息 CompletableFuture<MessageId> asyncMessageId = producer.sendAsync("This is a async message"); // 阻塞線程,直到返回結(jié)果 log.info("async message id is {}",asyncMessageId.get()); producer.close(); // 關(guān)閉licent的方式有兩種,同步和異步 // client.close(); client.closeAsync(); } }
java消費消息
package com.pulsar.demo; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; import java.util.concurrent.TimeUnit; public class PulsarConsumer { private static final String SERVER_URL = "pulsar://192.168.xxx:6650"; private static final String topic = "persistent://public/default/my-topic"; // 要訂閱的topic public static void main(String[] args) throws Exception { // 構(gòu)造Pulsar Client PulsarClient client = PulsarClient.builder() .serviceUrl(SERVER_URL) .enableTcpNoDelay(true) .build(); Consumer consumer = client.newConsumer() .consumerName("my-consumer") .topic("my-topic") .subscriptionName("my-subscription") .ackTimeout(10, TimeUnit.SECONDS) .maxTotalReceiverQueueSizeAcrossPartitions(10) .subscriptionType(SubscriptionType.Exclusive) .subscribe(); while (true) { Message msg = consumer.receive(); try { System.out.printf("Message received: %s\n", new String(msg.getData())); consumer.acknowledge(msg); } catch (Exception e) { consumer.negativeAcknowledge(msg); } } } }
停止Pulsar
完成后,您可以關(guān)閉 Pulsar 集群。在啟動集群的終端窗口中按Ctrl-C 。文章來源地址http://www.zghlxwxcb.cn/news/detail-850497.html
到了這里,關(guān)于Apache-Pulsar安裝操作說明的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!