国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

使用 Flink CDC 實現(xiàn) MySQL 數(shù)據(jù),表結(jié)構(gòu)實時入 Apache Doris

這篇具有很好參考價值的文章主要介紹了使用 Flink CDC 實現(xiàn) MySQL 數(shù)據(jù),表結(jié)構(gòu)實時入 Apache Doris。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

背景

  1. 現(xiàn)有數(shù)據(jù)庫:mysql
  2. 數(shù)據(jù):庫表較多,每個企業(yè)用戶一個分庫,每個企業(yè)下的表均不同,無法做到聚合,且表可以被用戶隨意改動,增刪改列等,增加表
  3. 分析:用戶自定義分析,通過拖拽定義圖卡,要求實時,點擊確認即出現(xiàn)相應(yīng)結(jié)果,其中有無法預判的過濾
  4. 問題:隨業(yè)務(wù)增長,企業(yè)用戶越來越多,mysql壓力越來越大,已經(jīng)出現(xiàn)一些圖卡加載過慢[mysql sql]

同步流程

  1. 腳本讀取mysql中需要同步的企業(yè),在獲取需要同步的表,以字段 member_id,table 字段存儲doris中表A,
  2. 腳本讀取doris 表A數(shù)據(jù),獲取mysql中的schema,通過轉(zhuǎn)換,獲取doris建表語句,連接doris執(zhí)行語句
  3. cancel flink 任務(wù),并重新啟動flink任務(wù)【重啟只適合添加新庫,新表不用重啟】
    1. 每次重啟連接doris 表A,獲取database,組裝?databaseList,tableList,tablseList?使用正則,database1.*,database2.*,對庫內(nèi)所有表進行監(jiān)聽,這樣可以達到mysql添加新表時將新表加入同步隊列
    2. doris目前還不支持同步數(shù)據(jù)時同步修改表結(jié)構(gòu)【據(jù)大佬說應(yīng)該1.2+會支持】,不過cdc可以獲取ddlsql,可以通過jdbc的方式連接doris去執(zhí)行ddlsql,因為sql有點差異,需要轉(zhuǎn)換才能執(zhí)行,結(jié)合mysql新表,可以在ddl獲取create?對doris進項建表
    3. 在將數(shù)據(jù)導入之doris時,速度導入過快都會出現(xiàn)導入失敗,-235錯誤,可以使用控制讀取binlog數(shù)量+window聚合 去批量導入
      ?? ?如需要導入表B的數(shù)據(jù)有{"id":1,"name":"小明"},{"id":2,"name":"小紅"},如果執(zhí)行兩次put顯然時不太合理的,可以使用jsonArr的方式[{"id":1,"name":"小明"},{"id":2,"name":"小紅"}]一次導入

代碼

? ? ? ? python 帶碼不在贅述,git:GitHub - xiaofeicn/MysqlToDorisTable

? ? ? ? Flink CDC

? ? ? ? ??flink中需要感知新表,每日重啟時獲取doris 表A數(shù)據(jù),并組裝成databaseList,tableList的參數(shù),代碼如下,代碼有注釋

????????FlinkCDCMysql2Doris.scala

package com.xxxx.mysql2doris


import org.apache.flink.streaming.api.TimeCharacteristic
import com.zbkj.util.{DorisStreamLoad, FlinkCDCSyncETL, KafkaUtil, PropertiesManager, PropertiesUtil, SinkDoris, SinkSchema}
import com.ververica.cdc.connectors.mysql.source.MySqlSource
import com.ververica.cdc.connectors.mysql.table.StartupOptions
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema
import com.zbkj.util.KafkaUtil.proper
import net.sf.json.JSONObject
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSource, DataStreamUtils}
import org.apache.flink.streaming.api.environment.{CheckpointConfig, StreamExecutionEnvironment}
import org.slf4j.{Logger, LoggerFactory}
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.kafka.connect.json.JsonConverterConfig

import java.util.Properties
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters.asScalaIteratorConverter

object FlinkCDCMysql2Doris {

  PropertiesManager.initUtil()
  val props: PropertiesUtil = PropertiesManager.getUtil
  val log: Logger = LoggerFactory.getLogger(this.getClass)

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 并行度
    env.setParallelism(props.parallelism)
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

    /**
     * checkpoint的相關(guān)設(shè)置
     */
    // 啟用檢查點,指定觸發(fā)checkpoint的時間間隔(單位:毫秒,默認500毫秒),默認情況是不開啟的
    env.enableCheckpointing(10000L, CheckpointingMode.EXACTLY_ONCE)
    // 設(shè)定Checkpoint超時時間,默認為10分鐘
    env.getCheckpointConfig.setCheckpointTimeout(600000)

    /** 設(shè)定兩個Checkpoint之間的最小時間間隔,防止出現(xiàn)例如狀態(tài)數(shù)據(jù)過大而導致Checkpoint執(zhí)行時間過長,從而導致Checkpoint積壓過多
     * 最終Flink應(yīng)用密切觸發(fā)Checkpoint操作,會占用了大量計算資源而影響到整個應(yīng)用的性能(單位:毫秒) */
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(60000)
    // 默認情況下,只有一個檢查點可以運行
    // 根據(jù)用戶指定的數(shù)量可以同時觸發(fā)多個Checkpoint,進而提升Checkpoint整體的效率
    //    env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
    /** 外部檢查點
     * 不會在任務(wù)正常停止的過程中清理掉檢查點數(shù)據(jù),而是會一直保存在外部系統(tǒng)介質(zhì)中,另外也可以通過從外部檢查點中對任務(wù)進行恢復 */
    env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

    /** 如果有更近的保存點時,是否將作業(yè)回退到該檢查點 */
    env.getCheckpointConfig.setPreferCheckpointForRecovery(true)
    // 設(shè)置可以允許的checkpoint失敗數(shù)
    env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)
    //設(shè)置可容忍的檢查點失敗數(shù),默認值為0表示不允許容忍任何檢查點失敗
    env.getCheckpointConfig.setTolerableCheckpointFailureNumber(2)

    /**
     * 重啟策略的配置
     */
    // 重啟3次,每次失敗后等待10000毫秒
    //    env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.of(3, TimeUnit.MINUTES), Time.of(30, TimeUnit.SECONDS)))
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L))

    /**
     * 獲取同步表配置
     * database table
     */
    val inputMysql = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
      .setDrivername("com.mysql.jdbc.Driver")
      .setDBUrl("jdbc:mysql://%s:%d/%s".format(props.doris_host, props.doris_port, props.sync_config_db))
      .setUsername(props.doris_user)
      .setPassword(props.doris_password)
      .setQuery("select member_id,sync_table from %s.%s".format(props.sync_config_db, props.sync_config_table))
      .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
      .finish()).uid("inputMysql")


    val databaseName: DataStream[String] = inputMysql.map(line => line.getField(0).toString).uid("databaseName")
    // 模糊監(jiān)聽
    val tableName: DataStream[String] = inputMysql.map(line => line.getField(0).toString + ".*").uid("tableName")
    val producer = KafkaUtil.getProducer

    val databaseIter = databaseName.executeAndCollect().asScala
    val databaseList = databaseIter.toSet.mkString(",")

    val tableIter = tableName.executeAndCollect().asScala
    val tableList = tableIter.toSet.mkString(",")
    println("databaseList:", databaseList)
    println("tableList:", tableList)
    val customConverterConfigs = new java.util.HashMap[String, Object] {
      put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric")
    }
    /**
     *
     * mysql source for doris
     */
    val mySqlSource = MySqlSource.builder[String]()
      .hostname(props.rds_host)
      .port(props.rds_port)
      .databaseList(databaseList)
      .tableList(tableList)
      .username(props.rds_user)
      .password(props.rds_password)
      .serverId("11110")
      .splitSize(props.split_size)
      .fetchSize(props.fetch_size)
      //       .startupOptions(StartupOptions.latest())
      // 全量讀取
      .startupOptions(StartupOptions.initial())
      .includeSchemaChanges(true)
      // 發(fā)現(xiàn)新表,加入同步任務(wù),需要在tableList中配置
      .scanNewlyAddedTableEnabled(true)
      .deserializer(new JsonDebeziumDeserializationSchema(false, customConverterConfigs)).build()

    val dataStreamSource: DataStreamSource[String] = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
    val ddlSqlStream: DataStream[String] = dataStreamSource.filter(line => line.contains("historyRecord") && !line.contains("CHANGE COLUMN")).uid("ddlSqlStream")

    val dmlStream: DataStream[String] = dataStreamSource.filter(line => !line.contains("historyRecord") && !line.contains("CHANGE COLUMN")).uid("dmlStream")

    val ddlDataStream = FlinkCDCSyncETL.ddlFormat(ddlSqlStream)
    val dmlDataStream = FlinkCDCSyncETL.binLogETL(dmlStream)
    //    ddlDataStream.print()

    //producer 為了在數(shù)據(jù)同步后通知分析任務(wù)
    val dorisStreamLoad = new DorisStreamLoad(props, producer)
    ddlDataStream.addSink(new SinkSchema(props)).name("ALTER TABLE TO DORIS").uid("SinkSchema")
    dmlDataStream.addSink(new SinkDoris(dorisStreamLoad)).name("Data TO DORIS").uid("SinkDoris")
    env.execute("Flink CDC Mysql To Doris With Initial")


  }

  case class dataLine(merge_type: String, db: String, table: String, data: String)


}
FlinkCDCBinLogETL.scala
package com.xxxx.util

import net.sf.json.JSONObject
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.api.java.tuple.Tuple4
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.windowing.time.Time

import scala.collection.mutable.ArrayBuffer
import scala.util.matching.Regex

object FlinkCDCSyncETL {

  def binLogETL(dataStreamSource: DataStream[String]): DataStream[org.apache.flink.api.java.tuple.Tuple4[String, String, String, String]] = {
    /**
     * 根據(jù)不同日志類型 匹配load doris方式
     */
    val tupleData: DataStream[org.apache.flink.api.java.tuple.Tuple4[String, String, String, String]] = dataStreamSource.map(line => {
      var data: JSONObject = null
      var mergetype = "APPEND"
      val lineObj = JSONObject.fromObject(line)

      val source = lineObj.getJSONObject("source")
      val db = source.getString("db")
      val table = source.getString("table")
      if ("d" == lineObj.getString("op")) {
        val oo = lineObj.getJSONObject("before")
        data = lineObj.getJSONObject("before")
        mergetype = "DELETE"
      } else if ("u" == lineObj.getString("op")) {
        data = lineObj.getJSONObject("after")
        mergetype = "MERGE"
      } else if ("c" == lineObj.getString("op")) {
        data = lineObj.getJSONObject("after")
      } else if ("r" == lineObj.getString("op")) {
        data = lineObj.getJSONObject("after")
        mergetype = "APPEND"
      }
      new Tuple4[String, String, String, String](mergetype, db, table, data.toString)
    }).returns(TypeInformation.of(new TypeHint[Tuple4[String, String, String, String]] {}))
    tupleData
    /**
     * 窗口聚合數(shù)據(jù),將相同load方式,db,table的json 數(shù)據(jù)組合為長字符串,
     */
    val byKeyData: DataStream[org.apache.flink.api.java.tuple.Tuple4[String, String, String, String]] = tupleData.keyBy(0, 1, 2)
      .timeWindow(Time.milliseconds(1000))
      .reduce((itemFirst, itemSecond) => new Tuple4(itemFirst.f0, itemFirst.f1, itemFirst.f2, itemFirst.f3 + "=-=-=" + itemSecond.f3))
    byKeyData
  }

  def ddlFormat(ddlDataStream: DataStream[String]): DataStream[String] = {

    val ddlStrDataStream: DataStream[String] = ddlDataStream.map(line => {
      try {
        val lineObj = JSONObject.fromObject(line)
        val historyRecord = JSONObject.fromObject(lineObj.getString("historyRecord"))
        val tableChangesArray = historyRecord.getJSONArray("tableChanges")
        val tableChanges = JSONObject.fromObject(tableChangesArray.getJSONObject(0))
        val tableChangeType = tableChanges.getString("type")
        var ddlSql = ""

        val table = tableChanges.optJSONObject("table")
        val primaryKeyColumnNames = table.getString("primaryKeyColumnNames").replace("[", "").replace("]", "").replace("\"", "")
        val columnsArray = table.getJSONArray("columns")
        // 建表轉(zhuǎn)換
        if (tableChangeType == "CREATE") {
          val tableName = tableChanges.getString("id").replace("\"", "")
          val columnsArrayBuffer = ArrayBuffer[String]()
          columnsArray.forEach(line => {
            val columnJson = JSONObject.fromObject(line)
            val name = columnJson.getString("name")
            val typeName = columnJson.getString("typeName")
            val length = columnJson.optInt("length", 1)
            val scale = columnJson.optInt("scale", 2)
            val lastColumnType = matchColumnType(typeName, length, scale)
            val lastColumn = s"$name $lastColumnType"
            columnsArrayBuffer.+=(lastColumn)
          })

          // 對列重新排序,主鍵依次放在最前面,避免錯誤Key columns should be a ordered prefix of the scheme
          val keys = primaryKeyColumnNames.split(",")
          for (indexOfCol <- 0 until keys.length) {
            val col = keys(indexOfCol)
            var columnFormat = ""
            columnsArrayBuffer.foreach(column => {
              if (column.startsWith(col)) {
                columnFormat = column
              }

            })
            val index = columnsArrayBuffer.indexOf(columnFormat)
            columnsArrayBuffer.remove(index)
            columnsArrayBuffer.insert(indexOfCol, columnFormat)

          }


          val header = s"CREATE TABLE IF NOT EXISTS $tableName ("
          val end = s""") UNIQUE KEY($primaryKeyColumnNames)  DISTRIBUTED BY HASH($primaryKeyColumnNames) BUCKETS 10  PROPERTIES ("replication_allocation" = "tag.location.default: 1")"""
          ddlSql = header + columnsArrayBuffer.mkString(",") + end
        } else if (tableChangeType == "ALTER") {
          var ddl = historyRecord.getString("ddl").replace("\r\n", " ")
          println(ddl)
          if (ddl.startsWith("RENAME")) {
            ddl = ddl.replace("`", "")
            val arr = ddl.split("")
            ddlSql = s"ALTER TABLE ${arr(2)} RENAME ${arr(4)}"
          } else if (ddl.contains("DROP COLUMN")) {

            ddlSql = ddl
          } else {
            val dbTableName = tableChanges.getString("id").replace("\"", "")
            val addColName = ddl.split(" ")(5).replace("`", "")
            var colTpe = ""
            columnsArray.forEach(line => {

              val columnJson = JSONObject.fromObject(line)

              if (columnJson.getString("name") == addColName) {
                val typeName = columnJson.getString("typeName")
                val length = columnJson.optInt("length", 1)
                val scale = columnJson.optInt("scale", 2)
                colTpe = matchColumnType(typeName, length, scale)
              }

            })
            if (ddl.contains("ADD COLUMN")) {

              ddlSql = s"ALTER TABLE $dbTableName ADD COLUMN $addColName $colTpe"
            } else {

              ddlSql = s"ALTER TABLE $dbTableName MODIFY COLUMN $addColName $colTpe"
            }

          }
        }

        println(ddlSql)
        ddlSql
      }
      catch {
        case ex: Exception => println(ex)
          "select 1"
      }

    })
    ddlStrDataStream
  }

  def showCapital(x: Option[String]): String = x match {
    case Some(s) => s
    case None => "?"
  }

  def matchColumnType(columnType: String, length: Int, scale: Int): String = {
    var returnColumnType = "VARCHAR(255)"
    columnType match {
      case "INT UNSIGNED" => returnColumnType = s"INT($length)"
      case "INT" => returnColumnType = s"INT($length)"
      case "TINYINT" => returnColumnType = s"TINYINT($length)"
      case "VARCHAR" => returnColumnType = s"VARCHAR(${length * 3})"
      case "BIGINT" => returnColumnType = s"BIGINT(${length})"
      case "TINYTEXT" => returnColumnType = s"TINYTEXT"
      case "LONGTEXT" => returnColumnType = s"STRING"
      case "TEXT" => returnColumnType = s"STRING"
      case "DECIMAL" => returnColumnType = s"DECIMAL($length,$scale)"
      case "VARBINARY" => returnColumnType = s"STRING"
      case "TIMESTAMP" => returnColumnType = s"STRING"
      case "ENUM" => returnColumnType = s"TINYINT"
      case "MEDIUMINT" => returnColumnType = s"INT"
      case "SMALLINT" => returnColumnType = s"SMALLINT"
      case "MEDIUMTEXT" => returnColumnType = s"STRING"
      case _ => returnColumnType = s"STRING"
    }
    returnColumnType
  }

}
DorisStreamLoad.scala
package com.xxxx.util

import net.sf.json.JSONObject
import net.sf.json.JSONArray
import org.apache.http.HttpHeaders
import org.apache.http.client.methods.HttpPut
import org.apache.http.entity.StringEntity
import org.apache.http.entity.BufferedHttpEntity
import org.apache.http.impl.client.{DefaultRedirectStrategy, HttpClientBuilder, HttpClients}
import org.apache.http.util.EntityUtils
import org.slf4j.{Logger, LoggerFactory}
import org.apache.commons.codec.binary.Base64
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

import java.io.IOException
import java.nio.charset.StandardCharsets
import java.util.UUID

class DorisStreamLoad(props: PropertiesUtil,producer:KafkaProducer[String, String]) extends Serializable {


  lazy val httpClientBuilder: HttpClientBuilder = HttpClients.custom.setRedirectStrategy(new DefaultRedirectStrategy() {
    override protected def isRedirectable(method: String): Boolean = {
      // If the connection target is FE, you need to deal with 307 redirect。
      true
    }
  })


  def loadJson(jsonData: String, mergeType: String, db: String, table: String): Unit = try {
    val loadUrlPattern = "http://%s/api/%s/%s/_stream_load?"
    val arr = jsonData.split("=-=-=")
    val jsonArray = new JSONArray()
    for (line <- arr) {
      try {
        val js = JSONObject.fromObject(line)
        jsonArray.add(js)
      } catch {
        case e: Exception =>
          println(e)
          println(line)
      }

    }
    val jsonArrayStr = jsonArray.toString()
    val client = httpClientBuilder.build
    val loadUrlStr = String.format(loadUrlPattern, props.doris_load_host, db, table)
    try {
      val put = new HttpPut(loadUrlStr)
      put.removeHeaders(HttpHeaders.CONTENT_LENGTH)
      put.removeHeaders(HttpHeaders.TRANSFER_ENCODING)
      put.setHeader(HttpHeaders.EXPECT, "100-continue")
      put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader)
      val label = UUID.randomUUID.toString
      // You can set stream load related properties in the Header, here we set label and column_separator.
      put.setHeader("label", label)
      put.setHeader("merge_type", mergeType)
      //      put.setHeader("two_phase_commit", "true")
      put.setHeader("column_separator", ",")
      put.setHeader("format", "json")
      put.setHeader("strip_outer_array", "true")
      put.setHeader("exec_mem_limit", "6442450944")
      val entity = new StringEntity(jsonArrayStr, "UTF-8")

      put.setEntity(entity)

      try {
        val response = client.execute(put)
        try {
          var loadResult = ""
          if (response.getEntity != null) {
            loadResult = EntityUtils.toString(response.getEntity)
          }
          val statusCode = response.getStatusLine.getStatusCode
          if (statusCode != 200) {
            throw new IOException("Stream load failed. status: %s load result: %s".format(statusCode, loadResult))
          }

        } finally if (response != null) {
          response.close()
        }
      }
    }
    finally
      if (client != null) client.close()
  }

  /**
   * Construct authentication information, the authentication method used by doris here is Basic Auth
   *
   */
  def basicAuthHeader: String = {
    val tobeEncode = props.doris_user + ":" + props.doris_password
    val encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8))
    "Basic " + new String(encoded)
  }


}
SinkDoris.scala
package com.xxxx.util


import org.apache.flink.api.java.tuple.Tuple4
import com.zbkj.mysql2doris.FlinkCDCMysql2Doris.dataLine
import net.sf.json.JSONObject
import org.apache.flink.streaming.api.functions.sink.SinkFunction


class SinkDoris(dorisStreamLoad:DorisStreamLoad) extends SinkFunction[Tuple4[String, String, String, String]]  {

//  val  dorisStreamLoad:DorisStreamLoadT=null
  /**
   * 在open()方法中建立連接,這樣不用每次invoke的時候都要建立連接和釋放連接。
   */
//   def open(parameters: Configuration): Unit = {
//     super
//     super.open(parameters);
//
//  }

  /**
   * 每個元素的插入都要調(diào)用一次invoke()方法進行插入操作
   */
  override def invoke(value:Tuple4[String, String, String, String]): Unit = {

    dorisStreamLoad.loadJson(value.f3,value.f0,value.f1,value.f2)
    val producer = KafkaUtil.getProducer
    val json = new JSONObject()
    json.put("db",value.f2)
    json.put("table",value.f3)
    KafkaUtil.sendKafkaMsg(producer, json.toString, "sync_table")

  }

}
SinkSchema.scala
package com.xxxx.util

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}

import java.sql.{Connection, DriverManager, PreparedStatement}

class SinkSchema(props:PropertiesUtil) extends RichSinkFunction[String]  {
  var conn: Connection = _
  var ps : PreparedStatement  = _
  var mysqlPool: MysqlPool = _

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    mysqlPool = MysqlManager.getMysqlPool
    conn = mysqlPool.getConnection
       conn.setAutoCommit(false)
  }

  override def close(): Unit = {
    super.close()
    if (conn != null) {
      conn.close()
    }
    if (ps != null) {
      ps.close()
    }
  }

  override def invoke(sql: String, context: SinkFunction.Context): Unit = {
    super.invoke(sql, context)
    if (sql !="" && sql.nonEmpty){
      ps = conn.prepareStatement(sql)
      try {
        ps.execute()
      }catch {
        case ex:Exception=>println(ex)
      }

      conn.commit()

    }
//    conn.close()
  }

}
PropertiesUtil.scala
package com.xxxx.util

import java.io.FileInputStream
import java.util.Properties

/**
 * propertiesUtil
 *
 */
class PropertiesUtil extends Serializable {


  private val props = new Properties()

  var doris_host = ""
  var doris_port = 0
  var doris_user = ""
  var doris_password = ""

  var database_list = ""
  var table_list = ""

  var mysql_host = ""
  var mysql_port = 0
  var mysql_user = ""
  var mysql_password = ""
  var doris_load_host = ""

  var rds_host = ""
  var rds_port = 0
  var rds_user = ""
  var rds_password = ""
  var rds_database = ""

//  var sync_database_select_sql = ""
//  var sync_table_select_sql = ""
//  var sync_config_host = ""
//  var sync_config_port = 0
//  var sync_config_user = ""
//  var sync_config_password = ""
  var sync_config_db = ""
  var sync_config_table = ""
  var sync_redis_table = ""
  var address_table = ""

  var parallelism = 0
  var split_size = 0
  var fetch_size = 0

  var bootstrap_servers = ""
  var topic = ""
  var group_id = ""
  var offset_mode = ""

  // reids
  var redis_max_total: Int = 0
  var redis_max_idle: Int = 0
  var redis_min_idle: Int = 0
  var redis_host = ""
  var redis_port: Int = 0
  var redis_timeout: Int = 0
  var redis_password = ""
  var redis_db_index: Int = 0
  var prefix = "0"


  def init(filePath: String): Unit = {
    props.load(new FileInputStream(filePath))

    // hdfs
    doris_host = props.getProperty("doris.host")
    doris_port = props.getProperty("doris.port").toInt
    doris_user = props.getProperty("doris.user")
    doris_password = props.getProperty("doris.password")

    database_list = props.getProperty("database.list")
    table_list = props.getProperty("table.list")

    mysql_host = props.getProperty("mysql.host")
    mysql_port = props.getProperty("mysql.port").toInt
    mysql_user = props.getProperty("mysql.user")
    mysql_password = props.getProperty("mysql.password")
    doris_load_host = props.getProperty("doris.load.host")


    rds_host = props.getProperty("rds.host")
    rds_port = props.getProperty("rds.port").toInt
    rds_user = props.getProperty("rds.user")
    rds_password = props.getProperty("rds.password")
    rds_database = props.getProperty("rds.database")
    sync_config_db = props.getProperty("sync.config.db")
    sync_config_table = props.getProperty("sync.config.table")
    sync_redis_table = props.getProperty("sync.redis.table")
    address_table = props.getProperty("address.table")


    parallelism = props.getProperty("parallelism").toInt
    split_size = props.getProperty("split.size").toInt
    fetch_size = props.getProperty("fetch.size").toInt

    bootstrap_servers = props.getProperty("bootstrap.servers")
    topic = props.getProperty("topic")
    group_id = props.getProperty("group.id")
    offset_mode = props.getProperty("offset.mode")


    // reids
    redis_max_total = props.getProperty("redis.max.total").toInt
    redis_max_idle = props.getProperty("redis.max.idle").toInt
    redis_min_idle = props.getProperty("redis.min.idle").toInt
    redis_host = props.getProperty("redis.redis.host")
    redis_port = props.getProperty("redis.redis.port").toInt
    redis_timeout = props.getProperty("redis.redis.timeout").toInt
    redis_password = props.getProperty("redis.password")
    redis_db_index = props.getProperty("redis.db.index").toInt

    prefix = props.getProperty("redis.key.prefix")


  }

  def stringToInt(prop: String): Int = {
    try {
      prop.toInt
    } catch {
      case ex: Exception => {
        0
      }
    }
  }
}

//惰性單例,真正計算時才初始化對象
object PropertiesManager {
  @volatile private var propertiesUtil: PropertiesUtil = _

  def getUtil: PropertiesUtil = {
    propertiesUtil
  }

  def initUtil(): Unit = {
    var filePath = "config.properties"
//        filePath = this.getClass.getResource("/").toString.replace("file:", "") + "config.properties"
    filePath = "/opt/flink-1.13.6/job/mysql2doris/config.properties"
    if (propertiesUtil == null) {
      propertiesUtil = new PropertiesUtil
    }
    propertiesUtil.init(filePath)
    //    propertiesUtil.evn = evn
  }
}

若有疑問請留言或者 加入857技術(shù)社區(qū)文章來源地址http://www.zghlxwxcb.cn/news/detail-401953.html

到了這里,關(guān)于使用 Flink CDC 實現(xiàn) MySQL 數(shù)據(jù),表結(jié)構(gòu)實時入 Apache Doris的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Flink CDC 實時mysql到mysql

    Flink CDC 實時mysql到mysql

    CDC?的全稱是?Change Data Capture?,在廣義的概念上,只要是能捕獲數(shù)據(jù)變更的技術(shù),我們都可以稱之為?CDC?。目前通常描述的?CDC?技術(shù)主要面向數(shù)據(jù)庫的變更,是一種用于捕獲數(shù)據(jù)庫中數(shù)據(jù)變更的技術(shù)。 mysqlcdc需要mysql開啟binlog,找到my.cnf,在 [mysqld] 中加入如下信息 [mysqld]

    2024年02月12日
    瀏覽(27)
  • Flink CDC 基于mysql binlog 實時同步mysql表

    Flink CDC 基于mysql binlog 實時同步mysql表

    環(huán)境說明: flink?1.15.2 mysql 版本5.7? ? 注意:需要開啟binlog,因為增量同步是基于binlog捕獲數(shù)據(jù) windows11 IDEA 本地運行 先上官網(wǎng)使用說明和案例:MySQL CDC Connector — Flink CDC documentation 1. mysql開啟binlog (注意,引擎是 InnoDB,如果是ndbcluster,本人測試是捕獲不到binlog日志的,增量相

    2024年02月10日
    瀏覽(24)
  • Flink CDC 基于mysql binlog 實時同步mysql表(無主鍵)

    Flink CDC 基于mysql binlog 實時同步mysql表(無主鍵)

    環(huán)境說明: flink 1.15.2 mysql 版本5.7 ? ?注意:需要開啟binlog,因為增量同步是基于binlog捕獲數(shù)據(jù) windows11 IDEA 本地運行 具體前提設(shè)置,請看這篇,包含 binlog 設(shè)置、Maven...... Flink CDC 基于mysql binlog 實時同步mysql表_彩虹豆的博客-CSDN博客 經(jīng)過不懈努力,終于從阿里help頁面找到了支

    2024年02月08日
    瀏覽(27)
  • 掌握實時數(shù)據(jù)流:使用Apache Flink消費Kafka數(shù)據(jù)

    掌握實時數(shù)據(jù)流:使用Apache Flink消費Kafka數(shù)據(jù)

    ? ? ? ? 導讀:使用Flink實時消費Kafka數(shù)據(jù)的案例是探索實時數(shù)據(jù)處理領(lǐng)域的絕佳方式。不僅非常實用,而且對于理解現(xiàn)代數(shù)據(jù)架構(gòu)和流處理技術(shù)具有重要意義。 ????????Apache Flink ?是一個在 有界 數(shù)據(jù)流和 無界 數(shù)據(jù)流上進行有狀態(tài)計算分布式處理引擎和框架。Flink 設(shè)計旨

    2024年02月03日
    瀏覽(31)
  • Flink CDC2.4 整庫實時同步MySql 到Doris

    ????????Flink 1.15.4? ? ? ? ? 目前有很多工具都支持無代碼實現(xiàn)Mysql - Doris 的實時同步 ? ? ? ? 如:SlectDB 已發(fā)布的功能包 ? ? ? ? ? ? ? ??Dinky?SeaTunnel?TIS?等等 ? ? ? ? ?不過好多要么不支持表結(jié)構(gòu)變動,要不不支持多sink,我們的業(yè)務(wù)必須支持對表結(jié)構(gòu)的實時級變動

    2024年02月11日
    瀏覽(35)
  • 基于大數(shù)據(jù)平臺(XSailboat)的計算管道實現(xiàn)MySQL數(shù)據(jù)源的CDC同步--flink CDC

    基于大數(shù)據(jù)平臺(XSailboat)的計算管道實現(xiàn)MySQL數(shù)據(jù)源的CDC同步--flink CDC

    筆者在先前的一篇文檔《數(shù)據(jù)標簽設(shè)計 – 大數(shù)據(jù)平臺(XSailboat)的數(shù)據(jù)標簽模塊》 提到了關(guān)于數(shù)據(jù)標簽的模塊,現(xiàn)已實現(xiàn)并應(yīng)用于項目中。在項目中遇到這樣一種情形: 如果打標信息和業(yè)務(wù)數(shù)據(jù)是在一個數(shù)據(jù)庫實例中,那么只需要連接兩張表進行查詢即可。但是數(shù)據(jù)標簽作為

    2024年01月17日
    瀏覽(35)
  • Flink CDC 基于Oracle log archiving 實時同步Oracle表到Mysql

    Flink CDC 基于Oracle log archiving 實時同步Oracle表到Mysql

    環(huán)境說明: flink 1.15.2 Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production mysql 版本:5.7 windows11 IDEA 本地運行 先上官網(wǎng)使用說明和案例:Oracle CDC Connector — Flink CDC documentation 1. Oracle 開啟 log archiving (1).啟用 log archiving ?? ??? ?a:以DBA用戶連接數(shù)據(jù)庫? ??

    2024年02月11日
    瀏覽(44)
  • 【實戰(zhàn)-01】flink cdc 實時數(shù)據(jù)同步利器

    【實戰(zhàn)-01】flink cdc 實時數(shù)據(jù)同步利器

    cdc github源碼地址 cdc官方文檔 對很多初入門的人來說是無法理解cdc到底是什么個東西。 有這樣一個需求,比如在mysql數(shù)據(jù)庫中存在很多數(shù)據(jù),但是公司要把mysql中的數(shù)據(jù)同步到數(shù)據(jù)倉庫(starrocks), 數(shù)據(jù)倉庫你可以理解為存儲了各種各樣來自不同數(shù)據(jù)庫中表。 數(shù)據(jù)的同步目前對

    2023年04月08日
    瀏覽(94)
  • Flink CDC 實時抽取 Oracle 數(shù)據(jù)-排錯&調(diào)優(yōu)

    Flink CDC 實時抽取 Oracle 數(shù)據(jù)-排錯&調(diào)優(yōu)

    Flink CDC 于 2021 年 11 月 15 日發(fā)布了最新版本 2.1,該版本通過引入內(nèi)置 Debezium 組件,增加了對 Oracle 的支持。對該版本進行試用并成功實現(xiàn)了對 Oracle 的實時數(shù)據(jù)捕獲以及性能調(diào)優(yōu),現(xiàn)將試用過程中的一些關(guān)鍵細節(jié)進行分享。 Oracle:11.2.0.4.0(RAC 部署) Flink:1.13.1 Hadoop:3.2.1

    2024年01月16日
    瀏覽(34)
  • Flink CDC實時同步PG數(shù)據(jù)庫

    JDK:1.8 Flink:1.16.2 Scala:2.11 Hadoop:3.1.3 github地址:https://github.com/rockets0421/FlinkCDC-PG.git? 1、更改配置文件postgresql.conf # 更改wal日志方式為logical wal_level = logical # minimal, replica, or logical # 更改solts最大數(shù)量(默認值為10),flink-cdc默認一張表占用一個slots max_replication_slots = 20 # m

    2024年02月13日
    瀏覽(35)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包