国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink Table/Sql自定義Kudu Sink實(shí)戰(zhàn)(其它Sink可參考)

這篇具有很好參考價(jià)值的文章主要介紹了Flink Table/Sql自定義Kudu Sink實(shí)戰(zhàn)(其它Sink可參考)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

1. 背景

使用第三方的org.apache.bahir ? flink-connector-kudu,batch模式寫入數(shù)據(jù)到Kudu會(huì)有FlushMode相關(guān)問題

具體可以參考我的這篇博客通過Flink SQL操作創(chuàng)建Kudu表,并讀寫Kudu表數(shù)據(jù)

2. 原理

Flink的Dynamic table能夠統(tǒng)一處理batch和streaming

實(shí)現(xiàn)自定義Source或Sink有兩種方式:

  1. 通過對(duì)已有的connector進(jìn)行拓展。比如對(duì)connector = jdbc拓展Clickhouse的jdbc連接器
  2. 繼承DynamicTableSourceFactory或DynamicTableSinkFactory,實(shí)現(xiàn)一個(gè)全新的connector。本節(jié)重點(diǎn)講解這種

flink kudu sink,# Flink,flink table,flink sql,自定義sink,kudu sink,connector
Metadata部分:Flink Catalog已有的Flink Table,或在Flink Catalog進(jìn)行Flink Table的create sql聲明。由CatalogTable實(shí)例進(jìn)行表示

Planning部分:DynamicTableSourceFactory或DynamicTableSinkFactory將CatalogTable的metadata,轉(zhuǎn)換成DynamicTableSource或DynamicTableSink的實(shí)例數(shù)據(jù)

DynamicTableFactory主要驗(yàn)證with子句的各個(gè)選項(xiàng),并解析with子句的各個(gè)選項(xiàng)值。with子句的connector值必須和factoryIdentifier一致

DynamicTableFactory通過DynamicTableSource或DynamicTableSink進(jìn)行runtime操作

runtime部分:Source主要需要實(shí)現(xiàn)ScanRuntimeProvider或LookupRuntimeProvider。Sink主要需要實(shí)現(xiàn)SinkRuntimeProvider。其中SinkRuntimeProvider有兩個(gè)子類:

  1. OutputFormatProvider,可以接收org.apache.flink.api.common.io.OutputFormat
  2. SinkFunctionProvider,可以接收org.apache.flink.streaming.api.functions.sink.SinkFunction

3. 通過Trino創(chuàng)建Kudu表

trino:default> create table flink_table_test(
            -> id int with (primary_key = true), 
            -> name varchar
            -> ) with(
            -> partition_by_hash_columns = array['id'], 
            -> partition_by_hash_buckets = 15, 
            -> number_of_replicas =1
            -> );
CREATE TABLE
trino:default> 

4. FlinkKuduTableSinkProject項(xiàng)目

4.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.mq</groupId>
    <artifactId>flinkKuduTableSinkProject</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <scala.binary.version>2.12</scala.binary.version>
        <scala.version>2.12.15</scala.version>
        <flink.version>1.14.4</flink.version>
        <kudu.version>1.15.0</kudu.version>

    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client</artifactId>
            <version>${kudu.version}</version>
        </dependency>

    </dependencies>


    <build>
        <plugins>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.3.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude></exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.10.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>4.6.1</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <args>
                        <arg>-nobootcp</arg>
                        <arg>-target:jvm-1.8</arg>
                    </args>
                </configuration>
            </plugin>

        </plugins>
    </build>

</project>

4.2 FlinkKuduTableSinkFactory.scala

定義FlinkKuduTableSinkFactory類,主要包含四個(gè)部分

  1. FlinkKuduTableSinkFactory
  2. FlinkKuduTableSinkFactory的伴生對(duì)象Object
  3. FlinkKuduTableSink
  4. FlinkKuduRowDataRichSinkFunction
package org.mq

import org.apache.flink.configuration.ConfigOptions.key
import org.apache.flink.configuration.{ConfigOption, Configuration, ReadableConfig}
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter
import org.apache.flink.table.connector.sink.{DynamicTableSink, SinkFunctionProvider}
import org.apache.flink.table.data.RowData
import org.apache.flink.table.factories.{DynamicTableFactory, DynamicTableSinkFactory, FactoryUtil}
import org.apache.flink.table.types.DataType
import org.apache.flink.table.types.logical.RowType
import org.apache.flink.types.{Row, RowKind}
import org.apache.kudu.client.SessionConfiguration.FlushMode
import org.apache.kudu.client._

import java.util
import scala.collection.JavaConverters.asScalaBufferConverter
import scala.collection.mutable.ArrayBuffer

// 由于KuduSqlSinkFactory是Serializable,其屬性也應(yīng)該是Serializable。將屬性定義在Object可以實(shí)現(xiàn)該功能
object FlinkKuduTableSinkFactory {
  val kudu_masters: ConfigOption[String] = key("kudu.masters")
    .stringType()
    .noDefaultValue()
    .withDescription("kudu masters")

  val kudu_table: ConfigOption[String] = key("kudu.table")
    .stringType()
    .noDefaultValue()
    .withDescription("kudu table")

}


class FlinkKuduTableSinkFactory extends DynamicTableSinkFactory with Serializable {

  import FlinkKuduTableSinkFactory._

  // 定義connector的name
  override def factoryIdentifier(): String = "kudu"

  // 定義with子句中必填的選項(xiàng)
  override def requiredOptions(): util.Set[ConfigOption[_]] = {
    val requiredSet: util.HashSet[ConfigOption[_]] = new util.HashSet[ConfigOption[_]]()
    requiredSet.add(kudu_masters)
    requiredSet.add(kudu_table)

    requiredSet

  }

  // // 定義with子句中可填的選項(xiàng)
  override def optionalOptions(): util.Set[ConfigOption[_]] = {
    new util.HashSet[ConfigOption[_]]()
  }


  override def createDynamicTableSink(context: DynamicTableFactory.Context): DynamicTableSink = {

    // 驗(yàn)證with子句選項(xiàng),并獲取各選項(xiàng)的值
    val FactoryHelper: FactoryUtil.TableFactoryHelper = FactoryUtil.createTableFactoryHelper(this, context)
    FactoryHelper.validate()
    val withOptions: ReadableConfig = FactoryHelper.getOptions()

    // 獲取各字段的數(shù)據(jù)類型
    val fieldDataTypes: DataType = context.getCatalogTable().getResolvedSchema.toPhysicalRowDataType
    // Buffer(id)
    val primaryKeys: Seq[String] = context.getCatalogTable().getResolvedSchema.getPrimaryKey
      .get().getColumns.asScala.toSeq

    new FlinkKuduTableSink(fieldDataTypes, withOptions)
  }


}


class FlinkKuduTableSink(fieldDataTypes: DataType,
                         withOptions: ReadableConfig) extends DynamicTableSink {

  // 定義Sink支持的ChangelogMode。insertOnly或upsert
  override def getChangelogMode(requestedMode: ChangelogMode): ChangelogMode = {
    requestedMode
  }

  // 調(diào)用用戶自己定義的streaming sink ,建立sql與streaming的聯(lián)系
  override def getSinkRuntimeProvider(context: DynamicTableSink.Context): DynamicTableSink.SinkRuntimeProvider = {
    val dataStructureConverter: DataStructureConverter = context.createDataStructureConverter(fieldDataTypes)

    SinkFunctionProvider.of(new FlinkKuduRowDataRichSinkFunction(dataStructureConverter, withOptions, fieldDataTypes))

  }

  // sink可以不用實(shí)現(xiàn),主要用來source的謂詞下推
  override def copy(): DynamicTableSink = {
    new FlinkKuduTableSink(fieldDataTypes, withOptions)
  }

  // 定義sink的匯總信息,用于打印到控制臺(tái)和log
  override def asSummaryString(): String = "kudu"

}


// 同flink streaming的自定義sink ,只不過處理的是RowData
class FlinkKuduRowDataRichSinkFunction(dataStructureConverter: DataStructureConverter,
                                       withOptions: ReadableConfig,
                                       fieldDataTypes: DataType) extends RichSinkFunction[RowData] {

  import FlinkKuduTableSinkFactory.{kudu_masters, kudu_table}

  private val serialVersionUID: Long = 1L

  private val fieldNameDatatypes: ArrayBuffer[(String, String)] = ArrayBuffer()
  private var kuduClient: KuduClient = _
  private var kuduSession: KuduSession = _
  private var kuduTable: KuduTable = _

  // 進(jìn)行各種參數(shù)的初始化
  override def open(parameters: Configuration): Unit = {
    super.open(parameters)

    val rowFields: util.List[RowType.RowField]
    = fieldDataTypes.getLogicalType.asInstanceOf[RowType].getFields

    rowFields.asScala.foreach(rowField => {
      val rowFieldDatatype: String = rowField.getType.asSummaryString()
        .split(" ").apply(0).split("\\(").apply(0)
      fieldNameDatatypes += ((rowField.getName, rowFieldDatatype))
    })

    kuduClient = new KuduClient.KuduClientBuilder(withOptions.get(kudu_masters)).build()
    kuduSession = kuduClient.newSession()
    kuduSession.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
    kuduTable = kuduClient.openTable(withOptions.get(kudu_table))
  }

  // 對(duì)每個(gè)rowData進(jìn)行具體的處理
  override def invoke(rowData: RowData, context: SinkFunction.Context): Unit = {


    val rowKind: RowKind = rowData.getRowKind()
    val row: Row = dataStructureConverter.toExternal(rowData).asInstanceOf[Row]
    // 處理insert和upsert
    if (rowKind.equals(RowKind.INSERT) || rowKind.equals(RowKind.UPDATE_AFTER)) {

      // 插入一條數(shù)據(jù)
      val upsert: Upsert = kuduTable.newUpsert()
      val partialRow: PartialRow = upsert.getRow()

      fieldNameDatatypes.foreach(fieldNameDatatype => {
        val fieldName: String = fieldNameDatatype._1
        val fieldDatatype: String = fieldNameDatatype._2

        fieldDatatype match {
          case "INT" => {
            var partialRowValue: Int = row.getFieldAs[Int](fieldName)
            if (partialRowValue == null) partialRowValue = 0
            partialRow.addInt(fieldName, partialRowValue)
          }
          case "BIGINT" => {
            var partialRowValue: Long = row.getFieldAs[Long](fieldName)
            if (partialRowValue == null) partialRowValue = 0L
            partialRow.addLong(fieldName, partialRowValue)
          }
          case "FLOAT" => {
            var partialRowValue: Float = row.getFieldAs[Float](fieldName)
            if (partialRowValue == null) partialRowValue = 0.0F
            partialRow.addFloat(fieldName, partialRowValue)
          }
          case "DOUBLE" => {
            var partialRowValue: Double = row.getFieldAs[Double](fieldName)
            if (partialRowValue == null) partialRowValue = 0.0
            partialRow.addDouble(fieldName, partialRowValue)
          }
          case "DECIMAL" => {
            val partialRowValue: java.math.BigDecimal =
              row.getFieldAs[java.math.BigDecimal](fieldName)

            partialRow.addDouble(fieldName,
              if (partialRowValue == null) {
                0.0
              } else {
                partialRowValue.doubleValue()
              })

          }
          case "STRING" => {
            var partialRowValue: String = row.getFieldAs[String](fieldName)
            if (partialRowValue == null) partialRowValue = ""
            partialRow.addString(fieldName, partialRowValue)
          }
          case "TIME" => {
            val partialRowValue: java.time.LocalTime =
              row.getFieldAs[java.time.LocalTime](fieldName)

            partialRow.addString(fieldName,
              if (partialRowValue == null) {
                ""
              } else {
                partialRowValue.toString
              })
          }
          case "DATE" => {
            val partialRowValue: java.time.LocalDate =
              row.getFieldAs[java.time.LocalDate](fieldName)

            partialRow.addDate(fieldName,
              if (partialRowValue == null) {
                new java.sql.Date(0L)
              } else {
                java.sql.Date.valueOf(partialRowValue)
              })

          }
          case "TIMESTAMP" => {
            val partialRowValue: java.time.LocalDateTime =
              row.getFieldAs[java.time.LocalDateTime](fieldName)

            partialRow.addTimestamp(fieldName,
              if (partialRowValue == null) {
                new java.sql.Timestamp(0L)
              } else {
                // 注意是否有時(shí)區(qū)的8小時(shí)偏差
                java.sql.Timestamp.valueOf(partialRowValue.plusHours(8L))
              })

          }
          case "BYTES" => {
            val partialRowValue: Array[Byte] =
              row.getFieldAs[Array[Byte]](fieldName)

            partialRow.addBinary(fieldName, partialRowValue)
          }

        }

      })
      kuduSession.apply(upsert)

      // 也可以手動(dòng)調(diào)用flush
      // kuduSession.flush()

    }

  }

  // 進(jìn)行各種資源的關(guān)閉
  override def close(): Unit = {
    super.close()

    kuduSession.close()
  }

}

4.3 META-INF/services

  1. 在項(xiàng)目的resource目錄下,新建META-INF/services目錄
  2. 在services目錄下新建文件:org.apache.flink.table.factories.Factory
  3. Factory文件添加DynamicTableSinkFactory的全路徑:org.mq.FlinkKuduTableSinkFactory

4.4 FlinkKuduTableSinkTest.scala測(cè)試文件

package org.mq

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object FlinkKuduTableSinkTest {

  def main(args: Array[String]): Unit = {

    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = StreamTableEnvironment.create(senv)

    tEnv.executeSql(
      """
        |create table flink_table_test(
        |id int,
        |name string,
        |primary key (id) not enforced
        |) with (
        |'connector' = 'kudu',
        |'kudu.masters' = '192.168.8.112:7051,192.168.8.113:7051',
        |'kudu.table' = 'flink_table_test'
        |)
        |""".stripMargin
    )

    tEnv.executeSql("insert into flink_table_test(id, name) values(2, 'li_si2')")


  }

}

執(zhí)行程序,然后查看Kudu表數(shù)據(jù)文章來源地址http://www.zghlxwxcb.cn/news/detail-621892.html

5. 查看Kudu表數(shù)據(jù)

trino:default> select * from flink_table_test;
 id |   name    
----+-----------
  1 | zhang_san 
(1 row)

Query 20220517_095005_00109_i893r, FINISHED, 2 nodes
Splits: 19 total, 19 done (100.00%)
0.22 [1 rows, 20B] [4 rows/s, 90B/s]

trino:default> 

到了這里,關(guān)于Flink Table/Sql自定義Kudu Sink實(shí)戰(zhàn)(其它Sink可參考)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Flink流批一體計(jì)算(15):PyFlink Tabel API之SQL寫入Sink

    目錄 舉個(gè)例子 寫入Sink的各種情況 1. 將結(jié)果數(shù)據(jù)收集到客戶端 2. 將結(jié)果數(shù)據(jù)轉(zhuǎn)換為Pandas DataFrame,并收集到客戶端 3. 將結(jié)果寫入到一張 Sink 表中 4. 將結(jié)果寫入多張 Sink 表中 舉個(gè)例子 將計(jì)算結(jié)果寫入給 sink 表 寫入Sink的各種情況 1. 將結(jié)果數(shù)據(jù)收集到客戶端 你可以使用 TableR

    2024年02月11日
    瀏覽(19)
  • 【Flink SQL】Flink SQL 基礎(chǔ)概念(一):SQL & Table 運(yùn)行環(huán)境、基本概念及常用 API

    《 Flink SQL 基礎(chǔ)概念 》系列,共包含以下 5 篇文章: Flink SQL 基礎(chǔ)概念(一):SQL Table 運(yùn)行環(huán)境、基本概念及常用 API Flink SQL 基礎(chǔ)概念(二):數(shù)據(jù)類型 Flink SQL 基礎(chǔ)概念(三):SQL 動(dòng)態(tài)表 連續(xù)查詢 Flink SQL 基礎(chǔ)概念(四):SQL 的時(shí)間屬性 Flink SQL 基礎(chǔ)概念(五):SQL 時(shí)區(qū)問

    2024年03月21日
    瀏覽(99)
  • Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

    Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

    ? ? ? ?今天一天爭(zhēng)取搞完最后這一部分,學(xué)完趕緊把 Kafka 和 Flume 學(xué)完,就要開始做實(shí)時(shí)數(shù)倉(cāng)了。據(jù)說是應(yīng)屆生得把實(shí)時(shí)數(shù)倉(cāng)搞個(gè) 80%~90% 才能差不多找個(gè)工作,太牛馬了。 ????????之前我們已經(jīng)用過了一些簡(jiǎn)單的內(nèi)置連接器,比如 \\\'datagen\\\' 、\\\'print\\\' ,其它的可以查看官網(wǎng):

    2024年01月24日
    瀏覽(52)
  • 《十堂課學(xué)習(xí) Flink》第五章:Table API 以及 Flink SQL 入門

    《十堂課學(xué)習(xí) Flink》第五章:Table API 以及 Flink SQL 入門

    第四章中介紹了 DataStream API 以及 DataSet API 的入門案例,本章開始介紹 Table API 以及基于此的高層應(yīng)用 Flink SQL 的基礎(chǔ)。 Flink 提供了兩個(gè)關(guān)系A(chǔ)PI——Table API 和 SQL——用于統(tǒng)一的流和批處理。Table API 是一種針對(duì)Java、Scala和Python的語言集成查詢API,它允許以非常直觀的方式組合來

    2024年02月03日
    瀏覽(48)
  • 【flink番外篇】9、Flink Table API 支持的操作示例(1)-通過Table API和SQL創(chuàng)建表

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年01月17日
    瀏覽(24)
  • Flink Table API 與 SQL 編程整理

    Flink Table API 與 SQL 編程整理

    Flink API 總共分為 4 層這里主要整理 Table API 的使用 Table API 是流處理和批處理通用的關(guān)系型 API , Table API 可以基于流輸入或者批輸入來運(yùn)行而不需要進(jìn)行任何修改。 Table API 是 SQL 語言的超集并專門為 Apache Flink 設(shè)計(jì)的, Table API 是 Scala 和 Java 語言集成式的 API 。與常規(guī) SQL 語言

    2024年02月04日
    瀏覽(26)
  • Flink-SQL——?jiǎng)討B(tài)表 (Dynamic Table)

    Flink-SQL——?jiǎng)討B(tài)表 (Dynamic Table)

    SQL 和關(guān)系代數(shù)在設(shè)計(jì)時(shí)并未考慮流數(shù)據(jù)。因此,在關(guān)系代數(shù)(和 SQL)之間幾乎沒有概念上的差異。 本文會(huì)討論這種差異,并介紹 Flink 如何在無界數(shù)據(jù)集上實(shí)現(xiàn)與數(shù)據(jù)庫(kù)引擎在有界數(shù)據(jù)上的處理具有相同的語義。 下表比較了傳統(tǒng)的關(guān)系代數(shù)和流處理與輸入數(shù)據(jù)、執(zhí)行和輸出結(jié)果

    2024年01月17日
    瀏覽(58)
  • Flink-SQL——時(shí)態(tài)表(Temporal Table)

    Flink-SQL——時(shí)態(tài)表(Temporal Table)

    這里我們需要注意一下的是雖然我們介紹的是Flink 的 Temporal Table 但是這個(gè)概念最早是在數(shù)據(jù)庫(kù)中提出的 在ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的數(shù)據(jù)庫(kù)廠商也先后實(shí)現(xiàn)了這個(gè)標(biāo)準(zhǔn)。Temporal Table記錄了歷史上任何時(shí)間點(diǎn)所有的數(shù)據(jù)改動(dòng),Temporal Table的工作

    2024年01月16日
    瀏覽(26)
  • 【flink番外篇】21、Flink 通過SQL client 和 table api注冊(cè)catalog示例

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月21日
    瀏覽(25)
  • Flink系列Table API和SQL之:時(shí)間屬性

    基于時(shí)間的操作(比如時(shí)間窗口),需要定義相關(guān)的時(shí)間語義和時(shí)間數(shù)據(jù)來源的信息。在Table API和SQL中,會(huì)給表單獨(dú)提供一個(gè)邏輯上的時(shí)間字段,專門用來在表處理程序中指示時(shí)間。 所謂的時(shí)間屬性(time attributes),就是每個(gè)表模式結(jié)構(gòu)(schema)的一部分。可以在創(chuàng)建表的DDL里直接定

    2023年04月09日
    瀏覽(37)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包