国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【Kafka】Ubuntu 部署kafka中間件,實現(xiàn)Django生產(chǎn)和消費

這篇具有很好參考價值的文章主要介紹了【Kafka】Ubuntu 部署kafka中間件,實現(xiàn)Django生產(chǎn)和消費。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

原文作者:我輩李想
版權聲明:文章原創(chuàng),轉(zhuǎn)載時請務必加上原文超鏈接、作者信息和本聲明。


前言

隊列是一種特殊的線性表,特殊之處在于它只允許在表的前端(front)進行刪除操作,而在表的后端(rear)進行插入操作,和棧一樣,隊列是一種操作受限制的線性表。進行插入操作的端稱為隊尾,進行刪除操作的端稱為隊頭。

消息隊列是一種中間件 ,用于在不同的組件或系統(tǒng)之間傳遞消息(進程間通訊的一種)。 它提供了一種可靠的機制(AMQP)來存儲和傳遞消息,并確保消息的順序性和可靠性。消息隊列需要存儲消息。

Apache Kafka是一個開源消息系統(tǒng),由Scala寫成。是由Apache軟件基金會開發(fā)的一個開源消息系統(tǒng)項目。Kafka是一個分布式消息隊列:生產(chǎn)者、消費者的功能。Kafka對消息保存時根據(jù)Topic進行歸類,發(fā)送消息者稱為Producer,消息接受者稱為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)成為broker。

一、Kafka安裝

1.下載并安裝Java

Kafka 是基于 Java 開發(fā)的,因此需要先安裝 Java 環(huán)境。如果你已經(jīng)安裝了 Java 環(huán)境,可以跳過這一步。

在命令行中輸入以下命令:

sudo apt-get update
# linux命令行下,安裝jdk
sudo apt-get install openjdk-8-jdk
# 查看安裝結果
java -version

2.下載和解壓 Kafka

下載 Kafka 壓縮包,并解壓到 /opt 目錄下。

cd ~
sudo wget https://archive.apache.org/dist/kafka/3.4.0/kafka_2.13-3.4.0.tgz
sudo tar -xzf kafka_2.13-3.4.0.tgz
cd /opt
sudo mv cd ~/kafka_2.13-3.4.0 kafka

3.配置 Kafka

接下來需要更改 Kafka 的配置文件。打開 config/server.properties 文件,并進行以下更改:

sudo vim /opt/kafka/config/server.properties

advertised.listeners=PLAINTEXT://<your-server-IP-address>:9092
listeners=PLAINTEXT://0.0.0.0:9092

確保將 <your-server-IP-address> 替換為實際的服務器 IP 地址。(ifconfig查看本機ip)

4.啟動 Kafka

Kafka 啟動有兩種方式:單機模式和分布式模式。

  • 單機模式

在單機模式下,Kafka 只有一個 broker。在命令行中輸入以下命令:

cd /opt/kafka
sudo /opt/kafka/bin/zookeeper-server-start.sh  /opt/kafka/config/zookeeper.properties  # 在一個窗口,或后臺運行
sudo /opt/kafka/bin/kafka-server-start.sh  /opt/kafka/config/server.properties  # 另起一個窗口,或后臺運行
  • 分布式模式

在分布式模式中,Kafka 包含多個 broker。在命令行中輸入以下命令:

首先,編輯 config/server.properties 文件,設置以下屬性:

broker.id=0 # 設置當前 broker 的 id,不能重復
listeners=PLAINTEXT://your.server.ip.address:9092 # 設置監(jiān)聽地址和端口
log.dirs=/tmp/kafka-logs # 設置日志目錄

復制 broker.id=0 的行,修改 broker.id 的值,設置多個 broker 的 id。

接下來,啟動 ZooKeeper:

cd /opt/kafka
sudo /opt/kafka/bin/zookeeper-server-start.sh config/zookeeper.properties

最后,每個 broker 啟動 Kafka:

cd /opt/kafka
sudo /opt/kafka/bin/kafka-server-start.sh config/server.properties

5.創(chuàng)建主題和生產(chǎn)者/消費者

可以使用 Kafka 自帶的命令行工具創(chuàng)建主題、發(fā)送消息和消費消息。

  • 創(chuàng)建主題
cd /opt/kafka
sudo /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server your.server.ip.address:9092 --replication-factor 1 --partitions 1 --topic test-topic
  • 查看
sudo /opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
  • 刪除
sudo /opt/kafka/bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test-topic

6.發(fā)布和訂閱消息

現(xiàn)在已經(jīng)成功地部署了 Kafka,并創(chuàng)建了一個 topic??梢允褂靡韵旅钤?topic 中發(fā)布和訂閱消息:

發(fā)布消息:# 另起一個窗口

sudo /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic 

訂閱消息:# 另起一個窗口

sudo /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning

以上是在 Ubuntu 上部署 Kafka 的基本步驟,你可以根據(jù)實際情況進行修改。

二、Kafka+Django生產(chǎn)和消費

confluent-kafka 和kafka-python是python中處理kafka的三方包,這里以confluent-kafka為例。

pip install confluent-kafka

1.Django配置文件

在settings.py中加入配置

KAFKA_SETTINGS = {
    'bootstrap.servers': 'localhost:9092', # localhost替換為kafka服務的ip
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest',
}

2.通過django命令實現(xiàn)消費

  1. kafka消費處理kafka_consumer.py文件
from confluent_kafka import Consumer, KafkaError
from django.conf import settings

def kafka_handler():
    c = Consumer(settings.KAFKA_SETTINGS)
    c.subscribe(['test-topic'])

    while True:
        msg = c.poll(1.0)
		print(111,msg)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print('End of partition reached')
            else:
                print('Error: {}'.format(msg.error()))
        else:
            print('Received message: {}'.format(msg.value()))

  1. 使用django的startapp新增kafka對app

     cd ptoject  #  manage.py的同級目錄
     django-admin startapp kafka
     python manage.py startapp kafka
    
  2. 在django配置文件settings中添加kafka至INSTALLED_APPS

     INSTALLED_APPS = [
         'django.contrib.admin',
         'django.contrib.auth',
         'django.contrib.contenttypes',
         'django.contrib.sessions',
         'django.contrib.messages',
         'django.contrib.staticfiles',
         'kafka'
     ]
    
  3. 自定義django命令
    可以參看這個鏈接:https://www.osgeo.cn/django/howto/custom-management-commands.html

    在kafka下新建 management/commands二級文件夾
    【Kafka】Ubuntu 部署kafka中間件,實現(xiàn)Django生產(chǎn)和消費,Python開始入門,中間件,ubuntu,kafka

其中kafka_consumer是第一步創(chuàng)建的消費處理程序,my_消費是我們自定義的django命令程序。my_消費.py的內(nèi)容如下:

# -*- coding:utf-8 _*-
"""
@author:Administrator
@file: my_消費.py
@time: 2023/7/16 0016  12:32
"""
from django.core.management.base import BaseCommand, CommandError

from kafka_consumer import kafka_handler


class Command(BaseCommand):
    help = 'Closes the specified poll for voting'

    def handle(self, *args, **options):
        kafka_handler()
  1. 啟動消費命令

切換至虛擬環(huán)境,目錄切換至django項目目錄,及mnange.py的同級目錄。

	python manage.py my_消費

【Kafka】Ubuntu 部署kafka中間件,實現(xiàn)Django生產(chǎn)和消費,Python開始入門,中間件,ubuntu,kafka

【Kafka】Ubuntu 部署kafka中間件,實現(xiàn)Django生產(chǎn)和消費,Python開始入門,中間件,ubuntu,kafka

3.通過Django生產(chǎn)

【Kafka】Ubuntu 部署kafka中間件,實現(xiàn)Django生產(chǎn)和消費,Python開始入門,中間件,ubuntu,kafka

  1. 創(chuàng)建kafka_producer.py生產(chǎn)文件
# -*- coding:utf-8 _*-
"""
@author:Administrator
@file: kafka_producer.py
@time: 2023/7/16 0016  12:28
"""
from confluent_kafka import Producer
from django.conf import settings


def send_message(message):
    p = Producer({'bootstrap.servers': settings.KAFKA_SETTINGS.get('bootstrap.servers')})
    topic = 'test-topic'
    p.produce(topic, message.encode('utf-8'))
    p.flush()


if __name__ == '__main__':
    send_message('測試')
  1. 通過django命令生產(chǎn)消息
    這里還是用的自定義命令,自定義文件為my_消費.py
# -*- coding:utf-8 _*-
"""
@author:Administrator
@file: my_消費.py
@time: 2023/7/16 0016  12:32
"""
from django.core.management.base import BaseCommand, CommandError

from kafka_producer import send_message


class Command(BaseCommand):
    help = 'Closes the specified poll for voting'

    def handle(self, *args, **options):
        send_message('111')

【Kafka】Ubuntu 部署kafka中間件,實現(xiàn)Django生產(chǎn)和消費,Python開始入門,中間件,ubuntu,kafka
啟動方式與消費一樣,python manage.py my_生產(chǎn)

  1. 通過django程序生產(chǎn)消息

kafka_producer文件中的send_message方法可以在程序中被調(diào)用,我們在處理用戶請求時,可以通過異步的方式處理send_message。文章來源地址http://www.zghlxwxcb.cn/news/detail-573315.html

到了這里,關于【Kafka】Ubuntu 部署kafka中間件,實現(xiàn)Django生產(chǎn)和消費的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 中間件 kafka

    Kafka(Apache Kafka)是一個非常流行的開源分布式流數(shù)據(jù)平臺。它最初由LinkedIn開發(fā),后來捐贈給了Apache基金會,并成為頂級項目。Kafka被設計用于處理實時數(shù)據(jù)流,具有高吞吐量、可擴展性和持久性。 Kafka 的主要特點和用途包括: 發(fā)布-訂閱模型: Kafka 提供了一種發(fā)布-訂閱(

    2024年02月13日
    瀏覽(21)
  • Kafka消息中間件(Kafka與MQTT區(qū)別)

    Kafka消息中間件(Kafka與MQTT區(qū)別)

    Kafka是一個分布式流處理平臺,它可以快速地處理大量的數(shù)據(jù)流。Kafka的核心原理是基于 發(fā)布/訂閱 模式的消息隊列。Kafka允許多個生產(chǎn)者將數(shù)據(jù)寫入主題(topic)中,同時也允許多個消費者從主題中讀取數(shù)據(jù)。 Kafka重要原理 Kafka的設計原則之一是高可用性和可擴展性,因此它

    2024年02月03日
    瀏覽(30)
  • 消息中間件(二)——kafka

    消息中間件(二)——kafka

    在大數(shù)據(jù)中,會使用到大量的數(shù)據(jù)。面對這些海量的數(shù)據(jù),我們一是需要做到能夠 收集 這些數(shù)據(jù),其次是要能夠 分析和處理 這些海量數(shù)據(jù)。在此過程中,需要一套消息系統(tǒng)。 Kafka專門為分 布式高吞吐量 系統(tǒng)設計。作為一個消息代理的替代品,Kafka往往做的比其他消息中間

    2024年02月07日
    瀏覽(29)
  • 消息中間件 —— 初識Kafka

    消息中間件 —— 初識Kafka

    1.1.1、為什么要有消息隊列? 1.1.2、消息隊列 消息 Message 網(wǎng)絡中的兩臺計算機或者兩個通訊設備之間傳遞的數(shù)據(jù)。例如說:文本、音樂、視頻等內(nèi)容。 隊列 Queue 一種特殊的線性表(數(shù)據(jù)元素首尾相接),特殊之處在于只允許在首部刪除元素和在尾部追加元素(FIFO)。 入隊、出

    2024年02月13日
    瀏覽(24)
  • 中間件(三)- Kafka(二)

    中間件(三)- Kafka(二)

    6.1 Kafka的高效讀寫 順序?qū)懘疟P Kafka的producer生產(chǎn)數(shù)據(jù),需要寫入到log文件中,寫的過程是追加到文件末端,順序?qū)懙姆绞剑倬W(wǎng)有數(shù)據(jù)表明,同樣的磁盤,順序?qū)懩軌虻?00M/s,而隨機寫只有200K/s,這與磁盤的機械結構有關,順序?qū)懼钥?,是因為其省去了大量磁頭尋址的時

    2024年02月07日
    瀏覽(22)
  • 大數(shù)據(jù)中間件——Kafka

    大數(shù)據(jù)中間件——Kafka

    Kafka安裝配置 首先我們把kafka的安裝包上傳到虛擬機中: 解壓到對應的目錄并修改對應的文件名: 首先我們來到kafka的config目錄,我們第一個要修改的文件就是server.properties文件,修改內(nèi)容如下: 主要修改三個部分,一個是唯一標識id,kafka的文件存儲路徑,一個是zookeeper的節(jié)

    2024年02月07日
    瀏覽(23)
  • 消息中間件之Kafka(一)

    消息中間件之Kafka(一)

    高性能的消息中間件,在大數(shù)據(jù)的業(yè)務場景下性能比較好,kafka本身不維護消息位點,而是交由Consumer來維護,消息可以重復消費,并且內(nèi)部使用了零拷貝技術,性能比較好 Broker持久化消息時采用了MMAP的技術,Consumer拉取消息時使用的sendfile技術 Kafka是最初由Linkedin公司開發(fā),

    2024年01月20日
    瀏覽(53)
  • 消息中間件之Kafka(二)

    消息中間件之Kafka(二)

    1.1 為什么要對topic下數(shù)據(jù)進行分區(qū)存儲? 1.commit log文件會受到所在機器的文件系統(tǒng)大小的限制,分區(qū)之后可以將不同的分區(qū)放在不同的機器上, 相當于對數(shù)據(jù)做了分布式存儲,理論上一個topic可以處理任意數(shù)量的數(shù)據(jù) 2.提高并行度 1.2 如何在多個partition中保證順序消費? 方案一

    2024年01月21日
    瀏覽(29)
  • 【Java面試丨消息中間件】Kafka

    【Java面試丨消息中間件】Kafka

    1. 介紹 使用kafka在消息的收發(fā)過程都有可能會出現(xiàn)消息丟失 (1)生產(chǎn)者發(fā)送消息到broker丟失 (2)消息在broker中存儲丟失 (3)消費者從broker接收消息丟失 2. 生產(chǎn)者發(fā)送消息到broker丟失 設置異步發(fā)送:同步發(fā)送會發(fā)生阻塞,一般使用異步發(fā)送方式發(fā)送消息 消息重試:由于網(wǎng)

    2024年02月11日
    瀏覽(30)
  • Kafka、RabbitMQ、RocketMQ中間件的對比

    Kafka、RabbitMQ、RocketMQ中間件的對比

    消息中間件現(xiàn)在有不少,網(wǎng)上很多文章都對其做過對比,在這我對其做進一步總結與整理。 ? ? RocketMQ 淘寶內(nèi)部的交易系統(tǒng)使用了淘寶自主研發(fā)的Notify消息中間件,使用Mysql作為消息存儲媒介,可完全水平擴容,為了進一步降低成本,我們認為存儲部分可以進一步優(yōu)化,201

    2024年02月05日
    瀏覽(25)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包