1.生產(chǎn)者推送數(shù)據(jù)
常用參數(shù)
bootstrap.servers:Kafka集群中的Broker列表,格式為host1:port1,host2:port2,…。生產(chǎn)者會從這些Broker中選擇一個(gè)可用的Broker作為消息發(fā)送的目標(biāo)Broker。
acks:Broker對消息的確認(rèn)模式??蛇x值為0、1、all。0表示生產(chǎn)者不會等待Broker的任何確認(rèn)消息;1表示生產(chǎn)者會等待Broker的Leader副本確認(rèn)消息;all表示生產(chǎn)者會等待所有副本都確認(rèn)消息。確認(rèn)模式越高,可靠性越高,但延遲也越大。
retries:消息發(fā)送失敗時(shí)的重試次數(shù)。默認(rèn)值為0,表示不進(jìn)行重試??梢詫⑵湓O(shè)置為大于0的值,例如3,表示最多重試3次。
batch.size:消息批量發(fā)送的大小。當(dāng)生產(chǎn)者累積到一定數(shù)量的消息時(shí),會將其打包成一個(gè)批次一次性發(fā)送給Broker。默認(rèn)值為16384字節(jié),即16KB。
linger.ms:消息發(fā)送的延遲時(shí)間。生產(chǎn)者會等待一定的時(shí)間,以便將更多的消息打包成一個(gè)批次一次性發(fā)送給Broker。默認(rèn)值為0,表示立即發(fā)送。設(shè)置較大的值可以提高吞吐量,但可能會增加消息的延遲。
buffer.memory:生產(chǎn)者可用于緩存消息的內(nèi)存大小。默認(rèn)值為33554432字節(jié),即32MB。如果生產(chǎn)者生產(chǎn)消息的速度快于發(fā)送消息的速度,可能會導(dǎo)致緩存溢出。可以調(diào)整該參數(shù)來適應(yīng)生產(chǎn)者的生產(chǎn)速度。
key.serializer:Key的序列化器。Kafka消息可以包含Key和Value,Key和Value都需要進(jìn)行序列化。該參數(shù)指定Key的序列化器。
value.serializer:Value的序列化器。該參數(shù)指定Value的序列化器。
max.block.ms:生產(chǎn)者在發(fā)送消息之前等待Broker元數(shù)據(jù)信息的最長時(shí)間。如果在該時(shí)間內(nèi)無法獲取到Broker元數(shù)據(jù)信息,則會拋出TimeoutException異常。默認(rèn)值為60000毫秒,即60秒。
compression.type:消息壓縮類型??蛇x值為none、gzip、snappy、lz4。默認(rèn)值為none,表示不進(jìn)行壓縮。壓縮可以減少消息的傳輸大小,提高網(wǎng)絡(luò)帶寬的利用率,但會增加CPU的消耗。
interceptor.classes:消息攔截器列表??梢灾付ǘ鄠€(gè)消息攔截器對消息進(jìn)行加工處理。例如,可以在消息中添加時(shí)間戳、添加消息來源等信息。
以上參數(shù)只是一部分,Kafka生產(chǎn)者還有更多參數(shù)可以進(jìn)行配置。需要根據(jù)實(shí)際情況選擇合適的參數(shù)進(jìn)行配置。
例子
下面是一個(gè)單例模式配置 kafka生產(chǎn)者的例子(避免多次創(chuàng)建實(shí)例,減少資源的消耗)
public class SingletonKafkaProducerExample {
private static SingletonKafkaProducerExample instance;
private static Producer<String, String> producer;
private SingletonKafkaProducerExample() {
//參數(shù)設(shè)置
Properties props = new Properties();
props.put("bootstrap.servers", "ip:端口");
props.put("acks", "all");
props.put("max.block.ms",120000);//默認(rèn)60s
props.put("retries", 3)//默認(rèn)0;
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("request.timeout.ms",60*1000);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//sasl認(rèn)證 (根據(jù)實(shí)際情況看是否配置)
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='username' password='password';");
producer = new KafkaProducer<>(props);
logger.info("kafka連接成功");
}
public static SingletonKafkaProducerExample getInstance() {
if (instance == null) {
synchronized (SingletonKafkaProducerExample.class) {
if (instance == null) {
instance = new SingletonKafkaProducerExample();
}
}
}
return instance;
}
public void sendMessage(String topic, String key, String value) {
try {
//這里也可以不用設(shè)置key和partition,例如不設(shè)置分區(qū) 系統(tǒng)會使用輪詢算法自動匹配partition
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
Future<RecordMetadata> future = producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("發(fā)送消息到" + metadata.topic() + "失?。? + exception.getMessage());
} else {
System.out.println("發(fā)送消息到" + metadata.topic() + "成功:partition=" + metadata.partition() + ", offset=" + metadata.offset());
}
});
future.get(); // 等待返回?cái)?shù)據(jù)
} catch (InterruptedException | ExecutionException e) {
System.err.println("發(fā)送消息失?。? + e.getMessage());
}
}
public void closeProducer() {
producer.close();
}
}
以上參數(shù)配置只是案例,實(shí)際參數(shù)配置需要根據(jù)業(yè)務(wù)情況自己設(shè)置
下面是生產(chǎn)的方法介紹:
close(): 關(guān)閉生產(chǎn)者,釋放相關(guān)資源。
close(Duration timeout): 在指定的超時(shí)時(shí)間內(nèi)關(guān)閉生產(chǎn)者,釋放相關(guān)資源。
initTransactions(): 初始化事務(wù),啟用事務(wù)支持。
beginTransaction(): 開始事務(wù)。
send(ProducerRecord<K, V> record): 發(fā)送一條消息記錄到指定的主題。
send(ProducerRecord<K, V> record, Callback callback): 發(fā)送一條消息記錄,并附帶一個(gè)回調(diào)函數(shù)用于異步處理發(fā)送結(jié)果。
send(ProducerRecord<K, V> record, ProducerCallback<T> callback): 發(fā)送一條消息記錄,并使用自定義的回調(diào)函數(shù)處理發(fā)送結(jié)果。
sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId): 將消費(fèi)者組的偏移量提交給事務(wù)。
partitionsFor(String topic): 獲取指定主題的分區(qū)信息。
metrics(): 獲取生產(chǎn)者的度量指標(biāo)信息。
flush(): 將所有已掛起的消息立即發(fā)送到Kafka服務(wù)器,等待服務(wù)器確認(rèn)后再返回。
commitTransaction(): 提交當(dāng)前事務(wù)。
abortTransaction(): 中止當(dāng)前事務(wù)。
sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata): 將消費(fèi)者組的偏移量和消費(fèi)者組元數(shù)據(jù)提交給事務(wù)。
可能遇見的問題
1.多個(gè)topic發(fā)送消息的時(shí)候總有1.2發(fā)送失敗 報(bào)Failed to update metadata after 60000ms
這種情況出現(xiàn)的原因可能是Kafka集群中Broker的元數(shù)據(jù)信息還沒有被更新到Kafka客戶端中,導(dǎo)致Kafka客戶端無法連接到指定的Broker。
解決
增加等待時(shí)間:可以通過設(shè)置max.block.ms屬性來增加等待時(shí)間
提高重試次數(shù):可以通過設(shè)置retries屬性來提高重試次數(shù)
檢查Broker配置
檢查網(wǎng)絡(luò)連接
檢查Kafka版本
如果下面3個(gè)都沒問題,就增加等待時(shí)間和重試次數(shù)。本人遇到這樣的問題解決了
消費(fèi)者 推送數(shù)據(jù)
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class KafkaConsumerExample {
public static void main(String[] args) {
// 配置消費(fèi)者參數(shù)
Properties props = new Properties();
/*
bootstrap.servers
Kafka集群中Broker的地址列表,格式為"hostname:port",例如:"localhost:9092"??梢耘渲枚鄠€(gè)Broker,用逗號分隔。
*/
props.put("bootstrap.servers", "ip:port");
/*
group.id
消費(fèi)者組的名稱,同一個(gè)消費(fèi)者組中的消費(fèi)者會共享消費(fèi)消息的責(zé)任。例如:"test"。
*/
props.put("group.id", "test");
/*
enable.auto.commit
是否自動提交偏移量,默認(rèn)為true。如果為false,則需要手動提交偏移量。
*/
props.put("enable.auto.commit", "true");
/*
session.timeout.ms
消費(fèi)者會話超時(shí)時(shí)間(毫秒),如果消費(fèi)者在該時(shí)間內(nèi)沒有向Kafka Broker發(fā)送心跳,則會被認(rèn)為已經(jīng)失效。默認(rèn)10000毫秒。
*/
props.put("session.timeout.ms", "30000");
/*
auto.offset.reset
如果消費(fèi)者在初始化時(shí)沒有指定偏移量或指定的偏移量不存在,則從哪個(gè)位置開始消費(fèi),默認(rèn)latest,即從最新的消息開始消費(fèi)。其他可選值為earliest和none。
*/
props.put("auto.offset.reset", "earliest");
/*
key.deserializer
key的反序列化方式,例如:"org.apache.kafka.common.serialization.StringDeserializer"。
*/
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/*
value.deserializer
value的反序列化方式,例如:"org.apache.kafka.common.serialization.StringDeserializer"。
*/
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/*
max.poll.records
每次拉取消息的最大記錄數(shù),默認(rèn)500條。
*/
props.put("max.poll.records", "10000");
/*
fetch.min.bytes
每次拉取的最小字節(jié)數(shù),默認(rèn)1字節(jié)。
fetch.max.bytes
每次拉取的最大字節(jié)數(shù),默認(rèn)52428800字節(jié),即50MB。
fetch.max.wait.ms
最長等待時(shí)間(毫秒),如果在該時(shí)間內(nèi)沒有拉取到任何消息,則返回空結(jié)果。默認(rèn)500毫秒。
*/
props.put("fetch.min.bytes", "1024");
props.put("fetch.max.bytes", "1048576");
props.put("fetch.max.wait.ms", "500");
/*
max.partition.fetch.bytes
每個(gè)分區(qū)最大拉取字節(jié)數(shù),默認(rèn)1048576字節(jié),即1MB。
*/
props.put("max.partition.fetch.bytes", "1024");
/*
connections.max.idle.ms
最大空閑連接時(shí)間(毫秒),超過該時(shí)間則連接被認(rèn)為已經(jīng)過期并關(guān)閉。默認(rèn)540000毫秒,即9分鐘。
*/
props.put("connections.max.idle.ms", "540000");
/*
request.timeout.ms
請求超時(shí)時(shí)間(毫秒),如果在該時(shí)間內(nèi)沒有收到Broker的響應(yīng),則認(rèn)為請求失敗。默認(rèn)30000毫秒。
*/
props.put("request.timeout.ms", "40000");
/*
retry.backoff.ms
重試等待時(shí)間(毫秒),如果請求失敗,則等待一段時(shí)間后再次重試。默認(rèn)500毫秒。
*/
props.put("retry.backoff.ms", "500");
/*
security.protocol
安全協(xié)議類型,例如SSL或SASL_SSL。
ssl.keystore.location
SSL證書的路徑和名稱。
ssl.keystore.password
SSL證書的密碼。
ssl.truststore.location
SSL信任證書庫的路徑和名稱。
ssl.truststore.password
SSL信任證書庫的密碼。
*/
props.put("security.protocol", "SSL");
props.put("ssl.keystore.location", "/path/to/keystore");
props.put("ssl.keystore.password", "password");
props.put("ssl.truststore.location", "/path/to/truststore");
props.put("ssl.truststore.password", "password");
// 創(chuàng)建Kafka消費(fèi)者實(shí)例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 訂閱主題
consumer.subscribe(Arrays.asList("my-topic"));
// 創(chuàng)建線程池
ExecutorService executor = Executors.newFixedThreadPool(6);
// 消費(fèi)消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// 獲取消息所在分區(qū)的編號
int partition = record.partition();
// 將消息提交給對應(yīng)的線程進(jìn)行處理
executor.submit(new MessageHandler(record.value(), partition));
}
}
}
// 消息處理器
static class MessageHandler implements Runnable {
private final String message;
private final int partition;
public MessageHandler(String message, int partition) {
this.message = message;
this.partition = partition;
}
@Override
public void run() {
// 對消息進(jìn)行處理
System.out.printf("Partition %d: Message received: %s%n", partition, message);
}
}
}
以上參數(shù)根據(jù)自己需求填寫
可以根據(jù)分區(qū) 使用多線程執(zhí)行文章來源:http://www.zghlxwxcb.cn/news/detail-562168.html
下面是消費(fèi)者的方法講解文章來源地址http://www.zghlxwxcb.cn/news/detail-562168.html
subscribe(Collection<String> topics): 訂閱一個(gè)或多個(gè)主題,開始消費(fèi)這些主題中的消息。
unsubscribe(): 取消訂閱當(dāng)前已經(jīng)訂閱的所有主題,停止消費(fèi)消息。
poll(Duration timeout): 從Kafka服務(wù)器拉取一批消息記錄,該方法會阻塞指定的超時(shí)時(shí)間,等待服務(wù)器返回消息。如果在超時(shí)時(shí)間內(nèi)沒有收到消息,則返回空記錄。
commitSync(): 同步方式提交消費(fèi)者的消費(fèi)偏移量(offset),表示消息已成功消費(fèi)。
commitSync(Duration timeout): 在指定的超時(shí)時(shí)間內(nèi)同步提交消費(fèi)者的消費(fèi)偏移量。
commitAsync(): 異步方式提交消費(fèi)者的消費(fèi)偏移量,不等待提交結(jié)果。
commitAsync(OffsetCommitCallback callback): 異步方式提交消費(fèi)者的消費(fèi)偏移量,并在提交完成后執(zhí)行回調(diào)函數(shù)。
seek(TopicPartition partition, long offset): 將消費(fèi)者的偏移量(offset)設(shè)置為指定分區(qū)的指定偏移量,以便從指定位置開始消費(fèi)消息。
seekToBeginning(Collection<TopicPartition> partitions): 將消費(fèi)者的偏移量設(shè)置為指定分區(qū)的最早可用偏移量,重新從分區(qū)起始位置開始消費(fèi)消息。
seekToEnd(Collection<TopicPartition> partitions): 將消費(fèi)者的偏移量設(shè)置為指定分區(qū)的最新可用偏移量,繼續(xù)消費(fèi)分區(qū)中尚未消費(fèi)的消息。
seekByTimestamp(Map<TopicPartition, Long> timestampsToSearch): 根據(jù)時(shí)間戳搜索偏移量,并將消費(fèi)者的偏移量設(shè)置為找到的偏移量。
assignment(): 獲取當(dāng)前分配給消費(fèi)者的所有分區(qū)。
pause(Collection<TopicPartition> partitions): 暫停指定分區(qū)的消息消費(fèi),消費(fèi)者將不再繼續(xù)接收這些分區(qū)的消息。
resume(Collection<TopicPartition> partitions): 恢復(fù)被暫停的指定分區(qū)的消息消費(fèi),使消費(fèi)者可以繼續(xù)接收這些分區(qū)的消息。
close(): 關(guān)閉消費(fèi)者,釋放相關(guān)資源。
到了這里,關(guān)于java:Kafka生產(chǎn)者推送數(shù)據(jù)與消費(fèi)者接收數(shù)據(jù)(參數(shù)配置以及案例)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!