国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

使用Kafka和CDC將數(shù)據(jù)從MongoDB Atlas流式傳輸?shù)絊ingleStore Kai

在本文中,我們將了解如何通過(guò)連接Apache Kafka代理到MongoDB Atlas并使用CDC解決方案將數(shù)據(jù)從MongoDB Atlas流式傳輸?shù)絊ingleStore Kai。同時(shí),我們還將使用Metabase創(chuàng)建一個(gè)簡(jiǎn)單的SingleStore Kai分析儀表板。

本文末尾附上所使用的代碼本文的出處鏈接。

https://github.com/VeryFatBoy/adtech-kafka-cdc

簡(jiǎn)介

CDC是一種追蹤數(shù)據(jù)庫(kù)或系統(tǒng)中發(fā)生的更改的方法。SingleStore現(xiàn)在提供與MongoDB配合使用的CDC解決方案。

為了演示CDC解決方案,我們將使用Kafka代理將數(shù)據(jù)流式傳輸?shù)組ongoDB Atlas集群,并使用CDC管道將數(shù)據(jù)從MongoDB Atlas傳播到SingleStore Kai。我們還將使用Metabase創(chuàng)建一個(gè)簡(jiǎn)單的分析儀表板。

下圖顯示了系統(tǒng)的高級(jí)架構(gòu)。

高級(jí)架構(gòu)展示圖片

以后會(huì)發(fā)布重點(diǎn)介紹使用CDC解決方案的其他場(chǎng)景的文章。

MongoDB Atlas

我們將在M0 Sandbox中使用MongoDB Atlas。在Database Access下,我們將配置一個(gè)具有atlasAdmin權(quán)限的管理員用戶。在Network Access下,我們將臨時(shí)允許從任何地方(IP地址0.0.0.0/0)訪問(wèn)。我們將記錄用戶名、密碼和主機(jī)。

Apache Kafka

我們將配置一個(gè)Kafka代理將數(shù)據(jù)流式傳輸?shù)組ongoDB Atlas。我們將使用Jupyter Notebook來(lái)實(shí)現(xiàn)這一目標(biāo)。

首先,我們需要安裝一些庫(kù):

!pip install pymongo kafka-python --quiet

接下來(lái),我們將連接到MongoDB Atlas和Kafka代理:

from kafka import KafkaConsumer
from pymongo import MongoClient

try:
    client = MongoClient("mongodb+srv://<username>:<password>@<host>/?retryWrites=true&w=majority")
    db = client.adtech
    print("連接成功")
except:
    print("無(wú)法連接")

consumer = KafkaConsumer(
    "ad_events",
    bootstrap_servers = ["public-kafka.memcompute.com:9092"]
)

我們將使用之前從MongoDB Atlas保存的值替換`<username>`、`<password>`和`<host>`。

首先,我們將加載100條記錄到MongoDB Atlas中:

MAX_ITERATIONS = 100

for iteration, message in enumerate(consumer, start = 1):
    if iteration > MAX_ITERATIONS:
        break

    try:
        record = message.value.decode("utf-8")
        user_id, event_name, advertiser, campaign, gender, income, page_url, region, country = map(str.strip, record.split("\t"))

        events_record = {
            "user_id": int(user_id),
            "event_name": event_name,
            "advertiser": advertiser,
            "campaign": int(campaign.split()[0]),
            "gender": gender,
            "income": income,
            "page_url": page_url,
            "region": region,
            "country": country
        }

        db.events.insert_one(events_record)
    except Exception as e:
        print(f"Iteration {iteration}: 無(wú)法插入數(shù)據(jù) - {str(e)}")

數(shù)據(jù)應(yīng)成功加載,并且我們應(yīng)該看到一個(gè)名為`adtech`的數(shù)據(jù)庫(kù)和一個(gè)名為`events`的集合。集合中的文檔應(yīng)具有類似以下示例的結(jié)構(gòu):

_id: ObjectId('64ec906d0e8c0f7bcf72a8ed')
user_id: 3857963415
event_name: "Impression"
advertiser: "Sherwin-Williams"
campaign: 13
gender: "Female"
income: "25k and below",
page_url: "/2013/02/how-to-make-glitter-valentines-heart-boxes.html/"
region: "Michigan"
country: "US"

這些文檔表示廣告活動(dòng)事件。events集合存儲(chǔ)了有關(guān)廣告商、廣告活動(dòng)以及用戶的各種人口統(tǒng)計(jì)信息,如性別和收入。

SingleStore Kai

SingleStore Kai是一個(gè)實(shí)時(shí)分析平臺(tái),可以處理大規(guī)模的數(shù)據(jù)和查詢。我們將使用CDC pipeline將數(shù)據(jù)從MongoDB Atlas傳播到SingleStore Kai。

之前的文章展示了創(chuàng)建免費(fèi)SingleStoreDB Cloud帳戶的步驟。我們將使用以下設(shè)置:

工作區(qū)組名稱:CDC Demo Group 

云提供商:AWS

地區(qū):US East 1 (N. Virginia)

工作區(qū)名稱:cdc-demo

大小:S-00

設(shè)置:

  • 選擇 SingleStore Kai 一旦工作區(qū)可用,我們會(huì)記下密碼和主機(jī)信息。從CDC Demo Group > Overview > Workspaces > cdc-demo > Connect > Connect Directly > SQL IDE > Host可以找到主機(jī)信息。我們稍后在Metabase中會(huì)需要這些信息。

  • 我們還會(huì)通過(guò)配置CDC Demo Group > Firewall臨時(shí)允許從任何地方訪問(wèn)。

從左側(cè)導(dǎo)航欄中,我們會(huì)選擇DEVELOP > SQL Editor來(lái)創(chuàng)建一個(gè)adtech數(shù)據(jù)庫(kù)和鏈接,如下所示:

CREATE DATABASE IF NOT EXISTS adtech;
USE adtech;

DROP LINK adtech.link;

CREATE LINK adtech.link AS MONGODB
CONFIG '{"mongodb.hosts": "<primary>:27017, <secondary>:27017, <secondary>:27017",
        "collection.include.list": "adtech.*",
        "mongodb.ssl.enabled": "true",
        "mongodb.authsource": "admin",
        "mongodb.members.auto.discover": "false"}'
CREDENTIALS '{"mongodb.user": "<username>",
            "mongodb.password": "<password>"}';

CREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK adtech.link '*' FORMAT AVRO;

我們會(huì)用之前從MongoDB Atlas保存的值替換<username>和<password>。我們還需要用MongoDB Atlas中每個(gè)地址的完整地址替換<primary>,<secondary>和<secondary>的值。

現(xiàn)在我們來(lái)檢查是否有任何表,如下所示:

SHOW TABLES;

這應(yīng)該會(huì)顯示一個(gè)名為events的表:

+------------------+
| Tables_in_adtech |
+------------------+
| events           |
+------------------+

我們來(lái)檢查表的結(jié)構(gòu):

DESCRIBE events;

輸出應(yīng)該如下所示:

+-------+------+------+------+---------+-------+
| Field | Type | Null | Key  | Default | Extra |
+-------+------+------+------+---------+-------+
| _id   | text | NO   | UNI  | NULL    |       |
| _more | JSON | NO   |      | NULL    |       |
+-------+------+------+------+---------+-------+

接下來(lái),我們來(lái)檢查是否有任何pipelines:

SHOW PIPELINES;

這將顯示一個(gè)名為events的pipeline,目前處于停止?fàn)顟B(tài):

+---------------------+---------+-----------+
| Pipelines_in_adtech | State   | Scheduled |
+---------------------+---------+-----------+
| events              | Stopped | False     |
+---------------------+---------+-----------+

現(xiàn)在我們來(lái)啟動(dòng)events pipeline:

START ALL PIPELINES;

狀態(tài)應(yīng)該變?yōu)镽unning:

+---------------------+---------+-----------+
| Pipelines_in_adtech | State   | Scheduled |
+---------------------+---------+-----------+
| events              | Running | False     |
+---------------------+---------+-----------+

如果我們現(xiàn)在運(yùn)行以下命令:

SELECT COUNT(*) FROM events;

它應(yīng)該返回結(jié)果100:

+----------+
| COUNT(*) |
+----------+
|      100 |
+----------+

我們檢查events表中的一行,如下所示:

SELECT * FROM events LIMIT 1;

輸出應(yīng)類似于以下內(nèi)容:

+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| _id                                  | _more                                                                                                                                                                                                                                                                   |
+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| {"$oid": "64ec906d0e8c0f7bcf72a8f7"} | {"_id":{"$oid":"64ec906d0e8c0f7bcf72a8f7"},"advertiser":"Wendys","campaign":13,"country":"US","event_name":"Click","gender":"Female","income":"75k - 99k","page_url":"/2014/05/flamingo-pop-bridal-shower-collab-with.html","region":"New Mexico","user_id":3857963416} |
+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

成功連接到MongoDB Atlas并將所有100條記錄復(fù)制到SingleStore Kai的CDC解決方案。

現(xiàn)在,讓我們使用Metabase創(chuàng)建一個(gè)儀表板。

Metabase

在之前的文章中,我們?cè)敿?xì)介紹了安裝、配置和創(chuàng)建與Metabase的連接的方法。我們將使用稍微變化的查詢來(lái)創(chuàng)建可視化。

1. 事件總數(shù)SQL

SELECT COUNT(*) FROM events;

2. 按地區(qū)統(tǒng)計(jì)事件SQL

SELECT _more::country AS `events.country`, COUNT(_more::country) AS 'events.countofevents'
FROM adtech.events AS events
GROUP BY 1;

3. 前5個(gè)廣告商的事件數(shù)量SQL

SELECT _more::advertiser AS `events.advertiser`, COUNT(*) AS `events.count`
FROM adtech.events AS events
WHERE (_more::advertiser LIKE '%Subway%' OR _more::advertiser LIKE '%McDonals%' OR _more::advertiser LIKE '%Starbucks%' OR _more::advertiser LIKE '%Dollar General%' OR _more::advertiser LIKE '%YUM! Brands%' OR _more::advertiser LIKE '%Dunkin Brands Group%')
GROUP BY 1
ORDER BY `events.count` DESC;

4. 廣告訪客按性別和收入分類SQL

SELECT *
FROM (SELECT *, DENSE_RANK() OVER (ORDER BY xx.z___min_rank) AS z___pivot_row_rank, RANK() OVER (PARTITION BY xx.z__pivot_col_rank ORDER BY xx.z___min_rank) AS z__pivot_col_ordering, CASE
        WHEN xx.z___min_rank = xx.z___rank THEN 1
        ELSE 0
      END AS z__is_highest_ranked_cell
    FROM (SELECT *, Min(aa.z___rank) OVER (PARTITION BY aa.`events.income`) AS z___min_rank
        FROM (SELECT *, RANK() OVER (ORDER BY CASE
                WHEN bb.z__pivot_col_rank = 1 THEN (CASE
                    WHEN bb.`events.count` IS NOT NULL THEN 0
                    ELSE 1
                  END)
                ELSE 2
              END, CASE
                WHEN bb.z__pivot_col_rank = 1 THEN bb.`events.count`
                ELSE NULL
              END DESC, bb.`events.count` DESC, bb.z__pivot_col_rank, bb.`events.income`) AS z___rank
            FROM (SELECT *, DENSE_RANK() OVER (ORDER BY CASE
                    WHEN ww.`events.gender` IS NULL THEN 1
                    ELSE 0
                  END, ww.`events.gender`) AS z__pivot_col_rank
                FROM (SELECT _more::gender AS `events.gender`, _more::income AS `events.income`, COUNT(*) AS `events.count`
                    FROM adtech.events AS events
                    WHERE (_more::income <> 'unknown' OR _more::income IS NULL)
                    GROUP BY 1, 2) ww) bb
            WHERE bb.z__pivot_col_rank <= 16384) aa) xx) zz
WHERE (zz.z__pivot_col_rank <= 50 OR zz.z__is_highest_ranked_cell = 1) AND (zz.z___pivot_row_rank <= 500 OR zz.z__pivot_col_ordering = 1)
ORDER BY zz.z___pivot_row_rank;

下圖顯示了在AdTech儀表板上調(diào)整大小和位置的圖表示例。

AdTech儀表板圖表示例

我們將設(shè)置自動(dòng)刷新選項(xiàng)為1分鐘。

如果我們?cè)贛ongoDB Atlas中使用Jupyter筆記本加載更多數(shù)據(jù),只需更改MAX_ITERATIONS,我們將看到數(shù)據(jù)傳播到SingleStore Kai,并在AdTech儀表板中反映出新的數(shù)據(jù)。文章來(lái)源地址http://www.zghlxwxcb.cn/article/666.html

到此這篇關(guān)于使用Kafka和CDC將數(shù)據(jù)從MongoDB Atlas流式傳輸?shù)絊ingleStore Kai的文章就介紹到這了,更多相關(guān)內(nèi)容可以在右上角搜索或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

原文地址:http://www.zghlxwxcb.cn/article/666.html

如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)聯(lián)系站長(zhǎng)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • SingleStore Kai for MongoDB 的 6 個(gè)主要功能

    SingleStore Kai for MongoDB將實(shí)時(shí)分析引入JSON文檔,通過(guò)將MongoDB查詢轉(zhuǎn)換為在SingleStoreDB上執(zhí)行的SQL語(yǔ)句來(lái)實(shí)現(xiàn)。無(wú)需對(duì)模式、數(shù)據(jù)或查詢進(jìn)行任何更改。 在Facebook上分享 在Twitter上分享 在LinkedIn上分享 在Reddit上分享 通過(guò)電子郵件分享 打印資源。 如今,世界上積累的大部分?jǐn)?shù)據(jù)都

    2024年02月15日
    瀏覽(15)
  • 流式計(jì)算中的多線程處理:如何使用Kafka實(shí)現(xiàn)高效的實(shí)時(shí)數(shù)據(jù)處理

    作者:禪與計(jì)算機(jī)程序設(shè)計(jì)藝術(shù) Apache Kafka 是 Apache Software Foundation 下的一個(gè)開(kāi)源項(xiàng)目,是一個(gè)分布式的、高吞吐量的、可擴(kuò)展的消息系統(tǒng)。它最初由 LinkedIn 開(kāi)發(fā)并于 2011 年發(fā)布。與其他一些類似產(chǎn)品相比,Kafka 有著更強(qiáng)大的功能和活躍的社區(qū)支持。因此,越來(lái)越多的人開(kāi)始使

    2024年02月12日
    瀏覽(22)
  • skywalking agent使用kafka數(shù)據(jù)傳輸

    skywalking agent使用kafka數(shù)據(jù)傳輸

    安裝Zookeeper 下載相應(yīng)版本的zookeeper 解壓文件 進(jìn)入conf目錄下,復(fù)制zoo_sample.cfg文件,這個(gè)是官方提供的配置樣例,我們修改復(fù)制的文件名稱未zoo.cfg。 進(jìn)入bin目錄,啟動(dòng)zookeeper 安裝Kafka 下載對(duì)應(yīng)版本的kafka 解壓文件 修改config/server.properties文件 啟動(dòng)kafka 啟動(dòng)項(xiàng)目 服務(wù)層 修改

    2024年02月15日
    瀏覽(20)
  • 使用 Python 流式傳輸來(lái)自 OpenAI API 的響應(yīng):分步指南

    使用 Python 流式傳輸來(lái)自 OpenAI API 的響應(yīng):分步指南

    OpenAI API 提供了大量可用于執(zhí)行各種 NLP 任務(wù)的尖端 AI 模型。但是,在某些情況下,僅向 OpenAI 發(fā)出 API 請(qǐng)求可能還不夠,例如需要實(shí)時(shí)更新時(shí)。這就是服務(wù)器發(fā)送事件 (SSE) 發(fā)揮作用的地方。 SSE 是一種簡(jiǎn)單有效的技術(shù),用于將數(shù)據(jù)從服務(wù)器實(shí)時(shí)流式傳輸?shù)娇蛻舳恕?如何在 W

    2023年04月19日
    瀏覽(39)
  • Spark寫入kafka(批數(shù)據(jù)和流式)

    Spark寫入kafka(批數(shù)據(jù)和流式)

    寫入kafka基礎(chǔ) kafka寫入策略 寫入kafka應(yīng)答響應(yīng)級(jí)別

    2024年01月25日
    瀏覽(18)
  • 大數(shù)據(jù)-Storm流式框架(六)---Kafka介紹

    大數(shù)據(jù)-Storm流式框架(六)---Kafka介紹

    Kafka是一個(gè)分布式的消息隊(duì)列系統(tǒng)(Message Queue)。 官網(wǎng):Apache Kafka 消息和批次 kafka的數(shù)據(jù)單元稱為 消息 。消息可以看成是數(shù)據(jù)庫(kù)表的一行或一條記錄。 消息由 字節(jié)數(shù)組 組成,kafka中消息沒(méi)有特別的格式或含義。 消息有可選的 鍵 ,也是一個(gè)字節(jié)數(shù)組,沒(méi)有特殊的含義。當(dāng)消

    2024年02月08日
    瀏覽(18)
  • Spark讀取kafka(流式和批數(shù)據(jù))
  • 【天衍系列 05】Flink集成KafkaSink組件:實(shí)現(xiàn)流式數(shù)據(jù)的可靠傳輸 & 高效協(xié)同

    【天衍系列 05】Flink集成KafkaSink組件:實(shí)現(xiàn)流式數(shù)據(jù)的可靠傳輸 & 高效協(xié)同

    Flink版本: 本文主要是基于Flink1.14.4 版本 導(dǎo)言: Apache Flink 作為流式處理領(lǐng)域的先鋒,為實(shí)時(shí)數(shù)據(jù)處理提供了強(qiáng)大而靈活的解決方案。其中,KafkaSink 是 Flink 生態(tài)系統(tǒng)中的關(guān)鍵組件之一,扮演著將 Flink 處理的數(shù)據(jù)可靠地發(fā)送到 Kafka 主題的角色。本文將深入探討 KafkaSink 的工作

    2024年02月20日
    瀏覽(50)
  • Debezium系列之:基于debezium將mysql數(shù)據(jù)庫(kù)數(shù)據(jù)更改流式傳輸?shù)?Elasticsearch和PostgreSQL數(shù)據(jù)庫(kù)

    Debezium系列之:基于debezium將mysql數(shù)據(jù)庫(kù)數(shù)據(jù)更改流式傳輸?shù)?Elasticsearch和PostgreSQL數(shù)據(jù)庫(kù)

    基于 Debezium 的端到端數(shù)據(jù)流用例,將數(shù)據(jù)流式傳輸?shù)?Elasticsearch 服務(wù)器,以利用其出色的功能對(duì)我們的數(shù)據(jù)進(jìn)行全文搜索。 同時(shí)把數(shù)據(jù)流式傳輸?shù)?PostgreSQL 數(shù)據(jù)庫(kù),通過(guò) SQL 查詢語(yǔ)言來(lái)優(yōu)化對(duì)數(shù)據(jù)的訪問(wèn)。 下面的圖表顯示了數(shù)據(jù)如何流經(jīng)我們的分布式系統(tǒng)。首先,Debezium M

    2024年02月13日
    瀏覽(20)
  • 4大企業(yè)實(shí)例解析:為何MongoDB Atlas成為AI服務(wù)構(gòu)建的首選

    4大企業(yè)實(shí)例解析:為何MongoDB Atlas成為AI服務(wù)構(gòu)建的首選

    隨著人工智能和生成式AI技術(shù)的迅猛發(fā)展,眾多企業(yè)和機(jī)構(gòu)正積極利用自然語(yǔ)言處理(NLP)、大型語(yǔ)言模型(LLM)等前沿技術(shù),打造出一系列AI驅(qū)動(dòng)的產(chǎn)品、服務(wù)和應(yīng)用程序。 本文將展示四家已在AI創(chuàng)新領(lǐng)域取得顯著成效的企業(yè),以及他們與MongoDB的緊密合作。這些企業(yè)選擇了

    2024年04月10日
    瀏覽(19)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包