3.1 生產(chǎn)者消息發(fā)送流程
1 發(fā)送原理
在消息發(fā)送的過程中,涉及到了兩個線程——main 線程和 Sender 線程。在 main 線程中創(chuàng)建了一個雙端隊(duì)列 RecordAccumulator。main 線程將消息發(fā)送給 RecordAccumulator,Sender 線程不斷從 RecordAccumulator 中拉取消息發(fā)送到 Kafka Broker
【RecordAccumulator緩沖的結(jié)構(gòu):每一個分區(qū)對應(yīng)一個雙端隊(duì)列deque
,存放的單元是ProducerBatch,一個Batch中存放了多個Record,那么存消息是自動放到尾端,而讀取消息(發(fā)送線程讀?。┦菑念^部開始的,目的是讓發(fā)送的消息更加緊湊,節(jié)約空間,提高效率。注意這個大的緩沖池,默認(rèn)是32M,如果超出了會阻塞send()方法,可以設(shè)置參數(shù)來調(diào)節(jié)這個大小。】
3.2 異步發(fā)送 API
public class CustomProducer {
public static void main(String[] args) {
// 0 配置
Properties properties = new Properties();
// 連接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.11:9092");
// 指定對應(yīng)的key和value的序列化類型 key.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 1 創(chuàng)建kafka生產(chǎn)者對象
// "" hello
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 2 發(fā)送數(shù)據(jù)
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));
}
// 3 關(guān)閉資源
kafkaProducer.close();
}
}
帶回調(diào)函數(shù)的異步發(fā)送
回調(diào)函數(shù)會在 producer 收到 ack 時調(diào)用,為異步調(diào)用,該方法有兩個參數(shù),分別是元數(shù)據(jù)信息(RecordMetadata)和異常信息(Exception),如果 Exception 為 null,說明消息發(fā)送成功,如果 Exception 不為 null,說明消息發(fā)送失敗。
注意:消息發(fā)送失敗會自動重試,不需要我們在回調(diào)函數(shù)中手動重試。
public class CustomProducerCallback {
public static void main(String[] args) throws InterruptedException {
// 0 配置
Properties properties = new Properties();
// 連接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.11:9092");
// 指定對應(yīng)的key和value的序列化類型 key.serializer
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 1 創(chuàng)建kafka生產(chǎn)者對象
// "" hello
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 2 發(fā)送數(shù)據(jù)
for (int i = 0; i < 10; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null){
System.out.println("主題: "+metadata.topic() + " 分區(qū): "+ metadata.partition());
}
}
});
Thread.sleep(2);
}
// 3 關(guān)閉資源
kafkaProducer.close();
}
}
3.3 同步發(fā)送數(shù)據(jù)
只需在異步發(fā)送的基礎(chǔ)上,再調(diào)用一下 get()方法即可。
public class CustomProducerSync {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 0 配置
Properties properties = new Properties();
// 連接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.11:9092");
// 指定對應(yīng)的key和value的序列化類型 key.serializer
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 1 創(chuàng)建kafka生產(chǎn)者對象
// "" hello
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 2 發(fā)送數(shù)據(jù)
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i)).get();
}
// 3 關(guān)閉資源
kafkaProducer.close();
}
}
3.4 生產(chǎn)者分區(qū)
1 kafka分區(qū)的好處
-
便于合理使用存儲資源
,每個Partition在一個Broker上存儲,可以把海量的數(shù)據(jù)按照分區(qū)切割成一塊一塊數(shù)據(jù)存儲在多臺Broker上。合理控制分區(qū)的任務(wù),可以實(shí)現(xiàn)負(fù)載均衡的效果 - 提高并行度,生產(chǎn)者可以以分區(qū)為單位發(fā)送數(shù)據(jù);消費(fèi)者可以以分區(qū)為單位進(jìn)行消費(fèi)數(shù)據(jù)。
2 生產(chǎn)者發(fā)送消息的分區(qū)策略
- 指定分區(qū)
- 沒有指定分區(qū),但是傳入了key值
- 既沒有指定分區(qū),也沒有傳入key值
3 自定義分區(qū)器
1)需求:
例如我們實(shí)現(xiàn)一個分區(qū)器實(shí)現(xiàn),發(fā)送過來的數(shù)據(jù)中如果包含 atguigu,就發(fā)往 0 號分區(qū),不包含 atguigu,就發(fā)往 1 號分區(qū)
2)定義類實(shí)現(xiàn) Partitioner 接口,重寫 partition()方法。
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 獲取數(shù)據(jù) atguigu hello
String msgValues = value.toString();
int partition;
if (msgValues.contains("atguigu")){
partition = 0;
}else {
partition = 1;
}
return partition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
3)使用分區(qū)器的方法,在生產(chǎn)者的配置中添加分區(qū)器參數(shù)
public class CustomProducerCallbackPartitions {
public static void main(String[] args) throws InterruptedException {
// 0 配置
Properties properties = new Properties();
// 連接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.11:9092");
// 指定對應(yīng)的key和value的序列化類型 key.serializer
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 關(guān)聯(lián)自定義分區(qū)器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atguigu.kafka.producer.MyPartitioner");
// 1 創(chuàng)建kafka生產(chǎn)者對象
// "" hello
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 2 發(fā)送數(shù)據(jù)
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("主題: " + metadata.topic() + " 分區(qū): " + metadata.partition());
}
}
});
Thread.sleep(2);
}
// 3 關(guān)閉資源
kafkaProducer.close();
}
}
3.5 生產(chǎn)者如何提高吞吐量
- 分批次發(fā)送消息
- 對生產(chǎn)者消息采用壓縮
四個重要參數(shù):
public class CustomProducerParameters {
public static void main(String[] args) {
// 0 配置
Properties properties = new Properties();
// 連接kafka集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.11:9092");
// 序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 緩沖區(qū)大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
// 批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
// linger.ms
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 壓縮
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
// 1 創(chuàng)建生產(chǎn)者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 2 發(fā)送數(shù)據(jù)
for (int i = 0; i < 50; i++) {
kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));
}
// 3 關(guān)閉資源
kafkaProducer.close();
}
}
3.6 數(shù)據(jù)可靠性
1、ACK應(yīng)答級別
- 0:生產(chǎn)者發(fā)送過來的數(shù)據(jù),不需要等數(shù)據(jù)落盤應(yīng)答
- 1:生產(chǎn)者發(fā)送過來的數(shù)據(jù),Leader收到數(shù)據(jù)后應(yīng)答
- -1 :生產(chǎn)者發(fā)送過來的數(shù)據(jù),Leader+和isr隊(duì)列里面的所有節(jié)點(diǎn)收齊數(shù)據(jù)后應(yīng)答。-1和all等價。
ACK=-1時存在的問題?
Leader收到數(shù)據(jù),所有Follower都開始同步數(shù)據(jù),但有一個Follower,因?yàn)槟撤N故障,遲遲不能與Leader進(jìn)行同步,那這個問題怎么解決呢?
Leader維護(hù)了一個動態(tài)的in-sync replica set(ISR),意為和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。
如果Follower長時間未向Leader發(fā)送通信請求或同步數(shù)據(jù),則該Follower將被踢出ISR。該時間閾值由replica.lag.time.max.ms參數(shù)設(shè)定,默認(rèn)30s
數(shù)據(jù)完全可靠條件 = ACK級別設(shè)置為-1 + 分區(qū)副本大于等于2 + ISR里應(yīng)答的最小副本數(shù)量大于等于2
在生產(chǎn)環(huán)境中,acks=0很少使用;acks=1,一般用于傳輸普通日志,允許丟個別數(shù)據(jù);acks=-1,一般用于傳輸和錢相關(guān)的數(shù)據(jù)對可靠性要求比較高的場景,
數(shù)據(jù)重復(fù)性問題
2、代碼演示
// acks
properties.put(ProducerConfig.ACKS_CONFIG,“1”);
// 重試次數(shù)
properties.put(ProducerConfig.RETRIES_CONFIG,3);
public class CustomProducerAcks {
public static void main(String[] args) {
// 0 配置
Properties properties = new Properties();
// 連接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
// 指定對應(yīng)的key和value的序列化類型 key.serializer
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// acks
properties.put(ProducerConfig.ACKS_CONFIG,"1");
// 重試次數(shù)
properties.put(ProducerConfig.RETRIES_CONFIG,3);
// 1 創(chuàng)建kafka生產(chǎn)者對象
// "" hello
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 2 發(fā)送數(shù)據(jù)
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));
}
// 3 關(guān)閉資源
kafkaProducer.close();
}
}
3.7 數(shù)據(jù)去重
1、三個概念
保證消息至少被消費(fèi)一次 :ACK級別設(shè)置為-1 + 分區(qū)副本大于等于2 + ISR里應(yīng)答的最小副本數(shù)量大于等于2
保證消息最多被消費(fèi)一次: ACK級別設(shè)置為0
保證消息只能精確消費(fèi)一次: (ACK級別設(shè)置為-1 + 分區(qū)副本大于等于2 + ISR里應(yīng)答的最小副本數(shù)量大于等于2) + 消息消費(fèi)的冪等性
Kafka 0.11版本以后,引入了一項(xiàng)重大特性:冪等性和事務(wù)。
2 冪等性
冪等性就是指Producer不論向Broker發(fā)送多少次重復(fù)數(shù)據(jù),Broker端都只會持久化一條,保證了不重復(fù)。
如何使用冪等性
開啟參數(shù) enable.idempotence 默認(rèn)為 true,false 關(guān)閉
3 生產(chǎn)者事務(wù)
1) kafka的事務(wù)原理這里講的不好,待會找另外的視頻看一下
2)kafka事務(wù)的api
Kafka 的事務(wù)一共有如下 5 個 API
// 1 初始化事務(wù)
void initTransactions();
// 2 開啟事務(wù)
void beginTransaction() throws ProducerFencedException;
// 3 在事務(wù)內(nèi)提交已經(jīng)消費(fèi)的偏移量(主要用于消費(fèi)者)
void sendOffsetsToTransaction (Map < TopicPartition, OffsetAndMetadata > offsets,String consumerGroupId) throws ProducerFencedException;
// 4 提交事務(wù)
void commitTransaction() throws ProducerFencedException;
// 5 放棄事務(wù)(類似于回滾事務(wù)的操作)
void abortTransaction() throws ProducerFencedException;
事務(wù)使用的demo
public class CustomProducerTranactions {
public static void main(String[] args) {
// 0 配置
Properties properties = new Properties();
// 連接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.11:9092");
// 指定對應(yīng)的key和value的序列化類型 key.serializer
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 指定事務(wù)id
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tranactional_id_01");
// 1 創(chuàng)建kafka生產(chǎn)者對象
// "" hello
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
try {
// 2 發(fā)送數(shù)據(jù)
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i));
}
int i = 1 / 0;
kafkaProducer.commitTransaction();
} catch (Exception e) {
kafkaProducer.abortTransaction();
} finally {
// 3 關(guān)閉資源
kafkaProducer.close();
}
}
}
3.8 數(shù)據(jù)有序性
文章來源:http://www.zghlxwxcb.cn/news/detail-660738.html
3.9 數(shù)據(jù)亂序
文章來源地址http://www.zghlxwxcb.cn/news/detail-660738.html
到了這里,關(guān)于三、Kafka生產(chǎn)者的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!