? ? ? ?編寫Scala代碼,使用Flink消費(fèi)Kafka中Topic為order的數(shù)據(jù)并進(jìn)行相應(yīng)的數(shù)據(jù)統(tǒng)計(jì)計(jì)算(訂單信息對應(yīng)表結(jié)構(gòu)order_info,訂單詳細(xì)信息對應(yīng)表結(jié)構(gòu)order_detail(來源類型和來源編號這兩個字段不考慮,所以在實(shí)時數(shù)據(jù)中不會出現(xiàn)),同時計(jì)算中使用order_info或order_detail表中create_time或operate_time取兩者中值較大者作為EventTime,若operate_time為空值或無此列,則使用create_time填充,允許數(shù)據(jù)延遲5s,訂單狀態(tài)分別為1001:創(chuàng)建訂單、1002:支付訂單、1003:取消訂單、1004:完成訂單、1005:申請退回、1006:退回完成。另外對于數(shù)據(jù)結(jié)果展示時,不要采用例如:1.9786518E7的科學(xué)計(jì)數(shù)法)。文章來源地址http://www.zghlxwxcb.cn/news/detail-842946.html
- 使用Flink消費(fèi)Kafka中的數(shù)據(jù),統(tǒng)計(jì)商城實(shí)時訂單實(shí)收金額(需要考慮訂單狀態(tài),若有取消訂單、申請退回、退回完成則不計(jì)入訂單實(shí)收金額,其他狀態(tài)的則累加),將key設(shè)置成totalprice存入Redis中。使用redis cli以get key方式獲取totalprice值,將結(jié)果截圖粘貼至客戶端桌面【Release\任務(wù)D提交結(jié)果.docx】中對應(yīng)的任務(wù)序號下,需兩次截圖,第一次截圖和第二次截圖間隔1分鐘以上,第一次截圖放前面,第二次截圖放后面;
- 在任務(wù)1進(jìn)行的同時,使用側(cè)邊流,監(jiān)控若發(fā)現(xiàn)order_status字段為退回完成, 將key設(shè)置成totalrefundordercount存入Redis中,value存放用戶退款消費(fèi)額。使用redis cli以get key方式獲取totalrefundordercount值,將結(jié)果截圖粘貼至客戶端桌面【Release\任務(wù)D提交結(jié)果.docx】中對應(yīng)的任務(wù)序號下,需兩次截圖,第一次截圖和第二次截圖間隔1分鐘以上,第一次截圖放前面,第二次截圖放后面;
- 在任務(wù)1進(jìn)行的同時,使用側(cè)邊流,監(jiān)控若發(fā)現(xiàn)order_status字段為取消訂單,將數(shù)據(jù)存入MySQL數(shù)據(jù)庫shtd_result的order_info表中,然后在Linux的MySQL命令行中根據(jù)id降序排序,查詢列id、consignee、consignee_tel、final_total_amount、feight_fee,查詢出前5條,將SQL語句復(fù)制粘貼至客戶端桌面【Release\任務(wù)D提交結(jié)果.docx】中對應(yīng)的任務(wù)序號下,將執(zhí)行結(jié)果截圖粘貼至客戶端桌面【Release\任務(wù)D提交結(jié)果.docx】中對應(yīng)的任務(wù)序號下。
使用Flink處理Kafka中的數(shù)據(jù)
package module_d
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.text.{DecimalFormat, SimpleDateFormat}
import java.time.Duration
import java.util.Properties
/**
* 編寫Scala代碼,使用Flink消費(fèi)Kafka中Topic為order的數(shù)據(jù)并進(jìn)行相應(yīng)的數(shù)據(jù)統(tǒng)計(jì)計(jì)算(訂單信息對應(yīng)表結(jié)構(gòu)order_info,訂單詳細(xì)信息對應(yīng)表結(jié)構(gòu)order_detail(來源類型和來源編號這兩個字段不考慮,所以在實(shí)時數(shù)據(jù)中不會出現(xiàn)),同時計(jì)算中使用order_info或order_detail表中create_time或operate_time取兩者中值較大者作為EventTime,若operate_time為空值或無此屬性,則使用create_time填充,允許數(shù)據(jù)延遲5S,訂單狀態(tài)分別為1001:創(chuàng)建訂單、1002:支付訂單、1003:取消訂單、1004:完成訂單、1005:申請退回、1006:退回完成。另外對于數(shù)據(jù)結(jié)果展示時,不要采用例如:1.9786518E7的科學(xué)計(jì)數(shù)法)。
*/
object task1 {
/**
* 一個流分成四個流
*/
lazy val statusother: OutputTag[String] = new OutputTag[String]("other")
lazy val status1003: OutputTag[String] = new OutputTag[String]("s1003")
lazy val status1005: OutputTag[String] = new OutputTag[String]("s1005")
lazy val status1006: OutputTag[String] = new OutputTag[String]("s1006")
def main(args: Array[String]): Unit = {
/**
* 1、使用Flink消費(fèi)Kafka中的數(shù)據(jù),統(tǒng)計(jì)商城實(shí)時訂單實(shí)收金額(需要考慮訂單狀態(tài),若有取消訂單、申請退回、退回完成則不計(jì)入訂單實(shí)收金額,其他狀態(tài)的則累加),將key設(shè)置成totalprice存入Redis中。使用redis cli以get key方式獲取totalprice值,將結(jié)果截圖粘貼至對應(yīng)報告中,需兩次截圖,第一次截圖和第二次截圖間隔1分鐘以上,第一次截圖放前面,第二次截圖放后面;
*/
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1) //并行度
//Kafka配置
val properties = new Properties()
properties.setProperty("bootstrap.servers", "ngc:9092") //集群地址
properties.setProperty("group.id", "g1") //消費(fèi)者組
//原始流
val stream = env.addSource(new FlinkKafkaConsumer[String]("order1", new SimpleStringSchema(), properties).setStartFromLatest())
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[String](Duration.ofSeconds(5))//允許數(shù)據(jù)延遲5S
.withTimestampAssigner(
new SerializableTimestampAssigner[String] {
override def extractTimestamp(t: String, l: Long): Long = {
val sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss")
if (t.split(",")(11).equals("")) { //如果operate_time為空
sdf.parse(t.split(",")(10)).getTime
} else {
val create_time = sdf.parse(t.split(",")(10)).getTime
val operate_time = sdf.parse(t.split(",")(11)).getTime
math.max(create_time, operate_time)
}
}
}
))
//設(shè)置自定義側(cè)邊流
val streamProcess = stream.process(new MdSplitProcessFunction)
/**
* 1、使用Flink消費(fèi)Kafka中的數(shù)據(jù),統(tǒng)計(jì)商城實(shí)時訂單實(shí)收金額(需要考慮訂單狀態(tài),若有取消訂單、申請退回、
* 退回完成則不計(jì)入訂單實(shí)收金額,其他狀態(tài)的則累加),將key設(shè)置成totalprice存入Redis中。使用redis
* cli以get key方式獲取totalprice值,將結(jié)果截圖粘貼至對應(yīng)報告中,需兩次截圖,第一次截圖和第二次截圖
* 間隔1分鐘以上,第一次截圖放前面,第二次截圖放后面;
*/
val ds1 = streamProcess
.getSideOutput(statusother)
.map(line => line.split(",")(3).toDouble)
.keyBy(_ => true) //聚合到一起
.sum(0)
.map(n=>new DecimalFormat("#.#").format(n))
//redis配置
val conf = new FlinkJedisPoolConfig.Builder()
.setHost("ngc")
.setPort(6378)
.setPassword("123456")
.build()
ds1.addSink(new RedisSink[String](conf, new MyRedisMapper("totalcount")))
/**
* 2、在任務(wù)1進(jìn)行的同時,使用側(cè)邊流,監(jiān)控若發(fā)現(xiàn)order_status字段為退回完成, 將key設(shè)置成totalrefundordercount存入Redis中,value存放用戶退款消費(fèi)額。使用redis cli以get key方式獲取totalrefundordercount值,將結(jié)果截圖粘貼至對應(yīng)報告中,需兩次截圖,第一次截圖和第二次截圖間隔1分鐘以上,第一次截圖放前面,第二次截圖放后面;
*/
val ds2 = streamProcess
.getSideOutput(status1006)
.map(line => line.split(",")(3).toDouble)
.keyBy(_ => true) //聚合到一起
.sum(0)
.map(n=>new DecimalFormat("#.#").format(n))
ds2.addSink(new RedisSink[String](conf, new MyRedisMapper("totalrefundordercount")))
/**
* 3、在任務(wù)1進(jìn)行的同時,使用側(cè)邊流,監(jiān)控若發(fā)現(xiàn)order_status字段為取消訂單,將數(shù)據(jù)存入MySQL數(shù)據(jù)庫shtd_result的order_info表中,然后在Linux的MySQL命令行中根據(jù)id降序排序,查詢列id、consignee、consignee_tel、final_total_amount、feight_fee,查詢出前5條,將SQL語句與執(zhí)行結(jié)果截圖粘貼至對應(yīng)報告中。
*/
val ds3 = streamProcess
.getSideOutput(status1003)
ds3.addSink(new RichSinkFunction[String] {
var conn: Connection = _
var insertStmt: PreparedStatement = _
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:mysql://ngc:3307/shtd_result?useSSL=false", "root", "123456")
insertStmt = conn.prepareStatement("insert into order_info (id,consignee,consignee_tel,final_total_amount,feight_fee) values (?,?,?,?,?)")
}
override def close(): Unit = {
insertStmt.close()
conn.close()
}
override def invoke(value: String, context: SinkFunction.Context): Unit = {
val arr = value.split(",")
insertStmt.setString(1, arr(0))
insertStmt.setString(2, arr(1))
insertStmt.setString(3, arr(2))
insertStmt.setString(4, arr(3))
insertStmt.setString(5, arr(19))
insertStmt.execute()
}
})
ds1.print()
ds2.print()
env.execute("kafka sink test")
}
/**
* 自定義側(cè)邊流配置
*/
class MdSplitProcessFunction extends ProcessFunction[String, String] {
override def processElement(value: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = {
val line = value.split(",")
/**
* 訂單狀態(tài)order_status分別為1001:創(chuàng)建訂單、1002:支付訂單、1003:取消訂單、1004:完成訂單、1005:申請退回、1006:退回完成。
*/
if (line(4).equals("1003")) {
ctx.output(status1003, value)
} else if (line(4).equals("1005")) {
ctx.output(status1005, value)
} else if (line(4).equals("1006")) {
ctx.output(status1006, value)
} else {
ctx.output(statusother, value)
}
}
}
/**
* Redis key——value存儲 也可用RichSinkFunction建立Redis
*/
class MyRedisMapper(key: String) extends RedisMapper[String] {
override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.SET)
override def getValueFromData(data: String): String = data
override def getKeyFromData(data: String): String = key
}
}
文章來源:http://www.zghlxwxcb.cn/news/detail-842946.html
到了這里,關(guān)于大數(shù)據(jù)職業(yè)技能大賽樣題(數(shù)據(jù)采集與實(shí)時計(jì)算:使用Flink處理Kafka中的數(shù)據(jù))的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!