1. Kafka 生產(chǎn)者
不管是把Kafka作為消息隊列、消息總線還是數(shù)據(jù)存儲平臺,總是需要一個可以往Kafka寫入數(shù)據(jù)的生產(chǎn)者、一個可以從Kafka讀取數(shù)據(jù)的消費者,或者一個兼具兩種角色的應用程序。
Kafka 生產(chǎn)者是指使用 Apache Kafka 消息系統(tǒng)的應用程序,它們負責將消息發(fā)送到 Kafka 集群中的一個或多個主題(topic)。生產(chǎn)者可以將消息發(fā)送到指定的主題,也可以根據(jù)分區(qū)策略將消息發(fā)送到多個分區(qū)中。生產(chǎn)者可以以異步或同步方式發(fā)送消息,并且可以配置消息的可靠性和持久性等屬性。在 Kafka 中,生產(chǎn)者是消息的源頭,它們將消息發(fā)送到 Kafka 集群中,供消費者消費。
2. kafaka 命令行操作
① 啟動 Zookeeper 集群:
[root@master01 bin]# pwd
/root/ch/soft/zk/zk-01/bin
[root@master01 bin]# ./zkServer.sh start
[root@master01 bin]# pwd
/root/ch/soft/zk/zk-02/bin
[root@master01 bin]# ./zkServer.sh start
[root@master01 bin]# pwd
/root/ch/soft/zk/zk-03/bin
[root@master01 bin]# ./zkServer.sh start
② 啟動 kafka 集群:
[root@master01 kafka01]# pwd
/root/ch/soft/kafka/kafka01
[root@master01 kafka01]# bin/kafka-server-start.sh config/server.properties
[root@master01 kafka02]# pwd
/root/ch/soft/kafka/kafka02
[root@master01 kafka02]# bin/kafka-server-start.sh config/server.properties
[root@master01 kafka03]# pwd
/root/ch/soft/kafka/kafka03
[root@master01 kafka03]# bin/kafka-server-start.sh config/server.properties
③ 創(chuàng)建主題 test:
[root@master01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --create --partitions 3 --replication-factor 2 --topic test
Created topic test.
[root@master01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --describe --topic test
Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: test Partition: 1 Leader: 0 Replicas: 0,2 Isr: 0,2
Topic: test Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0
④ 生產(chǎn)者發(fā)送消息到主題test:
[root@master01 kafka01]# bin/kafka-console-producer.sh --broker-list 10.65.132.2:9093 --topic test
>hello
>你好,kafka!
>
⑤ 消費者消費主題test的消息:
[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning
hello
你好,kafka!
3. kafka 生產(chǎn)者發(fā)送消息流程
先從創(chuàng)建一個ProducerRecord對象開始,其中需要包含目標主題和要發(fā)送的內(nèi)容。另外,還可以指定鍵、分區(qū)、時間戳或標頭。在發(fā)送ProducerRecord對象時,生產(chǎn)者需要先把鍵和值對象序列化成字節(jié)數(shù)組,這樣才能在網(wǎng)絡(luò)上傳輸。
接下來,如果沒有顯式地指定分區(qū),那么數(shù)據(jù)將被傳給分區(qū)器。分區(qū)器通常會基于ProducerRecord對象的鍵選擇一個分區(qū)。選好分區(qū)以后,生產(chǎn)者就知道該往哪個主題和分區(qū)發(fā)送這條消息了。緊接著,該消息會被添加到一個消息批次里,這個批次里的所有消息都將被發(fā)送給同一個主題和分區(qū)。有一個獨立的線程負責把這些消息批次發(fā)送給目標broker。
broker在收到這些消息時會返回一個響應。如果消息寫入成功,就返回一個RecordMetaData對象,其中包含了主題和分區(qū)信息,以及消息在分區(qū)中的偏移量。如果消息寫入失敗,則會返回一個錯誤。生產(chǎn)者在收到錯誤之后會嘗試重新發(fā)送消息,重試幾次之后如果還是失敗,則會放棄重試,并返回錯誤信息。
4. Kafka 生產(chǎn)者的創(chuàng)建
要向Kafka寫入消息,首先需要創(chuàng)建一個生產(chǎn)者對象,并設(shè)置一些屬性。Kafka生產(chǎn)者有3個必須設(shè)置的屬性。
① bootstrap.servers
broker的地址。可以由多個host:port組成,生產(chǎn)者用它們來建立初始的Kafka集群連接。它不需要包含所有的broker地址,因為生產(chǎn)者在建立初始連接之后可以從給定的broker那里找到其他broker的信息。不過還是建議至少提供兩個broker地址,因為一旦其中一個停機,則生產(chǎn)者仍然可以連接到集群。
② key.serializer
一個類名,用來序列化消息的鍵。broker 希望接收到的消息的鍵和值都是字節(jié)數(shù)組。生產(chǎn)者可以把任意Java對象作為鍵和值發(fā)送給broker,但它需要知道如何把這些Java對象轉(zhuǎn)換成字節(jié)數(shù)組。key.serializer 必須被設(shè)置為一個實現(xiàn)了 org.apache.kafka.common.serialization.Serializer 接口的類,生產(chǎn)者會用這個類把鍵序列化成字節(jié)數(shù)組。Kafka客戶端默認提供了ByteArraySerializer、StringSerializer和IntegerSerializer等,如果你只使用常見的幾種Java對象類型,就沒有必要實現(xiàn)自己的序列化器。
需要注意的是,必須設(shè)置key.serializer這個屬性,盡管你可能只需要將值發(fā)送給Kafka。如果只需要發(fā)送值,則可以將Void作為鍵的類型,然后將這個屬性設(shè)置為VoidSerializer。
③ value.serializer
一個類名,用來序列化消息的值。與設(shè)置key.serializer屬性一樣,需要將value.serializer設(shè)置成可以序列化消息值對象的類。
public class CustomProducer01 {
public static void main(String[] args) {
// kafka生產(chǎn)者屬性配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 創(chuàng)建kafka生產(chǎn)者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
}
}
5. Kafka 生產(chǎn)者發(fā)送消息
實例化好生產(chǎn)者對象后,接下來就可以開始發(fā)送消息了。KafkaProducer 的 send() 方法用于向 Kafka 集群發(fā)送消息。該方法的語法如下:
public interface Producer<K, V> extends Closeable {
Future<RecordMetadata> send(ProducerRecord<K, V> record);
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
}
其中,ProducerRecord<K, V> 表示要發(fā)送的消息記錄,K 和 V 分別表示鍵和值的類型。send() 方法返回一個 Future 對象,表示異步發(fā)送消息的結(jié)果。
發(fā)送消息主要有以下3種方式:
① 發(fā)送并忘記
把消息發(fā)送給服務(wù)器,但并不關(guān)心它是否成功送達。大多數(shù)情況下,消息可以成功送達,因為Kafka是高可用的,而且生產(chǎn)者有自動嘗試重發(fā)的機制。但是,如果發(fā)生了不可重試的錯誤或超時,那么消息將會丟失,應用程序?qū)⒉粫盏饺魏涡畔⒒虍惓!?/p>
② 同步發(fā)送
一般來說,生產(chǎn)者是異步的——我們調(diào)用send()方法發(fā)送消息,它會返回一個Future對象??梢哉{(diào)用get()方法等待Future完成,這樣就可以在發(fā)送下一條消息之前知道當前消息是否發(fā)送成功。
③ 異步發(fā)送
調(diào)用send()方法,并指定一個回調(diào)函數(shù),當服務(wù)器返回響應時,這個函數(shù)會被觸發(fā)。
1. 發(fā)送即忘記
發(fā)送即忘記,生產(chǎn)者發(fā)送消息后不會等待服務(wù)器的響應,直接發(fā)送下一條消息。它只管往Kafka中發(fā)送消息而并不關(guān)心消息是否正確到達。在大多數(shù)情況下,這種發(fā)送方式?jīng)]有什么問題,不過在某些時候(比如發(fā)生不可重試異常時)會造成消息的丟失。這種發(fā)送方式的性能最高,可靠性也最差。
public class CustomProducer01 {
private static final String brokerList "10.65.132.2:9093";
private static final String topic = "test";
public static Properties initConfig(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return properties;
}
public static void main(String[] args) {
// kafka生產(chǎn)者屬性配置
Properties properties = initConfig();
// kafka生產(chǎn)者發(fā)送消息,默認是異步發(fā)送方式
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, "你好,kafka!");
try{
// 發(fā)送消息
kafkaProducer.send(producerRecord);
}catch (Exception e){
e.printStackTrace();
}
// 關(guān)閉資源
kafkaProducer.close();
}
}
cmd命令行窗口開啟 kafka 消息者,觀察消費者是否接收到消息:
[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning
你好,kafka!
2. 同步發(fā)送
同步發(fā)送消息很簡單,當Kafka返回錯誤或重試次數(shù)達到上限時,生產(chǎn)者可以捕獲到異常。這里需要考慮性能問題。根據(jù)Kafka集群繁忙程度的不同,broker可能需要2毫秒或更長的時間來響應請求。如果采用同步發(fā)送方式,那么發(fā)送線程在這段時間內(nèi)就只能等待,什么也不做,甚至都不發(fā)送其他消息,這將導致糟糕的性能。因此,同步發(fā)送方式通常不會被用在生產(chǎn)環(huán)境中(但會經(jīng)常被用在示例代碼中)。
send() 方法本身就是異步的,send() 方法返回的Future對象可以使調(diào)用方稍后獲得發(fā)送的結(jié)果。在執(zhí)行send() 方法之后可以調(diào)用 get() 方法來阻塞等待Kafka的響應,直到消息發(fā)送成功,或者發(fā)生異常。如果發(fā)生異常,那么就需要捕獲異常并交由外層邏輯處理。
Future 接口源碼:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
Future接口是Java中用于表示異步計算結(jié)果的接口。它定義了一些方法,用于查詢異步計算是否完成、獲取計算結(jié)果等操作。
- cancel方法用于取消異步計算;
- isCancelled方法用于判斷異步計算是否已經(jīng)被取消;
- isDone方法用于判斷異步計算是否已經(jīng)完成。
- get方法用于獲取異步計算的結(jié)果,如果計算還沒有完成,則該方法會阻塞直到計算完成。如果計算被取消,則該方法會拋出CancellationException異常。如果計算拋出異常,則該方法會拋出ExecutionException異常。
- get(long timeout, TimeUnit unit)方法與get方法類似,但是它會在指定的時間內(nèi)等待計算完成,如果超時則會拋出TimeoutException異常。
Future 表示一個任務(wù)的生命周期,并提供了相應的方法來判斷任務(wù)是否已經(jīng)完成或取消,以及獲取任務(wù)的結(jié)果和取消任務(wù)等。既然KafkaProducer.send() 方法的返回值是一個Future類型的對象,那么完全可以用Java語言層面的技巧來豐富應用的實現(xiàn),比如使用Future中的 get(long timeout,TimeUnit unit)方法實現(xiàn)可超時的阻塞。
public class CustomProducer01 {
private static final String brokerList = "10.65.132.2:9093";
private static final String topic = "test";
public static Properties initConfig(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return properties;
}
public static void main(String[] args) {
// kafka生產(chǎn)者屬性配置
Properties properties = initConfig();
// kafka生產(chǎn)者發(fā)送消息,默認是異步發(fā)送方式
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, "你好,kafka,同步發(fā)送!");
try{
// 發(fā)送消息
Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
// 獲取異步計算的結(jié)果,如果計算還沒有完成,則該方法會阻塞直到計算完成
RecordMetadata recordMetadata = future.get();
System.out.println("metadata.topic() = " + recordMetadata.topic());
}catch (Exception e){
e.printStackTrace();
}
// 關(guān)閉資源
kafkaProducer.close();
}
}
[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning
你好,kafka!
你好,kafka,同步發(fā)送!
調(diào)用Future.get()方法等待Kafka響應。如果消息沒有發(fā)送成功,那么這個方法將拋出一個異常。如果沒有發(fā)生錯誤,那么我們將得到一個RecordMetadata對象,并能從中獲取消息的偏移量和其他元數(shù)據(jù)。
KafkaProducer一般會出現(xiàn)兩種錯誤。一種是可重試錯誤,這種錯誤可以通過重發(fā)消息來解決。例如,對于連接錯誤,只要再次建立連接就可以解決。對于“not leader for partition”(非分區(qū)首領(lǐng))錯誤,只要重新為分區(qū)選舉首領(lǐng)就可以解決,此時元數(shù)據(jù)也會被刷新??梢酝ㄟ^配置啟用KafkaProducer的自動重試機制。如果在多次重試后仍無法解決問題,則應用程序會收到重試異常。另一種錯誤則無法通過重試解決,比如“Message size too large”(消息太大)。對于這種錯誤,KafkaProducer不會進行任何重試,而會立即拋出異常。
3. 異步發(fā)送
假設(shè)一條消息在應用程序和Kafka集群之間往返需要10毫秒。如果在發(fā)送完每條消息后都需要等待響應,那么發(fā)送100條消息將需要1秒。如果只發(fā)送消息但不需要等待響應,那么發(fā)送100條消息所需要的時間就會少很多。大多數(shù)時候,并不需要等待響應——盡管Kafka會把消息的目標主題、分區(qū)信息和偏移量返回給客戶端,但對客戶端應用程序來說可能不是必需的。不過,當消息發(fā)送失敗,需要拋出異常、記錄錯誤日志或者把消息寫入“錯誤消息”文件以便日后分析診斷時,就需要用到這些信息了。為了能夠在異步發(fā)送消息時處理異常情況,生產(chǎn)者提供了回調(diào)機制。
生產(chǎn)者發(fā)送消息后不會等待服務(wù)器的響應,而是通過回調(diào)函數(shù)來處理服務(wù)器的響應?;卣{(diào)函數(shù)會在 producer 收到 ack 時調(diào)用,該方法有兩個參數(shù),分別是元數(shù)據(jù)信息(RecordMetadata)和異常信息(Exception),如果 Exception 為 null,說明消息發(fā)送成功,如果 Exception 不為 null,說明消息發(fā)送失敗。
注意:消息發(fā)送失敗會自動重試,不需要我們在回調(diào)函數(shù)中手動重試。
public class CustomProducer01 {
private static final String brokerList = "10.65.132.2:9093";
private static final String topic = "test";
public static Properties initConfig(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return properties;
}
public static void main(String[] args) {
// kafka生產(chǎn)者屬性配置
Properties properties = initConfig();
// kafka生產(chǎn)者發(fā)送消息,默認是異步發(fā)送方式
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, "你好,kafka,異步發(fā)送帶返回值!");
try{
// 發(fā)送消息
kafkaProducer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// 說明消息發(fā)送成功
if(e==null){
System.out.println("metadata.topic() = " + recordMetadata.topic());
System.out.println("metadata.partition() = " + recordMetadata.partition());
}
}
});
}catch (Exception e){
e.printStackTrace();
}
// 關(guān)閉資源
kafkaProducer.close();
}
}
[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning
你好,kafka!
你好,kafka,同步發(fā)送!
你好,kafka,異步發(fā)送帶回調(diào)函數(shù)!
Kafka生產(chǎn)者異步發(fā)送消息時,可以通過指定回調(diào)函數(shù)來處理發(fā)送結(jié)果。當消息發(fā)送完成后,回調(diào)函數(shù)會被調(diào)用,以通知應用程序消息發(fā)送的結(jié)果。具體來說,當生產(chǎn)者成功發(fā)送消息時,回調(diào)函數(shù)會被傳遞一個RecordMetadata對象,該對象包含了發(fā)送消息的相關(guān)信息,如消息所在的分區(qū)、消息在分區(qū)中的偏移量等。如果發(fā)送消息失敗,則回調(diào)函數(shù)會被傳遞一個非空的Exception對象,以指示發(fā)送失敗的原因。
注意:回調(diào)的執(zhí)行將在生產(chǎn)者主線程中進行,如果有兩條消息被發(fā)送給同一個分區(qū),則這可以保證它們的回調(diào)是按照發(fā)送的順序執(zhí)行的。這就要求回調(diào)的執(zhí)行要快,避免生產(chǎn)者出現(xiàn)延遲或影響其他消息的發(fā)送。不建議在回調(diào)中執(zhí)行阻塞操作,阻塞操作應該被放在其他線程中執(zhí)行。
6. Kafka 消息對象 ProducerRecord
① ProducerRecord 成員變量:文章來源:http://www.zghlxwxcb.cn/news/detail-637296.html
public class ProducerRecord<K, V> {
// 消息要發(fā)送到的主題
private final String topic;
// 消息要發(fā)送到的分區(qū)號,如果為null,則由Kafka自動選擇分區(qū)
private final Integer partition;
// 消息的鍵
private final K key;
// 消息的值
private final V value;
// 消息的時間戳,如果為null,則使用當前時間戳
private final Long timestamp;
// 消息的頭部信息
private final Headers headers;
// .....
}
- topic和partition字段分別代表消息要發(fā)往的主題和分區(qū)號。
- key是用來指定消息的鍵,它不僅是消息的附加信息,還可以用來計算分區(qū)號進而可以讓消息發(fā)往特定的分區(qū)。前面提及消息以主題為單位進行歸類,而這個key可以讓消息再進行二次歸類,同一個key的消息會被劃分到同一個分區(qū)中。
- value是指消息體,一般不為空,如果為空則表示特定的消息。
- timestamp是指消息的時間戳,它有CreateTime和LogAppendTime兩種類型,前者表示消息創(chuàng)建的時間,后者表示消息追加到日志文件的時間。
② ProducerRecord 構(gòu)造函數(shù):文章來源地址http://www.zghlxwxcb.cn/news/detail-637296.html
public class ProducerRecord<K, V> {
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null.");
if (timestamp != null && timestamp < 0)
throw new IllegalArgumentException(
String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
if (partition != null && partition < 0)
throw new IllegalArgumentException(
String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
this.topic = topic;
this.partition = partition;
this.key = key;
this.value = value;
this.timestamp = timestamp;
this.headers = new RecordHeaders(headers);
}
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
this(topic, partition, timestamp, key, value, null);
}
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
this(topic, partition, null, key, value, headers);
}
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, null, key, value, null);
}
public ProducerRecord(String topic, K key, V value) {
this(topic, null, null, key, value, null);
}
public ProducerRecord(String topic, V value) {
this(topic, null, null, null, value, null);
}
}
到了這里,關(guān)于分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!