主程序
public static void main(String[] args) throws Exception {
//1.獲取流執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//設(shè)置動態(tài)參數(shù)
ParameterTool propertiesargs = ParameterTool.fromArgs(args);
String fileName = propertiesargs.get("CephConfPath");
//從hdfs獲取動態(tài)參數(shù)配置文件
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
FileSystem fs = FileSystem.get(URI.create(fileName), conf);
fs.open(new org.apache.hadoop.fs.Path(fileName));
ParameterTool propertiesFile = ParameterTool.fromPropertiesFile(fs.open(new org.apache.hadoop.fs.Path(fileName)).getWrappedStream());
// 注冊給環(huán)境變量(HBASE使用)
env.getConfig().setGlobalJobParameters(propertiesFile);
new CephConfig(propertiesFile);
//2.設(shè)置CK&狀態(tài)后端
env.setStateBackend(new FsStateBackend(FSSTATEBACKEND));
env.enableCheckpointing(10000);// 每 ** ms 開始一次 checkpoint
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 設(shè)置模式為精確一次
env.getCheckpointConfig().setCheckpointTimeout(100000);// Checkpoint 必須在** ms內(nèi)完成,否則就會被拋棄
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);// 同一時間只允許一個 checkpoint 進(jìn)行
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);// 確認(rèn) checkpoints 之間的時間會進(jìn)行 ** ms
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10,TimeUnit.SECONDS)));//重啟策略:重啟3次,間隔10s
//3.從kafka中讀取日志信息,將將每行數(shù)據(jù)轉(zhuǎn)換為JavaBean對象 主流
DataStreamSource<String> dataStream = env.addSource(KafkaUtils.getKafkaSource(KAFKA_SOURCE_TOPIC, KAFKA_SOURCE_GROUP));
…………
//8.讀取HBase中user表,進(jìn)行維度關(guān)聯(lián)
SingleOutputStreamOperator<CephAccessRecord> record = AsyncDataStream.unorderedWait(
validDS,
new DimAsyncFunction<CephAccessRecord>() {
@Override
public String getKey(CephAccessRecord record) {
return record.access_key;
}
},
60, TimeUnit.SECONDS);
BucketAssigner<String, String> assigner = new DateTimeBucketAssigner<>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"));
StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(
new Path(HDFS_FILE_PATH),
new SimpleStringEncoder<>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.DAYS.toMillis(1))//至少包含 20 分鐘的數(shù)據(jù)
.withInactivityInterval(TimeUnit.DAYS.toMillis(1 ))//最近 20 分鐘沒有收到新的數(shù)據(jù)
.withMaxPartSize(1024 * 1024 * 1024)//文件大小已達(dá)到 1 GB
.build())
.withBucketAssigner(assigner)
.build();
// 將record-->過濾上傳數(shù)據(jù)-->轉(zhuǎn)換成jsonstring-->寫入到hdfs
// allDataDS.filter(log->log.event_type.equals("upload")).map(line->JSON.toJSONString(line)).addSink(fileSink);
dataStream.map(line->JSON.toJSONString(line)).addSink(fileSink);
//10.流環(huán)境執(zhí)行
env.execute();
異步關(guān)聯(lián)程序
package com.data.ceph.function;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
import java.util.Collections;
import java.util.Map;
public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T> implements DimAsyncJoinFunction<T> {
private org.apache.hadoop.hbase.client.Connection connection = null;
private ResultScanner rs = null;
private Table table = null;
@Override
public void open(Configuration parameters) throws Exception {
//不啟用安全認(rèn)證
System.setProperty("zookeeper.sasl.client", "false");
Map<String, String> stringStringMap = getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap();
String hbase = stringStringMap.get("hbase_zookeeper_quorum");
org.apache.hadoop.conf.Configuration hconf = HBaseConfiguration.create();
hconf.set(HConstants.ZOOKEEPER_QUORUM, "172.16.23.37,172.16.23.38,172.16.23.39");
// hconf.set(HConstants.ZOOKEEPER_QUORUM, hbase);
hconf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181");
hconf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase");
//指定用戶名為hbase的用戶去訪問hbase服務(wù)
UserGroupInformation userGroupInformation = UserGroupInformation.createRemoteUser("hive");
connection = ConnectionFactory.createConnection(hconf, User.create(userGroupInformation));
table = connection.getTable(TableName.valueOf("cloud:user_info"));
}
@Override
public void asyncInvoke(T input, ResultFuture<T> resultFuture) throws Exception {
Get get = new Get(Bytes.toBytes(getKey(input)));
Result rs = table.get(get);
for (Cell cell : rs.rawCells()) {
String column = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
BeanUtils.setProperty(input, column, value);
}
resultFuture.complete(Collections.singletonList(input));
}
@Override
public void close() throws Exception {
if (rs != null) rs.close();
if (table != null) table.close();
if (connection != null) connection.close();
}
@Override
public void timeout(T input, ResultFuture<T> resultFuture) throws Exception {
System.out.println("TimeOut:" + input);
}
}
文章來源地址http://www.zghlxwxcb.cn/news/detail-840649.html
文章來源:http://www.zghlxwxcb.cn/news/detail-840649.html
到了這里,關(guān)于Flink異步io關(guān)聯(lián)Hbase的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!