flink-cdc官網(wǎng):Oracle CDC Connector — CDC Connectors for Apache Flink? documentation
Flink環(huán)境依賴(lài):
(1)下載postgresql jdbc??jar包? postgresql-42.3.5 和 flink-sql-connector-oracle-cdc-2.2.0.jar將包放到flink 下 lib目錄里面 下載地址https://jdbc.postgresql.org/download.html flink-connector-jdbc_2.12_1.14.4.jar 包 https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc_2.12/1.14.4 (2)以 DBA 身份連接到數(shù)據(jù)庫(kù)
ORACLE_SID=SID
export ORACLE_SID
sqlplus /nolog
CONNECT sys/password AS SYSDBA
(3)啟用日志歸檔
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
? 啟用日志歸檔需要重啟數(shù)據(jù)庫(kù),嘗試時(shí)注意
? 歸檔日志會(huì)占用大量磁盤(pán)空間,建議定期清理過(guò)期日志
(4)檢查是否啟用了日志歸檔
-- Should now "Database log mode: Archive Mode"
archive log list;
必須為捕獲的表或數(shù)據(jù)庫(kù)啟用補(bǔ)充日志記錄,以便數(shù)據(jù)更改捕獲已更改數(shù)據(jù)庫(kù)行的之前狀態(tài)。下面說(shuō)明了如何在表/數(shù)據(jù)庫(kù)級(jí)別進(jìn)行配置。
-- Enable supplemental logging for a specific table:
ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-- Enable supplemental logging for database
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
(5)創(chuàng)建具有權(quán)限的 Oracle 用戶(hù)
(5.1)。創(chuàng)建表空間
sqlplus sys/password@host:port/SID AS SYSDBA;
? CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
? exit;
(5.2)。創(chuàng)建用戶(hù)并授予權(quán)限
sqlplus sys/password@host:port/SID AS SYSDBA;
? CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
? GRANT CREATE SESSION TO flinkuser;
? GRANT SET CONTAINER TO flinkuser;
? GRANT SELECT ON V_$DATABASE to flinkuser;
? GRANT FLASHBACK ANY TABLE TO flinkuser;
? GRANT SELECT ANY TABLE TO flinkuser;
? GRANT SELECT_CATALOG_ROLE TO flinkuser;
? GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
? GRANT SELECT ANY TRANSACTION TO flinkuser;
? GRANT LOGMINING TO flinkuser;
? GRANT CREATE TABLE TO flinkuser;
? GRANT LOCK ANY TABLE TO flinkuser;
? GRANT ALTER ANY TABLE TO flinkuser;
? GRANT CREATE SEQUENCE TO flinkuser;
? GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
? GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
? GRANT SELECT ON V_$LOG TO flinkuser;
? GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
? GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
? GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
? GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
? GRANT SELECT ON V_$LOGFILE TO flinkuser;
? GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
? GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
Flink SQL 客戶(hù)端連接器測(cè)試:
- 創(chuàng)建Oracle鏈接器
CREATE TABLE TEST_source (
?????? ID INT,
?????? PRIMARY KEY (ID) NOT ENFORCED
) WITH (
??? 'connector' = 'oracle-cdc',
??? 'hostname' = 'Oracle_IP地址',
??? 'port' = '1521',
??? 'username' = 'flinkuser',
??? 'password' = 'flinkpw',
??? 'database-name' = 'ORA19C',
??? 'schema-name' = 'FLINKUSER',
'table-name' = 'TEST'
????'debezium.log.mining.strategy'='online_catalog'
);
2.創(chuàng)建postgresql鏈接器接收端
create table flink_cdc_sink1(
ID INT,
primary key(ID) NOT ENFORCED)
with(
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://pg庫(kù)_IP地址:5432/ postgres?currentSchema=public',
'username' = 'postgres',
'password' = '123456',?
'table-name' = 'sink1'
);
3.插入數(shù)據(jù)
insert into flink_cdc_sink1? select ID from? TEST_source;
4.問(wèn)題:數(shù)據(jù)同步不過(guò)去
解決方案:檢查flink-connector-jdbc.jar包 版本問(wèn)題 替換即可
FLINK Oracle to Postgresql (JAVA)
1. java編碼
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
?* 測(cè)試 flink cdc 實(shí)時(shí)獲取oracle數(shù)據(jù)變化
?*/
public class FlinkCdcOracleExample {
??? public static void main(String[] args) throws Exception {
??????? EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
??????????????? .useBlinkPlanner()
??????????????? .inStreamingMode()
?????????????? ?.build();
??????? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
??????? env.setParallelism(1);
??????? StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
??????? tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
??????? String sourceDDL ="CREATE TABLE oracle_source (\n" +
??????????????? "???? ID INT, \n" +
??????????????? "???? NAME STRING, \n" +
??????????????? "???? PRIMARY KEY (ID) NOT ENFORCED \n" +
??????????????? "???? ) WITH (\n" +
??????????????? "???? 'connector' = 'oracle-cdc',\n" +
??????????????? "???? 'hostname' = 'Oracle_IP地址',\n" +
??????????????? "???? 'port' = '1521',\n" +
??????????????? "??? ?'username' = 'flinkuser',\n" +
??????????????? "???? 'password' = 'flinkpw',\n" +
??????????????? "???? 'database-name' = 'ORA19C',\n" +
??????????????? "???? 'schema-name' = 'FLINKUSER',\n" +?????????? // 注意這里要大寫(xiě)
??????????????? "???? 'table-name' = 'tablename',\n" +?
"???? 'debezium.log.mining.strategy'='online_catalog'\n"+
??????????????? "???? )";
??????? // 創(chuàng)建一張用于輸出的表
??????? String sinkDDL = "CREATE TABLE outTable (\n" +
??????????????? " id INT,\n" +
??????????????? " name STRING, \n" +
??????????????? " PRIMARY KEY (id) NOT ENFORCED\n" +
??????????????? ") WITH (\n" +
??????????????? " 'connector' = 'jdbc',\n" +
??????????????? " 'url' = 'jdbc:postgresql://PG庫(kù)_IP地址:5432/postgres?currentSchema=public',\n" +
??????????????? " 'username' = 'postgres',\n" +
??????????????? " 'password' = '123456',\n" +
??????????????? " 'table-name' = 'pg_sink'\n" +
??????????????? ")";
??????? /*String transformSQL =
??????????????? "select * from? oracle_source ";*/
??????? String transformSQL =
??????????????? "INSERT INTO outTable " +
??????????????????????? "SELECT ID,NAME " +
??????????????????????? "FROM oracle_source";
??????? //執(zhí)行source表ddl
???? ???tableEnv.executeSql(sourceDDL);
??????? //TableResult tableResult = tableEnv.executeSql("select * from oracle_source");
??????? //tableResult.print();
??????? //執(zhí)行sink表ddl
????? tableEnv.executeSql(sinkDDL);
??????? //執(zhí)行邏輯sql語(yǔ)句
??????? TableResult tableResult = tableEnv.executeSql(transformSQL);
??????? tableResult.print();
??????? env.execute();
??? }
}
返回內(nèi)容 以上代碼是修改后的應(yīng)不會(huì)有下圖報(bào)錯(cuò)
?
注:報(bào)這個(gè)錯(cuò)誤,但數(shù)據(jù)可以同步過(guò)去
錯(cuò)誤:可以讀取oracle表內(nèi)的數(shù)據(jù),但jdbc連接postgres 報(bào)錯(cuò),數(shù)據(jù)傳不過(guò)去
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-514038.html
解決:修改maven依賴(lài)
<dependency>
??? <groupId>com.ververica</groupId>
??? <artifactId>flink-connector-oracle-cdc</artifactId>
??? <version>2.2.0</version>
</dependency>
<dependency>
??? <groupId>org.postgresql</groupId>
??? <artifactId>postgresql</artifactId>
??? <version>42.3.5</version>
</dependency>
flink sql 端 創(chuàng)建oracle 接收器
create table flink_cdc_sink (
ID INT,
NAME STRING
)with(
'connector' = 'jdbc',
'url' = 'jdbc:oracle:thin:@192.168.58.202:1521:ORA19C',
'username' = 'flinkuser',
'password' = 'flinkpw',
'table-name' = 'TEST2',
'driver' = 'oracle.jdbc.driver.OracleDriver');
報(bào)錯(cuò):
文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-514038.html
jdbc 連接oracle錯(cuò)誤處理 解決方法:目前flink 1.14不支持jdbc 連接oracle 需要安裝 flink 1.15 處理 Flink 1.15 安裝 需要使用java11?
1.官網(wǎng)下載java 11
https://www.oracle.com/java/technologies/downloads/#java11
2.解壓 jdk tar 包
linux>tar -xzvf jdk-11.0.15.1_linux-x64_bin.tar.gz
3.修改環(huán)境配置文件
linux>?vim /etc/profile
# Java11環(huán)境變量配置
JAVA_HOME=/devtools/java/java11/jdk-11.0.15
PATH=$JAVA_HOME/bin:$PATH
CLASSPATH=$JAVA_HOME/lib
export JAVA_HOME CLASSPATH PATH
# Java8環(huán)境變量配置
JAVA_HOME=/devtools/java/java8/jdk1.8.0_321
PATH=$PATH:$JAVA_HOME/bin:$PATH
CLASSPATH=$JAVA_HOME/lib
export JAVA_HOME PATH CLASSPATH
4.重啟電腦生效
5.下載flink 1.15
linux>Wget https://dlcdn.apache.org/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz
6.配置 flink 1.15
linux>vim conf/flink-conf.yaml
jobmanager.rpc.address: jobIP地址
# 配置high-availability mode
high-availability: zookeeper
# JobManager的meta信息放在dfs,在zk上主要會(huì)保存一個(gè)指向dfs路徑的指針
high-availability.storageDir: hdfs://cluster/flinkha/
# 配置zookeeper quorum(hostname和端口需要依據(jù)對(duì)應(yīng)zk的實(shí)際配置)
high-availability.zookeeper.quorum: IPA:2181,IPB:2181,IPC:2181
# (可選)設(shè)置zookeeper的root目錄
#high-availability.zookeeper.path.root: /test_dir/test_standalone2_root
# 注釋以下配置
# jobmanager.bind-host: localhost
# taskmanager.bind-host: localhost
#taskmanager.host: localhost
#rest.address: localhost
#rest.bind-address: localhost
#配置yarn 高可用重試次數(shù)
yarn.application-attempts: 10
注意:必須要操作上面的“注釋以下配置” 否則Web UI 訪問(wèn)不了 其余配置一樣,可以參考最上面的搭建。
到了這里,關(guān)于Flink CDC SQL Oracle to Postgresql與jdbc連接oracle報(bào)錯(cuò)處理的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!