springboot集成flink,寫代碼學(xué)習(xí)flink,集成步驟如下:
1、maven引入依賴:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency>
2、配置文件配置相關(guān)參數(shù):文章來源:http://www.zghlxwxcb.cn/news/detail-488057.html
# Flink配置 flink.jobmanager.host=localhost flink.jobmanager.port=6123 flink.parallelism=1
3、寫測試類,代碼如下 :文章來源地址http://www.zghlxwxcb.cn/news/detail-488057.html
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import java.util.Random; public class Demo { public static void main(String[] args) throws Exception { // 創(chuàng)建執(zhí)行環(huán)境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 創(chuàng)建數(shù)據(jù)源 DataStream<String> stream = env.addSource(new SourceFunction<String>() { private volatile boolean isRunning = true; @Override public void run(SourceContext<String> ctx) throws Exception { Random random = new Random(); while (isRunning) { Thread.sleep(10); long timestamp = System.currentTimeMillis() - random.nextInt(5) * 1000; String str = "key" + random.nextInt(10) + "," + timestamp; ctx.collectWithTimestamp(str, timestamp); ctx.emitWatermark(new Watermark(timestamp)); } } @Override public void cancel() { isRunning = false; } }); // 將數(shù)據(jù)源解析成二元組(key, timestamp) DataStream<Tuple2<String, Long>> parsedStream = stream.map((String line) -> { String[] parts = line.split(","); return new Tuple2<>((String)parts[0], Long.parseLong(parts[1])); }).returns(Types.TUPLE(Types.STRING, Types.LONG)); // 設(shè)置事件時(shí)間和水位線 DataStream<Tuple2<String, Long>> withTimestampsAndWatermarks = parsedStream .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<String, Long>>() { @Override public long extractAscendingTimestamp(Tuple2<String, Long> element) { return element.f1; } }); // 按鍵值進(jìn)行分組 KeyedStream<Tuple2<String, Long>, Tuple> keyedStream = withTimestampsAndWatermarks.keyBy(0); // 每5秒鐘統(tǒng)計(jì)最近一分鐘的數(shù)據(jù)(使用滾動時(shí)間窗口) WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowedStream = keyedStream.window(TumblingEventTimeWindows.of(Time.minutes(1))); // 進(jìn)行聚合計(jì)算 DataStream<Tuple2<String, Long>> resultStream = windowedStream .reduce((Tuple2<String, Long> v1, Tuple2<String, Long> v2) -> new Tuple2<>(v1.f0, v1.f1 + v2.f1)); // 輸出結(jié)果 resultStream.print(); // 啟動作業(yè) env.execute("Demo"); } }
到了這里,關(guān)于springboot集成flink步驟,及demo的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!