国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

大數(shù)據(jù)-Storm流式框架(二)--wordcount案例

這篇具有很好參考價值的文章主要介紹了大數(shù)據(jù)-Storm流式框架(二)--wordcount案例。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

大數(shù)據(jù)-Storm流式框架(二)--wordcount案例,Storm,大數(shù)據(jù),storm

一、編寫wordcount案例

1、新建java項目

2、添加storm的jar包

storm軟件包中l(wèi)ib目錄下的所有jar包

3、編寫java類

WordCountTopology.java
package com.bjsxt.storm.wc;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;

public class WordCountTopology {

    public static void main(String[] args) {
        // 拓撲封裝了計算邏輯
        TopologyBuilder builder = new TopologyBuilder();
        // 設置lineSpout:該spout負責向拓撲發(fā)送句子
        builder.setSpout("lineSpout", new LineSpout());
        // 設置切分閃電,該閃電處理從水龍頭lineSpout通過隨機分組發(fā)送過來的元組
        builder.setBolt("splitBolt", new SplitBolt())
            .shuffleGrouping("lineSpout");
        // 定義一個計數(shù)閃電,該閃電從splitBolt閃電通過按字段分組的方式分發(fā)過來的元組
        // 按照元組中word的值進行分組。要保證相同的單詞一定發(fā)送給同一個閃電。
        builder.setBolt("countBolt", new CountBolt())
                .fieldsGrouping("splitBolt", new Fields("word"));

        // 通過建造者創(chuàng)建一個拓撲的實例
        StormTopology wordCountTopology = builder.createTopology();

        // 本地模擬集群
        LocalCluster cluster = new LocalCluster();

        Config config = new Config();

        // 將拓撲提交到本地模擬集群
        cluster.submitTopology("wordCountTopology", config, wordCountTopology);

        // 睡眠10s,也就是讓本地模擬集群運行10s
        Utils.sleep(10000);

        // 關閉本地模擬集群
        cluster.shutdown();

    }

}
LineSpout.java
package com.bjsxt.storm.wc;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

import java.util.Map;

public class LineSpout extends BaseRichSpout {

    private SpoutOutputCollector collector;

    private String[] lines = {
            "The logic for a realtime application is packaged into a Storm topology",
            "A stream is an unbounded sequence of tuples that is processed and created in parallel in a distributed fashion",
            "A spout is a source of streams in a topology",
            "Bolts can do anything from filtering, functions, aggregations, joins, talking to databases, and more.",
            "A stream grouping defines how that stream should be partitioned among the bolt's tasks.",
            "Storm guarantees that every spout tuple will be fully processed by the topology",
            "Each spout or bolt executes as many tasks across the cluster",
            "Each worker process is a physical JVM and executes a subset of all the tasks for the topology"
    };

    private int index = 0;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        // 在該組件在集群中初始化的時候調用一次
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        // 由storm的線程不停地調用,以便從數(shù)據(jù)源獲取元組
        // 該方法不需要自己寫循環(huán)和遍歷
        // 該方法不能阻塞
        // 負責從數(shù)據(jù)源獲取元組,向DAG發(fā)送元組
        // 輪詢取出句子
        String lingStr = lines[index % lines.length];
        // 將句子封裝為元組發(fā)射
        collector.emit(new Values(lingStr));
        index++;

        Utils.sleep(10);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // 用于聲明元組的結構以及流
//        declarer.declareStream("s1", new Fields("key1", "key2", "key3"));
//        declarer.declareStream("s2", new Fields("key21", "key22"));
        // 發(fā)送元組的時候就有一個字段,是line,它的值是句子
        // 可以將元組想象為map集合,只不過其key是固定的幾個
        declarer.declare(new Fields("line"));
    }
}
SplitBolt.java
package com.bjsxt.storm.wc;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.Map;

public class SplitBolt extends BaseRichBolt {

    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        String lineStr = input.getStringByField("line");
        String[] wordStrs = lineStr.split(" ");

        for (String wordStr : wordStrs) {
            // <hello, 1>
            this.collector.emit(new Values(wordStr, 1));
        }

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}
CountBolt.java
package com.bjsxt.storm.wc;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;

public class CountBolt extends BaseRichBolt {

    private Map<String, Integer> counts;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        counts = new HashMap<>();
    }

    @Override
    public void execute(Tuple input) {
//        new Fields("word", "count")
        String wordStr = input.getStringByField("word");
        Integer count = input.getIntegerByField("count");

        Integer sum = counts.get(wordStr);

        if (sum == null) {
            counts.put(wordStr, count);
        } else {
            counts.put(wordStr, sum + count);
        }

        counts.forEach((k, v) -> {
            System.out.println(k + "_________" + v);
        });
        System.out.println("========================================");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

4、運行

右鍵運行WordCountTopology

二、Storm整體架構

大數(shù)據(jù)-Storm流式框架(二)--wordcount案例,Storm,大數(shù)據(jù),storm

Storm配置項

說明

java.library.path

Storm本身依賴包的路徑,存在多個時用冒號分隔

storm.local.dir

Storm使用的本地文件系統(tǒng)目錄(必須存在并且storm進程可讀寫)。默認是storm的根目錄下的storm-local。

storm.zookeeper.servers

storm集群對應的zookeeper集群的主機列表

storm.zookeeper.port

storm集群對應的zookeeper集群的服務端口,zookeeper默認端口為2181

storm.zookeeper.root

storm的元數(shù)據(jù)在zookeeper中存儲的根目錄,默認值是/storm

storm.cluster.mode

storm運行模式,local或distributed。集群模式需設置為distributed

storm.messaging.transport

storm的消息傳輸機制,使用netty作為消息傳輸時設置為backtype.storm.messaging.netty.Context

nimbus.host

整個storm集群的nimbus節(jié)點

nimbus.supervisor.timeout.secs

storm中每個被發(fā)射出去的消息處理的超時時間,該時間影響到消息的處理,同時在storm?ui上殺掉一個拓撲時的默認時間(kill動作發(fā)出后多長時間才會真正將該拓撲殺掉)。默認值是60

ui.port

storm自帶UI,以http服務形式支持訪問,此處設置該http服務的端口(非root用戶端口號需要大于1024)

ui.childopts

storm UI進程的java參數(shù)設置(對java進程的約束都可以在此設置,如內存等)

logviewer.port

此處用于設置該Log?Viewer進程的端口(Log?Viewer進程也是http形式,需要運行在每個storm節(jié)點上)。默認值8000

logviewer.childopts

Log Viewer進程的參數(shù)設置

logviewer.appender.name

storm?log4j的appender,設置的名字對應于文件storm/log4j2/cluster.xml中設置的appender,cluster.xml可以控制storm?logger的級別

supervisor.slots.ports

storm的slot,最好設置為OS核數(shù)的整數(shù)倍;同時由于storm是基于內存的實時計算,slot數(shù)不要大于每臺物理機可運行slot個數(shù):(物理內存-虛擬內存)/單個java進程最大可占用內存數(shù)

worker.childopts

storm的worker進程的java限制,有效地設置該參數(shù)能夠在拓撲異常時進行原因分析:

-Xms1024m -Xmx1024m -XX:+UseConcMarkSweepGC -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -XX:+HeapDumpOnOutOfMemoryError

其中:Xms為單個java進程最小占用內存數(shù),Xmx為最大占用內存數(shù),設置HeapDumpOnOutOfMemoryError的好處是,當內存使用量超過Xmx時,java進程將被JVM殺掉同時會生成java_pid<pid數(shù)字>.hprof文件,使用MemoryAnalyzer分析hprof文件將能分析出內存使用情況從而進行相應的調整、分析是否有內存溢出等情況

storm.messaging.netty.buffer_size

netty傳輸?shù)腷uffer大小,默認為5MB,當spout發(fā)射的消息較大時,此處需要對應調整

storm.messaging.netty.max_retries

這幾個參數(shù)是關于使用netty作為底層消息傳輸時的相關設置,需要重視,否則可能由于bug而引起錯誤:

java.lang.IllegalArgumentException: timeout value is negative

storm.messaging.netty.max_wait_ms

storm.messaging.netty.min_wait_ms

topology.debug

該參數(shù)可以在拓撲中覆蓋,表示該拓撲是否運行于debug模式。運行于debug模式時,storm將記錄拓撲中收發(fā)消息等的詳細信息,線上環(huán)境不建議打開

topology.acker.executors

storm通過acker機制保證消息不丟失,此參數(shù)用于設置每個拓撲的acker數(shù)量,由于acker基本消耗的資源較小,強烈建議將此參數(shù)設置在較低的水平,可以在拓撲中進行覆蓋

topology.max.spout.pending

一個spout任務中處于pending狀態(tài)的最大元組數(shù)量。該配置應用于單個任務,而不是整個spout或拓撲,可在拓撲中進行覆蓋。

此外,storm/log4j2/cluster.xml文件中可以配置storm的日志級別矩陣信息等。

操作系統(tǒng)的配置,其中有兩項需要配置(通過ulimit?-a查看):

1、open?files:當前用戶可以打開的文件描述符數(shù)。

2、max?user processes:當前用戶可以運行的進程數(shù),此參數(shù)太小將引起storm的一個錯誤:

java.lang.OutOfMemoryError: unable to create new native thread

部署注意事項:

  1. 在storm根目錄下有一個lib目錄,存放storm本身依賴的jar包,此處的所有jar會被storm?worker進行啟動時加載,個人編寫的jar包不能放在此處,以免包更新帶來不便
  2. 向storm集群提交拓撲時,建議將該拓撲所有依賴的jar包和業(yè)務源代碼打到一個jar包中(fat?jar),如此則業(yè)務需要的jar包都和拓撲在同一個jar包中,否則當拓撲依賴的jar包更新時需要將該更新包放到所有的storm節(jié)點上。如果是在一個集群中,fat?jar可以保證不同業(yè)務的jar包是獨立的,不會混淆。

nimbus

  1. 接收jar包:提交應用拓撲
  2. 任務分配:將拓撲的任務分配給worker
  3. 資源調度:監(jiān)控各個supervisor節(jié)點的狀態(tài)進行負載均衡等。
  4. Nimbus不需要像supervisor節(jié)點那么高的配置,storm ui也不需要高配置,可以和nimbus節(jié)點運行在同一臺服務器節(jié)點上。

supervisor

  1. 監(jiān)聽nimbus的任務分配,啟動分配到的worker來對相應的任務進行處理。
  2. 監(jiān)控本地的worker進程,如果發(fā)現(xiàn)狀態(tài)不正常會殺死worker并重啟,超過一定次數(shù)后將分配給該錯誤狀態(tài)的worker的任務交還給nimbus進行再次分配。
  3. 刪除本地不再運行的任務

worker

完成拓撲中定義的業(yè)務邏輯,即執(zhí)行拓撲的進程。

一個worker的基本執(zhí)行步驟:

  1. 根據(jù)zookeeper中拓撲的組件分配變化,創(chuàng)建或移除worker到worker的鏈接
  2. 創(chuàng)建executor(執(zhí)行器)的輸入隊列receive-queue-map和輸出隊列transfer-queue
  3. 創(chuàng)建worker的接收線程receive-thread和發(fā)送線程transfer-thread
  4. 根據(jù)組件分配關系創(chuàng)建executor
    1. executor即worker JVM進程中的一個java線程,一般默認每個executor負責執(zhí)行一個task任務
  5. 在executor中執(zhí)行具體的任務(spout或者bolt)來執(zhí)行具體的業(yè)務邏輯。
    1. 檢查需要運行的task信息
    2. 獲取相應的task信息,即spout/bolt信息

每個任務對應一個線程或多個任務對應一個線程

線程稱為executor

executor在worker中運行

worker是一個JVM進程

在supervisor中運行

worker中的數(shù)據(jù)流:

worker中線程間通信使用的是Disruptor,進程間通信可能是netty也可以是zmq。默認使用netty。

數(shù)據(jù)流:

大數(shù)據(jù)-Storm流式框架(二)--wordcount案例,Storm,大數(shù)據(jù),storm

  1. 每個worker綁定一個socket端口作為數(shù)據(jù)的輸入,此端口作為socket的服務器端一直監(jiān)聽運行。
  2. 根據(jù)拓撲的關系,確定需要向外通信的任務所在的worker地址,并同該worker也創(chuàng)建好socket連接,此時該worker是作為socket的客戶端。
  3. receive thread負責將每個executor所需要的數(shù)據(jù)放入對應的receive-queue-map中,然后由executor來獲取自己所需要的數(shù)據(jù),這個過程通過disruptor進行通信。
  4. executor執(zhí)行完操作需要對外發(fā)送數(shù)據(jù)時,首先kryo將數(shù)據(jù)序列化,然后通過disruptor將數(shù)據(jù)放入對外的transfer-queue中。
  5. transfer?thread完成數(shù)據(jù)的發(fā)送工作。
  6. 如果executor需要對外發(fā)送的數(shù)據(jù)接收方和executor在同一個worker節(jié)點,則不需要執(zhí)行序列化操作,調用disruptor的publish方法直接放到接收方的executor對應的隊列中即可。

與MapReduce架構的對比:

大數(shù)據(jù)-Storm流式框架(二)--wordcount案例,Storm,大數(shù)據(jù),storm

提交作業(yè)過程

大數(shù)據(jù)-Storm流式框架(二)--wordcount案例,Storm,大數(shù)據(jù),storm

  1. 客戶端提交拓撲代碼到nimbus的nimbus/inbox目錄下。
  2. nimbus對topology進行校驗、處理
  3. nimbus針對該拓撲建立本地目錄:nimbus/stormdist/topology-id

該目錄下有三個文件:文章來源地址http://www.zghlxwxcb.cn/news/detail-715488.html

    1. stormjar.jar 從nimbus/inbox移動來的topology的jar包
    2. stormcode.ser 對topology對象的序列化
    3. stormconf.ser topology的運行配置信息
  1. nimbus的調度器根據(jù)拓撲的配置計算task,并把task分配到不同的worker上,調度的結果寫入zookeeper的/task節(jié)點下。
  2. zookeeper上建立assignments節(jié)點,存儲task和supervisor中worker的對應關系。
  3. zookeeper上創(chuàng)建workerbeats節(jié)點監(jiān)控worker的心跳。
  4. supervisor去zookeeper上獲取分配的task信息,啟動一個或多個worker來執(zhí)行。
  5. 每個worker上運行多個task,task由executor來執(zhí)行。
  6. worker根據(jù)拓撲信息初始化建立task之間的連接
  7. 相同worker內的task通過DisruptorQueue通信,不同worker間默認采用netty通信

到了這里,關于大數(shù)據(jù)-Storm流式框架(二)--wordcount案例的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包