原文作者:我輩李想
版權聲明:文章原創(chuàng),轉(zhuǎn)載時請務必加上原文超鏈接、作者信息和本聲明。
前言
隊列是一種特殊的線性表,特殊之處在于它只允許在表的前端(front)進行刪除操作,而在表的后端(rear)進行插入操作,和棧一樣,隊列是一種操作受限制的線性表。進行插入操作的端稱為隊尾,進行刪除操作的端稱為隊頭。
消息隊列是一種中間件 ,用于在不同的組件或系統(tǒng)之間傳遞消息(進程間通訊的一種)。 它提供了一種可靠的機制(AMQP)來存儲和傳遞消息,并確保消息的順序性和可靠性。消息隊列需要存儲消息。
Apache Kafka是一個開源消息系統(tǒng),由Scala寫成。是由Apache軟件基金會開發(fā)的一個開源消息系統(tǒng)項目。Kafka是一個分布式消息隊列:生產(chǎn)者、消費者的功能。Kafka對消息保存時根據(jù)Topic進行歸類,發(fā)送消息者稱為Producer,消息接受者稱為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)成為broker。
一、Kafka安裝
1.下載并安裝Java
Kafka 是基于 Java 開發(fā)的,因此需要先安裝 Java 環(huán)境。如果你已經(jīng)安裝了 Java 環(huán)境,可以跳過這一步。
在命令行中輸入以下命令:
sudo apt-get update
# linux命令行下,安裝jdk
sudo apt-get install openjdk-8-jdk
# 查看安裝結果
java -version
2.下載和解壓 Kafka
下載 Kafka 壓縮包,并解壓到 /opt 目錄下。
cd ~
sudo wget https://archive.apache.org/dist/kafka/3.4.0/kafka_2.13-3.4.0.tgz
sudo tar -xzf kafka_2.13-3.4.0.tgz
cd /opt
sudo mv cd ~/kafka_2.13-3.4.0 kafka
3.配置 Kafka
接下來需要更改 Kafka 的配置文件。打開 config/server.properties
文件,并進行以下更改:
sudo vim /opt/kafka/config/server.properties
advertised.listeners=PLAINTEXT://<your-server-IP-address>:9092
listeners=PLAINTEXT://0.0.0.0:9092
確保將 <your-server-IP-address>
替換為實際的服務器 IP 地址。(ifconfig查看本機ip)
4.啟動 Kafka
Kafka 啟動有兩種方式:單機模式和分布式模式。
- 單機模式
在單機模式下,Kafka 只有一個 broker。在命令行中輸入以下命令:
cd /opt/kafka
sudo /opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties # 在一個窗口,或后臺運行
sudo /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties # 另起一個窗口,或后臺運行
- 分布式模式
在分布式模式中,Kafka 包含多個 broker。在命令行中輸入以下命令:
首先,編輯 config/server.properties 文件,設置以下屬性:
broker.id=0 # 設置當前 broker 的 id,不能重復
listeners=PLAINTEXT://your.server.ip.address:9092 # 設置監(jiān)聽地址和端口
log.dirs=/tmp/kafka-logs # 設置日志目錄
復制 broker.id=0 的行,修改 broker.id 的值,設置多個 broker 的 id。
接下來,啟動 ZooKeeper:
cd /opt/kafka
sudo /opt/kafka/bin/zookeeper-server-start.sh config/zookeeper.properties
最后,每個 broker 啟動 Kafka:
cd /opt/kafka
sudo /opt/kafka/bin/kafka-server-start.sh config/server.properties
5.創(chuàng)建主題和生產(chǎn)者/消費者
可以使用 Kafka 自帶的命令行工具創(chuàng)建主題、發(fā)送消息和消費消息。
- 創(chuàng)建主題
cd /opt/kafka
sudo /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server your.server.ip.address:9092 --replication-factor 1 --partitions 1 --topic test-topic
- 查看
sudo /opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
- 刪除
sudo /opt/kafka/bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test-topic
6.發(fā)布和訂閱消息
現(xiàn)在已經(jīng)成功地部署了 Kafka,并創(chuàng)建了一個 topic??梢允褂靡韵旅钤?topic 中發(fā)布和訂閱消息:
發(fā)布消息:# 另起一個窗口
sudo /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
訂閱消息:# 另起一個窗口
sudo /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
以上是在 Ubuntu 上部署 Kafka 的基本步驟,你可以根據(jù)實際情況進行修改。
二、Kafka+Django生產(chǎn)和消費
confluent-kafka 和kafka-python是python中處理kafka的三方包,這里以confluent-kafka為例。
pip install confluent-kafka
1.Django配置文件
在settings.py中加入配置
KAFKA_SETTINGS = {
'bootstrap.servers': 'localhost:9092', # localhost替換為kafka服務的ip
'group.id': 'mygroup',
'auto.offset.reset': 'earliest',
}
2.通過django命令實現(xiàn)消費
- kafka消費處理kafka_consumer.py文件
from confluent_kafka import Consumer, KafkaError
from django.conf import settings
def kafka_handler():
c = Consumer(settings.KAFKA_SETTINGS)
c.subscribe(['test-topic'])
while True:
msg = c.poll(1.0)
print(111,msg)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print('End of partition reached')
else:
print('Error: {}'.format(msg.error()))
else:
print('Received message: {}'.format(msg.value()))
-
使用django的startapp新增kafka對app
cd ptoject # manage.py的同級目錄 django-admin startapp kafka python manage.py startapp kafka
-
在django配置文件settings中添加kafka至INSTALLED_APPS
INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'kafka' ]
-
自定義django命令
可以參看這個鏈接:https://www.osgeo.cn/django/howto/custom-management-commands.html在kafka下新建 management/commands二級文件夾
其中kafka_consumer是第一步創(chuàng)建的消費處理程序,my_消費是我們自定義的django命令程序。my_消費.py的內(nèi)容如下:
# -*- coding:utf-8 _*-
"""
@author:Administrator
@file: my_消費.py
@time: 2023/7/16 0016 12:32
"""
from django.core.management.base import BaseCommand, CommandError
from kafka_consumer import kafka_handler
class Command(BaseCommand):
help = 'Closes the specified poll for voting'
def handle(self, *args, **options):
kafka_handler()
- 啟動消費命令
切換至虛擬環(huán)境,目錄切換至django項目目錄,及mnange.py的同級目錄。
python manage.py my_消費
3.通過Django生產(chǎn)
- 創(chuàng)建kafka_producer.py生產(chǎn)文件
# -*- coding:utf-8 _*-
"""
@author:Administrator
@file: kafka_producer.py
@time: 2023/7/16 0016 12:28
"""
from confluent_kafka import Producer
from django.conf import settings
def send_message(message):
p = Producer({'bootstrap.servers': settings.KAFKA_SETTINGS.get('bootstrap.servers')})
topic = 'test-topic'
p.produce(topic, message.encode('utf-8'))
p.flush()
if __name__ == '__main__':
send_message('測試')
- 通過django命令生產(chǎn)消息
這里還是用的自定義命令,自定義文件為my_消費.py
# -*- coding:utf-8 _*-
"""
@author:Administrator
@file: my_消費.py
@time: 2023/7/16 0016 12:32
"""
from django.core.management.base import BaseCommand, CommandError
from kafka_producer import send_message
class Command(BaseCommand):
help = 'Closes the specified poll for voting'
def handle(self, *args, **options):
send_message('111')
啟動方式與消費一樣,python manage.py my_生產(chǎn)文章來源:http://www.zghlxwxcb.cn/news/detail-573315.html
- 通過django程序生產(chǎn)消息
kafka_producer文件中的send_message方法可以在程序中被調(diào)用,我們在處理用戶請求時,可以通過異步的方式處理send_message。文章來源地址http://www.zghlxwxcb.cn/news/detail-573315.html
到了這里,關于【Kafka】Ubuntu 部署kafka中間件,實現(xiàn)Django生產(chǎn)和消費的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!