在本文中,我們將了解如何通過(guò)連接Apache Kafka代理到MongoDB Atlas并使用CDC解決方案將數(shù)據(jù)從MongoDB Atlas流式傳輸?shù)絊ingleStore Kai。同時(shí),我們還將使用Metabase創(chuàng)建一個(gè)簡(jiǎn)單的SingleStore Kai分析儀表板。
本文末尾附上所使用的代碼本文的出處鏈接。
https://github.com/VeryFatBoy/adtech-kafka-cdc
簡(jiǎn)介
CDC是一種追蹤數(shù)據(jù)庫(kù)或系統(tǒng)中發(fā)生的更改的方法。SingleStore現(xiàn)在提供與MongoDB配合使用的CDC解決方案。
為了演示CDC解決方案,我們將使用Kafka代理將數(shù)據(jù)流式傳輸?shù)組ongoDB Atlas集群,并使用CDC管道將數(shù)據(jù)從MongoDB Atlas傳播到SingleStore Kai。我們還將使用Metabase創(chuàng)建一個(gè)簡(jiǎn)單的分析儀表板。
下圖顯示了系統(tǒng)的高級(jí)架構(gòu)。
以后會(huì)發(fā)布重點(diǎn)介紹使用CDC解決方案的其他場(chǎng)景的文章。
MongoDB Atlas
我們將在M0 Sandbox中使用MongoDB Atlas。在Database Access下,我們將配置一個(gè)具有atlasAdmin權(quán)限的管理員用戶。在Network Access下,我們將臨時(shí)允許從任何地方(IP地址0.0.0.0/0)訪問(wèn)。我們將記錄用戶名、密碼和主機(jī)。
Apache Kafka
我們將配置一個(gè)Kafka代理將數(shù)據(jù)流式傳輸?shù)組ongoDB Atlas。我們將使用Jupyter Notebook來(lái)實(shí)現(xiàn)這一目標(biāo)。
首先,我們需要安裝一些庫(kù):
!pip install pymongo kafka-python --quiet
接下來(lái),我們將連接到MongoDB Atlas和Kafka代理:
from kafka import KafkaConsumer from pymongo import MongoClient try: client = MongoClient("mongodb+srv://<username>:<password>@<host>/?retryWrites=true&w=majority") db = client.adtech print("連接成功") except: print("無(wú)法連接") consumer = KafkaConsumer( "ad_events", bootstrap_servers = ["public-kafka.memcompute.com:9092"] )
我們將使用之前從MongoDB Atlas保存的值替換`<username>`、`<password>`和`<host>`。
首先,我們將加載100條記錄到MongoDB Atlas中:
MAX_ITERATIONS = 100 for iteration, message in enumerate(consumer, start = 1): if iteration > MAX_ITERATIONS: break try: record = message.value.decode("utf-8") user_id, event_name, advertiser, campaign, gender, income, page_url, region, country = map(str.strip, record.split("\t")) events_record = { "user_id": int(user_id), "event_name": event_name, "advertiser": advertiser, "campaign": int(campaign.split()[0]), "gender": gender, "income": income, "page_url": page_url, "region": region, "country": country } db.events.insert_one(events_record) except Exception as e: print(f"Iteration {iteration}: 無(wú)法插入數(shù)據(jù) - {str(e)}")
數(shù)據(jù)應(yīng)成功加載,并且我們應(yīng)該看到一個(gè)名為`adtech`的數(shù)據(jù)庫(kù)和一個(gè)名為`events`的集合。集合中的文檔應(yīng)具有類似以下示例的結(jié)構(gòu):
_id: ObjectId('64ec906d0e8c0f7bcf72a8ed') user_id: 3857963415 event_name: "Impression" advertiser: "Sherwin-Williams" campaign: 13 gender: "Female" income: "25k and below", page_url: "/2013/02/how-to-make-glitter-valentines-heart-boxes.html/" region: "Michigan" country: "US"
這些文檔表示廣告活動(dòng)事件。events集合存儲(chǔ)了有關(guān)廣告商、廣告活動(dòng)以及用戶的各種人口統(tǒng)計(jì)信息,如性別和收入。
SingleStore Kai
SingleStore Kai是一個(gè)實(shí)時(shí)分析平臺(tái),可以處理大規(guī)模的數(shù)據(jù)和查詢。我們將使用CDC pipeline將數(shù)據(jù)從MongoDB Atlas傳播到SingleStore Kai。
之前的文章展示了創(chuàng)建免費(fèi)SingleStoreDB Cloud帳戶的步驟。我們將使用以下設(shè)置:
工作區(qū)組名稱:CDC Demo Group
云提供商:AWS
地區(qū):US East 1 (N. Virginia)
工作區(qū)名稱:cdc-demo
大小:S-00
設(shè)置:
選擇 SingleStore Kai 一旦工作區(qū)可用,我們會(huì)記下密碼和主機(jī)信息。從CDC Demo Group > Overview > Workspaces > cdc-demo > Connect > Connect Directly > SQL IDE > Host可以找到主機(jī)信息。我們稍后在Metabase中會(huì)需要這些信息。
我們還會(huì)通過(guò)配置CDC Demo Group > Firewall臨時(shí)允許從任何地方訪問(wèn)。
從左側(cè)導(dǎo)航欄中,我們會(huì)選擇DEVELOP > SQL Editor來(lái)創(chuàng)建一個(gè)adtech數(shù)據(jù)庫(kù)和鏈接,如下所示:
CREATE DATABASE IF NOT EXISTS adtech; USE adtech; DROP LINK adtech.link; CREATE LINK adtech.link AS MONGODB CONFIG '{"mongodb.hosts": "<primary>:27017, <secondary>:27017, <secondary>:27017", "collection.include.list": "adtech.*", "mongodb.ssl.enabled": "true", "mongodb.authsource": "admin", "mongodb.members.auto.discover": "false"}' CREDENTIALS '{"mongodb.user": "<username>", "mongodb.password": "<password>"}'; CREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK adtech.link '*' FORMAT AVRO;
我們會(huì)用之前從MongoDB Atlas保存的值替換<username>和<password>。我們還需要用MongoDB Atlas中每個(gè)地址的完整地址替換<primary>,<secondary>和<secondary>的值。
現(xiàn)在我們來(lái)檢查是否有任何表,如下所示:
SHOW TABLES;
這應(yīng)該會(huì)顯示一個(gè)名為events的表:
+------------------+ | Tables_in_adtech | +------------------+ | events | +------------------+
我們來(lái)檢查表的結(jié)構(gòu):
DESCRIBE events;
輸出應(yīng)該如下所示:
+-------+------+------+------+---------+-------+ | Field | Type | Null | Key | Default | Extra | +-------+------+------+------+---------+-------+ | _id | text | NO | UNI | NULL | | | _more | JSON | NO | | NULL | | +-------+------+------+------+---------+-------+
接下來(lái),我們來(lái)檢查是否有任何pipelines:
SHOW PIPELINES;
這將顯示一個(gè)名為events的pipeline,目前處于停止?fàn)顟B(tài):
+---------------------+---------+-----------+ | Pipelines_in_adtech | State | Scheduled | +---------------------+---------+-----------+ | events | Stopped | False | +---------------------+---------+-----------+
現(xiàn)在我們來(lái)啟動(dòng)events pipeline:
START ALL PIPELINES;
狀態(tài)應(yīng)該變?yōu)镽unning:
+---------------------+---------+-----------+ | Pipelines_in_adtech | State | Scheduled | +---------------------+---------+-----------+ | events | Running | False | +---------------------+---------+-----------+
如果我們現(xiàn)在運(yùn)行以下命令:
SELECT COUNT(*) FROM events;
它應(yīng)該返回結(jié)果100:
+----------+ | COUNT(*) | +----------+ | 100 | +----------+
我們檢查events表中的一行,如下所示:
SELECT * FROM events LIMIT 1;
輸出應(yīng)類似于以下內(nèi)容:
+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | _id | _more | +--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | {"$oid": "64ec906d0e8c0f7bcf72a8f7"} | {"_id":{"$oid":"64ec906d0e8c0f7bcf72a8f7"},"advertiser":"Wendys","campaign":13,"country":"US","event_name":"Click","gender":"Female","income":"75k - 99k","page_url":"/2014/05/flamingo-pop-bridal-shower-collab-with.html","region":"New Mexico","user_id":3857963416} | +--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
成功連接到MongoDB Atlas并將所有100條記錄復(fù)制到SingleStore Kai的CDC解決方案。
現(xiàn)在,讓我們使用Metabase創(chuàng)建一個(gè)儀表板。
Metabase
在之前的文章中,我們?cè)敿?xì)介紹了安裝、配置和創(chuàng)建與Metabase的連接的方法。我們將使用稍微變化的查詢來(lái)創(chuàng)建可視化。
1. 事件總數(shù)SQL
SELECT COUNT(*) FROM events;
2. 按地區(qū)統(tǒng)計(jì)事件SQL
SELECT _more::country AS `events.country`, COUNT(_more::country) AS 'events.countofevents' FROM adtech.events AS events GROUP BY 1;
3. 前5個(gè)廣告商的事件數(shù)量SQL
SELECT _more::advertiser AS `events.advertiser`, COUNT(*) AS `events.count` FROM adtech.events AS events WHERE (_more::advertiser LIKE '%Subway%' OR _more::advertiser LIKE '%McDonals%' OR _more::advertiser LIKE '%Starbucks%' OR _more::advertiser LIKE '%Dollar General%' OR _more::advertiser LIKE '%YUM! Brands%' OR _more::advertiser LIKE '%Dunkin Brands Group%') GROUP BY 1 ORDER BY `events.count` DESC;
4. 廣告訪客按性別和收入分類SQL
SELECT * FROM (SELECT *, DENSE_RANK() OVER (ORDER BY xx.z___min_rank) AS z___pivot_row_rank, RANK() OVER (PARTITION BY xx.z__pivot_col_rank ORDER BY xx.z___min_rank) AS z__pivot_col_ordering, CASE WHEN xx.z___min_rank = xx.z___rank THEN 1 ELSE 0 END AS z__is_highest_ranked_cell FROM (SELECT *, Min(aa.z___rank) OVER (PARTITION BY aa.`events.income`) AS z___min_rank FROM (SELECT *, RANK() OVER (ORDER BY CASE WHEN bb.z__pivot_col_rank = 1 THEN (CASE WHEN bb.`events.count` IS NOT NULL THEN 0 ELSE 1 END) ELSE 2 END, CASE WHEN bb.z__pivot_col_rank = 1 THEN bb.`events.count` ELSE NULL END DESC, bb.`events.count` DESC, bb.z__pivot_col_rank, bb.`events.income`) AS z___rank FROM (SELECT *, DENSE_RANK() OVER (ORDER BY CASE WHEN ww.`events.gender` IS NULL THEN 1 ELSE 0 END, ww.`events.gender`) AS z__pivot_col_rank FROM (SELECT _more::gender AS `events.gender`, _more::income AS `events.income`, COUNT(*) AS `events.count` FROM adtech.events AS events WHERE (_more::income <> 'unknown' OR _more::income IS NULL) GROUP BY 1, 2) ww) bb WHERE bb.z__pivot_col_rank <= 16384) aa) xx) zz WHERE (zz.z__pivot_col_rank <= 50 OR zz.z__is_highest_ranked_cell = 1) AND (zz.z___pivot_row_rank <= 500 OR zz.z__pivot_col_ordering = 1) ORDER BY zz.z___pivot_row_rank;
下圖顯示了在AdTech儀表板上調(diào)整大小和位置的圖表示例。
我們將設(shè)置自動(dòng)刷新選項(xiàng)為1分鐘。文章來(lái)源:http://www.zghlxwxcb.cn/article/666.html
如果我們?cè)贛ongoDB Atlas中使用Jupyter筆記本加載更多數(shù)據(jù),只需更改MAX_ITERATIONS,我們將看到數(shù)據(jù)傳播到SingleStore Kai,并在AdTech儀表板中反映出新的數(shù)據(jù)。文章來(lái)源地址http://www.zghlxwxcb.cn/article/666.html
到此這篇關(guān)于使用Kafka和CDC將數(shù)據(jù)從MongoDB Atlas流式傳輸?shù)絊ingleStore Kai的文章就介紹到這了,更多相關(guān)內(nèi)容可以在右上角搜索或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!