背景
- 現(xiàn)有數(shù)據(jù)庫:mysql
- 數(shù)據(jù):庫表較多,每個企業(yè)用戶一個分庫,每個企業(yè)下的表均不同,無法做到聚合,且表可以被用戶隨意改動,增刪改列等,增加表
- 分析:用戶自定義分析,通過拖拽定義圖卡,要求實時,點擊確認即出現(xiàn)相應(yīng)結(jié)果,其中有無法預判的過濾
- 問題:隨業(yè)務(wù)增長,企業(yè)用戶越來越多,mysql壓力越來越大,已經(jīng)出現(xiàn)一些圖卡加載過慢[mysql sql]
同步流程
- 腳本讀取mysql中需要同步的企業(yè),在獲取需要同步的表,以字段 member_id,table 字段存儲doris中表A,
- 腳本讀取doris 表A數(shù)據(jù),獲取mysql中的schema,通過轉(zhuǎn)換,獲取doris建表語句,連接doris執(zhí)行語句
- cancel flink 任務(wù),并重新啟動flink任務(wù)【重啟只適合添加新庫,新表不用重啟】
- 每次重啟連接doris 表A,獲取database,組裝?databaseList,tableList,tablseList?使用正則,database1.*,database2.*,對庫內(nèi)所有表進行監(jiān)聽,這樣可以達到mysql添加新表時將新表加入同步隊列
- doris目前還不支持同步數(shù)據(jù)時同步修改表結(jié)構(gòu)【據(jù)大佬說應(yīng)該1.2+會支持】,不過cdc可以獲取ddlsql,可以通過jdbc的方式連接doris去執(zhí)行ddlsql,因為sql有點差異,需要轉(zhuǎn)換才能執(zhí)行,結(jié)合mysql新表,可以在ddl獲取create?對doris進項建表
- 在將數(shù)據(jù)導入之doris時,速度導入過快都會出現(xiàn)導入失敗,-235錯誤,可以使用控制讀取binlog數(shù)量+window聚合 去批量導入
?? ?如需要導入表B的數(shù)據(jù)有{"id":1,"name":"小明"},{"id":2,"name":"小紅"},如果執(zhí)行兩次put顯然時不太合理的,可以使用jsonArr的方式[{"id":1,"name":"小明"},{"id":2,"name":"小紅"}]一次導入
代碼
? ? ? ? python 帶碼不在贅述,git:GitHub - xiaofeicn/MysqlToDorisTable
? ? ? ? Flink CDC
? ? ? ? ??flink中需要感知新表,每日重啟時獲取doris 表A數(shù)據(jù),并組裝成databaseList,tableList的參數(shù),代碼如下,代碼有注釋
????????FlinkCDCMysql2Doris.scala文章來源:http://www.zghlxwxcb.cn/news/detail-401953.html
package com.xxxx.mysql2doris
import org.apache.flink.streaming.api.TimeCharacteristic
import com.zbkj.util.{DorisStreamLoad, FlinkCDCSyncETL, KafkaUtil, PropertiesManager, PropertiesUtil, SinkDoris, SinkSchema}
import com.ververica.cdc.connectors.mysql.source.MySqlSource
import com.ververica.cdc.connectors.mysql.table.StartupOptions
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema
import com.zbkj.util.KafkaUtil.proper
import net.sf.json.JSONObject
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSource, DataStreamUtils}
import org.apache.flink.streaming.api.environment.{CheckpointConfig, StreamExecutionEnvironment}
import org.slf4j.{Logger, LoggerFactory}
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.kafka.connect.json.JsonConverterConfig
import java.util.Properties
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters.asScalaIteratorConverter
object FlinkCDCMysql2Doris {
PropertiesManager.initUtil()
val props: PropertiesUtil = PropertiesManager.getUtil
val log: Logger = LoggerFactory.getLogger(this.getClass)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 并行度
env.setParallelism(props.parallelism)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
/**
* checkpoint的相關(guān)設(shè)置
*/
// 啟用檢查點,指定觸發(fā)checkpoint的時間間隔(單位:毫秒,默認500毫秒),默認情況是不開啟的
env.enableCheckpointing(10000L, CheckpointingMode.EXACTLY_ONCE)
// 設(shè)定Checkpoint超時時間,默認為10分鐘
env.getCheckpointConfig.setCheckpointTimeout(600000)
/** 設(shè)定兩個Checkpoint之間的最小時間間隔,防止出現(xiàn)例如狀態(tài)數(shù)據(jù)過大而導致Checkpoint執(zhí)行時間過長,從而導致Checkpoint積壓過多
* 最終Flink應(yīng)用密切觸發(fā)Checkpoint操作,會占用了大量計算資源而影響到整個應(yīng)用的性能(單位:毫秒) */
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(60000)
// 默認情況下,只有一個檢查點可以運行
// 根據(jù)用戶指定的數(shù)量可以同時觸發(fā)多個Checkpoint,進而提升Checkpoint整體的效率
// env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
/** 外部檢查點
* 不會在任務(wù)正常停止的過程中清理掉檢查點數(shù)據(jù),而是會一直保存在外部系統(tǒng)介質(zhì)中,另外也可以通過從外部檢查點中對任務(wù)進行恢復 */
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
/** 如果有更近的保存點時,是否將作業(yè)回退到該檢查點 */
env.getCheckpointConfig.setPreferCheckpointForRecovery(true)
// 設(shè)置可以允許的checkpoint失敗數(shù)
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)
//設(shè)置可容忍的檢查點失敗數(shù),默認值為0表示不允許容忍任何檢查點失敗
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(2)
/**
* 重啟策略的配置
*/
// 重啟3次,每次失敗后等待10000毫秒
// env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.of(3, TimeUnit.MINUTES), Time.of(30, TimeUnit.SECONDS)))
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L))
/**
* 獲取同步表配置
* database table
*/
val inputMysql = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://%s:%d/%s".format(props.doris_host, props.doris_port, props.sync_config_db))
.setUsername(props.doris_user)
.setPassword(props.doris_password)
.setQuery("select member_id,sync_table from %s.%s".format(props.sync_config_db, props.sync_config_table))
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
.finish()).uid("inputMysql")
val databaseName: DataStream[String] = inputMysql.map(line => line.getField(0).toString).uid("databaseName")
// 模糊監(jiān)聽
val tableName: DataStream[String] = inputMysql.map(line => line.getField(0).toString + ".*").uid("tableName")
val producer = KafkaUtil.getProducer
val databaseIter = databaseName.executeAndCollect().asScala
val databaseList = databaseIter.toSet.mkString(",")
val tableIter = tableName.executeAndCollect().asScala
val tableList = tableIter.toSet.mkString(",")
println("databaseList:", databaseList)
println("tableList:", tableList)
val customConverterConfigs = new java.util.HashMap[String, Object] {
put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric")
}
/**
*
* mysql source for doris
*/
val mySqlSource = MySqlSource.builder[String]()
.hostname(props.rds_host)
.port(props.rds_port)
.databaseList(databaseList)
.tableList(tableList)
.username(props.rds_user)
.password(props.rds_password)
.serverId("11110")
.splitSize(props.split_size)
.fetchSize(props.fetch_size)
// .startupOptions(StartupOptions.latest())
// 全量讀取
.startupOptions(StartupOptions.initial())
.includeSchemaChanges(true)
// 發(fā)現(xiàn)新表,加入同步任務(wù),需要在tableList中配置
.scanNewlyAddedTableEnabled(true)
.deserializer(new JsonDebeziumDeserializationSchema(false, customConverterConfigs)).build()
val dataStreamSource: DataStreamSource[String] = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
val ddlSqlStream: DataStream[String] = dataStreamSource.filter(line => line.contains("historyRecord") && !line.contains("CHANGE COLUMN")).uid("ddlSqlStream")
val dmlStream: DataStream[String] = dataStreamSource.filter(line => !line.contains("historyRecord") && !line.contains("CHANGE COLUMN")).uid("dmlStream")
val ddlDataStream = FlinkCDCSyncETL.ddlFormat(ddlSqlStream)
val dmlDataStream = FlinkCDCSyncETL.binLogETL(dmlStream)
// ddlDataStream.print()
//producer 為了在數(shù)據(jù)同步后通知分析任務(wù)
val dorisStreamLoad = new DorisStreamLoad(props, producer)
ddlDataStream.addSink(new SinkSchema(props)).name("ALTER TABLE TO DORIS").uid("SinkSchema")
dmlDataStream.addSink(new SinkDoris(dorisStreamLoad)).name("Data TO DORIS").uid("SinkDoris")
env.execute("Flink CDC Mysql To Doris With Initial")
}
case class dataLine(merge_type: String, db: String, table: String, data: String)
}
FlinkCDCBinLogETL.scala
package com.xxxx.util
import net.sf.json.JSONObject
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.api.java.tuple.Tuple4
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.windowing.time.Time
import scala.collection.mutable.ArrayBuffer
import scala.util.matching.Regex
object FlinkCDCSyncETL {
def binLogETL(dataStreamSource: DataStream[String]): DataStream[org.apache.flink.api.java.tuple.Tuple4[String, String, String, String]] = {
/**
* 根據(jù)不同日志類型 匹配load doris方式
*/
val tupleData: DataStream[org.apache.flink.api.java.tuple.Tuple4[String, String, String, String]] = dataStreamSource.map(line => {
var data: JSONObject = null
var mergetype = "APPEND"
val lineObj = JSONObject.fromObject(line)
val source = lineObj.getJSONObject("source")
val db = source.getString("db")
val table = source.getString("table")
if ("d" == lineObj.getString("op")) {
val oo = lineObj.getJSONObject("before")
data = lineObj.getJSONObject("before")
mergetype = "DELETE"
} else if ("u" == lineObj.getString("op")) {
data = lineObj.getJSONObject("after")
mergetype = "MERGE"
} else if ("c" == lineObj.getString("op")) {
data = lineObj.getJSONObject("after")
} else if ("r" == lineObj.getString("op")) {
data = lineObj.getJSONObject("after")
mergetype = "APPEND"
}
new Tuple4[String, String, String, String](mergetype, db, table, data.toString)
}).returns(TypeInformation.of(new TypeHint[Tuple4[String, String, String, String]] {}))
tupleData
/**
* 窗口聚合數(shù)據(jù),將相同load方式,db,table的json 數(shù)據(jù)組合為長字符串,
*/
val byKeyData: DataStream[org.apache.flink.api.java.tuple.Tuple4[String, String, String, String]] = tupleData.keyBy(0, 1, 2)
.timeWindow(Time.milliseconds(1000))
.reduce((itemFirst, itemSecond) => new Tuple4(itemFirst.f0, itemFirst.f1, itemFirst.f2, itemFirst.f3 + "=-=-=" + itemSecond.f3))
byKeyData
}
def ddlFormat(ddlDataStream: DataStream[String]): DataStream[String] = {
val ddlStrDataStream: DataStream[String] = ddlDataStream.map(line => {
try {
val lineObj = JSONObject.fromObject(line)
val historyRecord = JSONObject.fromObject(lineObj.getString("historyRecord"))
val tableChangesArray = historyRecord.getJSONArray("tableChanges")
val tableChanges = JSONObject.fromObject(tableChangesArray.getJSONObject(0))
val tableChangeType = tableChanges.getString("type")
var ddlSql = ""
val table = tableChanges.optJSONObject("table")
val primaryKeyColumnNames = table.getString("primaryKeyColumnNames").replace("[", "").replace("]", "").replace("\"", "")
val columnsArray = table.getJSONArray("columns")
// 建表轉(zhuǎn)換
if (tableChangeType == "CREATE") {
val tableName = tableChanges.getString("id").replace("\"", "")
val columnsArrayBuffer = ArrayBuffer[String]()
columnsArray.forEach(line => {
val columnJson = JSONObject.fromObject(line)
val name = columnJson.getString("name")
val typeName = columnJson.getString("typeName")
val length = columnJson.optInt("length", 1)
val scale = columnJson.optInt("scale", 2)
val lastColumnType = matchColumnType(typeName, length, scale)
val lastColumn = s"$name $lastColumnType"
columnsArrayBuffer.+=(lastColumn)
})
// 對列重新排序,主鍵依次放在最前面,避免錯誤Key columns should be a ordered prefix of the scheme
val keys = primaryKeyColumnNames.split(",")
for (indexOfCol <- 0 until keys.length) {
val col = keys(indexOfCol)
var columnFormat = ""
columnsArrayBuffer.foreach(column => {
if (column.startsWith(col)) {
columnFormat = column
}
})
val index = columnsArrayBuffer.indexOf(columnFormat)
columnsArrayBuffer.remove(index)
columnsArrayBuffer.insert(indexOfCol, columnFormat)
}
val header = s"CREATE TABLE IF NOT EXISTS $tableName ("
val end = s""") UNIQUE KEY($primaryKeyColumnNames) DISTRIBUTED BY HASH($primaryKeyColumnNames) BUCKETS 10 PROPERTIES ("replication_allocation" = "tag.location.default: 1")"""
ddlSql = header + columnsArrayBuffer.mkString(",") + end
} else if (tableChangeType == "ALTER") {
var ddl = historyRecord.getString("ddl").replace("\r\n", " ")
println(ddl)
if (ddl.startsWith("RENAME")) {
ddl = ddl.replace("`", "")
val arr = ddl.split("")
ddlSql = s"ALTER TABLE ${arr(2)} RENAME ${arr(4)}"
} else if (ddl.contains("DROP COLUMN")) {
ddlSql = ddl
} else {
val dbTableName = tableChanges.getString("id").replace("\"", "")
val addColName = ddl.split(" ")(5).replace("`", "")
var colTpe = ""
columnsArray.forEach(line => {
val columnJson = JSONObject.fromObject(line)
if (columnJson.getString("name") == addColName) {
val typeName = columnJson.getString("typeName")
val length = columnJson.optInt("length", 1)
val scale = columnJson.optInt("scale", 2)
colTpe = matchColumnType(typeName, length, scale)
}
})
if (ddl.contains("ADD COLUMN")) {
ddlSql = s"ALTER TABLE $dbTableName ADD COLUMN $addColName $colTpe"
} else {
ddlSql = s"ALTER TABLE $dbTableName MODIFY COLUMN $addColName $colTpe"
}
}
}
println(ddlSql)
ddlSql
}
catch {
case ex: Exception => println(ex)
"select 1"
}
})
ddlStrDataStream
}
def showCapital(x: Option[String]): String = x match {
case Some(s) => s
case None => "?"
}
def matchColumnType(columnType: String, length: Int, scale: Int): String = {
var returnColumnType = "VARCHAR(255)"
columnType match {
case "INT UNSIGNED" => returnColumnType = s"INT($length)"
case "INT" => returnColumnType = s"INT($length)"
case "TINYINT" => returnColumnType = s"TINYINT($length)"
case "VARCHAR" => returnColumnType = s"VARCHAR(${length * 3})"
case "BIGINT" => returnColumnType = s"BIGINT(${length})"
case "TINYTEXT" => returnColumnType = s"TINYTEXT"
case "LONGTEXT" => returnColumnType = s"STRING"
case "TEXT" => returnColumnType = s"STRING"
case "DECIMAL" => returnColumnType = s"DECIMAL($length,$scale)"
case "VARBINARY" => returnColumnType = s"STRING"
case "TIMESTAMP" => returnColumnType = s"STRING"
case "ENUM" => returnColumnType = s"TINYINT"
case "MEDIUMINT" => returnColumnType = s"INT"
case "SMALLINT" => returnColumnType = s"SMALLINT"
case "MEDIUMTEXT" => returnColumnType = s"STRING"
case _ => returnColumnType = s"STRING"
}
returnColumnType
}
}
DorisStreamLoad.scala
package com.xxxx.util
import net.sf.json.JSONObject
import net.sf.json.JSONArray
import org.apache.http.HttpHeaders
import org.apache.http.client.methods.HttpPut
import org.apache.http.entity.StringEntity
import org.apache.http.entity.BufferedHttpEntity
import org.apache.http.impl.client.{DefaultRedirectStrategy, HttpClientBuilder, HttpClients}
import org.apache.http.util.EntityUtils
import org.slf4j.{Logger, LoggerFactory}
import org.apache.commons.codec.binary.Base64
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import java.io.IOException
import java.nio.charset.StandardCharsets
import java.util.UUID
class DorisStreamLoad(props: PropertiesUtil,producer:KafkaProducer[String, String]) extends Serializable {
lazy val httpClientBuilder: HttpClientBuilder = HttpClients.custom.setRedirectStrategy(new DefaultRedirectStrategy() {
override protected def isRedirectable(method: String): Boolean = {
// If the connection target is FE, you need to deal with 307 redirect。
true
}
})
def loadJson(jsonData: String, mergeType: String, db: String, table: String): Unit = try {
val loadUrlPattern = "http://%s/api/%s/%s/_stream_load?"
val arr = jsonData.split("=-=-=")
val jsonArray = new JSONArray()
for (line <- arr) {
try {
val js = JSONObject.fromObject(line)
jsonArray.add(js)
} catch {
case e: Exception =>
println(e)
println(line)
}
}
val jsonArrayStr = jsonArray.toString()
val client = httpClientBuilder.build
val loadUrlStr = String.format(loadUrlPattern, props.doris_load_host, db, table)
try {
val put = new HttpPut(loadUrlStr)
put.removeHeaders(HttpHeaders.CONTENT_LENGTH)
put.removeHeaders(HttpHeaders.TRANSFER_ENCODING)
put.setHeader(HttpHeaders.EXPECT, "100-continue")
put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader)
val label = UUID.randomUUID.toString
// You can set stream load related properties in the Header, here we set label and column_separator.
put.setHeader("label", label)
put.setHeader("merge_type", mergeType)
// put.setHeader("two_phase_commit", "true")
put.setHeader("column_separator", ",")
put.setHeader("format", "json")
put.setHeader("strip_outer_array", "true")
put.setHeader("exec_mem_limit", "6442450944")
val entity = new StringEntity(jsonArrayStr, "UTF-8")
put.setEntity(entity)
try {
val response = client.execute(put)
try {
var loadResult = ""
if (response.getEntity != null) {
loadResult = EntityUtils.toString(response.getEntity)
}
val statusCode = response.getStatusLine.getStatusCode
if (statusCode != 200) {
throw new IOException("Stream load failed. status: %s load result: %s".format(statusCode, loadResult))
}
} finally if (response != null) {
response.close()
}
}
}
finally
if (client != null) client.close()
}
/**
* Construct authentication information, the authentication method used by doris here is Basic Auth
*
*/
def basicAuthHeader: String = {
val tobeEncode = props.doris_user + ":" + props.doris_password
val encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8))
"Basic " + new String(encoded)
}
}
SinkDoris.scala
package com.xxxx.util
import org.apache.flink.api.java.tuple.Tuple4
import com.zbkj.mysql2doris.FlinkCDCMysql2Doris.dataLine
import net.sf.json.JSONObject
import org.apache.flink.streaming.api.functions.sink.SinkFunction
class SinkDoris(dorisStreamLoad:DorisStreamLoad) extends SinkFunction[Tuple4[String, String, String, String]] {
// val dorisStreamLoad:DorisStreamLoadT=null
/**
* 在open()方法中建立連接,這樣不用每次invoke的時候都要建立連接和釋放連接。
*/
// def open(parameters: Configuration): Unit = {
// super
// super.open(parameters);
//
// }
/**
* 每個元素的插入都要調(diào)用一次invoke()方法進行插入操作
*/
override def invoke(value:Tuple4[String, String, String, String]): Unit = {
dorisStreamLoad.loadJson(value.f3,value.f0,value.f1,value.f2)
val producer = KafkaUtil.getProducer
val json = new JSONObject()
json.put("db",value.f2)
json.put("table",value.f3)
KafkaUtil.sendKafkaMsg(producer, json.toString, "sync_table")
}
}
SinkSchema.scala
package com.xxxx.util
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import java.sql.{Connection, DriverManager, PreparedStatement}
class SinkSchema(props:PropertiesUtil) extends RichSinkFunction[String] {
var conn: Connection = _
var ps : PreparedStatement = _
var mysqlPool: MysqlPool = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
mysqlPool = MysqlManager.getMysqlPool
conn = mysqlPool.getConnection
conn.setAutoCommit(false)
}
override def close(): Unit = {
super.close()
if (conn != null) {
conn.close()
}
if (ps != null) {
ps.close()
}
}
override def invoke(sql: String, context: SinkFunction.Context): Unit = {
super.invoke(sql, context)
if (sql !="" && sql.nonEmpty){
ps = conn.prepareStatement(sql)
try {
ps.execute()
}catch {
case ex:Exception=>println(ex)
}
conn.commit()
}
// conn.close()
}
}
PropertiesUtil.scala
package com.xxxx.util
import java.io.FileInputStream
import java.util.Properties
/**
* propertiesUtil
*
*/
class PropertiesUtil extends Serializable {
private val props = new Properties()
var doris_host = ""
var doris_port = 0
var doris_user = ""
var doris_password = ""
var database_list = ""
var table_list = ""
var mysql_host = ""
var mysql_port = 0
var mysql_user = ""
var mysql_password = ""
var doris_load_host = ""
var rds_host = ""
var rds_port = 0
var rds_user = ""
var rds_password = ""
var rds_database = ""
// var sync_database_select_sql = ""
// var sync_table_select_sql = ""
// var sync_config_host = ""
// var sync_config_port = 0
// var sync_config_user = ""
// var sync_config_password = ""
var sync_config_db = ""
var sync_config_table = ""
var sync_redis_table = ""
var address_table = ""
var parallelism = 0
var split_size = 0
var fetch_size = 0
var bootstrap_servers = ""
var topic = ""
var group_id = ""
var offset_mode = ""
// reids
var redis_max_total: Int = 0
var redis_max_idle: Int = 0
var redis_min_idle: Int = 0
var redis_host = ""
var redis_port: Int = 0
var redis_timeout: Int = 0
var redis_password = ""
var redis_db_index: Int = 0
var prefix = "0"
def init(filePath: String): Unit = {
props.load(new FileInputStream(filePath))
// hdfs
doris_host = props.getProperty("doris.host")
doris_port = props.getProperty("doris.port").toInt
doris_user = props.getProperty("doris.user")
doris_password = props.getProperty("doris.password")
database_list = props.getProperty("database.list")
table_list = props.getProperty("table.list")
mysql_host = props.getProperty("mysql.host")
mysql_port = props.getProperty("mysql.port").toInt
mysql_user = props.getProperty("mysql.user")
mysql_password = props.getProperty("mysql.password")
doris_load_host = props.getProperty("doris.load.host")
rds_host = props.getProperty("rds.host")
rds_port = props.getProperty("rds.port").toInt
rds_user = props.getProperty("rds.user")
rds_password = props.getProperty("rds.password")
rds_database = props.getProperty("rds.database")
sync_config_db = props.getProperty("sync.config.db")
sync_config_table = props.getProperty("sync.config.table")
sync_redis_table = props.getProperty("sync.redis.table")
address_table = props.getProperty("address.table")
parallelism = props.getProperty("parallelism").toInt
split_size = props.getProperty("split.size").toInt
fetch_size = props.getProperty("fetch.size").toInt
bootstrap_servers = props.getProperty("bootstrap.servers")
topic = props.getProperty("topic")
group_id = props.getProperty("group.id")
offset_mode = props.getProperty("offset.mode")
// reids
redis_max_total = props.getProperty("redis.max.total").toInt
redis_max_idle = props.getProperty("redis.max.idle").toInt
redis_min_idle = props.getProperty("redis.min.idle").toInt
redis_host = props.getProperty("redis.redis.host")
redis_port = props.getProperty("redis.redis.port").toInt
redis_timeout = props.getProperty("redis.redis.timeout").toInt
redis_password = props.getProperty("redis.password")
redis_db_index = props.getProperty("redis.db.index").toInt
prefix = props.getProperty("redis.key.prefix")
}
def stringToInt(prop: String): Int = {
try {
prop.toInt
} catch {
case ex: Exception => {
0
}
}
}
}
//惰性單例,真正計算時才初始化對象
object PropertiesManager {
@volatile private var propertiesUtil: PropertiesUtil = _
def getUtil: PropertiesUtil = {
propertiesUtil
}
def initUtil(): Unit = {
var filePath = "config.properties"
// filePath = this.getClass.getResource("/").toString.replace("file:", "") + "config.properties"
filePath = "/opt/flink-1.13.6/job/mysql2doris/config.properties"
if (propertiesUtil == null) {
propertiesUtil = new PropertiesUtil
}
propertiesUtil.init(filePath)
// propertiesUtil.evn = evn
}
}
若有疑問請留言或者 加入857技術(shù)社區(qū)文章來源地址http://www.zghlxwxcb.cn/news/detail-401953.html
到了這里,關(guān)于使用 Flink CDC 實現(xiàn) MySQL 數(shù)據(jù),表結(jié)構(gòu)實時入 Apache Doris的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!