為了在不重新啟動(dòng) Flink 作業(yè)的情況下處理主題擴(kuò)展或主題創(chuàng)建等場(chǎng)景,可以將 Kafka 源配置為在提供的主題分區(qū)訂閱模式下定期發(fā)現(xiàn)新分區(qū)。要啟用分區(qū)發(fā)現(xiàn),請(qǐng)為屬性partition.discovery.interval.ms設(shè)置一個(gè)非負(fù)值。
一、動(dòng)態(tài)發(fā)現(xiàn)新增分區(qū)
flink程序增加自動(dòng)發(fā)現(xiàn)分區(qū)參數(shù):
- flink.partition-discovery.interval-millis是一個(gè)配置屬性,用于設(shè)置Flink作業(yè)中的分區(qū)發(fā)現(xiàn)間隔時(shí)間(以毫秒為單位)。
- 在Flink作業(yè)中,數(shù)據(jù)源(例如Kafka或文件系統(tǒng))的分區(qū)可能會(huì)發(fā)生變化。為了及時(shí)感知分區(qū)的變化情況,并根據(jù)變化進(jìn)行相應(yīng)的處理,F(xiàn)link提供了分區(qū)發(fā)現(xiàn)機(jī)制。
- flink.partition-discovery.interval-millis配置屬性用于設(shè)置Flink作業(yè)在進(jìn)行分區(qū)發(fā)現(xiàn)時(shí)的間隔時(shí)間。Flink作業(yè)會(huì)定期檢查數(shù)據(jù)源的分區(qū)情況,如果發(fā)現(xiàn)分區(qū)發(fā)生了變化(例如增加或減少了分區(qū)),F(xiàn)link會(huì)相應(yīng)地調(diào)整作業(yè)的并行度或重新分配任務(wù)來(lái)適應(yīng)新的分區(qū)情況。
- 通過(guò)調(diào)整flink.partition-discovery.interval-millis的值,可以控制Flink作業(yè)進(jìn)行分區(qū)發(fā)現(xiàn)的頻率。較小的間隔時(shí)間可以實(shí)時(shí)感知到分區(qū)變化,但可能會(huì)增加作業(yè)的開(kāi)銷(xiāo);較大的間隔時(shí)間可以減少開(kāi)銷(xiāo),但可能導(dǎo)致較長(zhǎng)時(shí)間的延遲。
- 需要注意的是,flink.partition-discovery.interval-millis的默認(rèn)值是5分鐘(300000毫秒),可以根據(jù)具體需求進(jìn)行調(diào)整。
二、Flink SQL動(dòng)態(tài)發(fā)現(xiàn)新增分區(qū)
參數(shù):scan.topic-partition-discovery.interval
CREATE TABLE KafkaTable (
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
`partition` BIGINT METADATA VIRTUAL,
`offset` BIGINT METADATA VIRTUAL,
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
Connector Options:
Option | Required | Default | Type | Description |
---|---|---|---|---|
scan.topic-partition-discovery.interval | optional | (none) | Duration | 消費(fèi)者定期發(fā)現(xiàn)動(dòng)態(tài)創(chuàng)建的Kafka主題和分區(qū)的時(shí)間間隔。 |
三、Flink API動(dòng)態(tài)發(fā)現(xiàn)新增分區(qū)
參數(shù):partition.discovery.interval.ms
Java文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-647384.html
KafkaSource.builder()
.setProperty("partition.discovery.interval.ms", "10000");
// discover new partitions per 10 seconds
Python文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-647384.html
KafkaSource.builder() \
.set_property("partition.discovery.interval.ms", "10000")
# discover new partitions per 10 seconds
到了這里,關(guān)于Flink系列之:動(dòng)態(tài)發(fā)現(xiàn)新增分區(qū)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!