国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink CDC系列之:Oracle CDC 導(dǎo)入 Elasticsearch

這篇具有很好參考價(jià)值的文章主要介紹了Flink CDC系列之:Oracle CDC 導(dǎo)入 Elasticsearch。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

一、深入理解Flink Oracle CDC Connector

  • Flink CDC系列之:Oracle CDC Connector

二、創(chuàng)建docker-compose.yml文件

version: '2.1'
services:
  oracle:
    image: yuxialuo/oracle-xe-11g-r2-cdc-demo:v1.0
    ports:
      - "1521:1521"
  elasticsearch:
    image: elastic/elasticsearch:7.6.0
    environment:
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - discovery.type=single-node
    ports:
      - "9200:9200"
      - "9300:9300"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
  kibana:
    image: elastic/kibana:7.6.0
    ports:
      - "5601:5601"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

該 Docker Compose 中包含的容器有:

  • Oracle: Oracle 11g, 已經(jīng)預(yù)先創(chuàng)建了 products 和 orders表,并插入了一些數(shù)據(jù)
  • Elasticsearch: orders 表將和 products 表進(jìn)行join,join的結(jié)果寫入Elasticsearch中
  • Kibana: 可視化 Elasticsearch 中的數(shù)據(jù)

三、啟動(dòng)容器

在 docker-compose.yml 所在目錄下運(yùn)行如下命令以啟動(dòng)所有容器:

docker-compose up -d

該命令會(huì)以 detached 模式自動(dòng)啟動(dòng) Docker Compose 配置中定義的所有容器。 你可以通過 docker ps 來觀察上述的容器是否正常啟動(dòng)了。 也可以訪問 http://localhost:5601/ 來查看 Kibana 是否運(yùn)行正常。 另外可以通過如下命令停止所有的容器:

docker-compose down

四、下載Flink Oracle CDC的jar包

下載以下 jar 包到 <FLINK_HOME>/lib/:

  • flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
  • flink-sql-connector-oracle-cdc-2.4.1.jar

五、啟動(dòng) Flink 集群,再啟動(dòng) SQL CLI

-- Flink SQL
-- checkpoint every 3000 milliseconds                       
Flink SQL> SET execution.checkpointing.interval = 3s;

Flink SQL> CREATE TABLE products (
    ID INT,
    NAME STRING,
    DESCRIPTION STRING,
    PRIMARY KEY (ID) NOT ENFORCED
  ) WITH (
    'connector' = 'oracle-cdc',
    'hostname' = 'localhost',
    'port' = '1521',
    'username' = 'flinkuser',
    'password' = 'flinkpw',
    'database-name' = 'XE',
    'schema-name' = 'flinkuser',  
    'table-name' = 'products'
  );

Flink SQL> CREATE TABLE orders (
   ORDER_ID INT,
   ORDER_DATE TIMESTAMP_LTZ(3),
   CUSTOMER_NAME STRING,
   PRICE DECIMAL(10, 5),
   PRODUCT_ID INT,
   ORDER_STATUS BOOLEAN
 ) WITH (
   'connector' = 'oracle-cdc',
   'hostname' = 'localhost',
   'port' = '1521',
   'username' = 'flinkuser',
   'password' = 'flinkpw',
   'database-name' = 'XE',
   'schema-name' = 'flinkuser',  
   'table-name' = 'orders'
 );

創(chuàng)建elasticsearch

Flink SQL> CREATE TABLE enriched_orders (
   ORDER_ID INT,
   ORDER_DATE TIMESTAMP_LTZ(3),
   CUSTOMER_NAME STRING,
   PRICE DECIMAL(10, 5),
   PRODUCT_ID INT,
   ORDER_STATUS BOOLEAN,
   PRODUCT_NAME STRING,
   PRODUCT_DESCRIPTION STRING,
   PRIMARY KEY (ORDER_ID) NOT ENFORCED
 ) WITH (
     'connector' = 'elasticsearch-7',
     'hosts' = 'http://localhost:9200',
     'index' = 'enriched_orders_1'

關(guān)聯(lián)處理后,插入數(shù)據(jù)

Flink SQL> INSERT INTO enriched_orders
 SELECT o.*, p.NAME, p.DESCRIPTION
 FROM orders AS o
 LEFT JOIN products AS p ON o.PRODUCT_ID = p.ID;

六、檢查 ElasticSearch 中的結(jié)果

檢查最終的結(jié)果是否寫入ElasticSearch中, 可以在Kibana看到ElasticSearch中的數(shù)據(jù)

七、在 Oracle 制造一些變更,觀察 ElasticSearch 中的結(jié)果

進(jìn)入Oracle容器中并通過如下的SQL語句對(duì)Oracle數(shù)據(jù)庫進(jìn)行一些修改, 然后就可以看到每執(zhí)行一條SQL語句,Elasticsearch中的數(shù)據(jù)都會(huì)實(shí)時(shí)更新。

docker-compose exec sqlplus flinkuser/flinkpw

插入更新數(shù)據(jù)文章來源地址http://www.zghlxwxcb.cn/news/detail-658774.html

INSERT INTO flinkuser.orders VALUES (10004, to_date('2020-07-30 15:22:00', 'yyyy-mm-dd hh24:mi:ss'), 'Jark', 29.71, 104, 0);

UPDATE flinkuser.orders SET ORDER_STATUS = 1 WHERE ORDER_ID = 10004;

DELETE FROM flinkuser.orders WHERE ORDER_ID = 10004;

到了這里,關(guān)于Flink CDC系列之:Oracle CDC 導(dǎo)入 Elasticsearch的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • flink oracle cdc實(shí)時(shí)同步(超詳細(xì))

    flink oracle cdc實(shí)時(shí)同步(超詳細(xì))

    官方文檔:https://github.com/ververica/flink-cdc-connectors/blob/release-master/docs/content/connectors/oracle-cdc.md 本文參照官方文檔來記錄Oracle CDC 的配置。 在本文開始前,需要先安裝Oracle,有興趣的同學(xué)可以參考博主之前寫的《docker下安裝oracle11g(一次安裝成功)》。 如果要做oracle的實(shí)時(shí)同步

    2024年02月12日
    瀏覽(21)
  • Flink Oracle CDC Connector源碼解讀

    Flink Oracle CDC Connector源碼解讀

    flink cdc是在flink的基礎(chǔ)上對(duì)oracle的數(shù)據(jù)進(jìn)行實(shí)時(shí)采集,底層使用的是debezium框架來實(shí)現(xiàn),debezium使用oracle自帶的logminer技術(shù)來實(shí)現(xiàn)。logminer的采集需要對(duì)數(shù)據(jù)庫和采集表添加補(bǔ)充日志,由于oracle18c不支持對(duì)數(shù)據(jù)添加補(bǔ)充日志,所以目前支持的oracle11、12、19三個(gè)版本。 flink oracle

    2024年02月02日
    瀏覽(23)
  • Flink CDC 實(shí)時(shí)抽取 Oracle 數(shù)據(jù)-排錯(cuò)&調(diào)優(yōu)

    Flink CDC 實(shí)時(shí)抽取 Oracle 數(shù)據(jù)-排錯(cuò)&調(diào)優(yōu)

    Flink CDC 于 2021 年 11 月 15 日發(fā)布了最新版本 2.1,該版本通過引入內(nèi)置 Debezium 組件,增加了對(duì) Oracle 的支持。對(duì)該版本進(jìn)行試用并成功實(shí)現(xiàn)了對(duì) Oracle 的實(shí)時(shí)數(shù)據(jù)捕獲以及性能調(diào)優(yōu),現(xiàn)將試用過程中的一些關(guān)鍵細(xì)節(jié)進(jìn)行分享。 Oracle:11.2.0.4.0(RAC 部署) Flink:1.13.1 Hadoop:3.2.1

    2024年01月16日
    瀏覽(34)
  • Flink CDC 基于Oracle log archiving 實(shí)時(shí)同步Oracle表到Mysql

    Flink CDC 基于Oracle log archiving 實(shí)時(shí)同步Oracle表到Mysql

    環(huán)境說明: flink 1.15.2 Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production mysql 版本:5.7 windows11 IDEA 本地運(yùn)行 先上官網(wǎng)使用說明和案例:Oracle CDC Connector — Flink CDC documentation 1. Oracle 開啟 log archiving (1).啟用 log archiving ?? ??? ?a:以DBA用戶連接數(shù)據(jù)庫? ??

    2024年02月11日
    瀏覽(44)
  • Flink CDC 基于Oracle log archiving 實(shí)時(shí)同步Oracle表到Mysql(無主鍵)

    環(huán)境說明: flink 1.15.2 Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production mysql 版本:5.7 windows11 IDEA 本地運(yùn)行 具體環(huán)境設(shè)置和maven依賴請(qǐng)看上篇:Flink CDC 基于Oracle log archiving 實(shí)時(shí)同步Oracle表到Mysql_彩虹豆的博客-CSDN博客 現(xiàn)在操作的是源表和目標(biāo)表都無主鍵數(shù)

    2024年02月15日
    瀏覽(30)
  • Flink CDC SQL Oracle to Postgresql與jdbc連接oracle報(bào)錯(cuò)處理

    Flink CDC SQL Oracle to Postgresql與jdbc連接oracle報(bào)錯(cuò)處理

    flink-cdc官網(wǎng):Oracle CDC Connector — CDC Connectors for Apache Flink? documentation Flink環(huán)境依賴: (3)啟用日志歸檔 (4)檢查是否啟用了日志歸檔 (5)創(chuàng)建具有權(quán)限的 Oracle 用戶 (5.1)。創(chuàng)建表空間 (5.2)。創(chuàng)建用戶并授予權(quán)限 Flink SQL 客戶端連接器測試: 創(chuàng)建 Oracle 鏈接器 返回內(nèi)容 以上代

    2024年02月11日
    瀏覽(23)
  • Flink CDC Oracle 用戶權(quán)限不足 ORA-01031: insufficient privileges

    Flink CDC Oracle 用戶權(quán)限不足 ORA-01031: insufficient privileges

    Flink CDC Oracle用戶權(quán)限不足 版本:flink1.14.5 、flinkcdc 2.2.1、oracle11g、 場景:flink cdc 實(shí)時(shí)抽取oracle的數(shù)據(jù)表。DBA為了數(shù)據(jù)庫安全考慮,對(duì)訪問用戶權(quán)限進(jìn)行控制。將oracle的flinkuser用戶XE下的orders表授權(quán)只讀權(quán)限給readuser用戶。授權(quán)情況如下: 此時(shí)執(zhí)行flink oracle cdc 任務(wù): taskmange

    2024年02月12日
    瀏覽(25)
  • flink cdc同步Oracle數(shù)據(jù)庫資料到Doris問題集錦

    java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/ThreadFactoryBuilder at com.ververica.cdc.debezium.DebeziumSourceFunction.open(DebeziumSourceFunction.java:218) ~[flink-connector-debezium-2.2.0.jar:2.2.0] at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-co

    2024年02月16日
    瀏覽(22)
  • 【現(xiàn)場問題】flink-cdc,Oracle2Mysql的坑,Oracle區(qū)分大小寫導(dǎo)致

    【現(xiàn)場問題】flink-cdc,Oracle2Mysql的坑,Oracle區(qū)分大小寫導(dǎo)致

    Column ‘id’ is NOT NULL, however, a null value is being written into it. You can set job configuration ‘table.exec.sink.not-null-enforcer’=‘DROP’ to suppress this exception and drop such records silently 大致意思就是不能插入為空的數(shù)值。 為什么會(huì)報(bào)這個(gè)錯(cuò)誤,我們來看DML的執(zhí)行語句: insert into t_wx_target select

    2024年02月12日
    瀏覽(25)
  • Flink系列之:Flink CDC深入了解MySQL CDC連接器

    Flink系列之:Flink CDC深入了解MySQL CDC連接器

    增量快照讀取是一種讀取表快照的新機(jī)制。與舊的快照機(jī)制相比,增量快照具有許多優(yōu)點(diǎn),包括: (1)在快照讀取期間,Source 支持并發(fā)讀取 (2)在快照讀取期間,Source 支持進(jìn)行 chunk 粒度的 checkpoint (3)在快照讀取之前,Source 不需要數(shù)據(jù)庫鎖權(quán)限。 如果希望 source 并行運(yùn)

    2024年02月02日
    瀏覽(30)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包