国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink CDC SQL Oracle to Postgresql與jdbc連接oracle報(bào)錯(cuò)處理

這篇具有很好參考價(jià)值的文章主要介紹了Flink CDC SQL Oracle to Postgresql與jdbc連接oracle報(bào)錯(cuò)處理。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

flink-cdc官網(wǎng):Oracle CDC Connector — CDC Connectors for Apache Flink? documentation

Flink環(huán)境依賴(lài):

(1)下載postgresql jdbc??jar包?
postgresql-42.3.5 和 flink-sql-connector-oracle-cdc-2.2.0.jar將包放到flink 下 lib目錄里面
下載地址https://jdbc.postgresql.org/download.html

flink-connector-jdbc_2.12_1.14.4.jar 包
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc_2.12/1.14.4

(2)以 DBA 身份連接到數(shù)據(jù)庫(kù)
ORACLE_SID=SID
export ORACLE_SID
sqlplus /nolog
  CONNECT sys/password AS SYSDBA

(3)啟用日志歸檔

alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
?	啟用日志歸檔需要重啟數(shù)據(jù)庫(kù),嘗試時(shí)注意
?	歸檔日志會(huì)占用大量磁盤(pán)空間,建議定期清理過(guò)期日志

(4)檢查是否啟用了日志歸檔

-- Should now "Database log mode: Archive Mode"
archive log list;
必須為捕獲的表或數(shù)據(jù)庫(kù)啟用補(bǔ)充日志記錄,以便數(shù)據(jù)更改捕獲已更改數(shù)據(jù)庫(kù)行的之前狀態(tài)。下面說(shuō)明了如何在表/數(shù)據(jù)庫(kù)級(jí)別進(jìn)行配置。
-- Enable supplemental logging for a specific table:
ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-- Enable supplemental logging for database
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

(5)創(chuàng)建具有權(quán)限的 Oracle 用戶(hù)

(5.1)。創(chuàng)建表空間

sqlplus sys/password@host:port/SID AS SYSDBA;
? CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
? exit;


(5.2)。創(chuàng)建用戶(hù)并授予權(quán)限

sqlplus sys/password@host:port/SID AS SYSDBA;
? CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
? GRANT CREATE SESSION TO flinkuser;
? GRANT SET CONTAINER TO flinkuser;
? GRANT SELECT ON V_$DATABASE to flinkuser;
? GRANT FLASHBACK ANY TABLE TO flinkuser;
? GRANT SELECT ANY TABLE TO flinkuser;
? GRANT SELECT_CATALOG_ROLE TO flinkuser;
? GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
? GRANT SELECT ANY TRANSACTION TO flinkuser;
? GRANT LOGMINING TO flinkuser;

? GRANT CREATE TABLE TO flinkuser;
? GRANT LOCK ANY TABLE TO flinkuser;
? GRANT ALTER ANY TABLE TO flinkuser;
? GRANT CREATE SEQUENCE TO flinkuser;

? GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
? GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;

? GRANT SELECT ON V_$LOG TO flinkuser;
? GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
? GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
? GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
? GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
? GRANT SELECT ON V_$LOGFILE TO flinkuser;
? GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
? GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;

Flink SQL 客戶(hù)端連接器測(cè)試:

  1. 創(chuàng)建Oracle鏈接器
CREATE TABLE TEST_source (
?????? ID INT,
?????? PRIMARY KEY (ID) NOT ENFORCED
) WITH (
??? 'connector' = 'oracle-cdc',
??? 'hostname' = 'Oracle_IP地址',
??? 'port' = '1521',
??? 'username' = 'flinkuser',
??? 'password' = 'flinkpw',
??? 'database-name' = 'ORA19C',
??? 'schema-name' = 'FLINKUSER',
    'table-name' = 'TEST'
????'debezium.log.mining.strategy'='online_catalog'

);
2.創(chuàng)建postgresql鏈接器接收端
create table flink_cdc_sink1(
ID INT,
primary key(ID) NOT ENFORCED)
with(
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://pg庫(kù)_IP地址:5432/ postgres?currentSchema=public', 
'username' = 'postgres',
'password' = '123456',? 
'table-name' = 'sink1'
);
3.插入數(shù)據(jù) 
insert into flink_cdc_sink1? select ID from? TEST_source;
4.問(wèn)題:數(shù)據(jù)同步不過(guò)去

Flink CDC SQL Oracle to Postgresql與jdbc連接oracle報(bào)錯(cuò)處理

解決方案:檢查flink-connector-jdbc.jar 版本問(wèn)題 替換即可
FLINK Oracle to Postgresql JAVA
1. java編碼
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
?* 測(cè)試 flink cdc 實(shí)時(shí)獲取oracle數(shù)據(jù)變化
?*/
public class FlinkCdcOracleExample {

??? public static void main(String[] args) throws Exception {

??????? EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
??????????????? .useBlinkPlanner()
??????????????? .inStreamingMode()
?????????????? ?.build();
??????? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
??????? env.setParallelism(1);
??????? StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);

??????? tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

??????? String sourceDDL ="CREATE TABLE oracle_source (\n" +
??????????????? "???? ID INT, \n" +
??????????????? "???? NAME STRING, \n" +
??????????????? "???? PRIMARY KEY (ID) NOT ENFORCED \n" +
??????????????? "???? ) WITH (\n" +
??????????????? "???? 'connector' = 'oracle-cdc',\n" +
??????????????? "???? 'hostname' = 'Oracle_IP地址',\n" +
??????????????? "???? 'port' = '1521',\n" +
??????????????? "??? ?'username' = 'flinkuser',\n" +
??????????????? "???? 'password' = 'flinkpw',\n" +
??????????????? "???? 'database-name' = 'ORA19C',\n" +
??????????????? "???? 'schema-name' = 'FLINKUSER',\n" +?????????? // 注意這里要大寫(xiě)
??????????????? "???? 'table-name' = 'tablename',\n" +?

"???? 'debezium.log.mining.strategy'='online_catalog'\n"+
??????????????? "???? )";
??????? // 創(chuàng)建一張用于輸出的表
??????? String sinkDDL = "CREATE TABLE outTable (\n" +
??????????????? " id INT,\n" +
??????????????? " name STRING, \n" +
??????????????? " PRIMARY KEY (id) NOT ENFORCED\n" +
??????????????? ") WITH (\n" +
??????????????? " 'connector' = 'jdbc',\n" +
??????????????? " 'url' = 'jdbc:postgresql://PG庫(kù)_IP地址:5432/postgres?currentSchema=public',\n" +
??????????????? " 'username' = 'postgres',\n" +
??????????????? " 'password' = '123456',\n" +
??????????????? " 'table-name' = 'pg_sink'\n" +
??????????????? ")";
??????? /*String transformSQL =
??????????????? "select * from? oracle_source ";*/
??????? String transformSQL =
??????????????? "INSERT INTO outTable " +
??????????????????????? "SELECT ID,NAME " +
??????????????????????? "FROM oracle_source";
??????? //執(zhí)行source表ddl
???? ???tableEnv.executeSql(sourceDDL);
??????? //TableResult tableResult = tableEnv.executeSql("select * from oracle_source");
??????? //tableResult.print();
??????? //執(zhí)行sink表ddl
????? tableEnv.executeSql(sinkDDL);
??????? //執(zhí)行邏輯sql語(yǔ)句
??????? TableResult tableResult = tableEnv.executeSql(transformSQL);
??????? tableResult.print();
??????? env.execute();
??? }
}

返回內(nèi)容 以上代碼是修改后的應(yīng)不會(huì)有下圖報(bào)錯(cuò)

?Flink CDC SQL Oracle to Postgresql與jdbc連接oracle報(bào)錯(cuò)處理

注:報(bào)這個(gè)錯(cuò)誤,但數(shù)據(jù)可以同步過(guò)去

 錯(cuò)誤:可以讀取oracle表內(nèi)的數(shù)據(jù),但jdbc連接postgres 報(bào)錯(cuò),數(shù)據(jù)傳不過(guò)去

Flink CDC SQL Oracle to Postgresql與jdbc連接oracle報(bào)錯(cuò)處理

解決:修改maven依賴(lài)
<dependency>
??? <groupId>com.ververica</groupId>
??? <artifactId>flink-connector-oracle-cdc</artifactId>
??? <version>2.2.0</version>
</dependency>

<dependency>
??? <groupId>org.postgresql</groupId>
??? <artifactId>postgresql</artifactId>
??? <version>42.3.5</version>
</dependency>
flink sql  創(chuàng)建oracle 接收器
create table flink_cdc_sink (
ID INT,
NAME STRING
)with(
'connector' = 'jdbc',
'url' = 'jdbc:oracle:thin:@192.168.58.202:1521:ORA19C',
'username' = 'flinkuser',
 'password' = 'flinkpw', 
'table-name' = 'TEST2',
 'driver' = 'oracle.jdbc.driver.OracleDriver');
報(bào)錯(cuò):

Flink CDC SQL Oracle to Postgresql與jdbc連接oracle報(bào)錯(cuò)處理文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-514038.html

jdbc 連接oracle錯(cuò)誤處理

解決方法:目前flink 1.14不支持jdbc 連接oracle 需要安裝 flink 1.15 處理
Flink 1.15 安裝 需要使用java11?
1.官網(wǎng)下載java 11
https://www.oracle.com/java/technologies/downloads/#java11
2.解壓 jdk tar 
linux>tar -xzvf jdk-11.0.15.1_linux-x64_bin.tar.gz
3.修改環(huán)境配置文件
linux>?vim /etc/profile
# Java11環(huán)境變量配置
JAVA_HOME=/devtools/java/java11/jdk-11.0.15
PATH=$JAVA_HOME/bin:$PATH
CLASSPATH=$JAVA_HOME/lib
export JAVA_HOME CLASSPATH PATH
 
# Java8環(huán)境變量配置
JAVA_HOME=/devtools/java/java8/jdk1.8.0_321
PATH=$PATH:$JAVA_HOME/bin:$PATH
CLASSPATH=$JAVA_HOME/lib
export JAVA_HOME PATH CLASSPATH
4.重啟電腦生效
5.下載flink 1.15
linux>Wget https://dlcdn.apache.org/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz
6.配置 flink 1.15
linux>vim conf/flink-conf.yaml

jobmanager.rpc.address: jobIP地址
# 配置high-availability mode
high-availability: zookeeper
# JobManager的meta信息放在dfs,在zk上主要會(huì)保存一個(gè)指向dfs路徑的指針 
high-availability.storageDir: hdfs://cluster/flinkha/
# 配置zookeeper quorum(hostname和端口需要依據(jù)對(duì)應(yīng)zk的實(shí)際配置)
high-availability.zookeeper.quorum: IPA:2181,IPB:2181,IPC:2181 
# (可選)設(shè)置zookeeper的root目錄
#high-availability.zookeeper.path.root: /test_dir/test_standalone2_root
# 注釋以下配置
# jobmanager.bind-host: localhost
# taskmanager.bind-host: localhost
#taskmanager.host: localhost
#rest.address: localhost
#rest.bind-address: localhost

#配置yarn 高可用重試次數(shù)
yarn.application-attempts: 10
注意:必須要操作上面的“注釋以下配置” 否則Web UI 訪問(wèn)不了 其余配置一樣,可以參考最上面的搭建。
 
 

到了這里,關(guān)于Flink CDC SQL Oracle to Postgresql與jdbc連接oracle報(bào)錯(cuò)處理的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Flink Oracle CDC Connector源碼解讀

    Flink Oracle CDC Connector源碼解讀

    flink cdc是在flink的基礎(chǔ)上對(duì)oracle的數(shù)據(jù)進(jìn)行實(shí)時(shí)采集,底層使用的是debezium框架來(lái)實(shí)現(xiàn),debezium使用oracle自帶的logminer技術(shù)來(lái)實(shí)現(xiàn)。logminer的采集需要對(duì)數(shù)據(jù)庫(kù)和采集表添加補(bǔ)充日志,由于oracle18c不支持對(duì)數(shù)據(jù)添加補(bǔ)充日志,所以目前支持的oracle11、12、19三個(gè)版本。 flink oracle

    2024年02月02日
    瀏覽(22)
  • Flink CDC 基于Oracle log archiving 實(shí)時(shí)同步Oracle表到Mysql

    Flink CDC 基于Oracle log archiving 實(shí)時(shí)同步Oracle表到Mysql

    環(huán)境說(shuō)明: flink 1.15.2 Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production mysql 版本:5.7 windows11 IDEA 本地運(yùn)行 先上官網(wǎng)使用說(shuō)明和案例:Oracle CDC Connector — Flink CDC documentation 1. Oracle 開(kāi)啟 log archiving (1).啟用 log archiving ?? ??? ?a:以DBA用戶(hù)連接數(shù)據(jù)庫(kù)? ??

    2024年02月11日
    瀏覽(44)
  • Flink CDC 實(shí)時(shí)抽取 Oracle 數(shù)據(jù)-排錯(cuò)&調(diào)優(yōu)

    Flink CDC 實(shí)時(shí)抽取 Oracle 數(shù)據(jù)-排錯(cuò)&調(diào)優(yōu)

    Flink CDC 于 2021 年 11 月 15 日發(fā)布了最新版本 2.1,該版本通過(guò)引入內(nèi)置 Debezium 組件,增加了對(duì) Oracle 的支持。對(duì)該版本進(jìn)行試用并成功實(shí)現(xiàn)了對(duì) Oracle 的實(shí)時(shí)數(shù)據(jù)捕獲以及性能調(diào)優(yōu),現(xiàn)將試用過(guò)程中的一些關(guān)鍵細(xì)節(jié)進(jìn)行分享。 Oracle:11.2.0.4.0(RAC 部署) Flink:1.13.1 Hadoop:3.2.1

    2024年01月16日
    瀏覽(34)
  • Flink CDC 基于Oracle log archiving 實(shí)時(shí)同步Oracle表到Mysql(無(wú)主鍵)

    環(huán)境說(shuō)明: flink 1.15.2 Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production mysql 版本:5.7 windows11 IDEA 本地運(yùn)行 具體環(huán)境設(shè)置和maven依賴(lài)請(qǐng)看上篇:Flink CDC 基于Oracle log archiving 實(shí)時(shí)同步Oracle表到Mysql_彩虹豆的博客-CSDN博客 現(xiàn)在操作的是源表和目標(biāo)表都無(wú)主鍵數(shù)

    2024年02月15日
    瀏覽(30)
  • 實(shí)例講解C++連接各種數(shù)據(jù)庫(kù),包含SQL Server、MySQL、Oracle、ACCESS、SQLite 和 PostgreSQL、MongoDB 數(shù)據(jù)庫(kù)

    ? C++ 是一種通用的編程語(yǔ)言,可以使用不同的庫(kù)和驅(qū)動(dòng)程序來(lái)連接各種數(shù)據(jù)庫(kù)。以下是一些示例代碼,演示如何使用 C++ 連接 SQL Server、MySQL、Oracle、ACCESS、SQLite 和 PostgreSQL、MongoDB 數(shù)據(jù)庫(kù)。 連接 SQL Server 數(shù)據(jù)庫(kù) 要使用 C++ 連接 SQL Server 數(shù)據(jù)庫(kù),可以使用 Microsoft 的 ADODB 庫(kù)。以

    2024年02月05日
    瀏覽(35)
  • 【現(xiàn)場(chǎng)問(wèn)題】flink-cdc,Oracle2Mysql的坑,Oracle區(qū)分大小寫(xiě)導(dǎo)致

    【現(xiàn)場(chǎng)問(wèn)題】flink-cdc,Oracle2Mysql的坑,Oracle區(qū)分大小寫(xiě)導(dǎo)致

    Column ‘id’ is NOT NULL, however, a null value is being written into it. You can set job configuration ‘table.exec.sink.not-null-enforcer’=‘DROP’ to suppress this exception and drop such records silently 大致意思就是不能插入為空的數(shù)值。 為什么會(huì)報(bào)這個(gè)錯(cuò)誤,我們來(lái)看DML的執(zhí)行語(yǔ)句: insert into t_wx_target select

    2024年02月12日
    瀏覽(25)
  • FLINK CDC postgresql (Stream與SQL)

    FLINK CDC postgresql (Stream與SQL)

    Postgres CDC Connector — CDC Connectors for Apache Flink? documentation flink cdc捕獲postgresql數(shù)據(jù) 1)更改配置文件 需要更改 # 更改wal日志方式為logical # 更改solts最大數(shù)量(默認(rèn)值為10),flink-cdc默認(rèn)一張表占用一個(gè) # 更改wal發(fā)送最大進(jìn)程數(shù)(默認(rèn)值為10),這個(gè)值和上面的solts設(shè)置一樣 # 中斷

    2023年04月27日
    瀏覽(21)
  • Flink CDC Oracle 用戶(hù)權(quán)限不足 ORA-01031: insufficient privileges

    Flink CDC Oracle 用戶(hù)權(quán)限不足 ORA-01031: insufficient privileges

    Flink CDC Oracle用戶(hù)權(quán)限不足 版本:flink1.14.5 、flinkcdc 2.2.1、oracle11g、 場(chǎng)景:flink cdc 實(shí)時(shí)抽取oracle的數(shù)據(jù)表。DBA為了數(shù)據(jù)庫(kù)安全考慮,對(duì)訪問(wèn)用戶(hù)權(quán)限進(jìn)行控制。將oracle的flinkuser用戶(hù)XE下的orders表授權(quán)只讀權(quán)限給readuser用戶(hù)。授權(quán)情況如下: 此時(shí)執(zhí)行flink oracle cdc 任務(wù): taskmange

    2024年02月12日
    瀏覽(25)
  • flink cdc同步Oracle數(shù)據(jù)庫(kù)資料到Doris問(wèn)題集錦

    java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/ThreadFactoryBuilder at com.ververica.cdc.debezium.DebeziumSourceFunction.open(DebeziumSourceFunction.java:218) ~[flink-connector-debezium-2.2.0.jar:2.2.0] at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-co

    2024年02月16日
    瀏覽(22)
  • Flink-CDC——MySQL、SqlSqlServer、Oracle、達(dá)夢(mèng)等數(shù)據(jù)庫(kù)開(kāi)啟日志方法

    目錄 1. 前言 2. 數(shù)據(jù)源安裝與配置 2.1 MySQL 2.1.1 安裝 2.1.2 CDC 配置 2.2 Postgresql 2.2.1 安裝 2.2.2 CDC 配置 2.3 Oracle 2.3.1 安裝 2.3.2 CDC 配置 2.4 SQLServer 2.4.1 安裝 2.4.2 CDC 配置 2.5達(dá)夢(mèng) 2.4.1安裝 2.4.2CDC配置 3. 驗(yàn)證 3.1 Flink版本與CDC版本的對(duì)應(yīng)關(guān)系 3.2 下載相關(guān)包 3.3 添加cdc jar 至lib目錄 3.4 驗(yàn)

    2024年02月05日
    瀏覽(122)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包