1 Kafka的工具類
1.1 從kafka消費數(shù)據(jù)的方法
- 消費者代碼
def getKafkaDStream(ssc : StreamingContext , topic: String , groupId:String ) ={
consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG , groupId)
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array(topic), consumerConfigs))
kafkaDStream
}
- 注意點
- consumerConfigs是定義的可變的map的類型的,具體如下
private val consumerConfigs: mutable.Map[String, Object] = mutable.Map[String,Object](
// kafka集群位置
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> MyPropsUtils(MyConfig.KAFKA_BOOTSTRAP_SERVERS),
// kv反序列化器
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
// groupId
// offset提交 自動 手動
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true",
//自動提交的時間間隔
//ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG
// offset重置 "latest" "earliest"
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest"
// .....
)
-
consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG , groupId)是為了不限制groupId特意寫的傳參
-
是使用自帶的kafka工具類createDirectStream方法去消費kafak 的數(shù)據(jù),詳細(xì)參數(shù)解釋如下
在`KafkaUtils.createDirectStream`方法中,后續(xù)傳遞的參數(shù)的含義如下:
1. `ssc`:這是一個`StreamingContext`對象,用于指定Spark Streaming的上下文。
2. `LocationStrategies.PreferConsistent`:這是一個位置策略,用于指定Kafka消費者的位置策略。`PreferConsistent`表示優(yōu)先選擇分區(qū)分布均勻的消費者。
3. `ConsumerStrategies.Subscribe[String, String]`:這是一個消費者策略,用于指定Kafka消費者的訂閱策略。`Subscribe[String, String]`表示按照指定的泛型主題字符串?dāng)?shù)組訂閱消息,鍵和值的類型都為`String`。
4. `Array(topic)`:這是一個字符串?dāng)?shù)組,用于指定要訂閱的Kafka主題。
5. `consumerConfigs`:這是一個`java.util.Properties`類型的對象,其中配置了一些Kafka消費者的屬性。
總之,在`KafkaUtils.createDirectStream`方法中,這些參數(shù)組合被用于創(chuàng)建一個Kafka直連流(Direct Stream),該流可以直接從Kafka主題中消費消息,并將其轉(zhuǎn)換為`InputDStream[ConsumerRecord[String, String]]`類型的DStream。
- Subscribe傳參需要指定泛型,這邊指定string,表示指定主題的鍵和值的類型,即Array(topic), consumerConfigs傳參是string
- 最后方法返回一個kafkaDStream
1.2 kafka的生產(chǎn)數(shù)據(jù)的方法
- 生產(chǎn)者代碼
- 創(chuàng)建與配置
/**
* 生產(chǎn)者對象
*/
val producer : KafkaProducer[String,String] = createProducer()
/**
* 創(chuàng)建生產(chǎn)者對象
*/
def createProducer():KafkaProducer[String,String] = {
val producerConfigs: util.HashMap[String, AnyRef] = new util.HashMap[String,AnyRef]
//生產(chǎn)者配置類 ProducerConfig
//kafka集群位置
//producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092")
//producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,MyPropsUtils("kafka.bootstrap-servers"))
producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,MyPropsUtils(MyConfig.KAFKA_BOOTSTRAP_SERVERS))
//kv序列化器
producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer")
producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer")
//acks
producerConfigs.put(ProducerConfig.ACKS_CONFIG , "all")
//batch.size 16kb
//linger.ms 0
//retries
//冪等配置
producerConfigs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG , "true")
val producer: KafkaProducer[String, String] = new KafkaProducer[String,String](producerConfigs)
producer
}
- 生產(chǎn)方法
/**
* 生產(chǎn)(按照默認(rèn)的黏性分區(qū)策略)
*/
def send(topic : String , msg : String ):Unit = {
producer.send(new ProducerRecord[String,String](topic , msg ))
}
/**或者!
* 生產(chǎn)(按照key進(jìn)行分區(qū))
*/
def send(topic : String , key : String , msg : String ):Unit = {
producer.send(new ProducerRecord[String,String](topic , key , msg ))
}
- 關(guān)閉生產(chǎn)
/**
* 關(guān)閉生產(chǎn)者對象
*/
def close():Unit = {
if(producer != null ) producer.close()
}
/**
* 刷寫 ,將緩沖區(qū)的數(shù)據(jù)刷寫到磁盤
*
*/
def flush(): Unit ={
producer.flush()
}
2 消費數(shù)據(jù)
2.1 消費到數(shù)據(jù)
單純的使用返回的ConsumerRecord不支持序列化,沒有實現(xiàn)序列化接口
因此需要轉(zhuǎn)換成通用的jsonobject對象文章來源:http://www.zghlxwxcb.cn/news/detail-705632.html
//3. 處理數(shù)據(jù)
//3.1 轉(zhuǎn)換數(shù)據(jù)結(jié)構(gòu)
val jsonObjDStream: DStream[JSONObject] = offsetRangesDStream.map(
consumerRecord => {
//獲取ConsumerRecord中的value,value就是日志數(shù)據(jù)
val log: String = consumerRecord.value()
//轉(zhuǎn)換成Json對象
val jsonObj: JSONObject = JSON.parseObject(log)
//返回
jsonObj
}
)
2.2 數(shù)據(jù)分流發(fā)送到對應(yīng)topic
- 提取錯誤數(shù)據(jù)并發(fā)送到對應(yīng)的topic中
jsonObjDStream.foreachRDD(
rdd => {
rdd.foreachPartition(
jsonObjIter => {
for (jsonObj <- jsonObjIter) {
//分流過程
//分流錯誤數(shù)據(jù)
val errObj: JSONObject = jsonObj.getJSONObject("err")
if(errObj != null){
//將錯誤數(shù)據(jù)發(fā)送到 DWD_ERROR_LOG_TOPIC
MyKafkaUtils.send(DWD_ERROR_LOG_TOPIC , jsonObj.toJSONString )
}else{
}
}
}
}
- 將公共字段和頁面數(shù)據(jù)發(fā)送到DWD_PAGE_DISPLAY_TOPIC
else{
// 提取公共字段
val commonObj: JSONObject = jsonObj.getJSONObject("common")
val ar: String = commonObj.getString("ar")
val uid: String = commonObj.getString("uid")
val os: String = commonObj.getString("os")
val ch: String = commonObj.getString("ch")
val isNew: String = commonObj.getString("is_new")
val md: String = commonObj.getString("md")
val mid: String = commonObj.getString("mid")
val vc: String = commonObj.getString("vc")
val ba: String = commonObj.getString("ba")
//提取時間戳
val ts: Long = jsonObj.getLong("ts")
// 頁面數(shù)據(jù)
val pageObj: JSONObject = jsonObj.getJSONObject("page")
if(pageObj != null ){
//提取page字段
val pageId: String = pageObj.getString("page_id")
val pageItem: String = pageObj.getString("item")
val pageItemType: String = pageObj.getString("item_type")
val duringTime: Long = pageObj.getLong("during_time")
val lastPageId: String = pageObj.getString("last_page_id")
val sourceType: String = pageObj.getString("source_type")
//封裝成PageLog,這邊還寫了bean實體類去接收
var pageLog =
PageLog(mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,ts)
//發(fā)送到DWD_PAGE_LOG_TOPIC
MyKafkaUtils.send(DWD_PAGE_LOG_TOPIC , JSON.toJSONString(pageLog , new SerializeConfig(true)))//scala中bean沒有set和get方法,這邊是直接操作字段
}
- 其他曝光、事件、啟動數(shù)據(jù)如下
//提取曝光數(shù)據(jù)
val displaysJsonArr: JSONArray = jsonObj.getJSONArray("displays")
if(displaysJsonArr != null && displaysJsonArr.size() > 0 ){
for(i <- 0 until displaysJsonArr.size()){
//循環(huán)拿到每個曝光
val displayObj: JSONObject = displaysJsonArr.getJSONObject(i)
//提取曝光字段
val displayType: String = displayObj.getString("display_type")
val displayItem: String = displayObj.getString("item")
val displayItemType: String = displayObj.getString("item_type")
val posId: String = displayObj.getString("pos_id")
val order: String = displayObj.getString("order")
//封裝成PageDisplayLog
val pageDisplayLog =
PageDisplayLog(mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,displayType,displayItem,displayItemType,order,posId,ts)
// 寫到 DWD_PAGE_DISPLAY_TOPIC
MyKafkaUtils.send(DWD_PAGE_DISPLAY_TOPIC , JSON.toJSONString(pageDisplayLog , new SerializeConfig(true)))
}
}
//提取事件數(shù)據(jù)(課下完成)
val actionJsonArr: JSONArray = jsonObj.getJSONArray("actions")
if(actionJsonArr != null && actionJsonArr.size() > 0 ){
for(i <- 0 until actionJsonArr.size()){
val actionObj: JSONObject = actionJsonArr.getJSONObject(i)
//提取字段
val actionId: String = actionObj.getString("action_id")
val actionItem: String = actionObj.getString("item")
val actionItemType: String = actionObj.getString("item_type")
val actionTs: Long = actionObj.getLong("ts")
//封裝PageActionLog
var pageActionLog =
PageActionLog(mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,actionId,actionItem,actionItemType,actionTs,ts)
//寫出到DWD_PAGE_ACTION_TOPIC
MyKafkaUtils.send(DWD_PAGE_ACTION_TOPIC , JSON.toJSONString(pageActionLog , new SerializeConfig(true)))
}
}
}
// 啟動數(shù)據(jù)(課下完成)
val startJsonObj: JSONObject = jsonObj.getJSONObject("start")
if(startJsonObj != null ){
//提取字段
val entry: String = startJsonObj.getString("entry")
val loadingTime: Long = startJsonObj.getLong("loading_time")
val openAdId: String = startJsonObj.getString("open_ad_id")
val openAdMs: Long = startJsonObj.getLong("open_ad_ms")
val openAdSkipMs: Long = startJsonObj.getLong("open_ad_skip_ms")
//封裝StartLog
var startLog =
StartLog(mid,uid,ar,ch,isNew,md,os,vc,ba,entry,openAdId,loadingTime,openAdMs,openAdSkipMs,ts)
//寫出DWD_START_LOG_TOPIC
MyKafkaUtils.send(DWD_START_LOG_TOPIC , JSON.toJSONString(startLog ,new SerializeConfig(true)))
2.3 精確一次消費
- 背景
發(fā)送kafka的是自動提交,如果提交有誤,會出現(xiàn)漏消費或者重復(fù)消費文章來源地址http://www.zghlxwxcb.cn/news/detail-705632.html
- 相關(guān)語義
- 至少一次消費:數(shù)據(jù)不會丟失,但存在數(shù)據(jù)重復(fù)
- 最多一次消費:數(shù)據(jù)不會重復(fù),但可能丟失數(shù)據(jù)
- 精確一次消費:不多不少一次消費
到了這里,關(guān)于Kafka/Spark-01消費topic到寫出到topic的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!