概述
RisingWave是一款分布式SQL流處理數(shù)據(jù)庫,旨在幫助用戶降低實時應(yīng)用的的開發(fā)成本。作為專為云上分布式流處理而設(shè)計的系統(tǒng),RisingWave為用戶提供了與PostgreSQL類似的使用體驗,官方宣稱具備比Flink高出10倍的性能(指throughput)以及更低的成本。RisingWave開發(fā)只需要關(guān)注SQL開發(fā),而不需要像Flink那樣去關(guān)注
- RisingWave與Flink不同的是,RisingWave既可以做流處理也可以存儲;而Flink只是流處理框架,而不能存儲數(shù)據(jù),計算后的數(shù)據(jù)需要存儲到外部系統(tǒng)中。官方宣稱可以完全替代FlinkSQL。
- RisingWave與批數(shù)據(jù)庫不同的是,RisingWave可以做流處理,按預(yù)定義邏輯實時處理數(shù)據(jù),官網(wǎng)宣稱可以做到流批一體,批數(shù)據(jù)庫只能處理批數(shù)據(jù)。
使用場景
RisingWave 的強項是流處理,底層存儲為行存,更加適合對已存儲的數(shù)據(jù)高并發(fā)點查,而并非全表掃描。RisingWave 的主要使用場景包括了監(jiān)控、報警、實時動態(tài)報表、流式 ETL、機器學(xué)習(xí)特征工程等。其已經(jīng)運用到金融交易、制造業(yè)、新媒體、物流等領(lǐng)域。
但是,RisingWave 不適合做分析型隨機查詢。為支持分析型隨機查詢,用戶還需將數(shù)據(jù)導(dǎo)入到實時分析數(shù)據(jù)庫中進(jìn)行操作。不少用戶將 RisingWave 與 ClickHouse、Apache Doris 等實時分析數(shù)據(jù)庫組合使用:他們使用 RisingWave 做流計算,同時使用實時分析數(shù)據(jù)庫進(jìn)行分析型隨機查詢。RisingWave 已經(jīng)支持到sink ClickHouse、Apache Doris等OLTP中,具體可以參考RisingWave Sink
注意:
RisingWave 不支持讀寫事務(wù)處理,但其支持只讀事務(wù)。在生產(chǎn)中,使用 RisingWave 的最佳實踐是將 RisingWave 放在事務(wù)型數(shù)據(jù)庫的下游。RisingWave 通過 CDC 從事務(wù)型數(shù)據(jù)庫中讀取已經(jīng)被序列化過的數(shù)據(jù)。
RisingWave 應(yīng)用
部署
RisingWave 單機試玩模式
docker run -itd \
-p 4566:4566 \
-p 5691:5691 \
--privileged \
--name=risingwave \
risingwavelabs/risingwave:latest playground
RisingWave 單機 Docker Compose 部署模式(測試推薦這種模式部署,以下測試基于此種模式)
clone the risingwave repository.
git clone https://github.com/risingwavelabs/risingwave.git
進(jìn)入docker目錄
cd docker
啟動RisingWave集群
#使用MinIO存儲狀態(tài)后端,standalone模式啟動
export RW_IMAGE=risingwavelabs/risingwave:latest
export ENABLE_TELEMETRY=true
docker compose up -d
安裝postgresql客戶端
由于RisingWave兼容postgresql協(xié)議,所以通過postgresql客戶端可以直接操作RisingWave
安裝postgresql客戶端
yum install -y postgresql
使用 psql 連接
psql -h localhost -p 4566 -d dev -U root
啟動mysql并開啟binlog
- 啟動mysql
# 查看詳細(xì)默認(rèn)配置
docker run -it --rm mysql:5.7 --verbose --help
#啟動mysql server
docker run -d \
--name mysql5.7 \
--restart=always \
-p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=123456 \
-v /data/mysql5.7/data:/var/lib/mysql \#數(shù)據(jù)文件
-v /data/mysql5.7/conf:/etc/mysql/conf.d \#配置文件
-v /data/mysql5.7/log:/var/log \#日志文件
mysql:5.7 \
--character-set-server=utf8mb4 \
--collation-server=utf8mb4_unicode_ci \
--log-bin=/var/lib/mysql/mysql-bin \#開啟binlog配置
--server-id=2 #開啟binlog配置
- 鏈接mysql
docker exec -it mysql5.7 mysql -h127.0.0.1 -P3306 -p’123456’
- 驗證是否開啟 binlog
show variables like ‘%log_bin%’;
- 授權(quán)
--授權(quán)RisingWave作為slave訪問mysql binlog
grant RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT on *.* to 'root'@'%' IDENTIFIED BY '123456';
--grant ALL PRIVILEGES on db01.* to 'root'@'%' IDENTIFIED BY '123456';
flush privileges;
--取消授權(quán),如有需要
REVOKE GRANT OPTION on *.* FROM 'root'@'%';
REVOKE ALL PRIVILEGES on *.* FROM 'root'@'%';
REVOKE ALL PRIVILEGES on db01.* FROM 'root'@'%';
flush privileges;
--查看授權(quán)
show grants for root@'%';
部署kafka
- 啟動kafka
# step-1
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper:latest
# step-2
# 啟動Kafka,將以下的倆個192.168.1.100換為本身的IP地址bash
docker run -d \
--name kafka \
--restart=always \
-p 8092:8092 \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.1.100:2181/kafka \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.100:8092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:8092 \
-t wurstmeister/kafka
- 與kafka交互
#list
docker run -it --rm wurstmeister/kafka kafka-topics.sh --bootstrap-server 192.168.1.100:8092 --list
#create topic
docker run -it --rm wurstmeister/kafka kafka-topics.sh --bootstrap-server 192.168.1.100:8092 --create --replication-factor 1 --partitions 1 --topic test2
#producer
docker run -it --rm wurstmeister/kafka kafka-console-producer.sh --bootstrap-server 192.168.1.100:8092 --topic test1
#consumer
docker run -it --rm wurstmeister/kafka kafka-console-consumer.sh --bootstrap-server 192.168.1.100:8092 --topic test1
- 或通過kcat與kafka交互
docker pull edenhill/kcat:1.7.1
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t test_sink_topic -C -J
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t test_sink_topic -C
RisingWave 使用demo
- 數(shù)據(jù)導(dǎo)出sink demo
-- create table
CREATE TABLE t1 (v1 int, v2 int)
WITH (
connector = 'datagen',
fields.v1.kind = 'sequence',
fields.v1.start = '1',
fields.v2.kind = 'random',
fields.v2.min = '-10',
fields.v2.max = '10',
fields.v2.seed = '1',
datagen.rows.per.second = '10'
) ROW FORMAT JSON;
-- create sink
CREATE SINK test_sink_1
FROM t1
WITH (
properties.bootstrap.server = '192.168.1.100:8092',
topic = 'test_sink_topic',
connector = 'kafka',
primary_key = 'v1'
)
FORMAT UPSERT ENCODE JSON;
查看kafka sink 結(jié)果
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t test_sink_topic -C -J
- 連接器 source
--source 連接器
CREATE SOURCE IF NOT EXISTS source_1 (
v1 integer,
v2 integer,
)
WITH (
connector='kafka',
topic='test_sink_topic',
properties.bootstrap.server='192.168.1.100:8092',
scan.startup.mode='latest',
) FORMAT PLAIN ENCODE JSON;
-- table連接器
CREATE TABLE IF NOT EXISTS table_1 (
v1 integer,
v2 integer,
)
WITH (
connector='kafka',
topic='test_sink_topic',
properties.bootstrap.server='192.168.1.100:8092',
scan.startup.mode='latest',
) FORMAT PLAIN ENCODE JSON;
- Change Data Capture (CDC) 直連 MySQL CDC
--mysql ddl:
create database db01;
use db01;
CREATE TABLE orders (
order_id int(11) NOT NULL AUTO_INCREMENT,
price decimal(11),
PRIMARY KEY (order_id)
);
-- risingwave ddl
CREATE TABLE orders (
order_id int,
price decimal,
PRIMARY KEY (order_id)
) WITH (
connector = 'mysql-cdc',
hostname = '192.168.1.100',
port = '3306',
username = 'root',
password = '123456',
database.name = 'db01',
table.name = 'orders',
);
--mysql dml
insert into orders(price) values(12),(10),(23);
insert into orders(price) values(12),(10);
update orders set price=100 where order_id=1;
delete from orders where order_id=3;
-- risingwave驗證數(shù)據(jù)
select * from orders ;
- 直接導(dǎo)出物化視圖/表數(shù)據(jù) (CREATE SINK FROM)
CREATE TABLE t11 (v1 int, v2 int)
WITH (
connector = 'datagen',
fields.v1.kind = 'sequence',
fields.v1.start = '1',
fields.v2.kind = 'random',
fields.v2.min = '-10',
fields.v2.max = '10',
fields.v2.seed = '1',
datagen.rows.per.second = '10'
) ROW FORMAT JSON;
create materialized view mv_t11 as select count(*) from t11;
CREATE SINK sink1 FROM mv_t11
WITH (
connector='kafka',
properties.bootstrap.server='192.168.1.100:8092',
topic='t_sink1'
)
FORMAT PLAIN ENCODE JSON(
force_append_only='true'
);
check結(jié)果
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink1 -C -J
- 導(dǎo)出 Query 的數(shù)據(jù)(CREATE SINK AS)
CREATE TABLE t11 (v1 int, v2 int)
WITH (
connector = 'datagen',
fields.v1.kind = 'sequence',
fields.v1.start = '1',
fields.v2.kind = 'random',
fields.v2.min = '-10',
fields.v2.max = '10',
fields.v2.seed = '1',
datagen.rows.per.second = '10'
) ROW FORMAT JSON;
CREATE SINK sink2 AS
SELECT
avg(v1) as avg_v1,
avg(v2) as avg_v2
FROM t1
WITH (
connector='kafka',
properties.bootstrap.server='192.168.1.100:8092',
topic='t_sink2'
)
FORMAT PLAIN ENCODE JSON(
force_append_only='true'
);
check結(jié)果
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink1 -C -J
總結(jié)
RisingWave 提供與 PostgreSQL 兼容的標(biāo)準(zhǔn)SQL接口。用戶可以像使用 PostgreSQL 一樣處理數(shù)據(jù)流。屏蔽了實時處理底層需要遇到的一些技術(shù)細(xì)節(jié)(狀態(tài)存儲,數(shù)據(jù)一致性,分布式集群擴展等),供應(yīng)用方快速的開發(fā)實時數(shù)據(jù)流,進(jìn)行流式ETL。具有以下特性:同步的實時性(可以保證實時的新鮮度,doris等OLAP引擎采用異步實時)、強一致性(doris等OLAP引擎僅提供最終一致性)、高可用、高并發(fā)、流處理語義、資源隔離??梢詰?yīng)用在一些數(shù)據(jù)看版,監(jiān)控,實時指標(biāo)等場景。文章來源:http://www.zghlxwxcb.cn/news/detail-832812.html
相關(guān)文章
github 倉庫
官方文檔
中文文檔
創(chuàng)始人知乎主頁
Slack文章來源地址http://www.zghlxwxcb.cn/news/detail-832812.html
到了這里,關(guān)于RisingWave分布式SQL流處理數(shù)據(jù)庫調(diào)研的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!