国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

flink cdc DataStream api 時區(qū)問題

這篇具有很好參考價值的文章主要介紹了flink cdc DataStream api 時區(qū)問題。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

flink cdc DataStream api 時區(qū)問題

以postgrsql 作為數(shù)據(jù)源時,Date和timesatmp等類型cdc同步讀出來時,會發(fā)現(xiàn)一下幾個問題:

時間,日期等類型的數(shù)據(jù)對應(yīng)的會轉(zhuǎn)化為Int,long等類型。

源表同步后,時間相差8小時。這是因為時區(qū)不同的緣故。

源表:
flink cdc DataStream api 時區(qū)問題,Flink,flink,java,kafka,hadoop
sink 表:
flink cdc DataStream api 時區(qū)問題,Flink,flink,java,kafka,hadoop
解決方案:在自定義序列化時進(jìn)行處理。
java code


package pg.cdc.ds;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.text.SimpleDateFormat;
import java.time.ZoneId;
import java.util.Date;
import java.util.List;

public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {

    ZoneId serverTimeZone;
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {

        //1.創(chuàng)建JSON對象用于存儲最終數(shù)據(jù)
        JSONObject result = new JSONObject();
        Struct value = (Struct) sourceRecord.value();

        //2.獲取庫名&表名
        Struct sourceStruct = value.getStruct("source");
        String database = sourceStruct.getString("db");
        String schema = sourceStruct.getString("schema");
        String tableName = sourceStruct.getString("table");

        //3.獲取"before"數(shù)據(jù)
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
        if (before != null) {
            Schema beforeSchema = before.schema();
            List<Field> beforeFields = beforeSchema.fields();
            for (Field field : beforeFields) {
                Object beforeValue = before.get(field);
                if ("int64".equals(field.schema().type().getName()) && "io.debezium.time.MicroTimestamp".equals(field.schema().name())) {
                    if (beforeValue != null) {
                        long times = (long) beforeValue / 1000;
                        String dateTime = sdf.format(new Date((times - 8 * 60 * 60 * 1000)));

                        beforeJson.put(field.name(), dateTime);
                    }
                }
                else if ("int64".equals(field.schema().type().getName()) && "io.debezium.time.NanoTimestamp".equals(field.schema().name())) {
                    if (beforeValue != null) {
                        long times = (long) beforeValue;
                        String dateTime = sdf.format(new Date((times - 8 * 60 * 60 * 1000)));
                        beforeJson.put(field.name(), dateTime);
                    }
                }  else if ("int64".equals(field.schema().type().getName()) && "io.debezium.time.Timestamp".equals(field.schema().name())) {
                    if (beforeValue != null) {
                        long times = (long) beforeValue;
                        String dateTime = sdf.format(new Date((times - 8 * 60 * 60 )));
                        beforeJson.put(field.name(), dateTime);
                    }
                } else if("int32".equals(field.schema().type().getName()) && "io.debezium.time.Date".equals(field.schema().name())){
                    if(beforeValue != null) {
                        int times = (int) beforeValue;
                        String dateTime = sdf1.format(new Date(times * 24 * 60 * 60L * 1000));
                        beforeJson.put(field.name(), dateTime);
                    }
                }
                else {
                    beforeJson.put(field.name(), beforeValue);
                }

            }
        }
        //4.獲取"after"數(shù)據(jù)
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null) {
            Schema afterSchema = after.schema();
            List<Field> afterFields = afterSchema.fields();
            for (Field field : afterFields) {
                Object afterValue = after.get(field);
                if ("int64".equals(field.schema().type().getName()) && "io.debezium.time.MicroTimestamp".equals(field.schema().name())) {
                    if (afterValue != null) {
                        long times = (long) afterValue / 1000;
                        String dateTime = sdf.format(new Date((times - 8 * 60 * 60 * 1000)));

                        afterJson.put(field.name(), dateTime);
                    }
                }
                else if ("int64".equals(field.schema().type().getName()) && "io.debezium.time.NanoTimestamp".equals(field.schema().name())) {
                    if (afterValue != null) {
                        long times = (long) afterValue;
                        String dateTime = sdf.format(new Date((times - 8 * 60 * 60 * 1000)));
                        afterJson.put(field.name(), dateTime);
                    }
                }  else if ("int64".equals(field.schema().type().getName()) && "io.debezium.time.Timestamp".equals(field.schema().name())) {
                    if (afterValue != null) {
                        long times = (long) afterValue;
                        String dateTime = sdf.format(new Date((times - 8 * 60 * 60)));
                        afterJson.put(field.name(), dateTime);
                    }
                }
                else if("int32".equals(field.schema().type().getName()) && "io.debezium.time.Date".equals(field.schema().name())){
                    if(afterValue != null) {
                        int times = (int) afterValue;
                        String dateTime = sdf1.format(new Date(times * 24 * 60 * 60L * 1000));
                        afterJson.put(field.name(), dateTime);
                    }
                }
                else {
                    afterJson.put(field.name(), afterValue);
                }
            }
        }

        //5.獲取操作類型  CREATE UPDATE DELETE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if ("create".equals(type) || "read".equals(type)) {
            type = "insert";
        }

        //6.將字段寫入JSON對象
        result.put("database", database);
        result.put("schema", schema);
        result.put("tableName", tableName);
        result.put("before", beforeJson);
        result.put("after", afterJson);
        result.put("type", type);
        //7.輸出數(shù)據(jù)
        collector.collect(result.toJSONString());

    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

scala code

import com.ververica.cdc.debezium.DebeziumDeserializationSchema
import com.ververica.cdc.debezium.utils.TemporalConversions
import io.debezium.time._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.types.Row
import org.apache.flink.util.Collector
import org.apache.kafka.connect.data.{SchemaBuilder, Struct}
import org.apache.kafka.connect.source.SourceRecord
import java.sql
import java.time.{Instant, LocalDateTime, ZoneId}
import scala.collection.JavaConverters._
import scala.util.parsing.json.JSONObject
 
 
 
class StructDebeziumDeserializationSchema(serverTimeZone: String) extends DebeziumDeserializationSchema[Row] {
 
  override def deserialize(sourceRecord: SourceRecord, collector: Collector[Row]): Unit = {
    // 解析主鍵
    val key = sourceRecord.key().asInstanceOf[Struct]
    val keyJs = parseStruct(key)
 
    // 解析值
    val value = sourceRecord.value().asInstanceOf[Struct]
    val source = value.getStruct("source")
    val before = parseStruct(value.getStruct("before"))
    val after = parseStruct(value.getStruct("after"))
 
    val row = Row.withNames()
    row.setField("table", s"${source.get("db")}.${source.get("table")}")
    row.setField("key", keyJs)
    row.setField("op", value.get("op"))
    row.setField("op_ts", LocalDateTime.ofInstant(Instant.ofEpochMilli(source.getInt64("ts_ms")), ZoneId.of(serverTimeZone)))
    row.setField("current_ts", LocalDateTime.ofInstant(Instant.ofEpochMilli(value.getInt64("ts_ms")), ZoneId.of(serverTimeZone)))
    row.setField("before", before)
    row.setField("after", after)
    collector.collect(row)
  }
 
  /** 解析[[Struct]]結(jié)構(gòu)為json字符串 */
  private def parseStruct(struct: Struct): String = {
    if (struct == null) return null
    val map = struct.schema().fields().asScala.map(field => {
      val v = struct.get(field)
      val typ = field.schema().name()
      println(s"$v, $typ, ${field.name()}")
      val value = v match {
        case long if long.isInstanceOf[Long] => convertLongToTime(long.asInstanceOf[Long], typ)
        case iv if iv.isInstanceOf[Int] => convertIntToDate(iv.asInstanceOf[Int], typ)
        case iv if iv == null => null
        case _ => convertObjToTime(v, typ)
      }
      (field.name(), value)
    }).filter(_._2 != null).toMap
    JSONObject.apply(map).toString()
  }
 
  /** 類型轉(zhuǎn)換 */
  private def convertObjToTime(obj: Any, typ: String): Any = {
    typ match {
      case Time.SCHEMA_NAME | MicroTime.SCHEMA_NAME | NanoTime.SCHEMA_NAME =>
        sql.Time.valueOf(TemporalConversions.toLocalTime(obj)).toString
      case Timestamp.SCHEMA_NAME | MicroTimestamp.SCHEMA_NAME | NanoTimestamp.SCHEMA_NAME | ZonedTimestamp.SCHEMA_NAME =>
        sql.Timestamp.valueOf(TemporalConversions.toLocalDateTime(obj, ZoneId.of(serverTimeZone))).toString
      case _ => obj
    }
  }
 
  /** long 轉(zhuǎn)換為時間類型 */
  private def convertLongToTime(obj: Long, typ: String): Any = {
    val time_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Time")
    val date_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Date")
    val timestamp_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Timestamp")
    typ match {
      case Time.SCHEMA_NAME =>
        org.apache.kafka.connect.data.Time.toLogical(time_schema, obj.asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalTime.toString
      case MicroTime.SCHEMA_NAME =>
        org.apache.kafka.connect.data.Time.toLogical(time_schema, (obj / 1000).asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalTime.toString
      case NanoTime.SCHEMA_NAME =>
        org.apache.kafka.connect.data.Time.toLogical(time_schema, (obj / 1000 / 1000).asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalTime.toString
      case Timestamp.SCHEMA_NAME =>
        val t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDateTime
        java.sql.Timestamp.valueOf(t).toString
      case MicroTimestamp.SCHEMA_NAME =>
        val t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj / 1000).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDateTime
        java.sql.Timestamp.valueOf(t).toString
      case NanoTimestamp.SCHEMA_NAME =>
        val t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj / 1000 / 1000).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDateTime
        java.sql.Timestamp.valueOf(t).toString
      case Date.SCHEMA_NAME =>
        org.apache.kafka.connect.data.Date.toLogical(date_schema, obj.asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDate.toString
      case _ => obj
    }
  }
 
  private def convertIntToDate(obj:Int, typ: String): Any ={
    val date_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Date")
    typ match {
      case Date.SCHEMA_NAME =>
        org.apache.kafka.connect.data.Date.toLogical(date_schema, obj).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDate.toString
      case _ => obj
    }
  }
 
  override def getProducedType: TypeInformation[Row] = {
    TypeInformation.of(classOf[Row])
  }
}

mysql cdc時區(qū)問題

mysql cdc也會出現(xiàn)上述時區(qū)問題,Debezium默認(rèn)將MySQL中datetime類型轉(zhuǎn)成UTC的時間戳({@link io.debezium.time.Timestamp}),時區(qū)是寫死的無法更改,導(dǎo)致數(shù)據(jù)庫中設(shè)置的UTC+8,到kafka中變成了多八個小時的long型時間戳 Debezium默認(rèn)將MySQL中的timestamp類型轉(zhuǎn)成UTC的字符串。

解決思路有兩種:

1:自定義序列化方式的時候做時區(qū)轉(zhuǎn)換。
2:自定義時間轉(zhuǎn)換類,通過debezium配置文件指定轉(zhuǎn)化格式。

這里主要使用第二種方式。文章來源地址http://www.zghlxwxcb.cn/news/detail-724921.html

package com.zmn.schema;

import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.function.Consumer;

/**
 * 處理Debezium時間轉(zhuǎn)換的問題
 * Debezium默認(rèn)將MySQL中datetime類型轉(zhuǎn)成UTC的時間戳({@link io.debezium.time.Timestamp}),時區(qū)是寫死的無法更改,
 * 導(dǎo)致數(shù)據(jù)庫中設(shè)置的UTC+8,到kafka中變成了多八個小時的long型時間戳
 * Debezium默認(rèn)將MySQL中的timestamp類型轉(zhuǎn)成UTC的字符串。
 * | mysql                               | mysql-binlog-connector                   | debezium                          |
 * | ----------------------------------- | ---------------------------------------- | --------------------------------- |
 * | date<br>(2021-01-28)                | LocalDate<br/>(2021-01-28)               | Integer<br/>(18655)               |
 * | time<br/>(17:29:04)                 | Duration<br/>(PT17H29M4S)                | Long<br/>(62944000000)            |
 * | timestamp<br/>(2021-01-28 17:29:04) | ZonedDateTime<br/>(2021-01-28T09:29:04Z) | String<br/>(2021-01-28T09:29:04Z) |
 * | Datetime<br/>(2021-01-28 17:29:04)  | LocalDateTime<br/>(2021-01-28T17:29:04)  | Long<br/>(1611854944000)          |
 *
 * @see io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter
 */
public class MySqlDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {

    private final static Logger logger = LoggerFactory.getLogger(MySqlDateTimeConverter.class);

    private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
    private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
    private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;
    private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;

    private ZoneId timestampZoneId = ZoneId.systemDefault();

    @Override
    public void configure(Properties props) {
        readProps(props, "format.date", p -> dateFormatter = DateTimeFormatter.ofPattern(p));
        readProps(props, "format.time", p -> timeFormatter = DateTimeFormatter.ofPattern(p));
        readProps(props, "format.datetime", p -> datetimeFormatter = DateTimeFormatter.ofPattern(p));
        readProps(props, "format.timestamp", p -> timestampFormatter = DateTimeFormatter.ofPattern(p));
        readProps(props, "format.timestamp.zone", z -> timestampZoneId = ZoneId.of(z));
    }

    private void readProps(Properties properties, String settingKey, Consumer<String> callback) {
        String settingValue = (String) properties.get(settingKey);
        if (settingValue == null || settingValue.length() == 0) {
            return;
        }
        try {
            callback.accept(settingValue.trim());
        } catch (IllegalArgumentException | DateTimeException e) {
            logger.error("The {} setting is illegal: {}",settingKey,settingValue);
            throw e;
        }
    }

    @Override
    public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
        String sqlType = column.typeName().toUpperCase();
        SchemaBuilder schemaBuilder = null;
        Converter converter = null;
        if ("DATE".equals(sqlType)) {
            schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");
            converter = this::convertDate;
        }
        if ("TIME".equals(sqlType)) {
            schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");
            converter = this::convertTime;
        }
        if ("DATETIME".equals(sqlType)) {
            schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");
            converter = this::convertDateTime;
        }
        if ("TIMESTAMP".equals(sqlType)) {
            schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");
            converter = this::convertTimestamp;
        }
        if (schemaBuilder != null) {
            registration.register(schemaBuilder, converter);
        }
    }

    private String convertDate(Object input) {
        if (input instanceof LocalDate) {
            return dateFormatter.format((LocalDate) input);
        }
        if (input instanceof Integer) {
            LocalDate date = LocalDate.ofEpochDay((Integer) input);
            return dateFormatter.format(date);
        }
        return null;
    }

    private String convertTime(Object input) {
        if (input instanceof Duration) {
            Duration duration = (Duration) input;
            long seconds = duration.getSeconds();
            int nano = duration.getNano();
            LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);
            return timeFormatter.format(time);
        }
        return null;
    }

    private String convertDateTime(Object input) {
        if (input instanceof LocalDateTime) {
            return datetimeFormatter.format((LocalDateTime) input);
        }
        return null;
    }

    private String convertTimestamp(Object input) {
        if (input instanceof ZonedDateTime) {
            // mysql的timestamp會轉(zhuǎn)成UTC存儲,這里的zonedDatetime都是UTC時間
            ZonedDateTime zonedDateTime = (ZonedDateTime) input;
            LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();
            return timestampFormatter.format(localDateTime);
        }
        return null;
    }
}
使用方式:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("snapshot.mode", "schema_only"); // 增量讀取

        //自定義時間轉(zhuǎn)換配置
        properties.setProperty("converters", "dateConverters");
        properties.setProperty("dateConverters.type", "pg.cdc.ds.PgSQLDateTimeConverter");
        properties.setProperty("dateConverters.format.date", "yyyy-MM-dd");
        properties.setProperty("dateConverters.format.time", "HH:mm:ss");
        properties.setProperty("dateConverters.format.datetime", "yyyy-MM-dd HH:mm:ss");
        properties.setProperty("dateConverters.format.timestamp", "yyyy-MM-dd HH:mm:ss");
        properties.setProperty("dateConverters.format.timestamp.zone", "UTC+8");
        properties.setProperty("debezium.snapshot.locking.mode","none"); //全局讀寫鎖,可能會影響在線業(yè)務(wù),跳過鎖設(shè)置        
        properties.setProperty("include.schema.changes", "true");
        // 使用flink mysql cdc 發(fā)現(xiàn)bigint unsigned類型的字段,capture以后轉(zhuǎn)成了字符串類型,
       // 用的這個解析吧JsonDebeziumDeserializationSchema。
        properties.setProperty("bigint.unsigned.handling.mode","long");
        properties.setProperty("decimal.handling.mode","double");
        
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("192.168.10.102")
                .port(3306)
                .username("yusys")
                .password("yusys")
                .port(3306)
                .databaseList("gmall")
                .tableList("gmall.faker_user1")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .debeziumProperties(properties)
                .serverId(5409)
                .build();
                
                
      SingleOutputStreamOperator<string> dataSource = env
                .addSource(sourceFunction).setParallelism(10).name("binlog-source");

到了這里,關(guān)于flink cdc DataStream api 時區(qū)問題的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • flink cdc數(shù)據(jù)同步,DataStream方式和SQL方式的簡單使用

    flink cdc數(shù)據(jù)同步,DataStream方式和SQL方式的簡單使用

    目錄 一、flink cdc介紹 1、什么是flink cdc 2、flink cdc能用來做什么 3、flink cdc的優(yōu)點 二、flink cdc基礎(chǔ)使用 1、使用flink cdc讀取txt文本數(shù)據(jù) 2、DataStream的使用方式 3、SQL的方式 總結(jié) flink cdc是一個由阿里研發(fā)的,一個可以直接從MySQL、PostgreSQL等數(shù)據(jù)庫直接讀取全量數(shù)據(jù)和增量變更數(shù)

    2024年02月13日
    瀏覽(26)
  • Flink cdc同步mysql到starrocks(日期時間格式/時區(qū)處理)

    flink 1.15.3(此時最新版本為1.16.1) mysql 5.7+ starrocks 2.5.2 mysql同步表結(jié)構(gòu) mysql中的timestamp字段是可以正常同步的,但是多了8小時,設(shè)置了mysql鏈接屬性也沒效果 參考下方的鏈接有兩種方式; 參考資料 https://blog.csdn.net/cloudbigdata/article/details/122935333 https://blog.csdn.net/WuBoooo/article/deta

    2024年02月16日
    瀏覽(27)
  • Flink DataStream API詳解

    參考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/datastream_api.html Data Sources Source是程序讀取其輸入的位置,您可以使用 env.addSource(sourceFunction) 將Source附加到程序中。Flink內(nèi)置了許多預(yù)先實現(xiàn)的SourceFunction,但是您始終可以通過實現(xiàn)SourceFunction(non-parallel sources)來編寫自定

    2024年02月14日
    瀏覽(51)
  • Flink學(xué)習(xí)——DataStream API

    Flink學(xué)習(xí)——DataStream API

    ? ? ? ? 一個flink程序,其實就是對DataStream的各種轉(zhuǎn)換。具體可以分成以下幾個部分: 獲取執(zhí)行環(huán)境(Execution Environment) 讀取數(shù)據(jù)源(Source) 定義基于數(shù)據(jù)的轉(zhuǎn)換操作(Transformations) 定義計算結(jié)果的輸出位置(Sink) 觸發(fā)程序執(zhí)行(Execute) ? ? ? ? flink 程序可以在各種上

    2024年02月05日
    瀏覽(22)
  • Flink|《Flink 官方文檔 - DataStream API - 概覽》學(xué)習(xí)筆記

    學(xué)習(xí)文檔:Flink 官方文檔 - DataStream API - 概覽 學(xué)習(xí)筆記如下: Flink 的 DataStream API: 數(shù)據(jù)里的起始是各種 source,例如消息隊列、socket 流、文件等; 對數(shù)據(jù)流進(jìn)行轉(zhuǎn)換,例如過濾、更新狀態(tài)、定義窗口、聚合等; 結(jié)果通過 sink 返回,例如可以將數(shù)據(jù)寫入文件或標(biāo)準(zhǔn)輸出。 Da

    2024年01月23日
    瀏覽(49)
  • 【Apache Flink】Flink DataStream API的基本使用

    【Apache Flink】Flink DataStream API的基本使用

    Flink DataStream API的基本使用 Flink DataStream API主要用于處理無界和有界數(shù)據(jù)流 。 無界數(shù)據(jù)流 是一個持續(xù)生成數(shù)據(jù)的數(shù)據(jù)源,它沒有明確的結(jié)束點,例如實時的交易數(shù)據(jù)或傳感器數(shù)據(jù)。這種類型的數(shù)據(jù)流需要使用Apache Flink的實時處理功能來連續(xù)地處理和分析。 有界數(shù)據(jù)流 是一個

    2024年02月06日
    瀏覽(26)
  • Flink|《Flink 官方文檔 - DataStream API - 算子 - 窗口》學(xué)習(xí)筆記

    Flink|《Flink 官方文檔 - DataStream API - 算子 - 窗口》學(xué)習(xí)筆記

    學(xué)習(xí)文檔:《Flink 官方文檔 - DataStream API - 算子 - 窗口》 學(xué)習(xí)筆記如下: 窗口(Window):窗口是處理無界流的關(guān)鍵所在。窗口可以將數(shù)據(jù)流裝入大小有限的 “桶” 中,再對每個 “桶” 加以處理。 Keyed Windows 在 Keyed Windows 上使用窗口時,要調(diào)用 keyBy(...) 而后再調(diào)用 window(..

    2024年01月18日
    瀏覽(58)
  • Flink DataStream之從Kafka讀數(shù)據(jù)

    Flink DataStream之從Kafka讀數(shù)據(jù)

    搭建Kafka 參考:centos7下kafka2.12-2.1.0的安裝及使用_kafka2.12-2.1.0 steam_QYHuiiQ的博客-CSDN博客 ?啟動zookeeper 啟動kafka 查看進(jìn)程 ?創(chuàng)建topic 查看topic列表 導(dǎo)入pom依賴 新建類 啟動程序 在終端向kafka生產(chǎn)數(shù)據(jù),同時觀察程序控制臺flink的讀取情況 ?如圖說明flink從kafka成功讀取數(shù)據(jù)。

    2024年02月13日
    瀏覽(19)
  • Flink基礎(chǔ)之DataStream API

    union聯(lián)合:被unioin的流中的數(shù)據(jù)類型必須一致 connect連接:合并的兩條流的數(shù)據(jù)類型可以不一致 connec后,得到的是ConnectedStreams 合并后需要根據(jù)數(shù)據(jù)流是否經(jīng)過keyby分區(qū) coConnect: 將兩條數(shù)據(jù)流合并為同一數(shù)據(jù)類型 keyedConnect 目前所使用的大多數(shù)Sink, 都是基于2PC的方式來保證狀態(tài)

    2024年02月05日
    瀏覽(30)
  • 《Flink學(xué)習(xí)筆記》——第五章 DataStream API

    《Flink學(xué)習(xí)筆記》——第五章 DataStream API

    一個Flink程序,其實就是對DataStream的各種轉(zhuǎn)換,代碼基本可以由以下幾部分構(gòu)成: 獲取執(zhí)行環(huán)境 讀取數(shù)據(jù)源 定義對DataStream的轉(zhuǎn)換操作 輸出 觸發(fā)程序執(zhí)行 獲取執(zhí)行環(huán)境和觸發(fā)程序執(zhí)行都屬于對執(zhí)行環(huán)境的操作,那么其構(gòu)成可以用下圖表示: 其核心部分就是Transform,對數(shù)據(jù)

    2024年02月10日
    瀏覽(19)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包