Scala第二十章節(jié)
章節(jié)目標(biāo)
- 理解Akka并發(fā)編程框架簡(jiǎn)介
- 掌握Akka入門案例
- 掌握Akka定時(shí)任務(wù)代碼實(shí)現(xiàn)
- 掌握兩個(gè)進(jìn)程間通信的案例
- 掌握簡(jiǎn)易版spark通信框架案例
1. Akka并發(fā)編程框架簡(jiǎn)介
1.1 Akka概述
Akka是一個(gè)用于構(gòu)建高并發(fā)、分布式和可擴(kuò)展的基于事件驅(qū)動(dòng)的應(yīng)用工具包。Akka是使用scala開(kāi)發(fā)的庫(kù),同時(shí)可以使用scala和Java語(yǔ)言來(lái)開(kāi)發(fā)基于Akka的應(yīng)用程序。
1.2 Akka特性
- 提供基于異步非阻塞、高性能的事件驅(qū)動(dòng)編程模型
- 內(nèi)置容錯(cuò)機(jī)制,允許Actor在出錯(cuò)時(shí)進(jìn)行恢復(fù)或者重置操作
- 超級(jí)輕量級(jí)的事件處理(每GB堆內(nèi)存幾百萬(wàn)Actor)
- 使用Akka可以在單機(jī)上構(gòu)建高并發(fā)程序,也可以在網(wǎng)絡(luò)中構(gòu)建分布式程序。
1.3 Akka通信過(guò)程
以下圖片說(shuō)明了Akka Actor的并發(fā)編程模型的基本流程:
- 學(xué)生創(chuàng)建一個(gè)ActorSystem
- 通過(guò)ActorSystem來(lái)創(chuàng)建一個(gè)ActorRef(老師的引用),并將消息發(fā)送給ActorRef
- ActorRef將消息發(fā)送給Message Dispatcher(消息分發(fā)器)
- Message Dispatcher將消息按照順序保存到目標(biāo)Actor的MailBox中
- Message Dispatcher將MailBox放到一個(gè)線程中
- MailBox按照順序取出消息,最終將它遞給TeacherActor接受的方法中
2. 創(chuàng)建Actor
Akka中,也是基于Actor來(lái)進(jìn)行編程的。類似于之前學(xué)習(xí)過(guò)的Actor。但是Akka的Actor的編寫、創(chuàng)建方法和之前有一些不一樣。
2.1 API介紹
-
ActorSystem: 它負(fù)責(zé)創(chuàng)建和監(jiān)督Actor
- 在Akka中,ActorSystem是一個(gè)重量級(jí)的結(jié)構(gòu),它需要分配多個(gè)線程.
- 在實(shí)際應(yīng)用中, ActorSystem通常是一個(gè)單例對(duì)象, 可以使用它創(chuàng)建很多Actor.
- 直接使用
context.system
就可以獲取到管理該Actor的ActorSystem的引用.
-
實(shí)現(xiàn)Actor類
- 定義類或者單例對(duì)象繼承Actor(注意:要導(dǎo)入akka.actor包下的Actor)
- 實(shí)現(xiàn)receive方法,receive方法中直接處理消息即可,不需要添加loop和react方法調(diào)用. Akka會(huì)自動(dòng)調(diào)用receive來(lái)接收消息.
- 【可選】還可以實(shí)現(xiàn)preStart()方法, 該方法在Actor對(duì)象構(gòu)建后執(zhí)行,在Actor生命周期中僅執(zhí)行一次.
-
加載Actor
- 要?jiǎng)?chuàng)建Akka的Actor,必須要先獲取創(chuàng)建一個(gè)ActorSystem。需要給ActorSystem指定一個(gè)名稱,并可以去加載一些配置項(xiàng)(后面會(huì)使用到)
- 調(diào)用ActorSystem.actorOf(Props(Actor對(duì)象), “Actor名字”)來(lái)加載Actor.
2.2 Actor Path
每一個(gè)Actor都有一個(gè)Path,這個(gè)路徑可以被外部引用。路徑的格式如下:
Actor類型 | 路徑 | 示例 |
---|---|---|
本地Actor | akka://actorSystem名稱/user/Actor名稱 | akka://SimpleAkkaDemo/user/senderActor |
遠(yuǎn)程Actor | akka.tcp://my-sys@ip地址:port/user/Actor名稱 | akka.tcp://192.168.10.17:5678/user/service-b |
2.3 入門案例
2.3.1 需求
基于Akka創(chuàng)建兩個(gè)Actor,Actor之間可以互相發(fā)送消息。
2.3.2 實(shí)現(xiàn)步驟
- 創(chuàng)建Maven模塊
- 創(chuàng)建并加載Actor
- 發(fā)送/接收消息
2.3.3 創(chuàng)建Maven模塊
使用Akka需要導(dǎo)入Akka庫(kù),這里我們使用Maven來(lái)管理項(xiàng)目, 具體步驟如下:
-
創(chuàng)建Maven模塊.
選中項(xiàng)目, 右鍵 -> new -> Module -> Maven -> Next -> GroupId: com.itheima ArtifactId: akka-demo next -> 設(shè)置"module name"值為"akka-demo" -> finish
-
打開(kāi)pom.xml文件,導(dǎo)入akka Maven依賴和插件.
//1. 直接把資料的pom.xml文件中的內(nèi)容貼過(guò)來(lái)就行了. //2. 源碼目錄在: src/main/scala下 //3. 測(cè)試代碼目錄在: src/test/scala下. //4. 上述的這兩個(gè)文件夾默認(rèn)是不存在的, 需要我們手動(dòng)創(chuàng)建. //5. 創(chuàng)建出來(lái)后, 記得要修改兩個(gè)文件夾的類型. 選中文件夾, 右鍵 -> Mark Directory as -> Source Roots //存放源代碼. Test Source Roots //存放測(cè)試代碼.
2.3.4 創(chuàng)建并加載Actor
到這, 我們已經(jīng)把Maven項(xiàng)目創(chuàng)建起來(lái)了, 后續(xù)我們都會(huì)采用Maven來(lái)管理我們的項(xiàng)目. 接下來(lái), 我們來(lái)實(shí)現(xiàn):
創(chuàng)建并加載Actor, 這里, 我們要?jiǎng)?chuàng)建兩個(gè)Actor:
- SenderActor:用來(lái)發(fā)送消息
- ReceiverActor:用來(lái)接收,回復(fù)消息
具體步驟
-
在src/main/scala文件夾下創(chuàng)建包: com.itheima.akka.demo
-
在該包下創(chuàng)建兩個(gè)Actor(注意: 用object修飾的單例對(duì)象).
-
SenderActor: 表示發(fā)送消息的Actor對(duì)象.
-
ReceiverActor: 表示接收消息的Actor對(duì)象.
-
-
在該包下創(chuàng)建
單例對(duì)象Entrance, 并封裝main方法
, 表示整個(gè)程序的入口. -
把程序啟動(dòng)起來(lái), 如果不報(bào)錯(cuò), 說(shuō)明代碼是沒(méi)有問(wèn)題的.
參考代碼
object SenderActor extends Actor {
/*
細(xì)節(jié):
在Actor并發(fā)編程模型中, 需要實(shí)現(xiàn)act方法, 想要持續(xù)接收消息, 可通過(guò)loop + react實(shí)現(xiàn).
在Akka編程模型中, 需要實(shí)現(xiàn)receive方法, 直接在receive方法中編寫偏函數(shù)處理消息即可.
*/
//重寫receive()方法
override def receive: Receive = {
case x => println(x)
}
}
object ReceiverActor extends Actor{
//重寫receive()方法
override def receive: Receive = {
case x => println(x)
}
}
object Entrance {
def main(args:Array[String]) = {
//1. 實(shí)現(xiàn)一個(gè)Actor Trait, 其實(shí)就是創(chuàng)建兩個(gè)Actor對(duì)象(上述步驟已經(jīng)實(shí)現(xiàn)).
//2. 創(chuàng)建ActorSystem
//兩個(gè)參數(shù)的意思分別是:ActorSystem的名字, 加載配置文件(此處先不設(shè)置)
val actorSystem = ActorSystem("actorSystem",ConfigFactory.load())
//3. 加載Actor
//actorOf方法的兩個(gè)參數(shù)意思是: 1. 具體的Actor對(duì)象. 2.該Actor對(duì)象的名字
val senderActor = actorSystem.actorOf(Props(SenderActor), "senderActor")
val receiverActor = actorSystem.actorOf(Props(ReceiverActor), "receiverActor")
}
}
2.3.5 發(fā)送/接收消息
思路分析
- 使用樣例類封裝消息
- SubmitTaskMessage——提交任務(wù)消息
- SuccessSubmitTaskMessage——任務(wù)提交成功消息
- 使用
!
發(fā)送異步無(wú)返回消息.
參考代碼
-
MessagePackage.scala文件中的代碼
/** * 記錄發(fā)送消息的 樣例類. * @param msg 具體的要發(fā)送的信息. */ case class SubmitTaskMessage(msg:String) /** * 記錄 回執(zhí)信息的 樣例類. * @param msg 具體的回執(zhí)信息. */ case class SuccessSubmitTaskMessage(msg:String)
-
Entrance.scala文件中的代碼
//程序主入口. object Entrance { def main(args: Array[String]): Unit = { //1. 創(chuàng)建ActorSystem, 用來(lái)管理所有用戶自定義的Actor. val actorSystem = ActorSystem("actorSystem", ConfigFactory.load()) //2. 通過(guò)ActorSystem, 來(lái)管理我們自定義的Actor(SenderActor, ReceiverActor) val senderActor = actorSystem.actorOf(Props(SenderActor), "senderActor") val receiverActor = actorSystem.actorOf(Props(ReceiverActor), "receiverActor") //3. 由ActorSystem給 SenderActor發(fā)送一句話"start". senderActor ! "start" } }
-
SenderActor.scala文件中的代碼
object SenderActor extends Actor{ override def receive: Receive = { //1. 接收Entrance發(fā)送過(guò)來(lái)的: start case "start" => { //2. 打印接收到的數(shù)據(jù). println("SenderActor接收到: Entrance發(fā)送過(guò)來(lái)的 start 信息.") //3. 獲取ReceiverActor的具體路徑. //參數(shù): 要獲取的Actor的具體路徑. //格式: akka://actorSystem的名字/user/要獲取的Actor的名字. val receiverActor = context.actorSelection("akka://actorSystem/user/receiverActor") //4. 給ReceiverActor發(fā)送消息: 采用樣例類SubmitTaskMessage receiverActor ! SubmitTaskMessage("我是SenderActor, 我在給你發(fā)消息!...") } //5. 接收ReceiverActor發(fā)送過(guò)來(lái)的回執(zhí)信息. case SuccessSubmitTaskMessage(msg) => println(s"SenderActor接收到回執(zhí)信息: ${msg} ") } }
-
ReceiverActor.scala文件中的代碼
object ReceiverActor extends Actor { override def receive: Receive = { //1. 接收SenderActor發(fā)送過(guò)來(lái)的消息. case SubmitTaskMessage(msg) => { //2. 打印接收到的消息. println(s"ReceiverActor接收到: ${msg}") //3. 給出回執(zhí)信息. sender ! SuccessSubmitTaskMessage("接收任務(wù)成功!. 我是ReceiverActor") } } }
輸出結(jié)果
SenderActor接收到: Entrance發(fā)送過(guò)來(lái)的 start 信息.
ReceiverActor接收到: 我是SenderActor, 我在給你發(fā)消息!...
SenderActor接收到回執(zhí)信息: 接收任務(wù)成功!. 我是ReceiverActor
3. Akka定時(shí)任務(wù)
需求: 如果我們想要使用Akka框架定時(shí)的執(zhí)行一些任務(wù),該如何處理呢?
答: 在Akka中,提供了一個(gè)scheduler對(duì)象來(lái)實(shí)現(xiàn)定時(shí)調(diào)度功能。使用ActorSystem.scheduler.schedule()方法
,就可以啟動(dòng)一個(gè)定時(shí)任務(wù)。
3.1 schedule()方法的格式
-
方式一: 采用
發(fā)送消息
的形式實(shí)現(xiàn).def schedule( initialDelay: FiniteDuration, // 延遲多久后啟動(dòng)定時(shí)任務(wù) interval: FiniteDuration, // 每隔多久執(zhí)行一次 receiver: ActorRef, // 給哪個(gè)Actor發(fā)送消息 message: Any) // 要發(fā)送的消息 (implicit executor: ExecutionContext) // 隱式參數(shù):需要手動(dòng)導(dǎo)入
-
方式二: 采用
自定義方式
實(shí)現(xiàn).def schedule( initialDelay: FiniteDuration, // 延遲多久后啟動(dòng)定時(shí)任務(wù) interval: FiniteDuration // 每隔多久執(zhí)行一次 )(f: ? Unit) // 定期要執(zhí)行的函數(shù),可以將邏輯寫在這里 (implicit executor: ExecutionContext) // 隱式參數(shù):需要手動(dòng)導(dǎo)入
注意: 不管使用上述的哪種方式實(shí)現(xiàn)定時(shí)器, 都需要
導(dǎo)入隱式轉(zhuǎn)換和隱式參數(shù)
, 具體如下://導(dǎo)入隱式轉(zhuǎn)換, 用來(lái)支持 定時(shí)器. import actorSystem.dispatcher //導(dǎo)入隱式參數(shù), 用來(lái)給定時(shí)器設(shè)置默認(rèn)參數(shù). import scala.concurrent.duration._
3.2 案例
需求
- 定義一個(gè)ReceiverActor, 用來(lái)循環(huán)接收消息, 并打印接收到的內(nèi)容.
- 創(chuàng)建一個(gè)ActorSystem, 用來(lái)管理所有用戶自定義的Actor.
- 關(guān)聯(lián)ActorSystem和ReceiverActor.
- 導(dǎo)入隱式轉(zhuǎn)換和隱式參數(shù).
- 通過(guò)定時(shí)器, 定時(shí)(間隔1秒)給ReceiverActor發(fā)送一句話.
- 方式一: 采用發(fā)送消息的形式實(shí)現(xiàn).
- 方式二: 采用自定義方式實(shí)現(xiàn).
參考代碼
//案例: 演示Akka中的定時(shí)器.
object MainActor {
//1. 定義一個(gè)Actor, 用來(lái)循環(huán)接收消息, 并打印.
object ReceiverActor extends Actor {
override def receive: Receive = {
case x => println(x) //不管接收到的是什么, 都打印.
}
}
def main(args: Array[String]): Unit = {
//2. 創(chuàng)建一個(gè)ActorSystem, 用來(lái)管理所有用戶自定義的Actor.
val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
//3. 關(guān)聯(lián)ActorSystem和ReceiverActor.
val receiverActor = actorSystem.actorOf(Props(ReceiverActor), "receiverActor")
//4. 導(dǎo)入隱式轉(zhuǎn)換和隱式參數(shù).
//導(dǎo)入隱式轉(zhuǎn)換, 用來(lái)支持 定時(shí)器.
import actorSystem.dispatcher
//導(dǎo)入隱式參數(shù), 用來(lái)給定時(shí)器設(shè)置默認(rèn)參數(shù).
import scala.concurrent.duration._
//5. 通過(guò)定時(shí)器, 定時(shí)(間隔1秒)給ReceiverActor發(fā)送一句話.
//方式一: 通過(guò)定時(shí)器的第一種方式實(shí)現(xiàn), 傳入四個(gè)參數(shù).
//actorSystem.scheduler.schedule(3.seconds, 2.seconds, receiverActor, "你好, 我是種哥, 我有種子你買嗎?...")
//方式二: 通過(guò)定時(shí)器的第二種方式實(shí)現(xiàn), 傳入兩個(gè)時(shí)間, 和一個(gè)函數(shù).
//actorSystem.scheduler.schedule(0 seconds, 2 seconds)(receiverActor ! "新上的種子喲, 你沒(méi)見(jiàn)過(guò)! 嘿嘿嘿...")
//實(shí)際開(kāi)發(fā)寫法
actorSystem.scheduler.schedule(0 seconds, 2 seconds){
receiverActor ! "新上的種子喲, 你沒(méi)見(jiàn)過(guò)! 嘿嘿嘿..."
}
}
}
4. 實(shí)現(xiàn)兩個(gè)進(jìn)程之間的通信
4.1 案例介紹
基于Akka實(shí)現(xiàn)在兩個(gè)進(jìn)程間發(fā)送、接收消息。
- WorkerActor啟動(dòng)后去連接MasterActor,并發(fā)送消息給MasterActor.
- MasterActor接收到消息后,再回復(fù)消息給WorkerActor。
4.2 Worker實(shí)現(xiàn)
步驟文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-847979.html
-
創(chuàng)建一個(gè)Maven模塊,導(dǎo)入依賴和配置文件.
-
創(chuàng)建Maven模塊.
GroupId: com.itheima
ArtifactID: akka-worker
-
把資料下的pom.xml文件中的內(nèi)容復(fù)制到Maven項(xiàng)目akka-worker的pom.xml文件中
-
把資料下的application.conf復(fù)制到 src/main/resources文件夾下.
-
打開(kāi) application.conf配置文件, 修改端口號(hào)為: 9999
-
-
創(chuàng)建啟動(dòng)WorkerActor.
- 在src/main/scala文件夾下創(chuàng)建包: com.itheima.akka
- 在該包下創(chuàng)建 WorkerActor(單例對(duì)象的形式創(chuàng)建).
- 在該包下創(chuàng)建Entrance單例對(duì)象, 里邊定義main方法
-
發(fā)送"setup"消息給WorkerActor,WorkerActor接收打印消息.
-
啟動(dòng)測(cè)試.
參考代碼
-
WorkerActor.scala文件中的代碼
//1. 創(chuàng)建WorkActor, 用來(lái)接收和發(fā)送消息. object WorkerActor extends Actor{ override def receive: Receive = { //2. 接收消息. case x => println(x) } }
-
Entrance.scala文件中的代碼
//程序入口. //當(dāng)前ActorSystem對(duì)象的路徑 akka.tcp://actorSystem@127.0.0.1:9999 object Entrance { def main(args: Array[String]): Unit = { //1. 創(chuàng)建ActorSystem. val actorSystem = ActorSystem("actorSystem", ConfigFactory.load()) //2. 通過(guò)ActorSystem, 加載自定義的WorkActor. val workerActor = actorSystem.actorOf(Props(WorkerActor), "workerActor") //3. 給WorkActor發(fā)送一句話. workerActor ! "setup" } } //啟動(dòng)測(cè)試: 右鍵, 執(zhí)行, 如果打印結(jié)果出現(xiàn)"setup", 說(shuō)明程序執(zhí)行沒(méi)有問(wèn)題.
4.3 Master實(shí)現(xiàn)
步驟
-
創(chuàng)建一個(gè)Maven模塊,導(dǎo)入依賴和配置文件.
-
創(chuàng)建Maven模塊.
GroupId: com.itheima
ArtifactID: akka-master
-
把資料下的pom.xml文件中的內(nèi)容復(fù)制到Maven項(xiàng)目akka-master的pom.xml文件中
-
把資料下的application.conf復(fù)制到 src/main/resources文件夾下.
-
打開(kāi) application.conf配置文件, 修改端口號(hào)為: 8888
-
-
創(chuàng)建啟動(dòng)MasterActor.
- 在src/main/scala文件夾下創(chuàng)建包: com.itheima.akka
- 在該包下創(chuàng)建 MasterActor(單例對(duì)象的形式創(chuàng)建).
- 在該包下創(chuàng)建Entrance單例對(duì)象, 里邊定義main方法
-
WorkerActor發(fā)送"connect"消息給MasterActor
-
MasterActor回復(fù)"success"消息給WorkerActor
-
WorkerActor接收并打印接收到的消息
-
啟動(dòng)Master、Worker測(cè)試
參考代碼
-
MasterActor.scala文件中的代碼
//MasterActor: 用來(lái)接收WorkerActor發(fā)送的數(shù)據(jù), 并給其返回 回執(zhí)信息. //負(fù)責(zé)管理MasterActor的ActorSystem的地址: akka.tcp://actorSystem@127.0.0.1:8888 object MasterActor extends Actor{ override def receive: Receive = { //1. 接收WorkerActor發(fā)送的數(shù)據(jù) case "connect" => { println("MasterActor接收到: connect!...") //2. 給WorkerActor回執(zhí)一句話. sender ! "success" } } }
-
Entrance.scala文件中的代碼
//Master模塊的主入口 object Entrance { def main(args: Array[String]): Unit = { //1. 創(chuàng)建ActorSystem, 用來(lái)管理用戶所有的自定義Actor. val actorSystem = ActorSystem("actorSystem", ConfigFactory.load()) //2. 關(guān)聯(lián)ActorSystem和MasterActor. val masterActor = actorSystem.actorOf(Props(MasterActor), "masterActor") //3. 給masterActor發(fā)送一句話: 測(cè)試數(shù)據(jù), 用來(lái)測(cè)試. //masterActor ! "測(cè)試數(shù)據(jù)" } }
-
WorkerActor.scala文件中的代碼(就修改了第3步)
//WorkerActor: 用來(lái)接收ActorSystem發(fā)送的消息, 并發(fā)送消息給MasterActor, 然后接收MasterActor的回執(zhí)信息. //負(fù)責(zé)管理WorkerActor的ActorSystem的地址: akka.tcp://actorSystem@127.0.0.1:9999 object WorkerActor extends Actor{ override def receive: Receive = { //1. 接收Entrance發(fā)送過(guò)來(lái)的: setup. case "setup" => { println("WorkerActor接收到: Entrance發(fā)送過(guò)來(lái)的指令 setup!.") //2. 獲取MasterActor的引用. val masterActor = context.system.actorSelection("akka.tcp://actorSystem@127.0.0.1:8888/user/masterActor") //3. 給MasterActor發(fā)送一句話. masterActor ! "connect" } //4. 接收MasterActor的回執(zhí)信息. case "success" => println("WorkerActor接收到: success!") } }
5. 案例: 簡(jiǎn)易版spark通信框架
5.1 案例介紹
模擬Spark的Master與Worker通信.
- 一個(gè)Master
- 管理多個(gè)Worker
- 若干個(gè)Worker(Worker可以按需添加)
- 向Master發(fā)送注冊(cè)信息
- 向Master定時(shí)發(fā)送心跳信息
5.2 實(shí)現(xiàn)思路
- 構(gòu)建Master、Worker階段
- 構(gòu)建Master ActorSystem、Actor
- 構(gòu)建Worker ActorSystem、Actor
- Worker注冊(cè)階段
- Worker進(jìn)程向Master注冊(cè)(將自己的ID、CPU核數(shù)、內(nèi)存大小(M)發(fā)送給Master)
- Worker定時(shí)發(fā)送心跳階段
- Worker定期向Master發(fā)送心跳消息
- Master定時(shí)心跳檢測(cè)階段
- Master定期檢查Worker心跳,將一些超時(shí)的Worker移除,并對(duì)Worker按照內(nèi)存進(jìn)行倒序排序
- 多個(gè)Worker測(cè)試階段
- 啟動(dòng)多個(gè)Worker,查看是否能夠注冊(cè)成功,并停止某個(gè)Worker查看是否能夠正確移除
5.3 工程搭建
需求
本項(xiàng)目使用Maven搭建工程.
步驟
- 分別搭建以下幾個(gè)項(xiàng)目, Group ID統(tǒng)一都為: com.itheima, 具體工程名如下:
工程名 | 說(shuō)明 |
---|---|
spark-demo-common | 存放公共的消息、實(shí)體類 |
spark-demo-master | Akka Master節(jié)點(diǎn) |
spark-demo-worker | Akka Worker節(jié)點(diǎn) |
-
導(dǎo)入依賴(資料包中的pom.xml).
注意: master, worker要添加common依賴, 具體如下:
<!--導(dǎo)入spark-demo-common模塊--> <dependency> <groupId>com.itheima</groupId> <artifactId>spark-demo-common</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
-
分別在三個(gè)項(xiàng)目下的src/main, src/test下, 創(chuàng)建scala目錄.
-
導(dǎo)入配置文件(資料包中的application.conf)
- 修改Master的端口為7000
- 修改Worker的端口為8000
5.4 構(gòu)建Master和Worker
需求
分別構(gòu)建Master和Worker,并啟動(dòng)測(cè)試
步驟
- 創(chuàng)建并加載Master Actor
- 創(chuàng)建并加載Worker Actor
- 測(cè)試是否能夠啟動(dòng)成功
參考代碼
-
完成master模塊中的代碼, 即: 在src/main/scala下創(chuàng)建包: com.itheima.spark.master, 包中代碼如下:
-
MasterActor.scala文件中的代碼
//Master: 用來(lái)管理多個(gè)Worker的. //MasterActor的路徑: akka.tcp://actorSystem@127.0.0.1:7000 object MasterActor extends Actor{ override def receive: Receive = { case x => println(x) } }
-
Master.scala文件中的代碼
//程序入口: 相當(dāng)于我們以前寫的MainActor object Master { def main(args: Array[String]): Unit = { //1. 創(chuàng)建ActorSystem. val actorSystem = ActorSystem("actorSystem", ConfigFactory.load()) //2. 通過(guò)ActorSystem, 關(guān)聯(lián)MasterActor. val masterActor = actorSystem.actorOf(Props(MasterActor), "masterActor") //3. 啟動(dòng)程序, 如果不報(bào)錯(cuò), 說(shuō)明代碼沒(méi)有問(wèn)題. } }
-
-
完成worker模塊中的代碼, 即: 在src/main/scala下創(chuàng)建包: com.itheima.spark.worker, 包中代碼如下:
-
WorkerActor.scala文件中的代碼
//WorkerActor的地址: akka.tcp://actorSystem@127.0.0.1:7100 object WorkerActor extends Actor{ override def receive: Receive = { case x => println(x) } }
-
Worker.scala文件中的代碼
//程序入口 object Worker { def main(args: Array[String]): Unit = { //1. 創(chuàng)建ActorSystem. val actorSystem = ActorSystem("actorSystem", ConfigFactory.load()) //2. 通過(guò)ActorSystem, 關(guān)聯(lián)MasterActor. val workerActor = actorSystem.actorOf(Props(WorkerActor), "workerActor") //3. 啟動(dòng)程序, 如果不報(bào)錯(cuò), 說(shuō)明代碼沒(méi)有問(wèn)題. workerActor ! "hello" } }
-
5.5 Worker注冊(cè)階段實(shí)現(xiàn)
需求
在Worker啟動(dòng)時(shí),發(fā)送注冊(cè)消息給Master.
思路分析
- Worker向Master發(fā)送注冊(cè)消息(workerid、cpu核數(shù)、內(nèi)存大?。?
- 隨機(jī)生成CPU核(1、2、3、4、6、8)
- 隨機(jī)生成內(nèi)存大?。?12、1024、2048、4096)(單位M)
- Master保存Worker信息,并給Worker回復(fù)注冊(cè)成功消息
- 啟動(dòng)測(cè)試
具體步驟
-
在spark-demo-common項(xiàng)目的src/main/scala文件夾下創(chuàng)建包: com.itheima.spark.commons
把資料下的MessagePackage.scala和Entities.scala這兩個(gè)文件拷貝到commons包下.
-
在WorkerActor單例對(duì)象中定義一些成員變量, 分別表示:
- masterActorRef: 表示MasterActor的引用.
- workerid: 表示當(dāng)前WorkerActor對(duì)象的id.
- cpu: 表示當(dāng)前WorkerActor對(duì)象的CPU核數(shù).
- mem: 表示當(dāng)前WorkerActor對(duì)象的內(nèi)存大小.
- cup_list: 表示當(dāng)前WorkerActor對(duì)象的CPU核心數(shù)的取值范圍.
- mem_list: 表示當(dāng)前WorkerActor對(duì)象的內(nèi)存大小的取值范圍.
-
在WorkerActor的preStart()方法中, 封裝注冊(cè)信息, 并發(fā)送給MasterActor.
-
在MasterActor中接收WorkerActor提交的注冊(cè)信息, 并保存到雙列集合中…
-
MasterActor給WorkerActor發(fā)送回執(zhí)信息(注冊(cè)成功信息.).
-
在WorkerActor中接收MasterActor回復(fù)的 注冊(cè)成功信息.
參考代碼
-
WorkerActor.scala文件中的代碼
//WorkerActor的地址: akka.tcp://actorSystem@127.0.0.1:7100 object WorkerActor extends Actor { //1 定義成員變量, 記錄MasterActor的引用, 以及WorkerActor提交的注冊(cè)參數(shù)信息. private var masterActorRef: ActorSelection = _ //表示MasterActor的引用. private var workerid:String = _ //表示W(wǎng)orkerActor的id private var cpu:Int = _ //表示W(wǎng)orkerActor的CPU核數(shù) private var mem:Int = _ //表示W(wǎng)orkerActor的內(nèi)存大小. private val cpu_list = List(1, 2, 3, 4, 6, 8) //CPU核心數(shù)的取值范圍 private val mem_list = List(512, 1024, 2048, 4096) //內(nèi)存大小取值范圍 //2. 重寫preStart()方法, 里邊的內(nèi)容: 在Actor啟動(dòng)之前就會(huì)執(zhí)行. override def preStart(): Unit = { //3. 獲取Master的引用. masterActorRef = context.actorSelection("akka.tcp://actorSystem@127.0.0.1:7000/usre/masterActor") //4. 構(gòu)建注冊(cè)消息. workerid = UUID.randomUUID().toString //設(shè)置workerActor的id val r = new Random() cpu = cpu_list(r.nextInt(cpu_list.length)) mem = mem_list(r.nextInt(mem_list.length)) //5. 將WorkerActor的提交信息封裝成 WorkerRegisterMessage對(duì)象. var registerMessage = WorkerRegisterMessage(workerid, cpu, mem) //6. 發(fā)送消息給MasterActor. masterActorRef ! registerMessage } override def receive: Receive = { case x => println(x) } }
-
MasterActor.scala文件中的代碼
//Master: 用來(lái)管理多個(gè)Worker的. //MasterActor的路徑: akka.tcp://actorSystem@127.0.0.1:7000 object MasterActor extends Actor{ //1. 定義一個(gè)可變的Map集合, 用來(lái)保存注冊(cè)成功好的Worker信息. private val regWorkerMap = collection.mutable.Map[String, WorkerInfo]() override def receive: Receive = { case WorkerRegisterMessage(workId, cpu, mem) => { //2. 打印接收到的注冊(cè)信息 println(s"MasterActor: 接收到worker注冊(cè)信息, ${workId}, ${cpu}, ${mem}") //3. 把注冊(cè)成功后的保存信息保存到: workInfo中. regWorkerMap += workId -> WorkerInfo(workId, cpu, mem) //4. 回復(fù)一個(gè)注冊(cè)成功的消息. sender ! RegisterSuccessMessage } } }
-
修改WorkerActor.scala文件中receive()方法的代碼
override def receive: Receive = { case RegisterSuccessMessage => println("WorkerActor: 注冊(cè)成功!") }
5.6 Worker定時(shí)發(fā)送心跳階段
需求
Worker接收到Master返回的注冊(cè)成功信息后,定時(shí)給Master發(fā)送心跳消息。而Master收到Worker發(fā)送的心跳消息后,需要更新對(duì)應(yīng)Worker的最后心跳時(shí)間。
思路分析
- 編寫工具類讀取心跳發(fā)送時(shí)間間隔
- 創(chuàng)建心跳消息
- Worker接收到注冊(cè)成功后,定時(shí)發(fā)送心跳消息
- Master收到心跳消息,更新Worker最后心跳時(shí)間
- 啟動(dòng)測(cè)試
具體步驟
-
在worker的src/main/resources文件夾下的 application.conf文件中添加一個(gè)配置.
worker.heartbeat.interval = 5 //配置worker發(fā)送心跳的周期(單位是 s)
-
在worker項(xiàng)目的com.itheima.spark.work包下創(chuàng)建一個(gè)新的單例對(duì)象: ConfigUtils, 用來(lái)讀取配置文件信息.
-
在WorkerActor的receive()方法中, 定時(shí)給MasterActor發(fā)送心跳信息.
-
Master接收到心跳消息, 更新Worker最后心跳時(shí)間. .
參考代碼
-
worker項(xiàng)目的ConfigUtils.scala文件中的代碼
object ConfigUtils { //1. 獲取配置信息對(duì)象. private val config = ConfigFactory.load() //2. 獲取worker心跳的具體周期 val `worker.heartbeat.interval` = config.getInt("worker.heartbeat.interval") }
-
修改WorkerActor.scala文件的receive()方法中的代碼
override def receive: Receive = { case RegisterSuccessMessage => { //1. 打印接收到的 注冊(cè)成功消息 println("WorkerActor: 接收到注冊(cè)成功消息!") //2. 導(dǎo)入時(shí)間單位隱式轉(zhuǎn)換 和 隱式參數(shù) import scala.concurrent.duration._ import context.dispatcher //3. 定時(shí)給Master發(fā)送心跳消息. context.system.scheduler.schedule(0 seconds, ConfigUtil.`worker.heartbeat.interval` seconds){ //3.1 采用自定義的消息的形式發(fā)送 心跳信息. masterActorRef ! WorkerHeartBeatMessage(workerId, cpu, mem) } } }
-
MasterActor.scala文件中的代碼
object MasterActor extends Actor { //1. 定義一個(gè)可變的Map集合, 用來(lái)保存注冊(cè)成功好的Worker信息. private val regWorkerMap = collection.mutable.Map[String, WorkerInfo]() override def receive: Receive = { //接收注冊(cè)信息. case WorkerRegisterMessage(workId, cpu, mem) => { //2. 打印接收到的注冊(cè)信息 println(s"MasterActor: 接收到worker注冊(cè)信息, ${workId}, ${cpu}, ${mem}") //3. 把注冊(cè)成功后的保存信息保存到: workInfo中. regWorkerMap += workId -> WorkerInfo(workId, cpu, mem, new Date().getTime) //4. 回復(fù)一個(gè)注冊(cè)成功的消息. sender ! RegisterSuccessMessage } //接收心跳消息 case WorkerHeartBeatMessage(workId, cpu, mem) => { //1. 打印接收到的心跳消息. println(s"MasterActor: 接收到${workId}的心跳信息") //2. 更新指定Worker的最后一次心跳時(shí)間. regWorkerMap += workId -> WorkerInfo(workId, cpu, mem, new Date().getTime) //3. 為了測(cè)試代碼邏輯是否OK, 我們可以打印下 regWorkerMap的信息 println(regWorkerMap) } } }
5.7 Master定時(shí)心跳檢測(cè)階段
需求
如果某個(gè)worker超過(guò)一段時(shí)間沒(méi)有發(fā)送心跳,Master需要將該worker從當(dāng)前的Worker集合中移除。可以通過(guò)Akka的定時(shí)任務(wù),來(lái)實(shí)現(xiàn)心跳超時(shí)檢查。
思路分析
- 編寫工具類,讀取檢查心跳間隔時(shí)間間隔、超時(shí)時(shí)間
- 定時(shí)檢查心跳,過(guò)濾出來(lái)大于超時(shí)時(shí)間的Worker
- 移除超時(shí)的Worker
- 對(duì)現(xiàn)有Worker按照內(nèi)存進(jìn)行降序排序,打印可用Worker
具體步驟
-
修改Master的application.conf配置文件, 添加兩個(gè)配置
#配置檢查Worker心跳的時(shí)間周期(單位: 秒)
master.check.heartbeat.interval = 6
#配置worker心跳超時(shí)的時(shí)間(秒)
master.check.heartbeat.timeout = 15 -
在Master項(xiàng)目的com.itheima.spark.master包下創(chuàng)建: ConfigUtils工具類(單例對(duì)象), 用來(lái)讀取配置文件信息.
-
在MasterActor中開(kāi)始檢查心跳(即: 修改MasterActor#preStart中的代碼.).
-
開(kāi)啟Master, 然后開(kāi)啟Worker, 進(jìn)行測(cè)試.
參考代碼
-
Master項(xiàng)目的ConfigUtils.scala文件中的代碼
//針對(duì)Master的工具類. object ConfigUtil { //1. 獲取到配置文件對(duì)象. private val config: Config = ConfigFactory.load() //2. 獲取檢查Worker心跳的時(shí)間周期(單位: 秒) val `master.check.heartbeat.interval` = config.getInt("master.check.heartbeat.interval") //3. 獲取worker心跳超時(shí)的時(shí)間(秒) val `master.check.heartbeat.timeout` = config.getInt("master.check.heartbeat.timeout") }
-
MasterActor.scala文件的preStart()方法中的代碼
//5. 定時(shí)檢查worker的心跳信息 override def preStart(): Unit = { //5.1 導(dǎo)入時(shí)間轉(zhuǎn)換隱式類型 和 定時(shí)任務(wù)隱式變量 import scala.concurrent.duration._ import context.dispatcher //5.2 啟動(dòng)定時(shí)任務(wù). context.system.scheduler.schedule(0 seconds, ConfigUtil.`master.check.heartbeat.interval` seconds) { //5.3 過(guò)濾大于超時(shí)時(shí)間的Worker. val timeOutWorkerMap = regWorkerMap.filter { keyval => //5.3.1 獲取最后一次心跳更新時(shí)間. val lastHeatBeatTime = keyval._2.lastHeartBeatTime //5.3.2 超時(shí)公式: 當(dāng)前系統(tǒng)時(shí)間 - 最后一次心跳時(shí)間 > 超時(shí)時(shí)間(配置文件信息 * 1000) if (new Date().getTime - lastHeatBeatTime > ConfigUtil.`master.check.heartbeat.timeout` * 1000) true else false } //5.4 移除超時(shí)的Worker if(!timeOutWorkerMap.isEmpty) { //如果要被移除的Worker集合不為空, 則移除此 timeOutWorkerMap //注意: 雙列集合是根據(jù)鍵移除元素的, 所以最后的 _._1是在獲取鍵. regWorkerMap --= timeOutWorkerMap.map(_._1) } //5.5 對(duì)worker按照內(nèi)存大小進(jìn)行降序排序, 打印Worker //_._2 獲取所有的WorkInfo對(duì)象. val workerList = regWorkerMap.map(_._2).toList //5.6 按照內(nèi)存進(jìn)行降序排序. val sortedWorkerList = workerList.sortBy(_.mem).reverse //5.7 打印結(jié)果 println("按照內(nèi)存的大小降序排列的Worker列表: ") println(sortedWorkerList) } }
5.8 多個(gè)Worker測(cè)試階段
需求
修改配置文件,啟動(dòng)多個(gè)worker進(jìn)行測(cè)試。
大白話: 啟動(dòng)一個(gè)Worker, 就修改一次Worker項(xiàng)目下的application.conf文件中記錄的端口號(hào), 然后重新開(kāi)啟Worker即可.文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-847979.html
步驟
- 測(cè)試啟動(dòng)新的Worker是否能夠注冊(cè)成功
- 停止Worker,測(cè)試是否能夠從現(xiàn)有列表刪除
-
-
}if (new Date().getTime - lastHeatBeatTime > ConfigUtil.`master.check.heartbeat.timeout` * 1000) true else false
//5.4 移除超時(shí)的Worker
if(!timeOutWorkerMap.isEmpty) {
//如果要被移除的Worker集合不為空, 則移除此 timeOutWorkerMap
//注意: 雙列集合是根據(jù)鍵移除元素的, 所以最后的 _.1是在獲取鍵.
regWorkerMap --= timeOutWorkerMap.map(.1)
}
//5.5 對(duì)worker按照內(nèi)存大小進(jìn)行降序排序, 打印Worker
//.2 獲取所有的WorkInfo對(duì)象.
val workerList = regWorkerMap.map(.2).toList
//5.6 按照內(nèi)存進(jìn)行降序排序.
val sortedWorkerList = workerList.sortBy(.mem).reverse
//5.7 打印結(jié)果
println("按照內(nèi)存的大小降序排列的Worker列表: ")
println(sortedWorkerList)
}
}
-
#### 5.8 多個(gè)Worker測(cè)試階段
**需求**
修改配置文件,啟動(dòng)多個(gè)worker進(jìn)行測(cè)試。
> 大白話: 啟動(dòng)一個(gè)Worker, 就修改一次Worker項(xiàng)目下的application.conf文件中記錄的端口號(hào), 然后重新開(kāi)啟Worker即可.
**步驟**
1. 測(cè)試啟動(dòng)新的Worker是否能夠注冊(cè)成功
2. 停止Worker,測(cè)試是否能夠從現(xiàn)有列表刪除
到了這里,關(guān)于Scala第二十章節(jié)(Akka并發(fā)編程框架、Akka入門案例、Akka定時(shí)任務(wù)代碼實(shí)現(xiàn)、兩個(gè)進(jìn)程間通信的案例以及簡(jiǎn)易版spark通信框架案例)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!