一、深入理解Flink Oracle CDC Connector
- Flink CDC系列之:Oracle CDC Connector
二、創(chuàng)建docker-compose.yml文件
version: '2.1'
services:
oracle:
image: yuxialuo/oracle-xe-11g-r2-cdc-demo:v1.0
ports:
- "1521:1521"
elasticsearch:
image: elastic/elasticsearch:7.6.0
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.type=single-node
ports:
- "9200:9200"
- "9300:9300"
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
kibana:
image: elastic/kibana:7.6.0
ports:
- "5601:5601"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
該 Docker Compose 中包含的容器有:
- Oracle: Oracle 11g, 已經(jīng)預(yù)先創(chuàng)建了 products 和 orders表,并插入了一些數(shù)據(jù)
- Elasticsearch: orders 表將和 products 表進(jìn)行join,join的結(jié)果寫入Elasticsearch中
- Kibana: 可視化 Elasticsearch 中的數(shù)據(jù)
三、啟動(dòng)容器
在 docker-compose.yml 所在目錄下運(yùn)行如下命令以啟動(dòng)所有容器:
docker-compose up -d
該命令會(huì)以 detached 模式自動(dòng)啟動(dòng) Docker Compose 配置中定義的所有容器。 你可以通過 docker ps 來觀察上述的容器是否正常啟動(dòng)了。 也可以訪問 http://localhost:5601/ 來查看 Kibana 是否運(yùn)行正常。 另外可以通過如下命令停止所有的容器:
docker-compose down
四、下載Flink Oracle CDC的jar包
下載以下 jar 包到 <FLINK_HOME>/lib/:
- flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
- flink-sql-connector-oracle-cdc-2.4.1.jar
五、啟動(dòng) Flink 集群,再啟動(dòng) SQL CLI
-- Flink SQL
-- checkpoint every 3000 milliseconds
Flink SQL> SET execution.checkpointing.interval = 3s;
Flink SQL> CREATE TABLE products (
ID INT,
NAME STRING,
DESCRIPTION STRING,
PRIMARY KEY (ID) NOT ENFORCED
) WITH (
'connector' = 'oracle-cdc',
'hostname' = 'localhost',
'port' = '1521',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'XE',
'schema-name' = 'flinkuser',
'table-name' = 'products'
);
Flink SQL> CREATE TABLE orders (
ORDER_ID INT,
ORDER_DATE TIMESTAMP_LTZ(3),
CUSTOMER_NAME STRING,
PRICE DECIMAL(10, 5),
PRODUCT_ID INT,
ORDER_STATUS BOOLEAN
) WITH (
'connector' = 'oracle-cdc',
'hostname' = 'localhost',
'port' = '1521',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'XE',
'schema-name' = 'flinkuser',
'table-name' = 'orders'
);
創(chuàng)建elasticsearch
Flink SQL> CREATE TABLE enriched_orders (
ORDER_ID INT,
ORDER_DATE TIMESTAMP_LTZ(3),
CUSTOMER_NAME STRING,
PRICE DECIMAL(10, 5),
PRODUCT_ID INT,
ORDER_STATUS BOOLEAN,
PRODUCT_NAME STRING,
PRODUCT_DESCRIPTION STRING,
PRIMARY KEY (ORDER_ID) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'enriched_orders_1'
關(guān)聯(lián)處理后,插入數(shù)據(jù)
Flink SQL> INSERT INTO enriched_orders
SELECT o.*, p.NAME, p.DESCRIPTION
FROM orders AS o
LEFT JOIN products AS p ON o.PRODUCT_ID = p.ID;
六、檢查 ElasticSearch 中的結(jié)果
檢查最終的結(jié)果是否寫入ElasticSearch中, 可以在Kibana看到ElasticSearch中的數(shù)據(jù)
七、在 Oracle 制造一些變更,觀察 ElasticSearch 中的結(jié)果
進(jìn)入Oracle容器中并通過如下的SQL語句對(duì)Oracle數(shù)據(jù)庫進(jìn)行一些修改, 然后就可以看到每執(zhí)行一條SQL語句,Elasticsearch中的數(shù)據(jù)都會(huì)實(shí)時(shí)更新。文章來源:http://www.zghlxwxcb.cn/news/detail-658774.html
docker-compose exec sqlplus flinkuser/flinkpw
插入更新數(shù)據(jù)文章來源地址http://www.zghlxwxcb.cn/news/detail-658774.html
INSERT INTO flinkuser.orders VALUES (10004, to_date('2020-07-30 15:22:00', 'yyyy-mm-dd hh24:mi:ss'), 'Jark', 29.71, 104, 0);
UPDATE flinkuser.orders SET ORDER_STATUS = 1 WHERE ORDER_ID = 10004;
DELETE FROM flinkuser.orders WHERE ORDER_ID = 10004;
到了這里,關(guān)于Flink CDC系列之:Oracle CDC 導(dǎo)入 Elasticsearch的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!