国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka/Spark-01消費topic到寫出到topic

這篇具有很好參考價值的文章主要介紹了Kafka/Spark-01消費topic到寫出到topic。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

1 Kafka的工具類

1.1 從kafka消費數(shù)據(jù)的方法

  1. 消費者代碼
  def getKafkaDStream(ssc : StreamingContext , topic: String  , groupId:String  ) ={
    consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG , groupId)

    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Array(topic), consumerConfigs))
    kafkaDStream
  }
  1. 注意點
  • consumerConfigs是定義的可變的map的類型的,具體如下
private val consumerConfigs: mutable.Map[String, Object] = mutable.Map[String,Object](
    // kafka集群位置

    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> MyPropsUtils(MyConfig.KAFKA_BOOTSTRAP_SERVERS),

    // kv反序列化器
    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
    // groupId
    // offset提交  自動 手動
    ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true",
    //自動提交的時間間隔
    //ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG
    // offset重置  "latest"  "earliest"
    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest"
    // .....
  )
  • consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG , groupId)是為了不限制groupId特意寫的傳參

  • 是使用自帶的kafka工具類createDirectStream方法去消費kafak 的數(shù)據(jù),詳細(xì)參數(shù)解釋如下

在`KafkaUtils.createDirectStream`方法中,后續(xù)傳遞的參數(shù)的含義如下:

1. `ssc`:這是一個`StreamingContext`對象,用于指定Spark Streaming的上下文。
2. `LocationStrategies.PreferConsistent`:這是一個位置策略,用于指定Kafka消費者的位置策略。`PreferConsistent`表示優(yōu)先選擇分區(qū)分布均勻的消費者。
3. `ConsumerStrategies.Subscribe[String, String]`:這是一個消費者策略,用于指定Kafka消費者的訂閱策略。`Subscribe[String, String]`表示按照指定的泛型主題字符串?dāng)?shù)組訂閱消息,鍵和值的類型都為`String`。
4. `Array(topic)`:這是一個字符串?dāng)?shù)組,用于指定要訂閱的Kafka主題。
5. `consumerConfigs`:這是一個`java.util.Properties`類型的對象,其中配置了一些Kafka消費者的屬性。

總之,在`KafkaUtils.createDirectStream`方法中,這些參數(shù)組合被用于創(chuàng)建一個Kafka直連流(Direct Stream),該流可以直接從Kafka主題中消費消息,并將其轉(zhuǎn)換為`InputDStream[ConsumerRecord[String, String]]`類型的DStream。

Kafka/Spark-01消費topic到寫出到topic,kafka,spark,大數(shù)據(jù)

  • Subscribe傳參需要指定泛型,這邊指定string,表示指定主題的鍵和值的類型,即Array(topic), consumerConfigs傳參是string

Kafka/Spark-01消費topic到寫出到topic,kafka,spark,大數(shù)據(jù)

  • 最后方法返回一個kafkaDStream

1.2 kafka的生產(chǎn)數(shù)據(jù)的方法

  1. 生產(chǎn)者代碼
  • 創(chuàng)建與配置
/**
    * 生產(chǎn)者對象
    */
  val producer : KafkaProducer[String,String] = createProducer()

  /**
    * 創(chuàng)建生產(chǎn)者對象
    */
  def createProducer():KafkaProducer[String,String] = {
    val producerConfigs: util.HashMap[String, AnyRef] = new util.HashMap[String,AnyRef]
    //生產(chǎn)者配置類 ProducerConfig
    //kafka集群位置
    //producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092")
    //producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,MyPropsUtils("kafka.bootstrap-servers"))
    producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,MyPropsUtils(MyConfig.KAFKA_BOOTSTRAP_SERVERS))
    //kv序列化器
    producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer")
    producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer")
    //acks
    producerConfigs.put(ProducerConfig.ACKS_CONFIG , "all")
    //batch.size  16kb
    //linger.ms   0
    //retries
    //冪等配置
    producerConfigs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG , "true")

    val producer: KafkaProducer[String, String] = new KafkaProducer[String,String](producerConfigs)
    producer
  }
  • 生產(chǎn)方法
  /**
    * 生產(chǎn)(按照默認(rèn)的黏性分區(qū)策略)
    */
  def send(topic : String  , msg : String ):Unit = {
    producer.send(new ProducerRecord[String,String](topic , msg ))
  }

  /**或者!
    * 生產(chǎn)(按照key進(jìn)行分區(qū))
    */
  def send(topic : String  , key : String ,  msg : String ):Unit = {
    producer.send(new ProducerRecord[String,String](topic , key ,  msg ))
  }
  • 關(guān)閉生產(chǎn)
/**
    * 關(guān)閉生產(chǎn)者對象
    */
  def close():Unit = {
    if(producer != null ) producer.close()
  }

  /**
    * 刷寫 ,將緩沖區(qū)的數(shù)據(jù)刷寫到磁盤
    *
    */
  def flush(): Unit ={
    producer.flush()
  }

2 消費數(shù)據(jù)

2.1 消費到數(shù)據(jù)

單純的使用返回的ConsumerRecord不支持序列化,沒有實現(xiàn)序列化接口

Kafka/Spark-01消費topic到寫出到topic,kafka,spark,大數(shù)據(jù)

因此需要轉(zhuǎn)換成通用的jsonobject對象

//3. 處理數(shù)據(jù)
    //3.1 轉(zhuǎn)換數(shù)據(jù)結(jié)構(gòu)
    val jsonObjDStream: DStream[JSONObject] = offsetRangesDStream.map(
      consumerRecord => {
        //獲取ConsumerRecord中的value,value就是日志數(shù)據(jù)
        val log: String = consumerRecord.value()
        //轉(zhuǎn)換成Json對象
        val jsonObj: JSONObject = JSON.parseObject(log)
        //返回
        jsonObj
      }
    )

2.2 數(shù)據(jù)分流發(fā)送到對應(yīng)topic

  1. 提取錯誤數(shù)據(jù)并發(fā)送到對應(yīng)的topic中
jsonObjDStream.foreachRDD(
      rdd => {

        rdd.foreachPartition(
          jsonObjIter => {
            for (jsonObj <- jsonObjIter) {
              //分流過程
              //分流錯誤數(shù)據(jù)
              val errObj: JSONObject = jsonObj.getJSONObject("err")
              if(errObj != null){
                //將錯誤數(shù)據(jù)發(fā)送到 DWD_ERROR_LOG_TOPIC
                MyKafkaUtils.send(DWD_ERROR_LOG_TOPIC ,  jsonObj.toJSONString )
              }else{
                  
              }
            }
          }
        }	
  1. 將公共字段和頁面數(shù)據(jù)發(fā)送到DWD_PAGE_DISPLAY_TOPIC
    Kafka/Spark-01消費topic到寫出到topic,kafka,spark,大數(shù)據(jù)
else{
                // 提取公共字段
                val commonObj: JSONObject = jsonObj.getJSONObject("common")
                val ar: String = commonObj.getString("ar")
                val uid: String = commonObj.getString("uid")
                val os: String = commonObj.getString("os")
                val ch: String = commonObj.getString("ch")
                val isNew: String = commonObj.getString("is_new")
                val md: String = commonObj.getString("md")
                val mid: String = commonObj.getString("mid")
                val vc: String = commonObj.getString("vc")
                val ba: String = commonObj.getString("ba")
                //提取時間戳
                val ts: Long = jsonObj.getLong("ts")
                // 頁面數(shù)據(jù)
                val pageObj: JSONObject = jsonObj.getJSONObject("page")
                if(pageObj != null ){
                  //提取page字段
                  val pageId: String = pageObj.getString("page_id")
                  val pageItem: String = pageObj.getString("item")
                  val pageItemType: String = pageObj.getString("item_type")
                  val duringTime: Long = pageObj.getLong("during_time")
                  val lastPageId: String = pageObj.getString("last_page_id")
                  val sourceType: String = pageObj.getString("source_type")

                  //封裝成PageLog,這邊還寫了bean實體類去接收
                  var pageLog =
                    PageLog(mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,ts)
                  //發(fā)送到DWD_PAGE_LOG_TOPIC
                  MyKafkaUtils.send(DWD_PAGE_LOG_TOPIC , JSON.toJSONString(pageLog , new SerializeConfig(true)))//scala中bean沒有set和get方法,這邊是直接操作字段
                }

  1. 其他曝光、事件、啟動數(shù)據(jù)如下
                  //提取曝光數(shù)據(jù)
                  val displaysJsonArr: JSONArray = jsonObj.getJSONArray("displays")
                  if(displaysJsonArr != null && displaysJsonArr.size() > 0 ){
                    for(i <- 0 until displaysJsonArr.size()){
                      //循環(huán)拿到每個曝光
                      val displayObj: JSONObject = displaysJsonArr.getJSONObject(i)
                      //提取曝光字段
                      val displayType: String = displayObj.getString("display_type")
                      val displayItem: String = displayObj.getString("item")
                      val displayItemType: String = displayObj.getString("item_type")
                      val posId: String = displayObj.getString("pos_id")
                      val order: String = displayObj.getString("order")

                      //封裝成PageDisplayLog
                      val pageDisplayLog =
                        PageDisplayLog(mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,displayType,displayItem,displayItemType,order,posId,ts)
                      // 寫到 DWD_PAGE_DISPLAY_TOPIC
                      MyKafkaUtils.send(DWD_PAGE_DISPLAY_TOPIC , JSON.toJSONString(pageDisplayLog , new SerializeConfig(true)))
                    }
                  }
                  //提取事件數(shù)據(jù)(課下完成)
                  val actionJsonArr: JSONArray = jsonObj.getJSONArray("actions")
                  if(actionJsonArr != null && actionJsonArr.size() > 0 ){
                    for(i <- 0 until actionJsonArr.size()){
                      val actionObj: JSONObject = actionJsonArr.getJSONObject(i)
                      //提取字段
                      val actionId: String = actionObj.getString("action_id")
                      val actionItem: String = actionObj.getString("item")
                      val actionItemType: String = actionObj.getString("item_type")
                      val actionTs: Long = actionObj.getLong("ts")

                      //封裝PageActionLog
                      var pageActionLog =
                        PageActionLog(mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,actionId,actionItem,actionItemType,actionTs,ts)
                      //寫出到DWD_PAGE_ACTION_TOPIC
                      MyKafkaUtils.send(DWD_PAGE_ACTION_TOPIC , JSON.toJSONString(pageActionLog , new SerializeConfig(true)))
                    }
                  }
                }
                // 啟動數(shù)據(jù)(課下完成)
                val startJsonObj: JSONObject = jsonObj.getJSONObject("start")
                if(startJsonObj != null ){
                  //提取字段
                  val entry: String = startJsonObj.getString("entry")
                  val loadingTime: Long = startJsonObj.getLong("loading_time")
                  val openAdId: String = startJsonObj.getString("open_ad_id")
                  val openAdMs: Long = startJsonObj.getLong("open_ad_ms")
                  val openAdSkipMs: Long = startJsonObj.getLong("open_ad_skip_ms")

                  //封裝StartLog
                  var startLog =
                    StartLog(mid,uid,ar,ch,isNew,md,os,vc,ba,entry,openAdId,loadingTime,openAdMs,openAdSkipMs,ts)
                  //寫出DWD_START_LOG_TOPIC
                  MyKafkaUtils.send(DWD_START_LOG_TOPIC , JSON.toJSONString(startLog ,new SerializeConfig(true)))

2.3 精確一次消費

  1. 背景

發(fā)送kafka的是自動提交,如果提交有誤,會出現(xiàn)漏消費或者重復(fù)消費文章來源地址http://www.zghlxwxcb.cn/news/detail-705632.html

  1. 相關(guān)語義
  • 至少一次消費:數(shù)據(jù)不會丟失,但存在數(shù)據(jù)重復(fù)
  • 最多一次消費:數(shù)據(jù)不會重復(fù),但可能丟失數(shù)據(jù)
  • 精確一次消費:不多不少一次消費

到了這里,關(guān)于Kafka/Spark-01消費topic到寫出到topic的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • kafka如何動態(tài)消費新增topic主題

    kafka如何動態(tài)消費新增topic主題

    一、解決痛點 使用spring-kafka客戶端,每次新增topic主題,都需要硬編碼客戶端并重新發(fā)布服務(wù),操作麻煩耗時長。kafkaListener雖可以支持通配符消費topic,缺點是并發(fā)數(shù)需要手動改并且重啟服務(wù) 。對于業(yè)務(wù)邏輯相似場景,創(chuàng)建新主題動態(tài)監(jiān)聽可以用kafka-batch-starter組件 二、組件

    2023年04月21日
    瀏覽(34)
  • 自定義kafka客戶端消費topic

    使用自定義的KafkaConsumer給spring進(jìn)行管理,之后在注入topic的set方法中,開單線程主動訂閱和讀取該topic的消息。 后端服務(wù)不需要啟動時就開始監(jiān)聽消費,而是根據(jù)啟動的模塊或者用戶自定義監(jiān)聽需要監(jiān)聽或者停止的topic 使用的spring集成2.1.8.RELEASE的版本,在@KafkaListener注解中沒

    2024年02月02日
    瀏覽(19)
  • Flink實現(xiàn)同時消費多個kafka topic,并輸出到多個topic

    Flink實現(xiàn)同時消費多個kafka topic,并輸出到多個topic

    1)代碼使用的 flink版本為1.16.1 ,舊版本的依賴及api可能不同,同時使用了hutool的JSON工具類,兩者均可自行更換; 2)本次編寫的兩個方案,均只適用于 數(shù)據(jù)源topic來自同一個集群 ,且kafka消費組相同,暫未研究flink的connect算子join多條流 代碼涉及 Hadoop相關(guān)環(huán)境 ,若無該環(huán)境

    2023年04月20日
    瀏覽(93)
  • Kafka某Topic的部分partition無法消費問題

    Kafka某Topic的部分partition無法消費問題

    今天同事反饋有個topic出現(xiàn)積壓。于是上kfk管理平臺查看該topic對應(yīng)的group。發(fā)現(xiàn)6個分區(qū)中有2個不消費,另外4個消費也較慢,總體lag在增長。查看服務(wù)器日志,日志中有rebalance 12?retry 。。。Exception,之后改消費線程停止。 查閱相關(guān)rebalance資料: ? 分析Rebalance?可能是 Consu

    2024年02月12日
    瀏覽(26)
  • Kafka系列:查看Topic列表、消息消費情況、模擬生產(chǎn)者消費者

    Kafka系列:查看Topic列表、消息消費情況、模擬生產(chǎn)者消費者

    執(zhí)行topic刪除命令時,出現(xiàn)提示 這條命令其實并不執(zhí)行刪除動作,僅僅是在zookeeper上標(biāo)記該topic要被刪除而已,同時也提醒用戶一定要提前打開delete.topic.enable開關(guān),否則刪除動作是不會執(zhí)行的。 解決辦法: a)在server.properties中設(shè)置delete.topic.enable參數(shù)為ture b)如下操作: 1.登

    2023年04月26日
    瀏覽(30)
  • Linux安裝Kafka,創(chuàng)建topic、生產(chǎn)者、消費者

    Linux安裝Kafka,創(chuàng)建topic、生產(chǎn)者、消費者

    1.創(chuàng)建安裝目錄/usr/local/kafka mkdir /usr/local/kafka 2.進(jìn)入安裝包目錄 cd?/usr/local/kafka? 3.下載安裝包 wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz 4.解壓安裝包 tar -zxvf kafka_2.12-3.3.1.tgz 5.進(jìn)入cd kafka_2.12-3.3.1目錄 cd kafka_2.12-3.3.1/ 6.修改zookeeper配置 cat ./config/zookeeper.properties | grep

    2023年04月17日
    瀏覽(29)
  • Kafka3.1部署和Topic主題數(shù)據(jù)生產(chǎn)與消費

    Kafka3.1部署和Topic主題數(shù)據(jù)生產(chǎn)與消費

    本章節(jié)主要講述Kafka3.1X版本在Windows11主機(jī)下部署以及JAVA對Kafka應(yīng)用: 1.安裝JDK配置環(huán)境變量 2.Zookeeper(zookeeper-3.7.1) zk 部署后的目錄位置:D:setupapache-zookeeper-3.7.1 3.安裝Kafka3.1X 3.1 下載包(kafka_2.12-3.1.2.tgz) Kafka 3.2、 解壓并進(jìn)入Kafka目錄: 根目錄:D:setupkafka3.1.2 3、 編輯

    2024年02月09日
    瀏覽(26)
  • Kafka系列之:記錄一次Kafka Topic分區(qū)擴(kuò)容,但是下游flink消費者沒有自動消費新的分區(qū)的解決方法

    Kafka系列之:記錄一次Kafka Topic分區(qū)擴(kuò)容,但是下游flink消費者沒有自動消費新的分區(qū)的解決方法

    生產(chǎn)環(huán)境Kafka集群壓力大,Topic讀寫壓力大,消費的lag比較大,因此通過擴(kuò)容Topic的分區(qū),增大Topic的讀寫性能 理論上下游消費者應(yīng)該能夠自動消費到新的分區(qū),例如flume消費到了新的分區(qū),但是實際情況是存在flink消費者沒有消費到新的分區(qū) 出現(xiàn)無法消費topic新的分區(qū)這種情況

    2024年02月14日
    瀏覽(64)
  • Kafka - 主題Topic與消費者消息Offset日志記錄機(jī)制

    Kafka - 主題Topic與消費者消息Offset日志記錄機(jī)制

    可以根據(jù)業(yè)務(wù)類型,分發(fā)到不同的Topic中,對于每一個Topic,下面可以有多個分區(qū)(Partition)日志文件: kafka 下的Topic的多個分區(qū),每一個分區(qū)實質(zhì)上就是一個隊列,將接收到的消息暫時存儲到隊列中,根據(jù)配置以及消息消費情況來對隊列消息刪除。 可以這么來理解Topic,Partitio

    2024年02月03日
    瀏覽(23)
  • kafka2.x常用命令:創(chuàng)建topic,查看topic列表、分區(qū)、副本詳情,刪除topic,測試topic發(fā)送與消費

    kafka2.x常用命令:創(chuàng)建topic,查看topic列表、分區(qū)、副本詳情,刪除topic,測試topic發(fā)送與消費

    原創(chuàng)/朱季謙 接觸kafka開發(fā)已經(jīng)兩年多,也看過關(guān)于kafka的一些書,但一直沒有怎么對它做總結(jié),借著最近正好在看《Apache Kafka實戰(zhàn)》一書,同時自己又搭建了三臺kafka服務(wù)器,正好可以做一些總結(jié)記錄。 本文主要是記錄如何在kafka集群服務(wù)器上創(chuàng)建topic,查看topic列表、分區(qū)、

    2024年02月03日
    瀏覽(31)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包