Kafka環(huán)境搭建
- 下載地址:https://link.zhihu.com/?target=https%3A//kafka.apache.org/downloads
- 解壓
- 啟動zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
需要注意的是 : " c o n f i g / z o o k e e p e r . p r o p e r t i e s " 目錄和 " / c o n f i g / z o o k e e p e r . p r o p e r t i e s " 目錄是不同的 . 前者指當(dāng)前目錄中 c o n f i g 目錄下的 z o o k e e p e r . p r o p e r t i e s 文件, 后者代表根目錄中 c o n f i g 目錄下的 z o o k e e p e r . p r o p e r t i e s 文件。 \color{red}需要注意的是:\\ "config/zookeeper.properties"目錄和 "/config/zookeeper.properties"目錄是不同的.\\ 前者指當(dāng)前目錄中config目錄下的zookeeper.properties文件,\\ 后者代表根目錄中config目錄下的zookeeper.properties文件。 需要注意的是:"config/zookeeper.properties"目錄和"/config/zookeeper.properties"目錄是不同的.前者指當(dāng)前目錄中config目錄下的zookeeper.properties文件,后者代表根目錄中config目錄下的zookeeper.properties文件。
若啟動不成功,需要將zookeeper.properties中的admin.EnableServer=false修改為admin.EnableServer=true
或者關(guān)閉zookeeper并重新啟動:
bin/zookeeper-server-stop.sh
- 啟動kafka
bin/kafka-server-start.sh config/server.properties
- 創(chuàng)建topic
kafka-topics.sh --create --zookeeper cluster1:9092,cluster2: 9092,cluster3: 9092--replication-factor 3 --partitions 1 --topic ljg
若發(fā)生錯誤:”zookeeper is not a recognized option”則將參數(shù)換成“—BOOTSTRAP-SERVER”,即:
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic ljg
上述兩者的區(qū)別是,–zookeeper 和cluster都是老版本的命令參數(shù),新版本可能不再支持。
- 創(chuàng)建生產(chǎn)者
kafka-console-producer.sh --broker-list cluster1:9092 --topic ljg
或者:
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic ljg
上述兩者的區(qū)別是,–broker-list 和cluster都是老版本的命令參數(shù),新版本可能不再支持。
- 創(chuàng)建消費(fèi)者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ljg
此時生產(chǎn)者即可進(jìn)入等待輸入,并將消息發(fā)送給消費(fèi)者。
2181端口用于管理Kafka集群的元數(shù)據(jù)信息,包括Kafka的配置信息、分區(qū)信息、消費(fèi)者信息等。而9092端口是Kafka Broker的默認(rèn)端口,用于接收和處理生產(chǎn)者和消費(fèi)者的消息,以及進(jìn)行數(shù)據(jù)的存儲和傳輸。
參考鏈接:https://www.cnblogs.com/anquing/p/14523046.html
maven下載安裝、設(shè)置
- 下載解壓
- 設(shè)置工作目錄
- 設(shè)置鏡像
- 編譯java項(xiàng)目
mave命令:
mvn clean:清理
mvn compile:編譯主程序
mvn test-compile:編譯測試程序
mvn test:執(zhí)行測試
mvn package:打包
mvn install:安裝
maven項(xiàng)目目錄結(jié)構(gòu):
Hello.java內(nèi)容:
package com.maven.test;
public class hello {
public String sayHello(String name){
return "Hello "+name+"!";
}
}
注意pom.xml中的name 和artifactId字段的區(qū)別。
pom.xml例子:
<?xml version="1.0" ?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.maven.test</groupId>
<artifactId>hello</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>hello</name>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.0</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
數(shù)據(jù)庫安裝和操作
安裝mabiadb數(shù)據(jù)庫
yum install mariadb-server
systemctl start mariadb
systemctl enable mariadb
mysql_secure_installation
創(chuàng)建數(shù)據(jù)表:
mysql -uroot -p
create database kafkaTestDB;
use kafkaTestDB;
create table kafkaTestTable(tickcount varchar(64), value varchar(64),time varchar(64));
java連接和操作數(shù)據(jù)庫:
package Main;
import java.sql.*;
public class JDBC {
public static void main(String[] args) throws SQLException, ClassNotFoundException {
// 1.加載驅(qū)動
Class.forName("com.mysql.cj.jdbc.Driver");
// 2.用戶信息和url
String url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true";
String username="root";
String password="root";
// 3.連接成功,數(shù)據(jù)庫對象 Connection
Connection connection = DriverManager.getConnection(url,username,password);
// 4.執(zhí)行SQL對象Statement,執(zhí)行SQL的對象
Statement statement = connection.createStatement();
// 5.執(zhí)行SQL的對象去執(zhí)行SQL,返回結(jié)果集
String sql = "SELECT *FROM studentinfo;";
ResultSet resultSet = statement.executeQuery(sql);
while(resultSet.next()){
System.out.println("SNo="+resultSet.getString("SNo"));
System.out.println("SName="+resultSet.getString("SName"));
System.out.println("Birth="+resultSet.getString("Birth"));
System.out.println("SPNo="+resultSet.getString("SPNo"));
System.out.println("Major="+resultSet.getString("Major"));
System.out.println("Grade="+resultSet.getString("Grade"));
System.out.println("SInstructor="+resultSet.getString("SInstructor"));
System.out.println("SPwd="+resultSet.getString("SPwd"));
}
// 6.釋放連接
resultSet.close();
statement.close();
connection.close();
}
}
JAVA代碼(maven)構(gòu)建生產(chǎn)者消費(fèi)者
工程目錄:
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bjtu.kafkaTest</groupId>
<artifactId>kafkaTest</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId> org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>0.8.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.26</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.8</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20180130</version>
</dependency>
</dependencies>
</project>
java源碼:
consumer:
package com.bjtu.kafkaTest;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.sql.*;
import java.util.Properties;
import org.json.*;
public class ConsumerDemo {
public static void mysqlAccess(String tick,String value,String t) {
try{
Class.forName("com.mysql.cj.jdbc.Driver");
String url = "jdbc:mysql://localhost:3306/";
String username="root";
String password="1234";
Connection connection = DriverManager.getConnection(url,username,password);
Statement statement = connection.createStatement();
String sql = "use kafkaTestDB;";
ResultSet resultSet = statement.executeQuery(sql);
//sql = "SELECT *FROM kafkaTestTable;";
//resultSet = statement.executeQuery(sql);
//while(resultSet.next()){
// System.out.println("tickcount:"+resultSet.getString("tickcount"));
//}
sql = "insert into kafkaTestTable values(" + tick +"," + value + "," + t + ")";
resultSet = statement.executeQuery(sql);
System.out.println(sql);
resultSet.close();
statement.close();
connection.close();
}
catch(Exception e){
System.out.println(e.toString());
}
}
public static void main(String[] args){
System.out.println("consumer start\r\n");
Statement statement = null;
Connection connection = null;
ResultSet resultSet = null;
try{
Class.forName("com.mysql.cj.jdbc.Driver");
String url = "jdbc:mysql://localhost:3306/";
String username="root";
String password="1234";
connection = DriverManager.getConnection(url,username,password);
statement = connection.createStatement();
String sql = "use kafkaTestDB;";
resultSet = statement.executeQuery(sql);
//sql = "SELECT *FROM kafkaTestTable;";
//resultSet = statement.executeQuery(sql);
//while(resultSet.next()){
// System.out.println("tickcount:"+resultSet.getString("tickcount"));
//}
}
catch(Exception e){
System.out.println(e.toString());
return;
}
Properties properties = new Properties();
properties.put("bootstrap.servers", "0.0.0.0:9092");
properties.put("group.id", "zabbix_perf");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
/**
* earliest
* 當(dāng)各分區(qū)下有已提交的offset時,從提交的offset開始消費(fèi);無提交的offset時,從頭開始消費(fèi)
* latest
* 當(dāng)各分區(qū)下有已提交的offset時,從提交的offset開始消費(fèi);無提交的offset時,消費(fèi)新產(chǎn)生的該分區(qū)下的數(shù)據(jù)
* none
* topic各分區(qū)都存在已提交的offset時,從offset后開始消費(fèi);只要有一個分區(qū)不存在已提交的offset,則拋出異常
*
*/
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "30000");
/**
* 反序列化
* 把kafka集群二進(jìn)制消息反序列化指定類型。
*/
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Arrays.asList("ljg"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);//100是超時時間
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, value = %s", record.offset(), record.value());
JSONObject jo = new JSONObject(record.value());
try{
String tick = jo.getString("tickcount");
String value = jo.getString("value");
String t = jo.getString("time");
//mysqlAccess(tick,value,t);
//System.out.println("tick:"+tick + ",value:"+value + ",time:"+t);
String sql = "insert into kafkaTestTable values(\"" +
tick + "\",\"" + value + "\",\"" + t + "\")";
System.out.println(sql);
int result = statement.executeUpdate(sql);
}catch(Exception e){
System.out.println(e.toString());
break;
}
}
}
//try{
//resultSet.close();
//statement.close();
//connection.close();
//}catch(Exception e){
// System.out.println(e.toString());
//}
}
}
producer:
package com.bjtu.kafkaTest;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Random;
import java.util.Date;
import java.time.LocalDate;
import org.joda.time.DateTime;
import java.text.SimpleDateFormat;
public class ProducerDemo {
public static void main(String[] args){
System.out.println("producer start\r\n");
Properties properties = new Properties();
/**
*bootstrap.server用于建立到Kafka集群的初始連接的主機(jī)/端口對的列表,如果有兩臺以上的機(jī)器,逗號分隔
*/
properties.put("bootstrap.servers", "0.0.0.0:9092");
/**
* acks有三種狀態(tài)
* acks=0 不等待服務(wù)器確認(rèn)直接發(fā)送消息,無法保證服務(wù)器收到消息數(shù)據(jù)
* acks=1 把消息記錄寫到本地,但不會保證所有的消息數(shù)據(jù)被確認(rèn)記錄的情況下進(jìn)行釋放
* acks=all 確認(rèn)所有的消息數(shù)據(jù)被同步副本確認(rèn),這樣保證了記錄不會丟失
*
*/
properties.put("acks", "all");
/**
* 設(shè)置成大于0將導(dǎo)致客戶端重新發(fā)送任何發(fā)送失敗的記錄
*
*/
properties.put("retries", 0);
/**
*16384字節(jié)是默認(rèn)設(shè)置的批處理的緩沖區(qū)
*/
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
/**
* 序列化類型。
* kafka是以鍵值對的形式發(fā)送到kafka集群的,key是可選的,value可以是任意類型,Message再被發(fā)送到kafka之前,Producer需要
* 把不同類型的消息轉(zhuǎn)化成二進(jìn)制類型。
*/
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = null;
try {
producer = new KafkaProducer<String, String>(properties);
Random r = new Random();
for (int i = 0; i < 1000; i++) {
int rv = r.nextInt(0x10000000);
long timestamp = System.currentTimeMillis();
Date date = new Date(timestamp + rv);
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
String formattedDate = sdf.format(date);
//System.out.println("格式化后的日期:" + formattedDate);
String msg = "{\"tickcount\":\"" + formattedDate + "\",\"value\":\"" +formattedDate +"\",\"time\":\"" + formattedDate + "\"}" ;
producer.send(new ProducerRecord<String, String>("ljg", msg));
System.out.println("Sent:" + msg);
//Thread.sleep(1);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
System.out.println("producer end\r\n");
}
}
安裝依賴包:
mvn idea:module
編譯運(yùn)行:
執(zhí)行:文章來源:http://www.zghlxwxcb.cn/news/detail-861878.html
mvn exec:java -Dexec.mainClass="com.bjtu.kafkaTest.ConsumerDemo"
mvn exec:java -Dexec.mainClass="com.bjtu.kafkaTest.ProducerDemo"
參考鏈接:
https://www.cnblogs.com/qqran/p/14772713.html文章來源地址http://www.zghlxwxcb.cn/news/detail-861878.html
到了這里,關(guān)于kafka大數(shù)據(jù)采集技術(shù)實(shí)驗(yàn)(未完待續(xù))的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!