提示:文章寫完后,目錄可以自動(dòng)生成,如何生成可參考右邊的幫助文檔
前言
提示:windows環(huán)境安裝失敗,Linux環(huán)境安裝成功(以下并沒有windows安裝示例)
一、安裝java(Kafka必須安裝java,因?yàn)閗afka依賴java核心)
下載地址:鏈接: https://www.oracle.com/java/technologies/downloads/#jdk20-linux
將文件放在Linux目錄中后進(jìn)行解壓:
假設(shè)我把[jdk-20_linux-x64_bin.tar.gz]包放在了/root/src/uap/web/third 目錄下
1、tar -zxvf jdk-20_linux-x64_bin.tar.gz
2、mv jdk.0.20 ./jdk
3、vim /etc/profile
JAVA_HOME=/root/src/uap/web/third/jdk
PATH=/root/src/uap/web/third/jdk/bin:$PATH
export JAVA_HOME
4、source /ect/profile
5、java -version (出現(xiàn)下圖極為成功)
二、安裝以及配置Kafka、zookeeper
1.下載Kafka(無需下載zookeeper,使用kafka自帶的即可)
下載地址:https://kafka.apache.org/downloads
提示:不要下載帶src的那個(gè),具體我也不知道,因?yàn)槲乙彩莻€(gè)小白
假設(shè)我把[kafka_2.12-3.5.1.tgz]包放在了/root/src/uap/web/third 目錄下
1、tar -zxvf kafka_2.12-3.5.1.tgz
2、mv kafka.2.12 ./kafka
3、創(chuàng)建kafka日志文件
mkdir -p ./kafka_data/log/kafka
mkdir -p ./kafka_data/log/zookeeper
mkdir -p ./kafka_data/zookeeper
4、cd ./kafka/config
vim server.properties
listeners=PLAINTEXT://localhost:9092 (34行左右,添加對應(yīng)的host、port)
broker.id=0
port=9092
host.name=192.168.1.241
log.dirs=/root/src/uap/web/third/kafka_data/log/kafka
zookeeper.connect=localhost:2181
wd
vim zookeeper.properties
dataDir=/root/src/uap/web/third/kafka_data/zookeeper
dataLogDir=/root/src/uap/web/third/kafka_data/log/zookeeper
clientPort=2181
maxClientCnxns=100
tickTimes=2000
initLimit=10
syncLimit=5
wd
5、cd ../ 進(jìn)入kafka目錄下
#啟動(dòng)zookeeper
./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
//如果其中報(bào)錯(cuò),大部分應(yīng)該是報(bào)JAVA_HOME 這個(gè)說明你沒有配置 /etc/profile 上面有
./bin/kafka-server-start.sh -daemon ./config/server.properties &
2.配置topid
代碼如下(示例):文章來源:http://www.zghlxwxcb.cn/news/detail-672666.html
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic myt
返回值:Created topic myt. 創(chuàng)建成功/否則失敗
3.安裝PHP的rdkafka,這個(gè)網(wǎng)上教程很多,基本上都是正確的
例如:阿里云開發(fā)者社區(qū),php安裝rdkafka教程
剩下邏輯就直接貼代碼了文章來源地址http://www.zghlxwxcb.cn/news/detail-672666.html
生產(chǎn)者:
public function producer(){
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', 'localhost:9092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("mytest");
//獲取數(shù)據(jù)庫數(shù)據(jù),存入kafka中
$wanchk = $this->db->query("SELECT * FROM hf_alarm_wanchk");
foreach ($wanchk as $k => $v){
$topic->produce(RD_KAFKA_PARTITION_UA, 0, array2json($v));
$producer->poll(0);
}
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
$producer->purge(RD_KAFKA_PURGE_F_QUEUE);
$producer->flush(10000);
}
消費(fèi)者:
//這個(gè)代碼需要使用終端運(yùn)行:
// /bin/php -c /etc/php.ini -f /入口文件目錄/index.php (類)consumer (方法)consumer
public function consumer()
{
$conf = new \RdKafka\Conf();
$conf->set('group.id', 'mytest');
$rk = new \RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('offset.store.method', 'broker');
$topicConf->set('auto.offset.reset', 'smallest');
$topic = $rk->newTopic('mytest', $topicConf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
$message = $topic->consume(0, 120 * 10000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
}
到了這里,關(guān)于PHP小白搭建Kafka環(huán)境以及初步使用rdkafka的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!