JVM并发 使用Akka构建演员应用程序 超越基础,构建使用actor交互的应用程序

“ JVM并发:与Akka异步执行 ”将您介绍给演员模型和Akka框架和运行时。建筑演员应用与建立传统的线性应用不同。通过线性应用,您可以考虑控制流程和完成目标所涉及的步骤顺序。为了有效利用演员模型,您可以将应用程序分解为独立的状态和行为(演员),并脚本化这些捆绑(消息)之间的交互。这两个组件 - 演员和消息 - 是您的应用程序的构建块。

如果你正确地组成你的演员和消息,你最终会遇到大多数事情异步发生的系统。异步操作比线性方法更难理解,但它在可扩展性方面是有利的。高度异步的程序能够更好地使用增加的系统资源(例如,内存和处理器)来更快地完成特定的任务或并行处理更多的任务实例。使用Akka,您甚至可以通过使用远程处理来分散多个系统来扩展这种可扩展性。

关于本系列

现在,多核系统是普遍存在的,并行编程必须比以往更广泛地应用。但并发可能难以正确实现,并且您需要新的工具来帮助您使用它。许多基于JVM的语言正在开发这种类型的工具,Scala在这一领域尤其活跃。本系列文章介绍了一些用于Java和Scala语言的并行编程的新方法。

在这篇文章中,您将更多地了解在演员和消息方面构建系统。两个示例应用程序中的第一个显示了Akka中演员和消息的工作原理。第二个更详细的例子说明了规划和可视化演员系统的结构。这两个示例都使用Scala代码,但是Java开发人员很容易理解(有关帮助,请参阅本系列的前一篇文章,与Akka进行Scala和Java编程的并行示例)。

认识Stars

最后一篇文章中的例子使用:

  • 演员直接由主要应用程序启动演员系统

  • 只有一种类型的演员

  • 演员之间极少的互动

对于第一个示例应用程序,我使用一个稍微更复杂的结构,我逐个评论。清单1显示了整个应用程序。

清单1.代 Star小号

1234五67891011121314151617181920212223242526272829三十313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899import scala.concurrent.duration._import scala.util.Randomimport akka.actor._import akka.util._object Stars1 extends App { import Star._ val starBaseLifetime = 5000 millis val starVariableLifetime = 2000 millis val starBaseSpawntime = 2000 millis val starVariableSpawntime = 1000 millis object Namer { case object GetName case class SetName(name: String) def props(names: Array[String]): Props = Props(new Namer(names)) } class Namer(names: Array[String]) extends Actor { import context.dispatcher import Namer._ context.setReceiveTimeout(starBaseSpawntime + starVariableSpawntime) def receive = { case GetName => { val name = ... sender ! SetName(name) } case ReceiveTimeout => { println("Namer receive timeout, shutting down system") system shutdown } } } object Star { case class Greet(peer: ActorRef) case object AskName case class TellName(name: String) case object Spawn case object IntroduceMe case object Die def props(greeting: String, gennum: Int, parent: String) = Props(new Star(greeting, gennum, parent)) } class Star(greeting: String, gennum: Int, parent: String) extends Actor { import context.dispatcher var myName: String = "" var starsKnown = Map[String, ActorRef]() val random = Random val namer = context actorSelection namerPath namer ! Namer.GetName def scaledDuration(base: FiniteDuration, variable: FiniteDuration) = base + variable * random.nextInt(1000) / 1000 val killtime = scaledDuration(starBaseLifetime, starVariableLifetime) val killer = scheduler.scheduleOnce(killtime, self, Die) val spawntime = scaledDuration(starBaseSpawntime, starVariableSpawntime) val spawner = scheduler.schedule(spawntime, 1 second, self, Spawn) if (gennum > 1) scheduler.scheduleOnce(1 second, context.parent, IntroduceMe) def receive = { case Namer.SetName(name) => { myName = name println(s"$name is the ${gennum}th generation child of $parent") context become named } } def named: Receive = { case Greet(peer) => peer ! AskName case AskName => sender ! TellName(myName) case TellName(name) => { println(s"$myName says: '$greeting, $name'") starsKnown += name -> sender } case Spawn => { println(s"$myName says: A star is born!") context.actorOf(props(greeting, gennum + 1, myName)) } case IntroduceMe => starsKnown.foreach { case (name, ref) => ref ! Greet(sender) } case Die => { println(s"$myName says: 'I'd like to thank the Academy...'") context stop self } } } val namerPath = "/user/namer" val system = ActorSystem("actor-demo-scala") val scheduler = system.scheduler system.actorOf(Namer.props(Array("Bob", "Alice", "Rock", "Paper", "Scissors", "North", "South", "East", "West", "Up", "Down")), "namer") val star1 = system.actorOf(props("Howya doing", 1, "Nobody")) val star2 = system.actorOf(props("Happy to meet you", 1, "Nobody")) Thread sleep 500 star1 ! Greet(star2) star2 ! Greet(star1)}

此应用程序创建一个具有两个actor类型的actor系统: NamerStar。该Namer演员是一个单独的,有效名的中央目录。该Star演员得到了他们的(屏幕)的名字 Namer,然后打印出问候消息给其他 StarS,因为在最后一的例子。但他们也产生 Star了他们随后介绍给Star他们所知道的孩子; 和Star演员能最终死亡。

清单2是运行此应用程序时可能会看到的输出示例。

清单2.应用程序输出

1234五67891011121314151617181920212223242526272829三十3132333435363738394041424344454647484950515253545556Bob is the 1th generation child of NobodyAlice is the 1th generation child of NobodyBob says: 'Howya doing, Alice'Alice says: 'Happy to meet you, Bob'Bob says: A star is born!Rock is the 2th generation child of BobAlice says: A star is born!Paper is the 2th generation child of AliceBob says: A star is born!Scissors is the 2th generation child of BobAlice says: 'Happy to meet you, Rock'Alice says: A star is born!North is the 2th generation child of AliceBob says: 'Howya doing, Paper'Rock says: 'Howya doing, Paper'Bob says: A star is born!South is the 2th generation child of BobAlice says: 'Happy to meet you, Scissors'Paper says: 'Happy to meet you, Scissors'Alice says: A star is born!East is the 2th generation child of AliceBob says: 'Howya doing, North'Rock says: 'Howya doing, North'Scissors says: 'Howya doing, North'Paper says: A star is born!West is the 3th generation child of PaperRock says: A star is born!Up is the 3th generation child of RockBob says: A star is born!Down is the 2th generation child of BobAlice says: 'Happy to meet you, South'North says: 'Happy to meet you, South'Paper says: 'Happy to meet you, South'Scissors says: A star is born!Bob-Bob is the 3th generation child of ScissorsAlice says: A star is born!Bob-Alice is the 2th generation child of AliceScissors says: 'Howya doing, East'Rock says: 'Howya doing, East'Bob says: 'Howya doing, East'South says: 'Howya doing, East'North says: A star is born!Bob-Rock is the 3th generation child of NorthPaper says: A star is born!Bob-Paper is the 3th generation child of PaperBob says: 'I'd like to thank the Academy...'Scissors says: 'Howya doing, West'South says: 'Howya doing, West'Alice says: A star is born!Bob-Scissors is the 2th generation child of AliceNorth says: A star is born!Bob-North is the 3th generation child of NorthPaper says: A star is born!Bob-South is the 3th generation child of PaperAlice says: 'I'd like to thank the Academy...'Namer receive timeout, shutting down system

几代 Star小号

与一些现实世界的演员不同,Star演员不会以戏剧和公开的方式制作后代; 相反,他们每次收到Spawn消息时都会静静地弹出一个孩子。他们对这一事件的兴奋之情是简单的诞生公告“ A star is born!再次,与现实世界的演员不同,骄傲的新父母Star甚至不能宣布他们的新孩子的名字,而是由命名机构决定的。在新生儿Star被命名后,Namer将以“ Ted is the 2th generation child of Bob。” 的形式打印小孩的姓名和细节。

A Star的死亡是由Star接收到一个Die消息触发的,响应该消息打印消息“” I'd like to thank the Academy...。在Star 随后执行该context stop self语句,告诉控制阿卡演员方面,它是做和应该被关闭。然后,上下文处理所有清理工作,并从系统中删除actor。

改变角色

现实世界的演员可以扮演很多不同的角色。通过改变消息处理方法,Akka演员也可以承担不同的角色。您可以在Staractor中看到这一点,默认receive 方法仅处理SetName消息,所有其他消息都由该named方法处理。在SetName消息 的处理中,切换发生在context become named声明中。这个角色改变的意图是,Star除非它被命名,否则它不能执行任何操作,并且在命名之后,它永远不能被重命名。

您可以始终以单一receive方法处理所有消息处理 ,但这通常会使基于当前actor状态的条件语句的凌乱代码。receive对于不同的状态使用单独的 方法可以保持代码的清洁和直接。一般来说,任何时候你有一个不同的消息适合的actor状态,你应该倾向于使用一种新的 receive方法来表示该状态。

您需要注意,当您更改演员角色时,不要排除处理有效的消息。例如,如果Star演员被允许在任何时候重命名,清单1中的named方法 将需要处理 消息。任何未被actor当前方法处理的消息有效地被删除(实际上,默认情况下发送到一个死信邮箱,但是丢失到用户角色)。SetNamereceive

作为更改消息处理程序的替代方法,您还可以将当前消息处理程序推送到堆栈中,并使用双参数形式设置新消息处理程序become(named, false)。然后,您可以最终通过context unbecome调用恢复原始处理程序 。您可以根据需要将呼叫嵌套become/ unbecome以这种方式深入,但是您必须小心代码最终执行 unbecome匹配become。任何不匹配的 becomes表示内存泄漏。

Namer演员

Namer演员通过在其构造函数名称的字符串数组。每次收到一条GetName消息时,它将在消息中返回数组中的下一个名称,SetName当其用完简单名称时,将连接到连字符名称。Namer演员的要点是为演员分配名称(理想的是唯一的名称)Star,所以Namer在这个系统中没有理由拥有多个 实例。启动actor系统的应用程序代码直接创建此单例实例,以便每个实例可供使用Star

因为应用程序创建Namer单身人士,所以可以通过ActorRef这个演员到每个人StarStar演员可以把它传递给他们的孩子。但是,Akka给你一种更清洁的方法来处理这种知名演员。actor初始化中的val namer = context actorSelection namerPath行 通过其在actor系统中的路径来Star查找Namer演员 - 在这种情况下 /user/namer。(/user前缀适用于所有用户创建的actors,并且namerNamer在使用actor创建 时设置的名称system.actorOf。)该namer值对应用程序中包含的所有actors都是可见的,因此可以在需要时直接使用。

计划消息

该清单1例使用几个预定的消息来促使各个角色。该Star演员在初始化过程中创建两个或三个预定的消息。该 val killer = scheduler.scheduleOnce(killtime, self, Die) 声明创建一个一次性消息调度程序,Star通过Die在舞台上的时间结束时发送消息来触发死亡 。该 val spawner = scheduler.schedule(spawntime, 1 second, self, Spawn) 声明创建一个重复的调度程序,Spawn 在初始延迟后以1秒的间隔发送消息,以填充新的一代Star

Star仅当Star另一个的后代Star (而不是由actor系统外部的应用程序代码创建)时,才使用第三种类型的调度消息。该 if (gennum > 1) scheduler.scheduleOnce(1 second, context.parent, IntroduceMe) 语句StarStar初始化之后创建一个待发送到父级的消息 ,如果新的Star是第二代或更高版本。当父母Star接收到这个消息时,它会发送一个 Greet消息给对方,Star它被引入,要求这些已知的Stars向孩子介绍自己。

Namer演员也使用计划的消息,这一次在接收超时的形式。该 context.setReceiveTimeout(starBaseSpawntime + starVariableSpawntime) 语句将超时设置为星的最大生成时间。每次演员接收到消息时,这个超时将由上下文重置,以便仅当指定的时间过去而没有接收到任何消息时才会触发该超时。Star不断创建Star发送消息的新孩子 Namer,所以只有当所有的Star演员都不在时才会发生超时。如果超时确实发生,请通过关闭整个actor系统来Namer处理结果 ReceiveTimeout消息(在akka.actor包中定义 )。

尖锐的读者可能会想知道Namer超时是如何发生的。的寿命Star总是将是至少5秒,每Star开始产卵的孩子 Star通过在时间s这是一个最大3秒老的-所以它看起来像应该有一个不断增长的过剩 StarS(有点像在电视真人秀)。那么这是如何工作的?答案在于Akka 演员监督模式和亲子关系。

演员家庭

Akka根据亲属执行行为者的监督层次。当一个演员创造另一个演员时,创造的演员成为原演员的下属。这意味着家长演员负责其小孩演员(我们常常希望看到适用于现实世界的演员的原则)。这个责任主要是关于失败的处理,但它确实对行动者的工作有一些影响。

监督层次是清单1演员系统关闭的原因。因为层次结构要求父actor可用,终止父actor将自动终止其所有子演员。在清单1中,只有两个Staractor最初由应用程序创建(它始终接收到名称BobAlice)。所有其他人Star都是由这两个初始 Star者之一或其子孙之一创建 Star的。所以当这些根中Star的每一个终止时,它都需要它的所有后代。两者都终止后,不Star存在。没有任何Stars来生成孩子Star,没有请求的名称去 Namer,所以Namer超时最终被触发,系统关闭。

更复杂的演员系统

清单1中看到一个简单的actor系统如何工作的例子。但真正的应用系统通常具有更多类型的演员(通常在数十或数百)以及演员之间更复杂的相互作用。设计和组织复杂演员系统的最佳方式之一是指定演员之间的消息流。

对于一个更复杂的例子,我扩展了清单1 应用程序来实现一个简单的电影制作模型。该模型使用四个主要actor类型和两个专用辅助actor类型:

  • Star:参与电影的演员

  • Scout一个人才队伍找新Star

  • Academy:一个跟踪所有活动Star的单例注册表

  • Director:电影制作人

  • CastingAssistant:助理 Director一个电影

  • ProductionAssistant助理 Director一个电影

Star以s 清单1中, Star本应用演员寿命有限。当一个人Director开始制作一部电影时,它会获得一个Star在电影中投射的当前活动的列表。首先, Director需要得到Star承诺的电影,然后使电影毕竟Star是承诺。如果Star电影中的任何一个在电影完成之前退出业务(或者以演员的名义去世),电影将失败。

图示消息

该清单1应用程序太简单了,我可以解释散文演员互动。这个更复杂的新应用程序需要更好的呈现交互的方式。消息传递图是显示这些交互的好方法。图1显示了在Scout找到新的Star(或在actor术语中创建一个Star)和新Star注册的过程中涉及到的交互序列 Academy

图1. Star创建和初始化

JVM并发 使用Akka构建演员应用程序 超越基础,构建使用actor交互的应用程序

以下是添加一个消息(和创建步骤)的顺序 Star

  1. FindTalent(从SchedulerScout):触发器添加一个Star

  2. GetName(从ScoutAcademy):分配名称Star

  3. GiveName(回复Academy):供应分配名称。

  4. actorOf():用提供的名称Scout创建新 Star演员。

  5. Register(来自StarAcademy):注册StarAcademy

该消息序列被设计为可扩展和灵活。每个消息都可以被隔离处理,所以actor不需要改变它们的内部状态来处理消息交换。(Academy 单例改变状态,但这是交换的整个目的的一部分。)由于没有内部状态改变,您不需要强制执行严格的消息序列。例如,您可以 通过发送FindTalentStar条消息来创建多个GetName消息 Academy。您甚至可以FindTalent在完成上一次Star创建之前连续处理多个 邮件。您还可以将任意数量的Scoutactors 添加 到系统中,并使其独立运行,而不会发生冲突。

制作电影是一个比创建新的电影更复杂的过程 Star,涉及更多的状态变化和潜在的故障条件。图2显示了制作电影时涉及的主要应用程序消息:

图2.制作电影

以下是制作电影所涉及的消息序列,主要看待一切顺利的道路,没有任何故障:

  1. MakeMovie(从SchedulerDirector):触发以启动电影。

  2. PickStars(从DirectorAcademy):选择Star要在动画中执行的动作。

  3. StarsPickedPickFailure(响应 Academy):如果足够Star的可用于制作电影,则Academy选择所需的号码并在StarsPicked消息中发回列表; 否则,Academy发送PickFailure 响应。

  4. actorOf()Director创建一个 CastingAssistant演员来处理电影。

  5. OfferRole(在电影中的CastingAssistant每一个Star):CastingAssistant提供角色 Star

  6. AcceptRoleRejectRole(每个响应 Star):Star如果已经提交了另一个角色但拒绝提供的角色,否则拒绝接受。

  7. AllSignedCastingFailureCastingAssistant对父母):当所有人Star都接受他们的角色时,他们CastingAssistant的工作已经完成,所以它通过Director 一个AllSigned消息将成功传递给了父母; 如果不可能施放Star(特别是如果死亡), CastingAssistant则将失败传递给父母。无论哪种方式,CastingAssistant完成并可以终止。

  8. actorOf()Director创建一个 ProductionAssistant演员来处理拍摄电影。

  9. ProductionComplete(从SchedulerProductionAssistant):在所需时间过后触发完成电影。

  10. ProductionCompleteProductionFailureProductionAssistant到父母):当定时器触发完成电影时ProductionAssistant ,向其父母报告电影已完成。

  11. RoleCompleteProductionAssistant对于Star电影中的每个 ):ProductionAssistant还需要通知每个Star电影完成,以便它们可用于其他电影。

该消息序列在一些演员中使用状态改变作为处理的一部分。Star需要在可用和正在致力于影片之间改变状态。CastingAssistant 演员需要跟踪哪些Star已经在电影中接受角色,所以他们知道他们还需要招募哪些角色。但是Director演员不需要改变状态,因为他们只响应他们收到的消息(包括他们的孩子演员的消息)。ProductionAssistant演员也不需要改变状态,因为他们只需要在电影终止时通知其他演员。

如果您将其功能合并到演员中,您可以避免使用单独CastingAssistant和 演员。尽管如此,消除其他演员变得 更加复杂,在这种情况下,将功能分成其他演员更有意义。当您考虑处理故障时尤其如此。ProductionAssistantDirectorDirector

处理失败

应用程序的一个重要方面不在图1和图2中的消息流中 。 Star寿命有限,所以处理这些人的所有行为者都Star需要意识到何时死亡。特别是,如果Star为电影选择的电影在电影完成之前死亡,则电影必然会失败。

在Akka演员系统中的失败处理使用父母监督,从而将失败的条件传递给演员的层次结构。故障通常在JVM中表示为异常,所以Akka使用自然处理异常来检测何时发生故障。如果一个actor没有在自己的代码中处理异常,Akka将通过终止actor并将故障传递给父actor来处理未捕获的异常。父母然后可以处理失败,或者自己无法对其父级进行处理。

Akka的内置故障处理在诸如I / O相关故障等条件下工作良好,但对于电影制作系统而言,异常将是一个不必要的并发症。在这种情况下需要的是监视其他演员,幸运的是,Akka提供了一个简单的方法。通过使用DeathWatch演员系统的组件,演员可以自己注册观看任何其他演员。注册后,观看演员Terminated如果观看的演员死亡,则收到系统消息。(为了避免任何竞赛条件,如果被观看的演员在手表开始之前已经死亡,则该Terminated消息立即出现在观看演员的邮箱中。)

DeathWatch通过调用该context.watch()方法激活,该 方法ActorRef 使actor被监视。Terminated当感兴趣的演员死亡时产生的消息是电影制作示例所需的故障处理。

Star 创建代码

清单3显示了启动应用程序并创建新Star的代码,与图1所示的消息流匹配。

清单3. Star 创建代码

1234五67891011121314151617181920212223242526272829三十313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899object Stars2 extends App { object Scout { case object FindTalent val starBaseLifetime = 7 seconds val starVariableLifetime = 3 seconds val findBaseTime = 1 seconds val findVariableTime = 3 seconds def props(): Props = Props(new Scout())} class Scout extends Actor { import Scout._ import Academy._ import context.dispatcher val random = Random scheduleFind def scheduleFind = { val nextTime = scaledDuration(findBaseTime, findVariableTime) scheduler.scheduleOnce(nextTime, self, FindTalent) } def scaledDuration(base: FiniteDuration, variable: FiniteDuration) = base + variable * random.nextInt(1000) / 1000 def receive = { case FindTalent => academy ! GetName case GiveName(name) => { system.actorOf(Star.props(name, scaledDuration(starBaseLifetime, starVariableLifetime)), name) println(s"$name has been discovered") scheduleFind } } } object Academy { case object GetName case class GiveName(name: String) case class Register(name: String) ... def props(names: Array[String]): Props = Props(new Academy(names)) } class Academy(names: Array[String]) extends Actor { import Academy._ var nextNameIndex = 0 val nameIndexLimit = names.length * (names.length + 1) val liveStars = Buffer[(ActorRef, String)]() ... def receive = { case GetName => { val name = if (nextNameIndex < names.length) names(nextNameIndex) else { val first = nextNameIndex / names.length - 1 val second = nextNameIndex % names.length names(first) + "-" + names(second) } sender ! GiveName(name) nextNameIndex = (nextNameIndex + 1) % nameIndexLimit } case Register(name) => { liveStars += ((sender, name)) context.watch(sender) println(s"Academy now tracking ${liveStars.size} stars") } case Terminated(ref) => { val star = (liveStars.find(_._1 == ref)).get liveStars -= star println(s"${star._2} has left the businessnAcademy now tracking ${liveStars.size} Stars") } ... } } } object Star { ... def props(name: String, lifespan: FiniteDuration) = Props(new Star(name, lifespan)) } class Star(name: String, lifespan: FiniteDuration) extends Actor { import Star._ import context.dispatcher academy ! Academy.Register(name) scheduler.scheduleOnce(lifespan, self, PoisonPill) } ... val system = ActorSystem("actor-demo-scala") val scheduler = system.scheduler val academy = system.actorOf(Academy.props(Array("Bob", "Alice", "Rock", "Paper", "Scissors", "North", "South", "East", "West", "Up", "Down")), "Academy") system.actorOf(Scout.props(), "Sam") system.actorOf(Scout.props(), "Dean") system.actorOf(Director.props("Astro"), "Astro") system.actorOf(Director.props("Cosmo"), "Cosmo") Thread sleep 15000 system.shutdown}

的清单3代码大多采用相同阿卡的功能的清单1 Star榜样,随着加入的DeathWatch-activating context.watch()由作出呼叫Academy演员在处理Register从一个新的消息Star。该Academy演员记录无论是ActorRef与每个名字Star,当一个Terminated 被处理的消息,它使用ActorRef查找并删除Star已经死了。这样Buffer (基本上是一个ArrayList)的现场直播Star

主要的应用代码首先创建单例Academy 演员,然后创建一对Scouts,最后创建一对 Directors。该应用程序允许演员系统运行15秒,然后关闭系统并退出。

开始一部电影

清单4显示了制作电影所涉及的代码的第一部分:铸造Stars以参与电影。该代码与图2消息流的顶部部分相匹配,包括Schedulera DirectorAcademyactor 之间的交互 。

清单4.电影制作代码

1234五67891011121314151617181920212223242526272829三十31323334353637383940414243444546474849505152535455565758596061object Stars2 extends App { ... object Director { case object MakeMovie val starCountBase = 2 val starCountVariable = 4 val productionTime = 3 seconds val recoveryTime = 3 seconds def props(name: String) = Props(new Director(name)) } class Director(name: String) extends Actor { import Academy._ import Director._ import ProductionAssistant._ import context.dispatcher val random = Random def makeMovie = { val numstars = random.nextInt(starCountVariable) + starCountBase academy ! PickStars(numstars) } def retryMovie = scheduler.scheduleOnce(recoveryTime, self, MakeMovie) makeMovie def receive = { case MakeMovie => makeMovie case PickFailure => retryMovie case StarsPicked(stars) => { println(s"$name wants to make a movie with ${stars.length} actors") context.actorOf(CastingAssistant.props(name, stars.map(_._1)), name + ":Casting") context become casting } } ... } ... object Academy { ... case class PickStars(count: Int) case object PickFailure case class StarsPicked(ref: List[(ActorRef, String)]) def props(names: Array[String]): Props = Props(new Academy(names)) } class Academy(names: Array[String]) extends Actor { ... def pickStars(n: Int): Seq[(ActorRef, String)] = ... def receive = { ... case PickStars(n) => { if (liveStars.size < n) sender ! PickFailure else sender ! StarsPicked(pickStars(n).toList) } } }

清单4代码的开头给出了 Director对象和actor定义的一部分,显示了通过向... Scheduler发送MakeMovie消息触发的电影制作的开始DirectorDirectorMakeMovie收到此消息时, 开始制作电影 ,请求 使用消息Academy分配Star电影PickStarsAcademy处理PickStars消息的代码(如清单4所示)发送回 PickFailure(如果没有足够Star的可用)或StarsPicked消息。如果Director 接收到PickFailure消息,则会安排另一个尝试以供稍后使用。如果Director收到一条StarsPicked消息,它会启动一个 CastingAssistant演员,其中列出StarAcademy电影中的角色选择的列表,然后更改状态以处理来自该影片的响应CastingAssistant。从这一点开始,清单5从 Director演员的投射Receive方法开始。

清单5. CastingAssistant 操作

1234五67891011121314151617181920212223242526272829三十3132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 class Director(name: String) extends Actor { ... def casting: Receive = { case CastingAssistant.AllSigned(stars) => { println(s"$name cast ${stars.length} actors for movie, starting production") context.actorOf(ProductionAssistant.props(productionTime, stars), name + ":Production") context become making } case CastingAssistant.CastingFailure => { println(s"$name failed casting a movie") retryMovie context become receive } } ... }object CastingAssistant { case class AllSigned(stars: List[ActorRef]) case object CastingFailure val retryTime = 1 second def props(dirname: String, stars: List[ActorRef]) = Props(new CastingAssistant(dirname, stars)) } class CastingAssistant(dirname: String, stars: List[ActorRef]) extends Actor { import CastingAssistant._ import Star._ import context.dispatcher var signed = Set[ActorRef]() stars.foreach { star => { star ! OfferRole context.watch(star) } } def receive = { case AcceptRole => { signed += sender println(s"Signed star ${signed.size} of ${stars.size} for director $dirname") if (signed.size == stars.size) { context.parent ! AllSigned(stars) context.stop(self) } } case RejectRole => scheduler.scheduleOnce(retryTime, sender, OfferRole) case Terminated(ref) => { context.parent ! CastingFailure stars.foreach { _ ! Star.CancelOffer } context.stop(self) } } } object Star { case object OfferRole case object AcceptRole case object RejectRole case object CancelOffer case object RoleComplete ... } class Star(name: String, lifespan: FiniteDuration) extends Actor { ... var acceptedOffer: ActorRef = null scheduler.scheduleOnce(lifespan, self, PoisonPill) def receive = { case OfferRole => { sender ! AcceptRole acceptedOffer = sender context become booked } } def booked: Receive = { case OfferRole => sender ! RejectRole case CancelOffer => if (sender == acceptedOffer) context become receive case RoleComplete => context become receive } }

Director创建CastingAssistant与列表ActorRefS表示Star在电影中被投秒。CastingAssistant首先发送OfferRole 给每个这些Stars,并且还注册自己作为每个观察者StarCastingAssistant然后等待 来自每个的一个AcceptRole或一个RejectRole消息Star,或Terminated 来自演员系统的消息,报告其中一个的消息 Star

如果从演员的每个角色CastingAssistant收到,它 会向其父母发送一条消息。该消息包括s 的列表作为方便,因为这需要传递下一个处理步骤。AcceptRoleStarAllSignedDirectorStar actorRef

如果从任何CastingAssistant一个接收到RejectRole消息Star,它会OfferRole在延迟后安排向同一个演员发送 。(星星通常是无法访问的,所以如果你希望他们在你的电影中,你需要继续询问,直到他们接受。)

如果CastingAssistant收到Terminated消息,则表示Star为电影选择的其中一个已经死亡。在这个令人遗憾的情况下,这些CastingAssistant报告回到了 CastingFailure父母Director身上,结束了自己。在结束之前,它会向其列表中的CancelOffer 每个发送一条消息Star,以便任何Star已经承诺在电影中角色的任何人 都被释放以承担其他角色。

你可能会问,为什么CastingAssistant发送 CancelOffer消息到 每一个Star -甚至从其中的那些 AcceptRole消息尚未处理。原因是Star列表中有可能发送了一个 AcceptRole,但是当Terminated邮件处理时它仍然在邮箱中 。在分布式演员系统的一般情况下,也可能 Star已经接受了该AcceptRole消息,但该消息仍在运行中或已丢失。将 CancelOffer消息发送到每个都Star使得故障处理在任何情况下更清洁,并且如果在电影中没有接受角色,则Star忽略CancelOffer消息很容易 。

清单6显示了电影制作过程的最后一部分:ProductionAssistant演员的操作(与图2的右下角相匹配)。这部分很简单,因为ProductionAssistant只需要处理 SchedulerProductionComplete消息或 Terminated消息。

清单6. ProductionAssistant 操作

1234五67891011121314151617181920212223242526272829三十31323334353637383940class Director(name: String) extends Actor { ... def making: Receive = { case m: ProductionAssistant.ProductionEnd => { m match { case ProductionComplete => println(s"$name made a movie!") case ProductionFailed => println(s"$name failed making a movie") } makeMovie context become receive } }}object ProductionAssistant { sealed trait ProductionEnd case object ProductionComplete extends ProductionEnd case object ProductionFailed extends ProductionEnd def props(time: FiniteDuration, stars: List[ActorRef]) = Props(new ProductionAssistant(time, stars))}class ProductionAssistant(time: FiniteDuration, stars: List[ActorRef]) extends Actor { import ProductionAssistant._ import context.dispatcher stars.foreach { star => context.watch(star) } scheduler.scheduleOnce(time, self, ProductionComplete) def endProduction(end: ProductionEnd) = { context.parent ! end stars.foreach { star => star ! Star.RoleComplete } context.stop(self) } def receive = { case ProductionComplete => endProduction(ProductionComplete) case Terminated(ref) => endProduction(ProductionFailed) }}

如果ProductionAssistant收到ProductionComplete来自该 消息的消息Scheduler,则可以向父级报告成功Director。如果首先收到Terminated消息,则必须报告失败。无论哪种方式,它也通过告诉所有Star参与电影的工作完成清理 。

清单7是运行此程序时看到的输出示例,电影制作结果以粗体显示。

清单7.输出示例

1234五67891011121314151617181920212223242526272829三十31323334353637383940414243444546474849505152535455565758Bob has been discoveredAcademy now tracking 1 starsAlice has been discoveredAcademy now tracking 2 starsRock has been discoveredAcademy now tracking 3 starsPaper has been discoveredAcademy now tracking 4 starsCosmo wants to make a movie with 4 actorsAstro wants to make a movie with 3 actorsSigned star 1 of 4 for director CosmoSigned star 2 of 4 for director CosmoSigned star 3 of 4 for director CosmoSigned star 4 of 4 for director CosmoCosmo cast 4 actors for movie, starting productionScissors has been discoveredAcademy now tracking 5 starsCosmo made a movie!Cosmo wants to make a movie with 4 actorsSigned star 1 of 4 for director CosmoSigned star 2 of 4 for director CosmoSigned star 3 of 4 for director CosmoSigned star 4 of 4 for director CosmoCosmo cast 4 actors for movie, starting productionNorth has been discoveredAcademy now tracking 6 starsSouth has been discoveredAcademy now tracking 7 starsCosmo failed making a movieAstro failed casting a movieBob has left the businessAcademy now tracking 6 StarsCosmo wants to make a movie with 3 actorsSigned star 1 of 3 for director CosmoSigned star 2 of 3 for director CosmoSigned star 3 of 3 for director CosmoCosmo cast 3 actors for movie, starting productionEast has been discoveredAcademy now tracking 7 starsWest has been discoveredAcademy now tracking 8 starsAlice has left the businessAcademy now tracking 7 StarsRock has left the businessAcademy now tracking 6 StarsUp has been discoveredAcademy now tracking 7 starsAstro wants to make a movie with 2 actorsSigned star 1 of 2 for director AstroSigned star 2 of 2 for director AstroAstro cast 2 actors for movie, starting productionCosmo made a movie!Cosmo wants to make a movie with 3 actorsSigned star 1 of 3 for director CosmoSigned star 2 of 3 for director CosmoSigned star 3 of 3 for director CosmoCosmo cast 3 actors for movie, starting productionDown has been discoveredAcademy now tracking 8 stars

在列表中点附近的双重故障显示了一个有趣的输出序列。首先来了 Cosmo failed making a movie,然后 Astro failed casting a movie,然后 Bob has left the business。这些行显示一个终止而产生的相互作用StarBob。在这种情况下,Bob已经接受了电影中的一个角色由正在取得Cosmo和生产已经开始,所以 CosmoProductionAssistant收到的Terminated消息并未能电影的制作。 Bob也被选定为电影中的一个角色被提出Astro但尚未接受该角色(因为 Bob已经承诺Cosmo的电影),这样 AstroCastingAssistant收到的 Terminated消息并未能电影的铸造。第三条消息是由Academy收到消息的时间生成的Terminated

结论

构建演员和消息交流只会让您的演员系统发挥作用。在某些时候,您需要跟踪演员行为不正之处。actor系统的异步性质使得更难确定有问题的交互。如何追踪和调试演员互动是一个值得本身就是整篇文章的话题。

相关内容推荐