国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

flink中使用外部定時(shí)器實(shí)現(xiàn)定時(shí)刷新

這篇具有很好參考價(jià)值的文章主要介紹了flink中使用外部定時(shí)器實(shí)現(xiàn)定時(shí)刷新。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

背景:

我們經(jīng)常會使用到比如數(shù)據(jù)庫中的配置表信息,而我們不希望每次都去查詢db,那么我們就想定時(shí)把db配置表的數(shù)據(jù)定時(shí)加載到flink的本地內(nèi)存中,那么如何實(shí)現(xiàn)呢?

外部定時(shí)器定時(shí)加載實(shí)現(xiàn)

1.在open函數(shù)中進(jìn)行定時(shí)器的創(chuàng)建和定時(shí)加載,這個(gè)方法對于所有的RichFunction富函數(shù)都適用,包括RichMap,RichFilter,RichSink等,代碼如下所示

package wikiedits.schedule;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExecutorUtils;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduleRichMapFunction extends RichFlatMapFunction<String, String> {

    // 定時(shí)任務(wù)執(zhí)行器
    private transient ScheduledExecutorService scheduledExecutorService;
    // 本地變量
    private int threshold;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 1.從db查詢數(shù)據(jù)初始化本地變量
//        threshold = DBManager.SELECTSQL.getConfig("threshold");
        // 2.使用定時(shí)任務(wù)更新本地內(nèi)存的配置信息以及更新本地變量threshold的值
        scheduledExecutorService = Executors.newScheduledThreadPool(10);
        scheduledExecutorService.scheduleWithFixedDelay(() -> {
            // 2.1 定時(shí)任務(wù)更新本地內(nèi)存配置項(xiàng)
            // List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();
//            for(ConfigEntity entity : configList){
                ConfigEntityLocalCache.getInstance().update("key", "value");
//            }
            // 2.2 更新本地變量threshold的值
//            threshold = DBManager.SELECTSQL.getConfig("threshold");
        }, 0, 100, TimeUnit.SECONDS);

    }

    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {

    }

    @Override
    public void close() throws Exception {
        ExecutorUtils.gracefulShutdown(100, TimeUnit.SECONDS, scheduledExecutorService);
    }


}

//本地緩存實(shí)現(xiàn)
package wikiedits.schedule;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

/**
 * 保存Config信息的本地緩存 ---定時(shí)同步DB配置表的數(shù)據(jù)
 */
public class ConfigEntityLocalCache {

    private static volatile ConfigEntityLocalCache instance = new ConfigEntityLocalCache();

    /**
     * 獲取本地緩存實(shí)例
     */
    public static ConfigEntityLocalCache getInstance() {
        return instance;
    }

    /** 緩存內(nèi)存配置項(xiàng) */
    private static Cache<String, String> configCache =
            CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();


    /**
     * 更新本地緩存數(shù)據(jù)
     */
    public boolean update(String key, String value){
        configCache.put(key, value);
        return true;
    }


    /**
     * 更新本地緩存數(shù)據(jù)
     */
    public  String getByKey(String key){
        return configCache.getIfPresent(key);
    }

}


2.在靜態(tài)類中通過static語句塊創(chuàng)建定時(shí)器并定時(shí)加載,代碼如下

package wikiedits.schedule;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

/**
 * 靜態(tài)類定時(shí)加載DB配置表到本地內(nèi)存中
 */
public class StaticLoadUtil {

    // 定時(shí)任務(wù)執(zhí)行器
    private static transient ScheduledExecutorService scheduledExecutorService;

    public static final Cache<String, String> configCache =
            CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();

    // 通過定時(shí)執(zhí)行器定時(shí)同步本地緩存和DB配置表
    static {
        scheduledExecutorService = Executors.newScheduledThreadPool(10);
        scheduledExecutorService.scheduleWithFixedDelay(() -> {
            // 2.1 定時(shí)任務(wù)更新本地內(nèi)存配置項(xiàng)
            // List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();
            // for(ConfigEntity entity : configList){
            configCache.put("key", "value");
            // }
            // 2.2 更新本地變量threshold的值
            // threshold = DBManager.SELECTSQL.getConfig("threshold");
        }, 0, 100, TimeUnit.SECONDS);
    }

    /**
     * 獲取本地緩存
     */
    public static Cache<String, String> getConfigCache() {
        return configCache;
    }


}

總結(jié):

1.外部定時(shí)器可以通過在富函數(shù)的open中進(jìn)行初始化并開始定時(shí)執(zhí)行

2.外部定時(shí)器也可以通過創(chuàng)建一個(gè)單獨(dú)的靜態(tài)類,然后在static模塊中進(jìn)行初始化并開始定時(shí)執(zhí)行文章來源地址http://www.zghlxwxcb.cn/news/detail-736743.html

到了這里,關(guān)于flink中使用外部定時(shí)器實(shí)現(xiàn)定時(shí)刷新的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【STM32】STM32學(xué)習(xí)筆記-定時(shí)器定時(shí)中斷 定時(shí)器外部時(shí)鐘(14)

    【STM32】STM32學(xué)習(xí)筆記-定時(shí)器定時(shí)中斷 定時(shí)器外部時(shí)鐘(14)

    1.1 TIM_InternalClockConfig 1.2 TIM_TimeBaseInit 1.3 TIM_TimeBaseInitTypeDef 1.4 TIM_ClearFlag 1.5 TIM_ITConfig 1.6 TIM_Cmd 1.7 中斷服務(wù)函數(shù) 參考程序 1.8 TIM_ETRClockMode2Config timer.h timer.c main.c timer.h timer.c main.c 09-定時(shí)器定時(shí)中斷.rar 10-定時(shí)器外部時(shí)鐘.rar 參考: 【STM32】江科大STM32學(xué)習(xí)筆記匯總

    2024年02月03日
    瀏覽(97)
  • STM32單片機(jī)(六)TIM定時(shí)器 -> 第二節(jié):TIM定時(shí)中斷練習(xí)(定時(shí)器定時(shí)中斷和定時(shí)器外部時(shí)鐘)

    STM32單片機(jī)(六)TIM定時(shí)器 -> 第二節(jié):TIM定時(shí)中斷練習(xí)(定時(shí)器定時(shí)中斷和定時(shí)器外部時(shí)鐘)

    ?? 專欄簡介:本專欄記錄了從零學(xué)習(xí)單片機(jī)的過程,其中包括51單片機(jī)和STM32單片機(jī)兩部分;建議先學(xué)習(xí)51單片機(jī),其是STM32等高級單片機(jī)的基礎(chǔ);這樣再學(xué)習(xí)STM32時(shí)才能融會貫通。 ?? 專欄適用人群 :適用于想要從零基礎(chǔ)開始學(xué)習(xí)入門單片機(jī),且有一定C語言基礎(chǔ)的的童鞋

    2024年02月09日
    瀏覽(30)
  • STM32自學(xué)?定時(shí)器外部時(shí)鐘案例

    本案例主要是通過外部時(shí)鐘實(shí)現(xiàn)對射式紅外傳感器的計(jì)次,在oled顯示屏上顯示CNT的次數(shù) #include \\\"stm32f10x.h\\\" #include \\\"stm32f10x_tim.h\\\" #include \\\"timer_interrupt.h\\\" #include \\\"stdint.h\\\" //初始化函數(shù) void Timer_Init(void) { ?/*開啟時(shí)鐘*/ ?RCC_APB1PeriphClockCmd(RCC_APB1Periph_TIM2, ENABLE); //開啟TIM2的時(shí)鐘 ?RCC_A

    2024年02月22日
    瀏覽(32)
  • 江科大stm32視頻學(xué)習(xí)筆記——TIM定時(shí)中斷&定時(shí)器外部時(shí)鐘

    江科大stm32視頻學(xué)習(xí)筆記——TIM定時(shí)中斷&定時(shí)器外部時(shí)鐘

    目錄 一、TIM(Timer)定時(shí)器簡介 ?1.1 定時(shí)器類型 摘要 1.1.1 基本定時(shí)器 1.1.2 通用定時(shí)器 1.1.3 高級定時(shí)器? 1.2 定時(shí)中斷基本結(jié)構(gòu) 1.2.1 結(jié)構(gòu)框圖 1.2.2 時(shí)序圖 二、定時(shí)器定時(shí)中斷定時(shí)器外部時(shí)鐘 2.1 內(nèi)部時(shí)鐘鬧鐘代碼 2.1.1 Timer.c 2.1.2 Buzzer.c加入間隔發(fā)聲函數(shù) 2.1.3 main.c 2.1.4 實(shí)驗(yàn)視頻

    2024年01月23日
    瀏覽(55)
  • 51 單片機(jī)【外部中斷、定時(shí)器中斷、回調(diào)函數(shù)】

    51 單片機(jī)【外部中斷、定時(shí)器中斷、回調(diào)函數(shù)】

    ?這里的外部中斷類似監(jiān)聽器,時(shí)時(shí)刻刻監(jiān)視某引腳的電平變化;這里的定時(shí)器中斷類似于定時(shí)任務(wù),可以定時(shí)執(zhí)行某函數(shù);這里將回調(diào)函數(shù)和中斷結(jié)合起來,案例里有點(diǎn)設(shè)計(jì)模式的味道(忘了哪個(gè)了,也可能就是感覺,關(guān)于高層不能調(diào)用低層的解決),也有點(diǎn)函數(shù)式編程的

    2024年02月04日
    瀏覽(36)
  • 單片機(jī)學(xué)習(xí) 11-中斷系統(tǒng)(定時(shí)器中斷+外部中斷)

    單片機(jī)學(xué)習(xí) 11-中斷系統(tǒng)(定時(shí)器中斷+外部中斷)

    ? 中斷是為使單片機(jī)具有對外部或內(nèi)部隨機(jī)發(fā)生的事件實(shí)時(shí)處理而設(shè)置的,中斷功能的存在,很大程度上提高了單片機(jī)處理外部或內(nèi)部事件的能力。它也是單片機(jī)最重要的功能之一,是我們學(xué)習(xí)單片機(jī)必須要掌握的。很多初學(xué)者被困在中斷中,學(xué)了很久仍然不知道中斷究竟是

    2024年02月05日
    瀏覽(25)
  • 51單片機(jī):中斷系統(tǒng)(外部中斷,定時(shí)器中斷,串口通信)

    51單片機(jī):中斷系統(tǒng)(外部中斷,定時(shí)器中斷,串口通信)

    目錄 中斷系統(tǒng)簡介: 中斷的優(yōu)先級和嵌套: 8個(gè)中斷請求源及其優(yōu)先級: 中斷的分別介紹: 1、外部中斷0:INT0?? 2、外部中斷1? 3、T0和 T1:定時(shí)計(jì)數(shù)器的功能 4、串口中斷(串口為什么使用定時(shí)器后面講) 中斷寄存器 (1)中斷允許控制(IE) (2)中斷請求標(biāo)志(TCON) (

    2024年01月25日
    瀏覽(20)
  • Flink timer定時(shí)器

    常見timer 基于處理時(shí)間或者事件時(shí)間處理過一個(gè)元素之后, 注冊一個(gè)定時(shí)器, 然后指定的時(shí)間執(zhí)行. Context和OnTimerContext 所持有的TimerService對象擁有以下方法: currentProcessingTime(): Long 返回當(dāng)前處理時(shí)間 currentWatermark(): Long 返回當(dāng)前watermark的時(shí)間戳 registerProcessingTimeTimer(timestamp: Lon

    2024年02月07日
    瀏覽(19)
  • STM32 學(xué)習(xí)筆記(六)定時(shí)器中斷:內(nèi)部時(shí)鐘模式,外部時(shí)鐘模式

    STM32 學(xué)習(xí)筆記(六)定時(shí)器中斷:內(nèi)部時(shí)鐘模式,外部時(shí)鐘模式

    定時(shí)器是功能最強(qiáng)大,內(nèi)容最復(fù)雜的32結(jié)構(gòu)。 之前51用過的功能,定時(shí)產(chǎn)生中斷。 輸出比較,常用于產(chǎn)生 PWM 波形,驅(qū)動(dòng)電機(jī)等。 輸入捕獲,測量方波頻率。 編碼器,讀取正交編碼器的波形。 最大定時(shí)時(shí)間:72M/65536/65536=中斷頻率,中斷頻率取倒數(shù)是最大定時(shí)時(shí)間。 定時(shí)器可

    2024年02月08日
    瀏覽(24)
  • Django框架使用定時(shí)器-APScheduler實(shí)現(xiàn)定時(shí)任務(wù):django實(shí)現(xiàn)簡單的定時(shí)任務(wù)

    系統(tǒng):windows10 python: python==3.9.0 djnago==3.2.0 APScheduler==3.10.1 1、創(chuàng)建utils包,在包里面創(chuàng)建schedulers包 utils/schedulers/task.py utils/schedulers/scheduler.py utils/schedulers/__init__.py 2、項(xiàng)目配置文件settings.py

    2024年02月12日
    瀏覽(17)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包