国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

輕松通關(guān)Flink第24講:Flink 消費 Kafka 數(shù)據(jù)業(yè)務(wù)開發(fā)

這篇具有很好參考價值的文章主要介紹了輕松通關(guān)Flink第24講:Flink 消費 Kafka 數(shù)據(jù)業(yè)務(wù)開發(fā)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

在上一課時中我們提過在實時計算的場景下,絕大多數(shù)的數(shù)據(jù)源都是消息系統(tǒng),而 Kafka 從眾多的消息中間件中脫穎而出,主要是因為高吞吐、低延遲的特點;同時也講了 Flink 作為生產(chǎn)者像 Kafka 寫入數(shù)據(jù)的方式和代碼實現(xiàn)。這一課時我們將從以下幾個方面介紹 Flink 消費 Kafka 中的數(shù)據(jù)方式和源碼實現(xiàn)。

Flink 如何消費 Kafka

Flink 在和 Kafka 對接的過程中,跟 Kafka 的版本是強相關(guān)的。上一課時也提到了,我們在使用 Kafka 連接器時需要引用相對應(yīng)的 Jar 包依賴,對于某些連接器比如 Kafka 是有版本要求的,一定要去官方網(wǎng)站找到對應(yīng)的依賴版本。

我們本地的 Kafka 版本是 2.1.0,所以需要對應(yīng)的類是 FlinkKafkaConsumer。首先需要在 pom.xml 中引入 jar 包依賴:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.10.0</version>
</dependency>

下面將對 Flink 消費 Kafka 數(shù)據(jù)的方式進(jìn)行分類講解。

消費單個 Topic

上一課時我們在本地搭建了 Kafka 環(huán)境,并且手動創(chuàng)建了名為 test 的 Topic,然后向名為 test 的 Topic 中寫入了數(shù)據(jù)。

那么現(xiàn)在我們要消費這個 Topic 中的數(shù)據(jù),該怎么做呢?

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.enableCheckpointing(5000);
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
    // 如果你是0.8版本的Kafka,需要配置
    //properties.setProperty("zookeeper.connect", "localhost:2181");
    //設(shè)置消費組
    properties.setProperty("group.id", "group_test");
    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
    //設(shè)置從最早的ffset消費
    consumer.setStartFromEarliest();
    //還可以手動指定相應(yīng)的 topic, partition,offset,然后從指定好的位置開始消費
    //HashMap<KafkaTopicPartition, Long> map = new HashMap<>();
    //map.put(new KafkaTopicPartition("test", 1), 10240L);
    //假如partition有多個,可以指定每個partition的消費位置
    //map.put(new KafkaTopicPartition("test", 2), 10560L);
    //然后各個partition從指定位置消費
    //consumer.setStartFromSpecificOffsets(map);
    env.addSource(consumer).flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String value, Collector<String> out) throws Exception {
            System.out.println(value);
        }
    });
    env.execute("start consumer...");
}

在設(shè)置消費 Kafka 中的數(shù)據(jù)時,可以顯示地指定從某個 Topic 的每一個 Partition 中進(jìn)行消費。

消費多個 Topic

我們的業(yè)務(wù)中會有這樣的情況,同樣的數(shù)據(jù)根據(jù)類型不同發(fā)送到了不同的 Topic 中,比如線上的訂單數(shù)據(jù)根據(jù)來源不同分別發(fā)往移動端和 PC 端兩個 Topic 中。但是我們不想把同樣的代碼復(fù)制一份,需重新指定一個 Topic 進(jìn)行消費,這時候應(yīng)該怎么辦呢?

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
// 如果你是0.8版本的Kafka,需要配置
//properties.setProperty("zookeeper.connect", "localhost:2181");
//設(shè)置消費組
properties.setProperty("group.id", "group_test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
ArrayList<String> topics = new ArrayList<>();
        topics.add("test_A");
        topics.add("test_B");
       // 傳入一個 list,完美解決了這個問題
        FlinkKafkaConsumer<Tuple2<String, String>> consumer = new FlinkKafkaConsumer<>(topics, new SimpleStringSchema(), properties);
...

我們可以傳入一個 list 來解決消費多個 Topic 的問題,如果用戶需要區(qū)分兩個 Topic 中的數(shù)據(jù),那么需要在發(fā)往 Kafka 中數(shù)據(jù)新增一個字段,用來區(qū)分來源。

消息序列化

我們在上述消費 Kafka 消息時,都默認(rèn)指定了消息的序列化方式,即 SimpleStringSchema。這里需要注意的是,在我們使用 SimpleStringSchema 的時候,返回的結(jié)果中只有原數(shù)據(jù),沒有 topic、parition 等信息,這時候可以自定義序列化的方式來實現(xiàn)自定義返回數(shù)據(jù)的結(jié)構(gòu)。

public class CustomDeSerializationSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> {
    //是否表示流的最后一條元素,設(shè)置為false,表示數(shù)據(jù)會源源不斷地到來
    @Override
    public boolean isEndOfStream(ConsumerRecord<String, String> nextElement) {
        return false;
    }
    //這里返回一個ConsumerRecord<String,String>類型的數(shù)據(jù),除了原數(shù)據(jù)還包括topic,offset,partition等信息
    @Override
    public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return new ConsumerRecord<String, String>(
                record.topic(),
                record.partition(),
                record.offset(),
                new String(record.key()),
                new String(record.value())
        );
    }
    //指定數(shù)據(jù)的輸入類型
    @Override
    public TypeInformation<ConsumerRecord<String, String>> getProducedType() {
        return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>(){});
    }
}

這里自定義了 CustomDeSerializationSchema 信息,就可以直接使用了。

Parition 和 Topic 動態(tài)發(fā)現(xiàn)

在很多場景下,隨著業(yè)務(wù)的擴展,我們需要對 Kafka 的分區(qū)進(jìn)行擴展,為了防止新增的分區(qū)沒有被及時發(fā)現(xiàn)導(dǎo)致數(shù)據(jù)丟失,消費者必須要感知 Partition 的動態(tài)變化,可以使用 FlinkKafkaConsumer 的動態(tài)分區(qū)發(fā)現(xiàn)實現(xiàn)。

我們只需要指定下面的配置,即可打開動態(tài)分區(qū)發(fā)現(xiàn)功能:每隔 10ms 會動態(tài)獲取 Topic 的元數(shù)據(jù),對于新增的 Partition 會自動從最早的位點開始消費數(shù)據(jù)。

properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10");

如果業(yè)務(wù)場景需要我們動態(tài)地發(fā)現(xiàn) Topic,可以指定 Topic 的正則表達(dá)式:

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(Pattern.compile("^test_([A-Za-z0-9]*)$"), new SimpleStringSchema(), properties);
Flink 消費 Kafka 設(shè)置 offset 的方法

Flink 消費 Kafka 需要指定消費的 offset,也就是偏移量。Flink 讀取 Kafka 的消息有五種消費方式:

  • 指定 Topic 和 Partition

  • 從最早位點開始消費

  • 從指定時間點開始消費

  • 從最新的數(shù)據(jù)開始消費

  • 從上次消費位點開始消費

/**
* Flink從指定的topic和parition中指定的offset開始
*/
Map<KafkaTopicPartition, Long> offsets = new HashedMap();
offsets.put(new KafkaTopicPartition("test", 0), 10000L);
offsets.put(new KafkaTopicPartition("test", 1), 20000L);
offsets.put(new KafkaTopicPartition("test", 2), 30000L);
consumer.setStartFromSpecificOffsets(offsets);
/**
* Flink從topic中最早的offset消費
*/
consumer.setStartFromEarliest();
/**
* Flink從topic中指定的時間點開始消費
*/
consumer.setStartFromTimestamp(1559801580000l);
/**
* Flink從topic中最新的數(shù)據(jù)開始消費
*/
consumer.setStartFromLatest();
/**
* Flink從topic中指定的group上次消費的位置開始消費,所以必須配置group.id參數(shù)
*/
consumer.setStartFromGroupOffsets();

源碼解析

從上面的類圖可以看出,F(xiàn)linkKafkaConsumer 繼承了 FlinkKafkaConsumerBase,而 FlinkKafkaConsumerBase 最終是對 SourceFunction 進(jìn)行了實現(xiàn)。

整體的流程:FlinkKafkaConsumer 首先創(chuàng)建了 KafkaFetcher 對象,然后 KafkaFetcher 創(chuàng)建了 KafkaConsumerThread 和 Handover,KafkaConsumerThread 負(fù)責(zé)直接從 Kafka 中讀取 msg,并交給 Handover,然后 Handover 將 msg 傳遞給 KafkaFetcher.emitRecord 將消息發(fā)出。

因為 FlinkKafkaConsumerBase 實現(xiàn)了 RichFunction 接口,所以當(dāng)程序啟動的時候,會首先調(diào)用 FlinkKafkaConsumerBase.open 方法:

public void open(Configuration configuration) throws Exception {
   // 指定offset的提交方式
   this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
         getIsAutoCommitEnabled(),
         enableCommitOnCheckpoints,
         ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
   // 創(chuàng)建分區(qū)發(fā)現(xiàn)器
   this.partitionDiscoverer = createPartitionDiscoverer(
         topicsDescriptor,
         getRuntimeContext().getIndexOfThisSubtask(),
         getRuntimeContext().getNumberOfParallelSubtasks());
   this.partitionDiscoverer.open();
   subscribedPartitionsToStartOffsets = new HashMap<>();
   final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
   if (restoredState != null) {
      for (KafkaTopicPartition partition : allPartitions) {
         if (!restoredState.containsKey(partition)) {
            restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
         }
      }
      for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {
         if (!restoredFromOldState) {
        <span class="hljs-keyword">if</span> (KafkaTopicPartitionAssigner.assign(
           restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
              == getRuntimeContext().getIndexOfThisSubtask()){
           subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
        }
     } <span class="hljs-keyword">else</span> {
       subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
     }
  }
  <span class="hljs-keyword">if</span> (filterRestoredPartitionsWithCurrentTopicsDescriptor) {
     subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -&gt; {
        <span class="hljs-keyword">if</span> (!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic())) {
           LOG.warn(
              <span class="hljs-string">"{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution."</span>,
              entry.getKey());
           <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;
        }
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;
     });
  }
  LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading {} partitions with offsets in restored state: {}"</span>,
     getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);

} else {

  <span class="hljs-keyword">switch</span> (startupMode) {
     <span class="hljs-keyword">case</span> SPECIFIC_OFFSETS:
        <span class="hljs-keyword">if</span> (specificStartupOffsets == <span class="hljs-keyword">null</span>) {
           <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> IllegalStateException(
              <span class="hljs-string">"Startup mode for the consumer set to "</span> + StartupMode.SPECIFIC_OFFSETS +
                 <span class="hljs-string">", but no specific offsets were specified."</span>);
        }
        <span class="hljs-keyword">for</span> (KafkaTopicPartition seedPartition : allPartitions) {
           Long specificOffset = specificStartupOffsets.get(seedPartition);
           <span class="hljs-keyword">if</span> (specificOffset != <span class="hljs-keyword">null</span>) {
                             subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - <span class="hljs-number">1</span>);
           } <span class="hljs-keyword">else</span> {
           subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
           }
        }
        <span class="hljs-keyword">break</span>;
     <span class="hljs-keyword">case</span> TIMESTAMP:
        <span class="hljs-keyword">if</span> (startupOffsetsTimestamp == <span class="hljs-keyword">null</span>) {
           <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> IllegalStateException(
              <span class="hljs-string">"Startup mode for the consumer set to "</span> + StartupMode.TIMESTAMP +
                 <span class="hljs-string">", but no startup timestamp was specified."</span>);
        }
        <span class="hljs-keyword">for</span> (Map.Entry&lt;KafkaTopicPartition, Long&gt; partitionToOffset
              : fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp).entrySet()) {
           subscribedPartitionsToStartOffsets.put(
              partitionToOffset.getKey(),
              (partitionToOffset.getValue() == <span class="hljs-keyword">null</span>)
                  KafkaTopicPartitionStateSentinel.LATEST_OFFSET
                    : partitionToOffset.getValue() - <span class="hljs-number">1</span>);
        }
        <span class="hljs-keyword">break</span>;
     <span class="hljs-keyword">default</span>:
        <span class="hljs-keyword">for</span> (KafkaTopicPartition seedPartition : allPartitions) {
           subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
        }
  }
  <span class="hljs-keyword">if</span> (!subscribedPartitionsToStartOffsets.isEmpty()) {
     <span class="hljs-keyword">switch</span> (startupMode) {
        <span class="hljs-keyword">case</span> EARLIEST:
           LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}"</span>,
              getRuntimeContext().getIndexOfThisSubtask(),
              subscribedPartitionsToStartOffsets.size(),
              subscribedPartitionsToStartOffsets.keySet());
           <span class="hljs-keyword">break</span>;
        <span class="hljs-keyword">case</span> LATEST:
           LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}"</span>,
              getRuntimeContext().getIndexOfThisSubtask(),
              subscribedPartitionsToStartOffsets.size(),
              subscribedPartitionsToStartOffsets.keySet());
           <span class="hljs-keyword">break</span>;
        <span class="hljs-keyword">case</span> TIMESTAMP:
           LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from timestamp {}: {}"</span>,
              getRuntimeContext().getIndexOfThisSubtask(),
              subscribedPartitionsToStartOffsets.size(),
              startupOffsetsTimestamp,
              subscribedPartitionsToStartOffsets.keySet());
           <span class="hljs-keyword">break</span>;
        <span class="hljs-keyword">case</span> SPECIFIC_OFFSETS:
           LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}"</span>,
              getRuntimeContext().getIndexOfThisSubtask(),
              subscribedPartitionsToStartOffsets.size(),
              specificStartupOffsets,
              subscribedPartitionsToStartOffsets.keySet());
           List&lt;KafkaTopicPartition&gt; partitionsDefaultedToGroupOffsets = <span class="hljs-keyword">new</span> ArrayList&lt;&gt;(subscribedPartitionsToStartOffsets.size());
           <span class="hljs-keyword">for</span> (Map.Entry&lt;KafkaTopicPartition, Long&gt; subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
              <span class="hljs-keyword">if</span> (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
                 partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
              }
           }
           <span class="hljs-keyword">if</span> (partitionsDefaultedToGroupOffsets.size() &gt; <span class="hljs-number">0</span>) {
              LOG.warn(<span class="hljs-string">"Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}"</span> +
                    <span class="hljs-string">"; their startup offsets will be defaulted to their committed group offsets in Kafka."</span>,
                 getRuntimeContext().getIndexOfThisSubtask(),
                 partitionsDefaultedToGroupOffsets.size(),
                 partitionsDefaultedToGroupOffsets);
           }
           <span class="hljs-keyword">break</span>;
        <span class="hljs-keyword">case</span> GROUP_OFFSETS:
           LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}"</span>,
              getRuntimeContext().getIndexOfThisSubtask(),
              subscribedPartitionsToStartOffsets.size(),
              subscribedPartitionsToStartOffsets.keySet());
     }
  } <span class="hljs-keyword">else</span> {
     LOG.info(<span class="hljs-string">"Consumer subtask {} initially has no partitions to read from."</span>,
        getRuntimeContext().getIndexOfThisSubtask());
  }

}
}

對 Kafka 中的 Topic 和 Partition 的數(shù)據(jù)進(jìn)行讀取的核心邏輯都在 run 方法中:

public void run(SourceContext<T> sourceContext) throws Exception {
   if (subscribedPartitionsToStartOffsets == null) {
      throw new Exception("The partitions were not set for the consumer");
   }
   this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
   this.failedCommits =  this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
   final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
   this.offsetCommitCallback = new KafkaCommitCallback() {
      @Override
      public void onSuccess() {
         successfulCommits.inc();
      }
      @Override
      public void onException(Throwable cause) {
         LOG.warn(String.format("Consumer subtask %d failed async Kafka commit.", subtaskIndex), cause);
         failedCommits.inc();
      }
   };

if (subscribedPartitionsToStartOffsets.isEmpty()) {
sourceContext.markAsTemporarilyIdle();
}
LOG.info(“Consumer subtask {} creating fetcher with offsets {}.”,
getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets);

this.kafkaFetcher = createFetcher(
sourceContext,
subscribedPartitionsToStartOffsets,
periodicWatermarkAssigner,
punctuatedWatermarkAssigner,
(StreamingRuntimeContext) getRuntimeContext(),
offsetCommitMode,
getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
useMetrics);
if (!running) {
return;
}
if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
kafkaFetcher.runFetchLoop();
} else {
runWithPartitionDiscovery();
}
}

Flink 消費 Kafka 數(shù)據(jù)代碼

上面介紹了 Flink 消費 Kafka 的方式,以及消息序列化的方式,同時介紹了分區(qū)和 Topic 的動態(tài)發(fā)現(xiàn)方法,那么回到我們的項目中來,消費 Kafka 數(shù)據(jù)的完整代碼如下:

public class KafkaConsumer {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.enableCheckpointing(5000);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        //設(shè)置消費組
        properties.setProperty("group.id", "group_test");
        properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
        //設(shè)置從最早的ffset消費
        consumer.setStartFromEarliest();
        env.addSource(consumer).flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                System.out.println(value);
            }
        });
        env.execute("start consumer...");
    }
}

我們可以直接右鍵運行代碼,在控制臺中可以看到數(shù)據(jù)的正常打印,如下圖所示:

通過代碼可知,我們之前發(fā)往 Kafka 的消息被完整地打印出來了。

總結(jié)

這一課時介紹了 Flink 消費 Kafka 的方式,比如從常用的指定單個或者多個 Topic、消息的序列化、分區(qū)的動態(tài)發(fā)現(xiàn)等,還從源碼上介紹了 Flink 消費 Kafka 的原理。通過本課時的學(xué)習(xí),相信你可以對 Flink 消費 Kafka 有一個較為全面地了解,根據(jù)業(yè)務(wù)場景可以正確選擇消費的方式和配置。


精選評論

**6513:

項目中也是用main方法作為入口嗎?有集成springboot的案例嗎

??? 講師回復(fù):

??? 在編寫Flink代碼時,盡量避免使用spring類的框架,因為沒有必要。只要依賴Flink必要的包和一些工具類即可。

**冰:

老師,這里可以從指定offset消費,怎么在程序終止或停止時保存offset,以便啟時使用了?

??? 講師回復(fù):

??? 如果實際情況需要保存位點,那么一般是自己管理位點,每次停止重啟后從自己管理的位點消費,比如你可以存儲在mysql中,自己去讀取

**良:

我是用flink消費Kafka從指定的topic和partition中指定的offset處開始消費,實際結(jié)果與預(yù)期不一致,消費的分區(qū)對不上是啥原因呢?示例代碼:Mapoffsets.put(new KafkaTopicPartition(topic, 0), 1L);offsets.put(new KafkaTopicPartition(topic, 1), 2L);offsets.put(new KafkaTopicPartition(topic, 2), 3L);consumer.setStartFromSpecificOffsets(offsets);返回結(jié)果:ConsumerRecord(topic=new-topic-config-test, partition=0, offset=10105849, key=, value=Python從入門到放棄!ConsumerRecord(topic=new-topic-config-test, partition=3, offset=10107121, key=, value=Python從入門到放棄!ConsumerRecord(topic=new-topic-config-test, partition=2, offset=10102806, key=, value=Java從入門到放棄!

??? 講師回復(fù):

??? 從兩個原因查詢,第一看下并行度的設(shè)置要和kafka分區(qū)設(shè)置保持一致。第二,要保證kafka4個分區(qū)都有數(shù)據(jù)。

*軒:

請問哪里可以下載項目用的數(shù)據(jù),不是源碼

??? 講師回復(fù):

??? 在項目中有數(shù)據(jù),也可以自己造一些數(shù)據(jù)

**7324:

flink如何將key相同的數(shù)據(jù)寫入到kafka的同一個partition呢?

??? 講師回復(fù):

??? 你可以自定義自己的kafka分區(qū)器,可以查一下FlinkKafkaPartitioner的用法,但是一般我們不會這么用,如果你需要自定義寫入kafka的分區(qū)器,要保證數(shù)據(jù)盡量均勻,不要引起kafka端的數(shù)據(jù)傾斜文章來源地址http://www.zghlxwxcb.cn/news/detail-712510.html

到了這里,關(guān)于輕松通關(guān)Flink第24講:Flink 消費 Kafka 數(shù)據(jù)業(yè)務(wù)開發(fā)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 掌握實時數(shù)據(jù)流:使用Apache Flink消費Kafka數(shù)據(jù)

    掌握實時數(shù)據(jù)流:使用Apache Flink消費Kafka數(shù)據(jù)

    ? ? ? ? 導(dǎo)讀:使用Flink實時消費Kafka數(shù)據(jù)的案例是探索實時數(shù)據(jù)處理領(lǐng)域的絕佳方式。不僅非常實用,而且對于理解現(xiàn)代數(shù)據(jù)架構(gòu)和流處理技術(shù)具有重要意義。 ????????Apache Flink ?是一個在 有界 數(shù)據(jù)流和 無界 數(shù)據(jù)流上進(jìn)行有狀態(tài)計算分布式處理引擎和框架。Flink 設(shè)計旨

    2024年02月03日
    瀏覽(31)
  • 輕松通關(guān)Flink第34講:Flink 和 Redis 整合以及 Redis Sink 實現(xiàn)

    上一課時我們使用了 3 種方法進(jìn)行了 PV 和 UV 的計算,分別是全窗口內(nèi)存統(tǒng)計、使用分組和過期數(shù)據(jù)剔除、使用 BitMap / 布隆過濾器。到此為止我們已經(jīng)講了從數(shù)據(jù)清洗到水印、窗口設(shè)計,PV 和 UV 的計算,接下來需要把結(jié)果寫入不同的目標(biāo)庫供前端查詢使用。 下面我們分別講

    2024年02月08日
    瀏覽(17)
  • 流批一體計算引擎-4-[Flink]消費kafka實時數(shù)據(jù)

    流批一體計算引擎-4-[Flink]消費kafka實時數(shù)據(jù)

    Python3.6.9 Flink 1.15.2消費Kafaka Topic PyFlink基礎(chǔ)應(yīng)用之kafka 通過PyFlink作業(yè)處理Kafka數(shù)據(jù) PyFlink需要特定的Python版本,Python 3.6, 3.7, 3.8 or 3.9。 1.3.1 python3和pip3的配置 一、系統(tǒng)中安裝了多個版本的python3 。 二、環(huán)境變量path作用順序 三、安裝Pyflink 1.3.2 配置Flink Kafka連接 (1)在https://mvnr

    2024年02月06日
    瀏覽(35)
  • 【Flink-Kafka-To-RocketMQ】使用 Flink 自定義 Sink 消費 Kafka 數(shù)據(jù)寫入 RocketMQ

    這里的 maven 依賴比較冗余,推薦大家都加上,后面陸續(xù)優(yōu)化。 注意: 1、此程序中所有的相關(guān)配置都是通過 Mysql 讀取的(生產(chǎn)環(huán)境中沒有直接寫死的,都是通過配置文件動態(tài)配置),大家實際測試過程中可以將相關(guān)配置信息寫死。 2、此程序中 Kafka 涉及到了 Kerberos 認(rèn)證操作

    2024年02月03日
    瀏覽(21)
  • flink如何初始化kafka數(shù)據(jù)源的消費偏移

    我們知道在日常非flink場景中消費kafka主題時,我們只要指定了消費者組,下次程序重新消費時是可以從上次消費停止時的消費偏移開始繼續(xù)消費的,這得益于kafka的_offset_主題保存的關(guān)于消費者組和topic偏移位置的具體偏移信息,那么flink應(yīng)用中重啟flink應(yīng)用時,flink是從topic的什

    2024年02月16日
    瀏覽(31)
  • Idea本地跑flink任務(wù)時,總是重復(fù)消費kafka的數(shù)據(jù)(kafka->mysql)

    Idea本地跑flink任務(wù)時,總是重復(fù)消費kafka的數(shù)據(jù)(kafka->mysql)

    1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 Idea中執(zhí)行任務(wù)時,沒法看到JobManager的錯誤,以至于我以為是什么特殊的原因?qū)е氯蝿?wù)總是反復(fù)消費。在close方法中,增加日志,發(fā)現(xiàn)jdbc連接被關(guān)閉了。 重新消費,jdbc連接又啟動了。 注意,在Flink的函數(shù)中,open和close方法

    2024年02月07日
    瀏覽(25)
  • 實戰(zhàn)Flink Java api消費kafka實時數(shù)據(jù)落盤HDFS

    實戰(zhàn)Flink Java api消費kafka實時數(shù)據(jù)落盤HDFS

    在Java api中,使用flink本地模式,消費kafka主題,并直接將數(shù)據(jù)存入hdfs中。 flink版本1.13 kafka版本0.8 hadoop版本3.1.4 為了完成 Flink 從 Kafka 消費數(shù)據(jù)并實時寫入 HDFS 的需求,通常需要啟動以下組件: 確保 Zookeeper 在運行,因為 Flink 的 Kafka Consumer 需要依賴 Zookeeper。 確保 Kafka Serve

    2024年01月24日
    瀏覽(29)
  • Flink1.17.1消費kafka3.5中的數(shù)據(jù)出現(xiàn)問題Failed to get metadata for topics [flink].

    問題呈現(xiàn) Failed to get metadata for topics [flink]. at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:47) at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52) at org.apache.flink.connecto

    2024年02月11日
    瀏覽(24)
  • 大數(shù)據(jù)之使用Flink消費Kafka中topic為ods_mall_data的數(shù)據(jù),根據(jù)數(shù)據(jù)中不同的表將數(shù)據(jù)分別分發(fā)至kafka的DWD層

    大數(shù)據(jù)之使用Flink消費Kafka中topic為ods_mall_data的數(shù)據(jù),根據(jù)數(shù)據(jù)中不同的表將數(shù)據(jù)分別分發(fā)至kafka的DWD層

    前言 題目: 一、讀題分析 二、處理過程 三、重難點分析 總結(jié)? 本題來源于全國職業(yè)技能大賽之大數(shù)據(jù)技術(shù)賽項賽題 - 電商數(shù)據(jù)處理 - 實時數(shù)據(jù)處理 注:由于設(shè)備問題,代碼執(zhí)行結(jié)果以及數(shù)據(jù)的展示無法給出,可參照我以往的博客其中有相同數(shù)據(jù)源展示 ? ? 提示:以下是本

    2024年02月04日
    瀏覽(44)
  • 大數(shù)據(jù)之使用Flink消費Kafka中topic為ods_mall_log的數(shù)據(jù),根據(jù)不同的表前綴區(qū)分在存入Kafka的topic當(dāng)中

    前言 題目: 一、讀題分析 二、處理過程 ? 1.數(shù)據(jù)處理部分: 2.HBaseSink(未經(jīng)測試,不能證明其正確性,僅供參考?。?三、重難點分析 總結(jié)? 什么是HBase? 本題來源于全國職業(yè)技能大賽之大數(shù)據(jù)技術(shù)賽項賽題 - 電商數(shù)據(jù)處理 - 實時數(shù)據(jù)處理 注:由于設(shè)備問題,代碼執(zhí)行結(jié)果

    2024年02月03日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包