一、背景:
CDC數(shù)據(jù)中包含了,數(shù)據(jù)的變更過程。當(dāng)CDC寫入傳統(tǒng)數(shù)據(jù)庫最終每一個primary key下會保存一條數(shù)據(jù)。當(dāng)然可以使用特殊手段保存多分記錄但是顯然造成了數(shù)據(jù)膨脹。
另外數(shù)據(jù)湖Hudi(0.13.1)是不支持保存所有Changelog其Compaction機制會清除所有舊版本的內(nèi)容。Iceberg支持TimeTravel,能查到某個時間點的數(shù)據(jù)狀態(tài),但是不能列舉的單條記錄的Change過程。
所以目前只能手動實現(xiàn)。
其實,實現(xiàn)思路很簡單,將原PrimaryKey+Cdc的 ts_ms
一起作為新表的 PrimaryKey就可以了。但需要注意的是一條數(shù)據(jù)可能變更很多次,但一般需要保存近幾次的變更,所以就需要刪除部分舊變更記錄。ts_ms
就是CDC數(shù)據(jù)中記錄的日志實際產(chǎn)生的時間,具體參見debezium 。如果原表primarykey是聯(lián)合主鍵,即有多個字段共同組成,則最好將這些字段拼接為一個字符串,方便后續(xù)關(guān)聯(lián)。
本文思路
CDC --寫入-> Phoenix + 定期刪除舊版本記錄
CDC數(shù)據(jù)寫入略過,此處使用SQL模擬寫入。
二、Phoenix舊版記錄刪除(DEMO)
phoenix doc
bin/sqlline.py www.xx.com:2181
-- 直接創(chuàng)建phoenix表
create table TEST.TEST_VERSION(
ID VARCHAR NOT NULL,
TS TIMESTAMP NOT NULL,
NAME VARCHAR,
CONSTRAINT my_pk PRIMARY KEY (ID,TS)
) VERSIONS=5;
再去hbase shell中查看,hbase 關(guān)聯(lián)表已經(jīng)有phoenix創(chuàng)建了。
hbase(main):032:0> desc "TEST:TEST_VERSION"
Table TEST:TEST_VERSION is ENABLED
TEST:TEST_VERSION, {TABLE_ATTRIBUTES => {coprocessor$1 => '|org.apache.phoenix.coprocessor.ScanRegionObserver|805306366|', coprocessor$2 => '|org.apache.phoenix.coprocessor.UngroupedAggregateRe
gionObserver|805306366|', coprocessor$3 => '|org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver|805306366|', coprocessor$4 => '|org.apache.phoenix.coprocessor.ServerCachingEndpointImpl|80
5306366|', coprocessor$5 => '|org.apache.phoenix.hbase.index.Indexer|805306366|index.builder=org.apache.phoenix.index.PhoenixIndexBuilder,org.apache.hadoop.hbase.index.codec.class=org.apache.phoenix
.index.PhoenixIndexCodec', METADATA => {'OWNER' => 'dcetl'}}
COLUMN FAMILIES DESCRIPTION
{NAME => '0', VERSIONS => '5', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'FAST_DIFF', T
TL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'NONE', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPE
N => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}
-- 在phoenix中向表插入數(shù)據(jù)
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 10:00:00'),'zhangsan');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 11:00:00'),'lisi');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 12:00:00'),'wangwu');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 13:00:00'),'zhaoliu');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 14:00:00'),'liuqi');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 15:00:00'),'sunba');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 07:00:00'),'sunyang');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 08:00:00'),'chaoyang');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 09:00:00'),'xuri');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 09:30:00'),'chenxi');
-- OK再查詢一下數(shù)據(jù)插入情況
SELECT * FROM TEST.TEST_VERSION;
以下假設(shè)每個PrimaryKey需要保留最新的3版本數(shù)據(jù)。所以紅色框內(nèi)是需要刪除的數(shù)據(jù)。
現(xiàn)在需要使用row_number的函數(shù)給每個primarykey的不通version數(shù)據(jù)標(biāo)識。但是phoenix并沒有開窗函數(shù)。只有agg聚合函數(shù)。
phoenix對SQL的限制還是比較多的如:
(1)join 非等值連接不支持,如on a.id>s.id
是不支持的,也不支持?jǐn)?shù)組比較連接,如on a.id = ARRAY[1,2,3]
。 會報錯:Error: Does not support non-standard or non-equi correlated-subquery conditions. (state=,code=0)
(2)where exists 格式的非等值連接不支持。select ... from A where exists (select 1 from B where A.id>B.id)
是不支持的。會報錯:Error: Does not support non-standard or non-equi correlated-subquery conditions. (state=,code=0)
(2)沒有開窗window函數(shù)
(3)DELETE FROM不支持JOIN
最終發(fā)下有一下函數(shù)可用
(1)NTH_VALUE
獲取分組排序的第N個值。 返回原值的類型。
(2)FIRST_VALUES
和 LAST_VALUES
獲取分區(qū)排序后的前、后的N個值,返回ARRAY類型。
此三個函數(shù)官網(wǎng)doc中,案例是這樣的 FIRST_VALUES( name, 3 ) WITHIN GROUP (ORDER BY salary DESC)
是全局分組,而實際使用中是需要搭配 GROUP BY
使用的。
所以可以獲取到
-- 方案一:使用NTH_VALUE獲取閾值
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A
INNER JOIN (
SELECT ID,NTH_VALUE(TS,3) WITHIN GROUP (ORDER BY TS DESC) THRES FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < Z.THRES
-- 方案二:使用FIRST_VALUES獲取到一個ARRAY
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A
INNER JOIN (
SELECT ID,FIRST_VALUES(TS,3) WITHIN GROUP (ORDER BY TS DESC) TSS FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < ALL(Z.TSS);
由于phoenix支持行子查詢,以下是官方案例。這樣就能繞過不使用DELETE … JOIN了。
Row subqueries
A subquery can return multiple fields in one row, which is considered returning a row constructor. The row constructor on both sides of the operator (IN/NOT IN, EXISTS/NOT EXISTS or comparison operator) must contain the same number of values, like in the below example:
SELECT column1, column2
FROM t1
WHERE (column1, column2) IN
(SELECT column3, column4
FROM t2
WHERE column5 = ‘nowhere’);
This query returns all pairs of (column1, column2) that can match any pair of (column3, column4) in the second table after being filtered by condition: column5 = ‘nowhere’.
最終實現(xiàn)刪除 除N個較新的以外的所有舊版本數(shù)據(jù), SQL如下:
-- NTH_VALUE方式
DELETE FROM TEST.TEST_VERSION
WHERE (ID,TS) IN (
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A
INNER JOIN (
SELECT ID,NTH_VALUE(TS,3) WITHIN GROUP (ORDER BY TS DESC) THRES FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < Z.THRES
);
-- FIRST_VALUES方式
DELETE FROM TEST.TEST_VERSION
WHERE (ID,TS) IN (
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A
INNER JOIN (
SELECT ID,FIRST_VALUES(TS,3) WITHIN GROUP (ORDER BY TS DESC) TSS FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < ALL(Z.TSS)
);
刪除后效果:
三、探索
3.1 Phoenix的Row Timestamp 探索
Phoenix的Row Timestamp
是為了在meta中更快檢索數(shù)據(jù)而設(shè)置的。不能實現(xiàn)hbase 中的versions 數(shù)據(jù)在phoenix中展現(xiàn)。
如下測試案例:
phoenix建表,并插入數(shù)據(jù):
create table TEST.TEST_ROW_TIMESTAMP(
ID VARCHAR NOT NULL,
TS TIMESTAMP NOT NULL,
NAME VARCHAR,
CONSTRAINT my_pk PRIMARY KEY (ID,TS ROW_TIMESTAMP)
) VERSIONS=5;
UPSERT INTO TEST.TEST_ROW_TIMESTAMP(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 09:30:00'),'windows');
UPSERT INTO TEST.TEST_ROW_TIMESTAMP(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 10:30:00'),'mac');
UPSERT INTO TEST.TEST_ROW_TIMESTAMP(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 11:30:00'),'linux');
在hbase中查詢表:
hbase(main):050:0> desc 'TEST:TEST_ROW_TIMESTAMP'
Table TEST:TEST_ROW_TIMESTAMP is ENABLED
TEST:TEST_ROW_TIMESTAMP, {TABLE_ATTRIBUTES => {coprocessor$1 => '|org.apache.phoenix.coprocessor.ScanRegionObserver|805306366|', coprocessor$2 => '|org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver|805306366|', coprocessor$3
=> '|org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver|805306366|', coprocessor$4 => '|org.apache.phoenix.coprocessor.ServerCachingEndpointImpl|805306366|', coprocessor$5 => '|org.apache.phoenix.hbase.index.Indexer|805306366|index.b
uilder=org.apache.phoenix.index.PhoenixIndexBuilder,org.apache.hadoop.hbase.index.codec.class=org.apache.phoenix.index.PhoenixIndexCodec', METADATA => {'OWNER' => 'dcetl'}}
COLUMN FAMILIES DESCRIPTION
{NAME => '0', VERSIONS => '5', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'FAST_DIFF', TTL => 'FOREVER', MIN_VERSIONS => '0', REPLICAT
ION_SCOPE => '0', BLOOMFILTER => 'NONE', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}
1 row(s)
Took 0.0235 seconds
hbase(main):049:0> scan 'TEST:TEST_ROW_TIMESTAMP'
ROW COLUMN+CELL
rk001\x00\x80\x00\x01o`p\xC1\xC0\x00\x00\x00\x00 column=0:\x00\x00\x00\x00, timestamp=1577871000000, value=x
rk001\x00\x80\x00\x01o`p\xC1\xC0\x00\x00\x00\x00 column=0:\x80\x0B, timestamp=1577871000000, value=windows
rk001\x00\x80\x00\x01o`\xA7\xB0@\x00\x00\x00\x00 column=0:\x00\x00\x00\x00, timestamp=1577874600000, value=x
rk001\x00\x80\x00\x01o`\xA7\xB0@\x00\x00\x00\x00 column=0:\x80\x0B, timestamp=1577874600000, value=mac
rk001\x00\x80\x00\x01o`\xDE\x9E\xC0\x00\x00\x00\x00 column=0:\x00\x00\x00\x00, timestamp=1577878200000, value=x
rk001\x00\x80\x00\x01o`\xDE\x9E\xC0\x00\x00\x00\x00 column=0:\x80\x0B, timestamp=1577878200000, value=linux
3 row(s)
Took 0.0072 seconds
如上查詢結(jié)果,我們希望在hbase中只有一行數(shù)據(jù),并保存為對多個版本,但實際查詢到了多條數(shù)據(jù),timestamp做為hbase表的rowkey的一部分了。phoenix在創(chuàng)建表時候沒有使用hbase多版本保存機制。
3.2 phoenix 和 hbase表結(jié)構(gòu)不一致
先創(chuàng)建hbase Table
create 'TEST:TEST_DIF_TS',{NAME => 'COLS', VERSIONS => 3}
put 'TEST:TEST_DIF_TS','001', 'COLS:NAME','zhangsan'
put 'TEST:TEST_DIF_TS','001', 'COLS:TS', 1695189085000
put 'TEST:TEST_DIF_TS','001', 'COLS:NAME','lisi'
put 'TEST:TEST_DIF_TS','001', 'COLS:TS', 1695189090000
put 'TEST:TEST_DIF_TS','001', 'COLS:NAME','wangwu'
put 'TEST:TEST_DIF_TS','001', 'COLS:TS', 1695189095000
put 'TEST:TEST_DIF_TS','001', 'COLS:NAME','zhaoliu'
put 'TEST:TEST_DIF_TS','001', 'COLS:TS', 1695189105000
get 'TEST:TEST_DIF_TS','001',{COLUMN=>'COLS:NAME',VERSIONS=>3}
# 結(jié)果:
COLUMN CELL
COLS:NAME timestamp=1695784642879, value=zhaoliu
COLS:NAME timestamp=1695784642857, value=wangwu
COLS:NAME timestamp=1695784642830, value=lisi
創(chuàng)建Phoenix Table
create table TEST.TEST_DIF_TS(
ID VARCHAR NOT NULL,
TS TIMESTAMP NOT NULL,
NAME VARCHAR,
CONSTRAINT my_pk PRIMARY KEY (ID,TS)
);
UPSERT INTO TEST.TEST_DIF_TS(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 11:30:00'),'XXX');
0: jdbc:phoenix:...> select * from TEST.TEST_DIF_TS;
+--------+--------------------------+-------+
| ID | TS | NAME |
+--------+--------------------------+-------+
| rk001 | 2020-01-01 11:30:00.000 | XXX |
+--------+--------------------------+-------+
再翻查hbase Table數(shù)據(jù)
hbase(main):004:0> scan 'TEST:TEST_DIF_TS'
ROW COLUMN+CELL
001 column=COLS:NAME, timestamp=1695784642879, value=zhaoliu
001 column=COLS:TS, timestamp=1695784643741, value=1695189105000
rk001\x00\x80\x00\x01o`\xDE\x9E\xC0\x00\x00\x00\x column=0:\x00\x00\x00\x00, timestamp=1695786568345, value=x
00
rk001\x00\x80\x00\x01o`\xDE\x9E\xC0\x00\x00\x00\x column=0:\x80\x0B, timestamp=1695786568345, value=XXX
可以看到Phoenix只能查詢到自己插入的數(shù)據(jù),但是hbase可以查詢到phoenix,所以phoenix會把不符合自己表結(jié)構(gòu)的數(shù)據(jù)過濾掉。phoenix的會將自己所有的primary key字段拼接后作為hbase 的rowkey存入hbase。文章來源:http://www.zghlxwxcb.cn/news/detail-730872.html
參考文章:
Phoenix實踐 —— Phoenix SQL常用基本語法總結(jié)小記
Phoenix 對 Hbase 中表的映射
phoenix使用詳解
Phoenix 簡介及使用方式
phoenix創(chuàng)建映射表和創(chuàng)建索引、刪除索引、重建索引文章來源地址http://www.zghlxwxcb.cn/news/detail-730872.html
到了這里,關(guān)于【Phoenix】phoenix實現(xiàn)每個Primarykey主鍵保留N版本數(shù)據(jù),CDC數(shù)據(jù)記錄為Changelog格式的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!