背景:
我們經(jīng)常會使用到比如數(shù)據(jù)庫中的配置表信息,而我們不希望每次都去查詢db,那么我們就想定時(shí)把db配置表的數(shù)據(jù)定時(shí)加載到flink的本地內(nèi)存中,那么如何實(shí)現(xiàn)呢?
外部定時(shí)器定時(shí)加載實(shí)現(xiàn)
1.在open函數(shù)中進(jìn)行定時(shí)器的創(chuàng)建和定時(shí)加載,這個(gè)方法對于所有的RichFunction富函數(shù)都適用,包括RichMap,RichFilter,RichSink等,代碼如下所示
package wikiedits.schedule;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExecutorUtils;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduleRichMapFunction extends RichFlatMapFunction<String, String> {
// 定時(shí)任務(wù)執(zhí)行器
private transient ScheduledExecutorService scheduledExecutorService;
// 本地變量
private int threshold;
@Override
public void open(Configuration parameters) throws Exception {
// 1.從db查詢數(shù)據(jù)初始化本地變量
// threshold = DBManager.SELECTSQL.getConfig("threshold");
// 2.使用定時(shí)任務(wù)更新本地內(nèi)存的配置信息以及更新本地變量threshold的值
scheduledExecutorService = Executors.newScheduledThreadPool(10);
scheduledExecutorService.scheduleWithFixedDelay(() -> {
// 2.1 定時(shí)任務(wù)更新本地內(nèi)存配置項(xiàng)
// List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();
// for(ConfigEntity entity : configList){
ConfigEntityLocalCache.getInstance().update("key", "value");
// }
// 2.2 更新本地變量threshold的值
// threshold = DBManager.SELECTSQL.getConfig("threshold");
}, 0, 100, TimeUnit.SECONDS);
}
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
}
@Override
public void close() throws Exception {
ExecutorUtils.gracefulShutdown(100, TimeUnit.SECONDS, scheduledExecutorService);
}
}
//本地緩存實(shí)現(xiàn)
package wikiedits.schedule;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
/**
* 保存Config信息的本地緩存 ---定時(shí)同步DB配置表的數(shù)據(jù)
*/
public class ConfigEntityLocalCache {
private static volatile ConfigEntityLocalCache instance = new ConfigEntityLocalCache();
/**
* 獲取本地緩存實(shí)例
*/
public static ConfigEntityLocalCache getInstance() {
return instance;
}
/** 緩存內(nèi)存配置項(xiàng) */
private static Cache<String, String> configCache =
CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();
/**
* 更新本地緩存數(shù)據(jù)
*/
public boolean update(String key, String value){
configCache.put(key, value);
return true;
}
/**
* 更新本地緩存數(shù)據(jù)
*/
public String getByKey(String key){
return configCache.getIfPresent(key);
}
}
2.在靜態(tài)類中通過static語句塊創(chuàng)建定時(shí)器并定時(shí)加載,代碼如下
package wikiedits.schedule;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
/**
* 靜態(tài)類定時(shí)加載DB配置表到本地內(nèi)存中
*/
public class StaticLoadUtil {
// 定時(shí)任務(wù)執(zhí)行器
private static transient ScheduledExecutorService scheduledExecutorService;
public static final Cache<String, String> configCache =
CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();
// 通過定時(shí)執(zhí)行器定時(shí)同步本地緩存和DB配置表
static {
scheduledExecutorService = Executors.newScheduledThreadPool(10);
scheduledExecutorService.scheduleWithFixedDelay(() -> {
// 2.1 定時(shí)任務(wù)更新本地內(nèi)存配置項(xiàng)
// List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();
// for(ConfigEntity entity : configList){
configCache.put("key", "value");
// }
// 2.2 更新本地變量threshold的值
// threshold = DBManager.SELECTSQL.getConfig("threshold");
}, 0, 100, TimeUnit.SECONDS);
}
/**
* 獲取本地緩存
*/
public static Cache<String, String> getConfigCache() {
return configCache;
}
}
總結(jié):
1.外部定時(shí)器可以通過在富函數(shù)的open中進(jìn)行初始化并開始定時(shí)執(zhí)行文章來源:http://www.zghlxwxcb.cn/news/detail-736743.html
2.外部定時(shí)器也可以通過創(chuàng)建一個(gè)單獨(dú)的靜態(tài)類,然后在static模塊中進(jìn)行初始化并開始定時(shí)執(zhí)行文章來源地址http://www.zghlxwxcb.cn/news/detail-736743.html
到了這里,關(guān)于flink中使用外部定時(shí)器實(shí)現(xiàn)定時(shí)刷新的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!