国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

《十堂課學(xué)習(xí) Flink》第七章:Flink 流計(jì)算保存結(jié)果env.sinkTo(以 Kafka / ES 為例)

這篇具有很好參考價(jià)值的文章主要介紹了《十堂課學(xué)習(xí) Flink》第七章:Flink 流計(jì)算保存結(jié)果env.sinkTo(以 Kafka / ES 為例)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

7.1 本章概述

本章基于Elastic Search 以及 Kafka 用于介紹 Flink 的 sinkTo / addSink 的 API 的使用方法,此外我們還會(huì)實(shí)現(xiàn)幾個(gè)通用的方法,在實(shí)際應(yīng)用場(chǎng)景中,針對(duì)不同的實(shí)體類可以通過(guò)這個(gè)通用的方法來(lái)完成,而不需要一對(duì)一地實(shí)現(xiàn)。

7.2 效果展示

flink 寫數(shù)據(jù)到ES

此外,還將編寫一個(gè)通用的工具類,用于 kafka 的序列化與反序列化,即對(duì)于某實(shí)體類(不管是什么類型的實(shí)體類均可),我們通過(guò)監(jiān)聽kafka的topic進(jìn)行序列化,得到期望的實(shí)體類;并將flink執(zhí)行結(jié)果進(jìn)行反序列化,轉(zhuǎn)換為json字符串,寫回 kafka 。

7.3 代碼編寫

根據(jù)實(shí)際情況添加相關(guān)依賴,與 Flink / Kafka 相關(guān)的依賴我們?cè)谇懊嬲鹿?jié)已經(jīng)陳述過(guò),這里我們只添加額外添加的依賴,也就是

其中 flink.version1.14.6

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.45</version>
        </dependency>
    </dependencies>

7.3.1 sinkTo ES

這個(gè)案例中,只是把三個(gè)字符串寫入 ES。

package cn.smileyan.demo;

import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * 創(chuàng)建索引 hello-world 并向這個(gè)索引中寫入數(shù)據(jù)
 * @author Smileyan
 */
public class FlinkElasticsearchDemo {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final MultipleParameterTool parameters = MultipleParameterTool.fromArgs(args);

        // 設(shè)置 Elasticsearch 集群的連接信息 比如 http://localhost:9200 | http://localhost:9200,http://localhost:9200
        String urls = parameters.get("es.hosts", "http://localhost:9200");
        final String regex = ",";
        List<HttpHost> httpHosts = Arrays.stream(urls.split(regex)).map(HttpHost::create).collect(Collectors.toList());

        // 創(chuàng)建 ElasticsearchSinkFunction 用于將數(shù)據(jù)寫入 Elasticsearch
        final String index = parameters.get("index", "hello-world");
        ElasticsearchSink.Builder<String> esSinkBuilder = buildSinkEs(httpHosts, index);

        // 創(chuàng)建數(shù)據(jù)流
        DataStream<String> resultStream = env.fromElements("data1", "data2", "data3");

        // 將結(jié)果 Sink 到 Elasticsearch
        resultStream.addSink(esSinkBuilder.build());

        // 執(zhí)行 Flink 作業(yè)
        env.execute("Flink Elasticsearch Example");
    }

    /**
     * 獲取用于將字符串?dāng)?shù)據(jù) Sink 到 Elasticsearch 的 ElasticsearchSink.Builder 對(duì)象。
     *
     * @param httpHosts Elasticsearch 集群的連接信息
     * @param index 存儲(chǔ)到 Elasticsearch 中的索引
     * @return ElasticsearchSink.Builder 對(duì)象
     */
    private static ElasticsearchSink.Builder<String> buildSinkEs(List<HttpHost> httpHosts, String index) {
        ElasticsearchSinkFunction<String> elasticsearchSinkFunction = (element, ctx, indexer) -> {
            // 將數(shù)據(jù)寫入 Elasticsearch
            Map<String, String> json = new HashMap<>(1);
            json.put("data", element);
            IndexRequest source = Requests.indexRequest().index(index).source(json);
            indexer.add(source);
        };

        // 創(chuàng)建 ElasticsearchSink
        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction);

        // 設(shè)置批量寫入的緩沖區(qū)大小,可根據(jù)實(shí)際情況調(diào)整
        esSinkBuilder.setBulkFlushMaxActions(1);
        return esSinkBuilder;
    }
}

7.3.2 將實(shí)體類反序列化后寫 ES


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**
 *
 * @author Smileyan
 */
public class GenericFlinkElasticsearchDemo {
    public static void main(String[] args) throws Exception {
        // 設(shè)置執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final MultipleParameterTool parameters = MultipleParameterTool.fromArgs(args);

        // 設(shè)置 Elasticsearch 集群的連接信息 比如 http://localhost:9200 | http://localhost:9200,http://localhost:9200
        String urls = parameters.get("es.hosts", "http://localhost:9200");
        final String regex = ",";
        List<HttpHost> httpHosts = Arrays.stream(urls.split(regex)).map(HttpHost::create).collect(Collectors.toList());

        // 創(chuàng)建 ElasticsearchSinkFunction 用于將數(shù)據(jù)寫入 Elasticsearch
        final String index = parameters.get("es.index", "hello-world-2");

        GenericElasticsearchSinkFunction<Student> elasticsearchSinkFunction = new GenericElasticsearchSinkFunction<>(index);
        // 創(chuàng)建 ElasticsearchSink
        ElasticsearchSink.Builder<Student> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction);

        DataStream<Student> inputDataStream = env.fromElements(
                new Student(1, "張三", 18),
                new Student(2, "李四", 20),
                new Student(3, "王五", 22)
        );

        // 將結(jié)果 Sink 到 Elasticsearch
        inputDataStream.addSink(esSinkBuilder.build());

        // 執(zhí)行 Flink 作業(yè)
        env.execute("Flink Elasticsearch Example");
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}

其中我們實(shí)現(xiàn)了一個(gè)可復(fù)用的類方法。

import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;


/**
 * 將實(shí)體類數(shù)據(jù)寫入到ES中
 * @author Smileyan
 */
public class GenericElasticsearchSinkFunction<T> implements ElasticsearchSinkFunction<T> {
    private final String indexName;

    public GenericElasticsearchSinkFunction(String indexName) {
        this.indexName = indexName;
    }

    @Override
    public void process(T element, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
        // 將數(shù)據(jù)寫入 Elasticsearch 的邏輯
        if (element != null) {
            JSONObject jsonMap = JSONObject.from(element);
            IndexRequest indexRequest = Requests.indexRequest()
                    .index(indexName)
                    .source(jsonMap);

            // 將 IndexRequest 添加到 RequestIndexer
            requestIndexer.add(indexRequest);
        }
    }
}

7.3.3 通用 kafka 的序列化、反序列化方法

這個(gè)案例中,我們通過(guò)實(shí)現(xiàn)兩個(gè)關(guān)鍵接口,使得我們?cè)趹?yīng)用 kafka 的sink和source的過(guò)程中更加方便。通過(guò)泛型來(lái)泛化序列化和反序列化的類類型,從而實(shí)現(xiàn)高可復(fù)用性。

package cn.smileyan.demos;

import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.nio.charset.StandardCharsets;


/**
 * 將字節(jié)碼數(shù)據(jù)進(jìn)行序列化,以及將實(shí)體類轉(zhuǎn)換
 * @author smileyan
 * @param <O> 實(shí)體類
 */
@Slf4j
public class CommonEntitySchema<O> implements DeserializationSchema<O>, SerializationSchema<O> {

    private final Class<O> clazz;

    public CommonEntitySchema(Class<O> clazz) {
        this.clazz = clazz;
    }

    @Override
    public O deserialize(byte[] message) {
        try {
            String str = new String(message, StandardCharsets.UTF_8);
            log.info("kafka received message: {}", str);
            return JSON.parseObject(str, clazz);
        } catch (Exception e) {
            log.error(e.getMessage());
        }
        return null;
    }

    @Override
    public boolean isEndOfStream(O nextElement) {
        return false;
    }

    @Override
    public TypeInformation<O> getProducedType() {
        return TypeInformation.of(clazz);
    }

    @Override
    public byte[] serialize(O element) {
        return JSON.toJSONBytes(element);
    }
}

接著我們基于這個(gè)已經(jīng)實(shí)現(xiàn)的類,完成 kafka 的 source 和 sink 的例子。

package cn.smileyan.demos;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Collections;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * 將 kafka 數(shù)據(jù)進(jìn)行序列化,轉(zhuǎn)換為實(shí)體類
 * @author smileyan
 */
@Slf4j
public class FlinkKafkaEntitySinkToExample {
    /**
     * 參數(shù)解釋:
     *  -bs broker 地址
     *  -kcg kafka consumer group
     *  -it kafka 輸入數(shù)據(jù) topic
     *  -ot kafka 輸出數(shù)據(jù) topic
     *  -ct 是否自動(dòng)創(chuàng)建輸入 topic
     *  -pt topic 分區(qū)數(shù)
     *  -rf topic 副本數(shù)
     */
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final MultipleParameterTool cmd = MultipleParameterTool.fromArgs(args);
        final String bootstrapServer = cmd.get("bs", "localhost:9092");
        final String kafkaConsumerGroup = cmd.get("kcg", "flink-consumer");
        final String inputTopic = cmd.get("it", "quickstart-events");
        final String outputTopic = cmd.get("ot", "quickstart-results");
        final boolean createTopic = cmd.getBoolean("ct", false);
        final Long transactionTimeout = cmd.getLong("tt", 300000L);

        log.info("broker is {} and topic is {}", bootstrapServer, inputTopic);

        // 如果 topic 不存在,并且開啟了由 flink 任務(wù)創(chuàng)建 TOPIC。默認(rèn)不開啟,一般情況下,部署人員應(yīng)當(dāng)根據(jù)實(shí)際情況設(shè)置不同topic的并行度,副本數(shù)
        if (createTopic) {
            final int partitions = cmd.getInt("pt", 1);
            final short replicationFactor = cmd.getShort("rf", (short) 1);
            createTopic(bootstrapServer, inputTopic, partitions, replicationFactor);
        }

        final KafkaSource<Student> kafkaSource = KafkaSource.<Student>builder()
                .setGroupId(kafkaConsumerGroup)
                .setStartingOffsets(OffsetsInitializer.latest())
                .setBootstrapServers(bootstrapServer)
                .setTopics(inputTopic)
                .setValueOnlyDeserializer(new CommonEntitySchema<>(Student.class))
                .build();

        Properties properties = new Properties();
        properties.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, String.valueOf(transactionTimeout));
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);

        final KafkaSink<Student> kafkaSink = KafkaSink.<Student>builder()
                .setKafkaProducerConfig(properties)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(outputTopic)
                        .setValueSerializationSchema(new CommonEntitySchema<>(Student.class))
                        .build())
                .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();

        final DataStreamSource<Student> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");

        // 過(guò)濾掉反序列化失敗的對(duì)象,只保留正確的對(duì)象
        SingleOutputStreamOperator<Student> out1 = kafkaStream.filter(Objects::nonNull)
                .map(student -> {
                    log.info("filter none objects is {}", student);
                    return student;
                });

        // 只選擇年紀(jì)小于 10 的對(duì)象
        SingleOutputStreamOperator<Student> out2 = out1.filter(student -> student.getAge() != null && student.getAge() < 10)
                .map(student -> {
                    log.info("filter age < 10: {}", student);
                    return student;
                });

        out2.sinkTo(kafkaSink);

        env.execute("Flink Kafka Example");
    }

    /**
     * 如果 TOPIC 不存在則創(chuàng)建該 TOPIC
     * @param bootstrapServer kafka broker 地址
     * @param topic 想要?jiǎng)?chuàng)建的 TOPIC
     * @param partitions 并行度
     * @param replicationFactor 副本數(shù)
     */
    public static void createTopic(String bootstrapServer,
                                   String topic,
                                   int partitions,
                                   int replicationFactor) throws ExecutionException, InterruptedException {
        Properties adminProperties = new Properties();
        adminProperties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        try (AdminClient adminClient = AdminClient.create(adminProperties)) {
            if (!adminClient.listTopics().names().get().contains(topic)) {
                NewTopic newTopic = new NewTopic(topic, partitions, (short) replicationFactor);
                adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
                log.info("created topic: {}", topic);
            }
        }
    }

    @Data
    static class Student {
        private String name;
        private Integer age;
    }

}

這個(gè)代碼中,我們還通過(guò) filter 方法過(guò)濾掉反序列化失敗的、反序列化后不滿足要求的實(shí)體。也就是把滿足條件的實(shí)體類 sinkTo Kafka。

此外,官方推薦使用的也是這類方法,而不是以前官方實(shí)現(xiàn)的 FlinkKafkaXXXXX 之類的。

7.3.4 實(shí)現(xiàn)一個(gè)通用的KafkaSourceBuilder 以及 KafkaSinkBuilder 方法

假設(shè),我們需要開發(fā)多個(gè) Flink Job ,也就是多個(gè)不同的 main 方法類,需要針對(duì)不同類型的 kafka 消息完成對(duì)應(yīng)的計(jì)算過(guò)程。也就是說(shuō) Kafka 的 source 以及 Kafka 的 sink 的實(shí)體類可能大不相同,但是這個(gè)過(guò)程以及參數(shù)是一樣的。

所以我們不妨實(shí)現(xiàn)一個(gè)通過(guò)的 source 方法以及 sink 方法,通過(guò) main 方法中的 String[] args 來(lái)確定 kafka 的 source 地址以及 kafka 的 sink 地址。

具體的實(shí)現(xiàn)方法如下:

package cn.smileyan.demos;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;

import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * 通用的基于args的kafka構(gòu)建
 * @author smileyan
 */
@Slf4j
public class KafkaArgsBuilder {
    /**
     * 構(gòu)建參數(shù)
     */
    private final MultipleParameterTool parameterTool;

    public KafkaArgsBuilder(String[] args) {
        parameterTool = MultipleParameterTool.fromArgs(args);
    }

    /**
     * 構(gòu)建kafka sink
     * @param clazz 實(shí)體類class
     * @param <E> 實(shí)體類泛型
     * @return kafka sink 對(duì)象
     */
    public <E> KafkaSink<E> buildSink(Class<E> clazz) {
        final String bs = parameterTool.getRequired(KafkaArgs.BOOTSTRAP_SERVER.key);
        final String ot = parameterTool.getRequired(KafkaArgs.OUTPUT_TOPIC.key);

        return KafkaSink.<E>builder()
                .setBootstrapServers(bs)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(ot)
                        .setValueSerializationSchema(new CommonEntitySchema<>(clazz))
                        .build())
                .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();
    }

    /**
     * 構(gòu)建kafka source
     * @param clazz 實(shí)體類class
     * @param <E> 實(shí)體類泛型
     * @return kafka source 對(duì)象
     */
    public <E> KafkaSource<E> buildSource(Class<E> clazz) throws ExecutionException, InterruptedException {
        final String kafkaConsumerGroup = parameterTool.getRequired(KafkaArgs.KAFKA_CONSUMER_GROUP.key);
        final String bootstrapServer = parameterTool.getRequired(KafkaArgs.BOOTSTRAP_SERVER.key);
        final String inputTopic = parameterTool.getRequired(KafkaArgs.INPUT_TOPIC.key);
        final boolean createTopic = parameterTool.has(KafkaArgs.CREATE_TOPIC.key);

        if (createTopic) {
            final int partition = parameterTool.getInt(KafkaArgs.CREATE_TOPIC_PARTITION.key, 1);
            final short replicationFactor = parameterTool.getShort(KafkaArgs.REPLICATION_FACTOR.key, (short) 1);
            createTopic(bootstrapServer, inputTopic, partition, replicationFactor);
        }

        return KafkaSource.<E>builder()
                .setGroupId(kafkaConsumerGroup)
                .setStartingOffsets(OffsetsInitializer.latest())
                .setBootstrapServers(bootstrapServer)
                .setTopics(inputTopic)
                .setValueOnlyDeserializer(new CommonEntitySchema<>(clazz))
                .build();
    }

    public enum KafkaArgs {
        /*
         * kafka 服務(wù)地址
         */
        BOOTSTRAP_SERVER("bs"),

        /*
         * kafka 消費(fèi)者組
         */
        KAFKA_CONSUMER_GROUP("kcg"),

        /*
         * kafka 輸入主題
         */
        INPUT_TOPIC("it"),

        /*
         * kafka 輸出主題
         */
        OUTPUT_TOPIC("ot"),

        /*
         * 是否自動(dòng)創(chuàng)建主題
         */
        CREATE_TOPIC("ct"),

        /*
         * 分區(qū)數(shù)
         */
        CREATE_TOPIC_PARTITION("pt"),

        /*
         * 副本數(shù)
         */
        REPLICATION_FACTOR("rf");

        private final String key;

        KafkaArgs(String key) {
            this.key = key;
        }
    }

    /**
     * 如果 TOPIC 不存在則創(chuàng)建該 TOPIC
     * @param bootstrapServer kafka broker 地址
     * @param topic 想要?jiǎng)?chuàng)建的 TOPIC
     * @param partitions 并行度
     * @param replicationFactor 副本數(shù)
     */
    public static void createTopic(String bootstrapServer,
                                   String topic,
                                   int partitions,
                                   int replicationFactor) throws ExecutionException, InterruptedException {
        Properties adminProperties = new Properties();
        adminProperties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        try (AdminClient adminClient = AdminClient.create(adminProperties)) {
            if (!adminClient.listTopics().names().get().contains(topic)) {
                NewTopic newTopic = new NewTopic(topic, partitions, (short) replicationFactor);
                adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
                log.info("created topic: {}", topic);
            }
        }
    }
}

對(duì)應(yīng)地,添加一個(gè)使用的案例:

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 將實(shí)體類序列化并寫入 kafka
 * @author smileyan
 */
public class KafkaArgsSinkDemo {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        final KafkaArgsBuilder kafkaArgsBuilder = new KafkaArgsBuilder(args);

        DataStream<Student> dataStream = env.fromElements(
                new Student(1, "張三", 18),
                new Student(2, "李四", 20),
                new Student(3, "王五", 22)
        );

        KafkaSink<Student> sinker = kafkaArgsBuilder.buildSink(Student.class);

        dataStream.sinkTo(sinker);

        env.execute();
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}

構(gòu)建一個(gè) kafka source 例子。

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 使用 KafkaArgsBuilder 的 source 案例
 * @author smileyan
 */
public class KafkaArgsSourceDemo {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        final KafkaArgsBuilder kafkaArgsBuilder = new KafkaArgsBuilder(args);

        final KafkaSource<String> source = kafkaArgsBuilder.buildSource(String.class);

        final DataStreamSource<String> kafkaStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        kafkaStream.print();

        env.execute("Flink Kafka Example");
    }
}

7.4 本章小結(jié)

前面章節(jié)已經(jīng)比較詳細(xì)的介紹了項(xiàng)目的基礎(chǔ)環(huán)境以及注意事項(xiàng),本章也作為一個(gè)承接點(diǎn),把最基本的內(nèi)容大致過(guò)一遍以后,將會(huì)基于我們假設(shè)的需求場(chǎng)景,開發(fā)對(duì)應(yīng)的處理流程。包括對(duì)多條 kafka 消息的聚合操作,滑動(dòng)窗口操作,自定義的 map 以及 flatMap 操作,實(shí)際業(yè)務(wù)場(chǎng)景中可能用的同步、異步 http 請(qǐng)求等。

希望可以幫助到您,非常感謝小伙伴們的支持 !

感謝閱讀 ~

感謝點(diǎn)贊 ~

《十堂課學(xué)習(xí) Flink》第七章:Flink 流計(jì)算保存結(jié)果env.sinkTo(以 Kafka / ES 為例),學(xué)習(xí),flink,kafka文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-858849.html

到了這里,關(guān)于《十堂課學(xué)習(xí) Flink》第七章:Flink 流計(jì)算保存結(jié)果env.sinkTo(以 Kafka / ES 為例)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包