一、INSERT INTO
CREATE TABLE `stu` (id int,name string, age int)
PARTITIONED BY (age)
insert into stu values(3,'殺sheng',16),(4,'鳴人',19)
二、INSERT OVERWRITE
僅支持Flink的Batch模式
SET execution.runtime-mode = batch;
INSERT OVERWRITE sample VALUES (1,'a');
INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 6;
三、UPSERT
當(dāng)將數(shù)據(jù)寫入v2表格時(shí),Iceberg支持基于主鍵的UPSERT。有兩種方法可以啟用upsert。
建表時(shí)指定
CREATE TABLE `hive_catalog`.`test`.`sample5`(
`id` INT UNIQUE COMMENT 'unique id',
`data` STRING NOT NULL,
PRIMARY KEY(`id`) NOT ENFORCED
) with (
'format-version'='2',
'write.upsert.enabled'='true'
);
UPSERT模式下,如果對表進(jìn)行分區(qū),則分區(qū)字段必須是主鍵。
insert into sample5 values(1,'a');
insert into sample5 values(2,'b');
SET sql-client.execution.result-mode=tableau;
select * from sample5;
insert into sample5 values(2,'c');
四、查詢Batch模式
Batch模式:
SET execution.runtime-mode = batch;
select * from sample;
五、查詢Streaming模式
Streaming模式:
SET execution.runtime-mode = streaming;
SET table.dynamic-table-options.enabled=true;
SET sql-client.execution.result-mode=tableau;
從當(dāng)前快照讀取所有記錄,然后從該快照讀取增量數(shù)據(jù)
SELECT * FROM sample /*+ OPTIONS('streaming'='true','monitor-interval'='1s')*/;
讀取指定快照id(不包含)后的增量數(shù)據(jù)
SELECT * FROM sample /*+ OPTIONS('streaming'='true','monitor-interval'='1s','start-snapshot-id'='384023852058202')*/;
六、讀取Kafka流插入到iceberg表中
下載flink-connector-kafka:
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/1.17.1
創(chuàng)建iceberg表:
CREATE TABLE `hive_catalog`.`test`.`sample5`(
`id` INT UNIQUE COMMENT 'unique id',
`data` STRING NOT NULL,
PRIMARY KEY(`id`) NOT ENFORCED
)
創(chuàng)建kafka topic對應(yīng)的表:
create table default_catalog.default_database.kafka(
id int,
data string
) with(
'connector' = 'kafka',
'topic' = 'testKafkaTopic',
'properties.zookeeper.connect'='hadoop1:2101',
'properties.bootstrap.servers' = 'hadoop1:9092',
'format' = 'json',
'properties.group.id'='iceberg',
'scan.startup.mode'='earliest-offset'
);
流式讀?。?/p>
SET sql-client.execution.result-mode=tableau;
SET execution.runtime-mode = streaming;
插入數(shù)據(jù)
insert into hive_catalog.test1.sample5 select * from default_catalog.default_database.kafka;
查詢數(shù)據(jù)文章來源:http://www.zghlxwxcb.cn/news/detail-519844.html
SELECT * FROM sample5 /*+ OPTIONS('streaming'='true','monitor-interval'='1s')*/;
topic有最新數(shù)據(jù)時(shí)候,就能源源不斷查詢到最新數(shù)據(jù)。文章來源地址http://www.zghlxwxcb.cn/news/detail-519844.html
到了這里,關(guān)于Iceberg從入門到精通系列之十:flink sql往Iceberg表插入數(shù)據(jù),Batch模式和Streaming模式查詢數(shù)據(jù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!