Doris系列
注:大家覺得博客好的話,別忘了點贊收藏呀,本人每周都會更新關(guān)于人工智能和大數(shù)據(jù)相關(guān)的內(nèi)容,內(nèi)容多為原創(chuàng),Python Java Scala SQL 代碼,CV NLP 推薦系統(tǒng)等,Spark Flink Kafka Hbase Hive Flume等等~寫的都是純干貨,各種頂會的論文解讀,一起進(jìn)步。
今天和大家分享一下Doris系列之導(dǎo)入Kafka數(shù)據(jù)操作
#博學(xué)谷IT學(xué)習(xí)技術(shù)支持#
前言
接著上次的Doris系列繼續(xù)和大家分享,上次講了Doris 建表操作,和從Broker Load導(dǎo)入hdfs數(shù)據(jù)操作,今天和大家分享從Routine Load導(dǎo)入kafka數(shù)據(jù)操作。
如上圖,Client 向 FE 提交一個例行導(dǎo)入作業(yè)。
FE 通過 JobScheduler 將一個導(dǎo)入作業(yè)拆分成若干個 Task。每個 Task 負(fù)責(zé)導(dǎo)入指定的一部分?jǐn)?shù)據(jù)。Task 被 TaskScheduler 分配到指定的 BE 上執(zhí)行。
在 BE 上,一個 Task 被視為一個普通的導(dǎo)入任務(wù),通過 Stream Load 的導(dǎo)入機制進(jìn)行導(dǎo)入。導(dǎo)入完成后,向 FE 匯報。
FE 中的 JobScheduler 根據(jù)匯報結(jié)果,繼續(xù)生成后續(xù)新的 Task,或者對失敗的 Task 進(jìn)行重試。
整個例行導(dǎo)入作業(yè)通過不斷的產(chǎn)生新的 Task,來完成數(shù)據(jù)不間斷的導(dǎo)入。
一、Kafka集群使用步驟
Kafka也是Doris一個非常重要的數(shù)據(jù)來源。
1.啟動kafka集群環(huán)境
這里根據(jù)自己的路徑啟動kafka集群環(huán)境
cd /export/servers/kafka_2.12-2.4.1
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &
2.創(chuàng)建kafka的topic主題
這里創(chuàng)建一個topic名字是test的kafka消息隊列,設(shè)置1個partitions ,并且只備份1份數(shù)據(jù)。
bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 1 \
--partitions 1 \
--topic test
如果Topic已經(jīng)存在,則可以刪除
bin/kafka-topics.sh --delete --zookeeper node01:2181 --topic test
3.往kafka中插入一批測試數(shù)據(jù)
這里簡單做個小案例,插入2條數(shù)據(jù)。
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
{"id":1,"name":"zhangsan","age":20}
{"id":2,"name":"lisi","age":30}
二、Doris使用步驟
1.創(chuàng)建對應(yīng)表
這里根據(jù)自己kafka生成的數(shù)據(jù)創(chuàng)建對應(yīng)字段和格式的表格
create table student_kafka2
(
id int,
name varchar(50),
age int
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 10;
2.創(chuàng)建導(dǎo)入作業(yè)
- student_kafka2為第一步創(chuàng)建的表格名稱
- desired_concurrent_number是并行度相關(guān)的參數(shù)
- strict_mode是否采用嚴(yán)格模式
- format為導(dǎo)入的格式,這里是json
CREATE ROUTINE LOAD test_db.kafka_job_new on student_kafka2
PROPERTIES
(
"desired_concurrent_number"="1",
"strict_mode"="false",
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list"= "node01:9092,node02:9092,node03:9092",
"kafka_topic" = "test",
"property.group.id" = "test_group_1",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.enable.auto.commit" = "false"
);
select * from student_kafka2;
三、Doris常用的參數(shù)
設(shè)置刪除時是否允許不分區(qū)直接刪除
- SET delete_without_partition = true;
設(shè)置最大內(nèi)存限制
- SET exec_mem_limit = 8589934592;
- SHOW VARIABLES LIKE “%mem_limit%”;
設(shè)置最長查詢時間限制
- SET query_timeout = 600;
- SHOW VARIABLES LIKE “%query_timeout%”;
添加新的含預(yù)聚合的列
- ALTER TABLE table1 ADD COLUMN uv BIGINT SUM DEFAULT ‘0’ after pv;
Broadcast/Shuffle Join 操作,默認(rèn)為Broadcast文章來源:http://www.zghlxwxcb.cn/news/detail-475446.html
- select sum(table1.pv) from table1 join [broadcast] table2 where
table1.siteid = 12; - select sum(table1.pv) from table1 join [shuffle] table2 where
table1.siteid = 12;
總結(jié)
今天主要和大家分享了Doris系列之導(dǎo)入Kafka數(shù)據(jù)操作,如果大家實際工作中需要用到Kafka結(jié)合Doris操作,可以參考一下使用步驟。文章來源地址http://www.zghlxwxcb.cn/news/detail-475446.html
到了這里,關(guān)于Doris系列之導(dǎo)入Kafka數(shù)據(jù)操作的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!