国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

kafka大數(shù)據(jù)采集技術(shù)實(shí)驗(yàn)(未完待續(xù))

這篇具有很好參考價值的文章主要介紹了kafka大數(shù)據(jù)采集技術(shù)實(shí)驗(yàn)(未完待續(xù))。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。

Kafka環(huán)境搭建

  1. 下載地址:https://link.zhihu.com/?target=https%3A//kafka.apache.org/downloads
  2. 解壓
  3. 啟動zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

需要注意的是 : " c o n f i g / z o o k e e p e r . p r o p e r t i e s " 目錄和 " / c o n f i g / z o o k e e p e r . p r o p e r t i e s " 目錄是不同的 . 前者指當(dāng)前目錄中 c o n f i g 目錄下的 z o o k e e p e r . p r o p e r t i e s 文件, 后者代表根目錄中 c o n f i g 目錄下的 z o o k e e p e r . p r o p e r t i e s 文件。 \color{red}需要注意的是:\\ "config/zookeeper.properties"目錄和 "/config/zookeeper.properties"目錄是不同的.\\ 前者指當(dāng)前目錄中config目錄下的zookeeper.properties文件,\\ 后者代表根目錄中config目錄下的zookeeper.properties文件。 需要注意的是:"config/zookeeper.properties"目錄和"/config/zookeeper.properties"目錄是不同的.前者指當(dāng)前目錄中config目錄下的zookeeper.properties文件,后者代表根目錄中config目錄下的zookeeper.properties文件。

若啟動不成功,需要將zookeeper.properties中的admin.EnableServer=false修改為admin.EnableServer=true
或者關(guān)閉zookeeper并重新啟動:

bin/zookeeper-server-stop.sh 
  1. 啟動kafka
bin/kafka-server-start.sh config/server.properties
  1. 創(chuàng)建topic
kafka-topics.sh --create --zookeeper cluster1:9092,cluster2: 9092,cluster3: 9092--replication-factor 3 --partitions 1 --topic ljg

若發(fā)生錯誤:”zookeeper is not a recognized option”則將參數(shù)換成“—BOOTSTRAP-SERVER”,即:

./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic ljg

上述兩者的區(qū)別是,–zookeeper 和cluster都是老版本的命令參數(shù),新版本可能不再支持。

  1. 創(chuàng)建生產(chǎn)者
kafka-console-producer.sh --broker-list cluster1:9092 --topic ljg

或者:

./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic ljg

上述兩者的區(qū)別是,–broker-list 和cluster都是老版本的命令參數(shù),新版本可能不再支持。

  1. 創(chuàng)建消費(fèi)者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ljg

此時生產(chǎn)者即可進(jìn)入等待輸入,并將消息發(fā)送給消費(fèi)者。

kafka大數(shù)據(jù)采集技術(shù)實(shí)驗(yàn)(未完待續(xù)),軟件開發(fā),kafka,大數(shù)據(jù),分布式

2181端口用于管理Kafka集群的元數(shù)據(jù)信息,包括Kafka的配置信息、分區(qū)信息、消費(fèi)者信息等。而9092端口是Kafka Broker的默認(rèn)端口,用于接收和處理生產(chǎn)者和消費(fèi)者的消息,以及進(jìn)行數(shù)據(jù)的存儲和傳輸。

參考鏈接:https://www.cnblogs.com/anquing/p/14523046.html

maven下載安裝、設(shè)置

  1. 下載解壓
  2. 設(shè)置工作目錄
  3. 設(shè)置鏡像
  4. 編譯java項(xiàng)目

mave命令:

mvn clean:清理
mvn compile:編譯主程序
mvn test-compile:編譯測試程序
mvn test:執(zhí)行測試
mvn package:打包
mvn install:安裝

maven項(xiàng)目目錄結(jié)構(gòu):
kafka大數(shù)據(jù)采集技術(shù)實(shí)驗(yàn)(未完待續(xù)),軟件開發(fā),kafka,大數(shù)據(jù),分布式

Hello.java內(nèi)容:

package com.maven.test;


public class hello {
  public String sayHello(String name){
    return "Hello "+name+"!";
  }
}

注意pom.xml中的name 和artifactId字段的區(qū)別。
pom.xml例子:

<?xml version="1.0" ?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.maven.test</groupId>
    <artifactId>hello</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <name>hello</name>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.0</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

數(shù)據(jù)庫安裝和操作

安裝mabiadb數(shù)據(jù)庫

yum install mariadb-server 
systemctl start mariadb
systemctl enable mariadb
mysql_secure_installation

創(chuàng)建數(shù)據(jù)表:

mysql -uroot -p
create database kafkaTestDB;
use kafkaTestDB;
create table kafkaTestTable(tickcount varchar(64), value  varchar(64),time varchar(64));

java連接和操作數(shù)據(jù)庫:

package Main;

import java.sql.*;

public class JDBC {
    public static void main(String[] args) throws SQLException, ClassNotFoundException {
//        1.加載驅(qū)動
        Class.forName("com.mysql.cj.jdbc.Driver");
//        2.用戶信息和url
        String url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true";
        String username="root";
        String password="root";
//        3.連接成功,數(shù)據(jù)庫對象 Connection
        Connection connection = DriverManager.getConnection(url,username,password);
//        4.執(zhí)行SQL對象Statement,執(zhí)行SQL的對象
        Statement statement = connection.createStatement();
//        5.執(zhí)行SQL的對象去執(zhí)行SQL,返回結(jié)果集
        String sql = "SELECT *FROM studentinfo;";
        ResultSet resultSet = statement.executeQuery(sql);
        while(resultSet.next()){
            System.out.println("SNo="+resultSet.getString("SNo"));
            System.out.println("SName="+resultSet.getString("SName"));
            System.out.println("Birth="+resultSet.getString("Birth"));
            System.out.println("SPNo="+resultSet.getString("SPNo"));
            System.out.println("Major="+resultSet.getString("Major"));
            System.out.println("Grade="+resultSet.getString("Grade"));
            System.out.println("SInstructor="+resultSet.getString("SInstructor"));
            System.out.println("SPwd="+resultSet.getString("SPwd"));
        }
//        6.釋放連接
        resultSet.close();
        statement.close();
        connection.close();
    }
}


JAVA代碼(maven)構(gòu)建生產(chǎn)者消費(fèi)者

工程目錄:

kafka大數(shù)據(jù)采集技術(shù)實(shí)驗(yàn)(未完待續(xù)),軟件開發(fā),kafka,大數(shù)據(jù),分布式

pom.xml:


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>com.bjtu.kafkaTest</groupId>
    <artifactId>kafkaTest</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
 
    <dependencies>
 
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.2.1</version>
        </dependency>
 
        <dependency>
            <groupId> org.apache.cassandra</groupId>
            <artifactId>cassandra-all</artifactId>
            <version>0.8.1</version>
 
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>8.0.26</version>
</dependency>

<dependency>
   <groupId>joda-time</groupId>
   <artifactId>joda-time</artifactId>
   <version>2.9.8</version>
</dependency>

<dependency>
    <groupId>org.json</groupId>
    <artifactId>json</artifactId>
    <version>20180130</version>
</dependency>

    </dependencies>
 
</project>


java源碼:

consumer:

package com.bjtu.kafkaTest;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
 
import java.util.Arrays;

import java.sql.*;
import java.util.Properties;
import org.json.*;
 




public class ConsumerDemo {

	


    public static void mysqlAccess(String tick,String value,String t)  {

	try{

		Class.forName("com.mysql.cj.jdbc.Driver");

		String url = "jdbc:mysql://localhost:3306/";

		String username="root";
		String password="1234";

		Connection connection = DriverManager.getConnection(url,username,password);

		Statement statement = connection.createStatement();

		String sql = "use kafkaTestDB;";
		ResultSet resultSet = statement.executeQuery(sql);
		//sql = "SELECT *FROM kafkaTestTable;";
		//resultSet = statement.executeQuery(sql);
		//while(resultSet.next()){
		//    System.out.println("tickcount:"+resultSet.getString("tickcount"));
		//}
		sql = "insert into kafkaTestTable values(" + tick +"," + value + "," + t + ")";
		resultSet = statement.executeQuery(sql);
		System.out.println(sql);
	
		resultSet.close();
		statement.close();
		connection.close();
	}
	catch(Exception e){
		System.out.println(e.toString());
	}

    }






    public static void main(String[] args){

	System.out.println("consumer start\r\n");
	Statement statement = null;
	Connection connection = null;
	ResultSet resultSet = null;
	try{

		Class.forName("com.mysql.cj.jdbc.Driver");

		String url = "jdbc:mysql://localhost:3306/";

		String username="root";
		String password="1234";

		connection = DriverManager.getConnection(url,username,password);

		statement = connection.createStatement();

		String sql = "use kafkaTestDB;";
		resultSet = statement.executeQuery(sql);
		//sql = "SELECT *FROM kafkaTestTable;";
		//resultSet = statement.executeQuery(sql);
		//while(resultSet.next()){
		//    System.out.println("tickcount:"+resultSet.getString("tickcount"));
		//}

	}
	catch(Exception e){
		System.out.println(e.toString());
		return;
	}

        Properties properties = new Properties();
      	properties.put("bootstrap.servers", "0.0.0.0:9092");
    
        properties.put("group.id", "zabbix_perf");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        /**
         * earliest
         *   當(dāng)各分區(qū)下有已提交的offset時,從提交的offset開始消費(fèi);無提交的offset時,從頭開始消費(fèi)
         *   latest
         *   當(dāng)各分區(qū)下有已提交的offset時,從提交的offset開始消費(fèi);無提交的offset時,消費(fèi)新產(chǎn)生的該分區(qū)下的數(shù)據(jù)
         *   none
         *   topic各分區(qū)都存在已提交的offset時,從offset后開始消費(fèi);只要有一個分區(qū)不存在已提交的offset,則拋出異常
         *
         */
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "30000");
        /**
         * 反序列化
         * 把kafka集群二進(jìn)制消息反序列化指定類型。
         */
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Arrays.asList("ljg"));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);//100是超時時間

	
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, value = %s", record.offset(), record.value());
		

		JSONObject jo = new JSONObject(record.value());
		try{
			String tick = jo.getString("tickcount");
			String value = jo.getString("value");
			String t = jo.getString("time");
			//mysqlAccess(tick,value,t);
			//System.out.println("tick:"+tick + ",value:"+value + ",time:"+t);


			String sql = "insert into kafkaTestTable values(\"" + 
	tick + "\",\"" + value + "\",\"" + t + "\")";
			System.out.println(sql);
			int result = statement.executeUpdate(sql);
			
	

		}catch(Exception e){
			System.out.println(e.toString());
			break;
		}

            }
        }
 

	//try{
	//resultSet.close();
	//statement.close();
	//connection.close();
	//}catch(Exception e){
	//	System.out.println(e.toString());
	//}
    }

}


producer:

package com.bjtu.kafkaTest;
 
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
 
import java.util.Properties;
import java.util.Random;
import java.util.Date;

import java.time.LocalDate;
import org.joda.time.DateTime;

import java.text.SimpleDateFormat;



public class ProducerDemo {

    public static void main(String[] args){

	System.out.println("producer start\r\n");
 
        Properties properties = new Properties();
        /**
         *bootstrap.server用于建立到Kafka集群的初始連接的主機(jī)/端口對的列表,如果有兩臺以上的機(jī)器,逗號分隔
         */
        properties.put("bootstrap.servers", "0.0.0.0:9092");
        /**
         * acks有三種狀態(tài)
         * acks=0 不等待服務(wù)器確認(rèn)直接發(fā)送消息,無法保證服務(wù)器收到消息數(shù)據(jù)
         * acks=1 把消息記錄寫到本地,但不會保證所有的消息數(shù)據(jù)被確認(rèn)記錄的情況下進(jìn)行釋放
         * acks=all 確認(rèn)所有的消息數(shù)據(jù)被同步副本確認(rèn),這樣保證了記錄不會丟失
         *
         */
        properties.put("acks", "all");
        /**
         * 設(shè)置成大于0將導(dǎo)致客戶端重新發(fā)送任何發(fā)送失敗的記錄
         *
         */
        properties.put("retries", 0);
        /**
         *16384字節(jié)是默認(rèn)設(shè)置的批處理的緩沖區(qū)
         */
        properties.put("batch.size", 16384);
 
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        /**
         * 序列化類型。
         * kafka是以鍵值對的形式發(fā)送到kafka集群的,key是可選的,value可以是任意類型,Message再被發(fā)送到kafka之前,Producer需要
         * 把不同類型的消息轉(zhuǎn)化成二進(jìn)制類型。
         */
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = null;
        try {
            producer = new KafkaProducer<String, String>(properties);

		Random r = new Random();

            for (int i = 0; i < 1000; i++) {

		int rv = r.nextInt(0x10000000);

		long timestamp = System.currentTimeMillis();
		Date date = new Date(timestamp + rv);
		SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
		String formattedDate = sdf.format(date);
		//System.out.println("格式化后的日期:" + formattedDate);


                String msg = "{\"tickcount\":\"" + formattedDate + "\",\"value\":\"" +formattedDate +"\",\"time\":\"" 			+ formattedDate + "\"}" ;

                producer.send(new ProducerRecord<String, String>("ljg", msg));

                System.out.println("Sent:" + msg);

		//Thread.sleep(1);
            }
        } catch (Exception e) {
            e.printStackTrace();
 
        } finally {
            producer.close();
        }
 
	System.out.println("producer end\r\n");
    }
}

安裝依賴包:

mvn idea:module

編譯運(yùn)行:

kafka大數(shù)據(jù)采集技術(shù)實(shí)驗(yàn)(未完待續(xù)),軟件開發(fā),kafka,大數(shù)據(jù),分布式

執(zhí)行:

mvn exec:java -Dexec.mainClass="com.bjtu.kafkaTest.ConsumerDemo"


mvn exec:java -Dexec.mainClass="com.bjtu.kafkaTest.ProducerDemo"

參考鏈接:
https://www.cnblogs.com/qqran/p/14772713.html文章來源地址http://www.zghlxwxcb.cn/news/detail-861878.html

到了這里,關(guān)于kafka大數(shù)據(jù)采集技術(shù)實(shí)驗(yàn)(未完待續(xù))的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【flume實(shí)時采集mysql數(shù)據(jù)庫的數(shù)據(jù)到kafka】

    【flume實(shí)時采集mysql數(shù)據(jù)庫的數(shù)據(jù)到kafka】

    最近做了flume實(shí)時采集mysql數(shù)據(jù)到kafka的實(shí)驗(yàn),做個筆記,防止忘記 ?。?!建議從頭看到尾,因?yàn)橐恍┖唵蔚臇|西我在前面提了,后面沒提。 Kafka搭建:https://blog.csdn.net/cjwfinal/article/details/120803013 flume搭建:https://blog.csdn.net/cjwfinal/article/details/120441503?spm=1001.2014.3001.5502 編寫配置

    2024年02月03日
    瀏覽(25)
  • Kafka數(shù)據(jù)采集至Elasticsearch的Filebeat配置

    Filebeat是一種輕量級的開源日志數(shù)據(jù)采集器,用于將各種日志數(shù)據(jù)發(fā)送到Elasticsearch等目標(biāo)存儲系統(tǒng)。本文將介紹如何使用Filebeat配置將Kafka中的數(shù)據(jù)采集并發(fā)送至Elasticsearch。 在開始之前,請確保已經(jīng)安裝并配置了Filebeat、Kafka和Elasticsearch。以下是配置的步驟: 配置Filebeat 打開

    2024年02月08日
    瀏覽(29)
  • 大數(shù)據(jù)之使用Flume監(jiān)聽端口采集數(shù)據(jù)流到Kafka

    大數(shù)據(jù)之使用Flume監(jiān)聽端口采集數(shù)據(jù)流到Kafka

    前言 題目: 一、讀題分析 二、處理過程?? 1.先在Kafka中創(chuàng)建符合題意的Kafka的topic ?創(chuàng)建符合題意的Kafka的topic 2.寫出Flume所需要的配置文件 3.啟動腳本然后啟動Flume監(jiān)聽端口數(shù)據(jù)并傳到Kafka 啟動flume指令 啟動腳本,觀察Flume和Kafka的變化 三、重難點(diǎn)分析 總結(jié)? ????????本題

    2024年02月08日
    瀏覽(27)
  • Kafka數(shù)據(jù)流的實(shí)時采集與統(tǒng)計機(jī)制

    隨著大數(shù)據(jù)時代的到來,實(shí)時數(shù)據(jù)處理成為了眾多企業(yè)和組織的關(guān)注焦點(diǎn)。為了滿足這一需求,Apache Kafka成為了一個廣泛采用的分布式流處理平臺。Kafka以其高吞吐量、可擴(kuò)展性和容錯性而聞名,被廣泛應(yīng)用于日志收集、事件驅(qū)動架構(gòu)和實(shí)時分析等場景。 在本文中,我們將探

    2024年02月07日
    瀏覽(28)
  • spark DStream從不同數(shù)據(jù)源采集數(shù)據(jù)(RDD 隊(duì)列、文件、diy 采集器、kafka)(scala 編程)

    目錄 1. RDD隊(duì)列 2 textFileStream 3 DIY采集器 4 kafka數(shù)據(jù)源【重點(diǎn)】 ? ? ? ?a、使用場景:測試 ? ? ? ?b、實(shí)現(xiàn)方式: 通過ssc.queueStream(queueOfRDDs)創(chuàng)建DStream,每一個推送這個隊(duì)列的RDD,都會作為一個DStream處理 ? ? 1. 自定義采集器 ? ? 2. 什么情況下需要自定采集器呢? ? ? ? ? ?比

    2024年02月07日
    瀏覽(48)
  • filebeat采集日志數(shù)據(jù)到kafka(一)(filebeat->kafka->logstash->es)

    filebeat采集日志數(shù)據(jù)到kafka(一)(filebeat->kafka->logstash->es)

    一、filebeat安裝 filebeat-kafka版本適配 1、安裝包下載 https://www.elastic.co/cn/downloads/past-releases#filebeat 解壓 2、新建yml配置文件,如test.yml 3、Filebeat啟動和停止 啟動:./filebeat -c test.yml 停止:kill -9 PID

    2024年02月16日
    瀏覽(25)
  • (二十八)大數(shù)據(jù)實(shí)戰(zhàn)——Flume數(shù)據(jù)采集之kafka數(shù)據(jù)生產(chǎn)與消費(fèi)集成案例

    (二十八)大數(shù)據(jù)實(shí)戰(zhàn)——Flume數(shù)據(jù)采集之kafka數(shù)據(jù)生產(chǎn)與消費(fèi)集成案例

    本節(jié)內(nèi)容我們主要介紹一下flume數(shù)據(jù)采集和kafka消息中間鍵的整合。通過flume監(jiān)聽nc端口的數(shù)據(jù),將數(shù)據(jù)發(fā)送到kafka消息的first主題中,然后在通過flume消費(fèi)kafka中的主題消息,將消費(fèi)到的消息打印到控制臺上。集成使用flume作為kafka的生產(chǎn)者和消費(fèi)者。關(guān)于nc工具、flume以及kafka的

    2024年02月09日
    瀏覽(20)
  • 大數(shù)據(jù)采集技術(shù)與預(yù)處理學(xué)習(xí)一:大數(shù)據(jù)概念、數(shù)據(jù)預(yù)處理、網(wǎng)絡(luò)數(shù)據(jù)采集

    大數(shù)據(jù)采集技術(shù)與預(yù)處理學(xué)習(xí)一:大數(shù)據(jù)概念、數(shù)據(jù)預(yù)處理、網(wǎng)絡(luò)數(shù)據(jù)采集

    目錄 大數(shù)據(jù)概念: 1.數(shù)據(jù)采集過程中會采集哪些類型的數(shù)據(jù)? 2.非結(jié)構(gòu)化數(shù)據(jù)采集的特點(diǎn)是什么? 3.請闡述傳統(tǒng)的數(shù)據(jù)采集與大數(shù)據(jù)采集的區(qū)別? ???????????????4.大數(shù)據(jù)采集的數(shù)據(jù)源有哪些?針對不同的數(shù)據(jù)源,我們可以采用哪些不同的方法和工具? 數(shù)據(jù)

    2024年01月25日
    瀏覽(30)
  • 【數(shù)倉】通過Flume+kafka采集日志數(shù)據(jù)存儲到Hadoop

    【數(shù)倉】通過Flume+kafka采集日志數(shù)據(jù)存儲到Hadoop

    【數(shù)倉】基本概念、知識普及、核心技術(shù) 【數(shù)倉】數(shù)據(jù)分層概念以及相關(guān)邏輯 【數(shù)倉】Hadoop軟件安裝及使用(集群配置) 【數(shù)倉】Hadoop集群配置常用參數(shù)說明 【數(shù)倉】zookeeper軟件安裝及集群配置 【數(shù)倉】kafka軟件安裝及集群配置 【數(shù)倉】flume軟件安裝及配置 【數(shù)倉】flum

    2024年03月17日
    瀏覽(29)
  • 大數(shù)據(jù)的關(guān)鍵技術(shù)之——大數(shù)據(jù)采集

    大數(shù)據(jù)的關(guān)鍵技術(shù)之——大數(shù)據(jù)采集

    本文目錄: 一、寫在前面的話 二、大數(shù)據(jù)采集概念 三、大數(shù)據(jù)采集步驟 3.1、大數(shù)據(jù)采集步驟(總體角度) 3.2、大數(shù)據(jù)采集步驟(數(shù)據(jù)集角度) 3.3、大數(shù)據(jù)采集步驟(數(shù)據(jù)集角度) 四、數(shù)據(jù)源與數(shù)據(jù)類型的關(guān)系 4.1、大數(shù)據(jù)體系數(shù)據(jù) 4.2、數(shù)據(jù)源與數(shù)據(jù)類型的關(guān)系 五、大數(shù)據(jù)

    2024年02月08日
    瀏覽(20)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包