国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

flink 1.18 sql demo

這篇具有很好參考價值的文章主要介紹了flink 1.18 sql demo。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。

flink 1.18 sql demo

更換flink-table-planner 為 flink-table-planner-loader pom.xml

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-uber</artifactId>
            <version>1.18.0</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>1.18.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>1.18.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.18.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>1.18.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-csv -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>1.18.0</version>
        </dependency>
        <!-- 官網(wǎng)給的是flink-connector-kafka 但是flink on k8s 會缺包然后有個sql-connector jar 引入后正常 兩個保留一個即可 -->
     <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka</artifactId>
            <version>3.0.2-1.18</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>3.0.2-1.18</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-table-planner_2.12</artifactId>-->
<!--            <version>1.18.0</version>-->
<!--        </dependency>-->
<!--         https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-loader-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId> </artifactId>
            <version>1.18.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.21</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.21</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!-- Replace this with the main class of your job 這里是你的主類地址-->
                                    <mainClass>com.cn.App</mainClass>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

demo

package com.cn;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


/**
 * @Classname app
 * @Description TODO
 * @Date 2024/1/12 11:26
 * @Created by typezhou
 */
public class App {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000L);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        String str = "CREATE TABLE KafkaTable (\n" +
                "  `user_id` STRING,\n" +
                "  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'aaaa',\n" +
                "  'properties.bootstrap.servers' = '172.xx.xx.xx:9092,172.xx.86.xx:9092,172.xx.xx.xx:9092',\n" +
                "  'properties.group.id' = 'testGrou1p',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'format' = 'csv'\n" +
                ")";
        tableEnv.executeSql(str);
        Table tableResult = tableEnv.sqlQuery("SELECT user_id  FROM KafkaTable group by user_id");
//        DataStream<ResultBean> tuple2DataStream = tableEnv.toDataStream(result, ResultBean.class);
//        SingleOutputStreamOperator<ResultBean> map = tuple2DataStream.map(new MapFunction<ResultBean, ResultBean>() {
//            @Override
//            public ResultBean map(ResultBean s) throws Exception {
//                Thread.sleep(3000L);
//                return s;
//            }
//        });
//        tuple2DataStream.print();
        String sqlPri = "CREATE TABLE print_table (\n" +
                "  `user_id` STRING \n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'bbbb',\n" +
                "  'properties.bootstrap.servers' = '172.xx.xx.xx:9092,172.xx.86.xx:9092,172.xx.xx.xx:9092',\n" +
                "  'format' = 'csv'\n" +
                ")";
        tableEnv.executeSql(sqlPri);
        tableEnv.executeSql("insert into  print_table SELECT user_id FROM KafkaTable");

    }


}

文章來源地址http://www.zghlxwxcb.cn/news/detail-799540.html

到了這里,關(guān)于flink 1.18 sql demo的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • mORMot 1.18 第07章 簡單的讀寫操作

    本章描述了典型的數(shù)據(jù)讀寫操作。首先,我們將注意力集中在數(shù)據(jù)上,而不是函數(shù)。 讀取操作返回一個TID,它是一個32位或64位整數(shù)(取決于你的內(nèi)存模型),反映了表的信息。TID在表中的每一行都是唯一的。 ORM的新手可能會感到驚訝,但通常你不需要創(chuàng)建SQL查詢來過濾請求

    2024年04月28日
    瀏覽(24)
  • 數(shù)據(jù)結(jié)構(gòu)_復(fù)雜度+之后的事-1.18

    本質(zhì)是個 函數(shù) ,表示復(fù)雜度的函數(shù)。 用 O 漸進(jìn)粗略表示,如O(1), O(N)。(這個符號以前在學(xué)拓?fù)浣Y(jié)構(gòu)時見過,現(xiàn)在回想,也確實(shí)是算法相關(guān)的): 1)常數(shù)用O(1)表示; 2)保留最高階項(xiàng),并去掉系數(shù)。2N^3+N+10-----O(N^3)。 3)對于多情況復(fù)雜度,按最復(fù)雜情況的計算。 時間復(fù)

    2024年01月19日
    瀏覽(21)
  • k8s 1.18 VS 1.24

    Kubernetes是一個開源的容器編排平臺,它致力于自動化容器的部署、擴(kuò)展和管理。1.24和1.18是Kubernetes的兩個版本,它們之間的區(qū)別包括以下幾個方面: API版本:Kubernetes 1.24支持API版本為v1.22,而Kubernetes 1.18支持API版本為v1.17。 功能特性:Kubernetes 1.24相對于1.18增加了許多新的功

    2023年04月23日
    瀏覽(43)
  • 【Linux】在centos快速搭建K8S1.18集群

    【Linux】在centos快速搭建K8S1.18集群

    使用 kubeadm 創(chuàng)建集群幫助文檔 如果您需要以下幾點(diǎn),該工具是很好的選擇:kubeadm 一種簡單的方法,讓你嘗試 Kubernetes,可能是第一次。 現(xiàn)有用戶自動設(shè)置群集并測試其應(yīng)用程序的一種方式。 其他生態(tài)系統(tǒng)和/或安裝程序工具中的構(gòu)建塊,具有更大的 范圍。 一臺或多臺機(jī)器,

    2024年04月29日
    瀏覽(28)
  • flink sql1.18.0連接SASL_PLAINTEXT認(rèn)證的kafka3.3.1

    flink sql1.18.0連接SASL_PLAINTEXT認(rèn)證的kafka3.3.1

    閱讀此文默認(rèn)讀者對docker、docker-compose有一定了解。 docker-compose運(yùn)行了一個jobmanager、一個taskmanager和一個sql-client。 如下: 注意三個容器都映射了/opt/flink目錄。需要先將/opt/flink目錄拷貝到跟docker-compose.yml同一目錄下,并分別重命名,如下圖: 三個文件夾內(nèi)容是一樣的,只是

    2024年02月03日
    瀏覽(20)
  • Kubernetes - CentOS7搭建k8s_v1.18集群高可用(kubeadm/二進(jìn)制包部署方式)實(shí)測配置驗(yàn)證手冊

    Kubernetes - CentOS7搭建k8s_v1.18集群高可用(kubeadm/二進(jìn)制包部署方式)實(shí)測配置驗(yàn)證手冊

    一、Kubernetes—k8s是什么 Kubernetes 這個名字源于希臘語,意為“舵手“或”飛行員\\\"。 Kubernetes,簡稱K8s,中間有8個字符用8代替縮寫。 Google于2014年開源項(xiàng)目,為容器化應(yīng)用提供集群和管理的開源工具,Kubernetes目標(biāo)是讓部署容器化的應(yīng)用簡單并且高效,提供了應(yīng)用部署,規(guī)劃,更

    2024年04月27日
    瀏覽(25)
  • 24、Flink 的table api與sql之Catalogs(介紹、類型、java api和sql實(shí)現(xiàn)ddl、java api和sql操作catalog)-1

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月06日
    瀏覽(21)
  • 24、Flink 的table api與sql之Catalogs(java api操作視圖)-3

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月07日
    瀏覽(26)
  • 實(shí)戰(zhàn)Java springboot 采用Flink CDC操作SQL Server數(shù)據(jù)庫獲取增量變更數(shù)據(jù)

    目錄 前言: 1、springboot引入依賴: 2、yml配置文件 3、創(chuàng)建SQL server CDC變更數(shù)據(jù)監(jiān)聽器 4、反序列化數(shù)據(jù),轉(zhuǎn)為變更JSON對象 5、CDC 數(shù)據(jù)實(shí)體類 6、自定義ApplicationContextUtil 7、自定義sink 交由spring管理,處理變更數(shù)據(jù) ? ? ? ? 我的場景是從SQL Server數(shù)據(jù)庫獲取指定表的增量數(shù)據(jù),查

    2024年02月10日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包