zio-actors与akka-actor集成
zio-actors 与 akka-actor 是两种不同实现,分两种情况:
- zio actor 发消息给 akka actor
- akka actor 发消息给 zio actor
依赖
不包括 akka actor 和 zio-actors 依赖,只是集成所需的
"dev.zio" %% "zio-actors-akka-interop" % <VERSION>"
所需的导入如下:
import zio.actors.Actor.Stateful
import zio.actors.{
ActorSystem, ActorRef, Context, Supervisor }
import zio.actors.akka.{
AkkaTypedActor, AkkaTypedActorRefLocal }
import zio.{
IO, Runtime }import akka.actor.typed
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.Scheduler
import akka.util.Timeoutimport scala.concurrent.duration._
本章例子的样例类:
sealed trait TypedMessage[+_]
case class PingToZio(zioReplyToActor: ActorRef[ZioMessage], msg: String) extends TypedMessage[Unit]
case class PingFromZio(zioSenderActor: ActorRef[ZioMessage]) extends TypedMessage[Unit]sealed trait ZioMessage[+_]
case class PongFromAkka(msg: String) extends ZioMessage[Unit]
case class Ping(akkaActor: AkkaTypedActorRefLocal[TypedMessage]) extends ZioMessage[Unit]
基本的 actors 使用需要定义一个Stateful
来描述 actor 的行为。然后通过监督方式、初始状态和提到的Stateful
来完成 actor 的创建。
在 zio actor 与 akka actor 通信
zio actor Stateful
实现如下:
val handler = new Stateful[Any, String, ZioMessage] {
override def receive[A](state: String, msg: ZioMessage[A], context: Context): IO[Throwable, (String, A)] =msg match {
case PongFromAkka(msg) => IO.succeed((msg, ())) // zio actor接收akka actor的消息case Ping(akkaActor) => // akkaActor的类型是AkkaTypedActorRefLocal,而不是 akka actor 的ActorReffor {
self <- context.self[ZioMessage]_ <- akkaActor ! PingFromZio(self) // 把self带上用于收回复} yield (state, ())case _=> IO.fail(new Exception("fail"))}
}
在 akka actor 中 发送消息到 zio actor
akka actor,需要一个行为(behavior)来定义要处理的消息,在这种情况下向 zio actor 发送和接收消息:
object TestBehavior {
lazy val zioRuntime = Runtime.defaultdef apply(): Behavior[TypedMessage[_]] =Behaviors.receiveMessage {
message =>message match {
case PingToZio(zioReplyToActor, msgToZio) => // 在akka 中发消息,需要unsafeRun执行ZIO effectzioRuntime.unsafeRun(zioReplyToActor ! PongFromAkka(msgToZio)) case PingFromZio(zioSenderActor) => zioRuntime.unsafeRun(zioSenderActor ! PongFromAkka("Pong from Akka"))}Behaviors.same}}
主程序
我们已经准备好开始从 zio 向 akka 发送消息,或者通过fire-and-forget
交互模式反过来,但首先我们需要用创建的 akka ActorRef
(或ActorSystem
)创建一个 ZIO 值,可以使用AkkaTypedActor.make
:
for {
akkaSystem <- IO(typed.ActorSystem(TestBehavior(), "akkaSystem")) // akka actor 的 ActorSystemsystem <- ActorSystem("zioSystem") // zio actor 的 ActorSystemakkaActor <- AkkaTypedActor.make(akkaSystem) // 使用interop提供的AkkaTypedActor,对akka actor做一次包装zioActor <- system.make("zioActor", Supervisor.none, "", handler) // 使用zio的ActorSystem创建zio actor_ <- akkaActor ! PingToZio(zioActor, "Ping from Akka") // 发消息给akka actor,并带上zioActor,用于接收回复_ <- zioActor ! Ping(akkaActor) // 发消息给zio actor,并带上akkaActor,用于接收回复
} yield ()
zim 中应用
zim 不涉及到2种 actor 通信,websocket 使用的是 akka actor,而在定时任务处使用了 zio actor,实现一个基于 zio actor 的定时器如下:
object ScheduleStateful {
val stateful: Stateful[Any, Unit, Command] = new Stateful[Any, Unit, Command] {
override def receive[A](state: Unit, msg: Command[A], context: Context): UIO[(Unit, A)] = {
val taskIO = msg match {
case OnlineUserMessage(descr) =>WsService.getConnections.flatMap {
i =>LogUtil.debug(s"${
descr.getOrElse("receive")} Total online user => $i")}case _ => UIO.unit}// 这里返回的类型按照zio-actors官网的写法返回(Unit, A) idea会提示语法错误,目前还不知道是谁的问题,只能强制转换了taskIO.foldM(e => LogUtil.error(s"ScheduleStateful $e").as(() -> "".asInstanceOf[A]),_ => ZIO.succeed(() -> "".asInstanceOf[A]))}}
}
根据Stateful
创建 actor
lazy val scheduleActor: ZIO[Any, Throwable, ActorRef[protocol.Command]] =actorSystem.flatMap(_.make(Constants.SCHEDULE_JOB_ACTOR, zio.actors.Supervisor.none, (), ScheduleStateful.stateful)).provideLayer(Clock.live ++ InfrastructureConfiguration.live)
启动 actor,只需要像使用普通方法一样调用该方法即可:
def scheduleTask: Task[Unit] = {
val task = ZioActorSystemConfiguration.scheduleActor.flatMap(f => f ! OnlineUserMessage(Some("scheduleTask"))) repeat Schedule.secondOfMinute(0)// secondOfMinute类似于Cron的时间表,每分钟的指定秒数重复出现。此处为0秒task.foldM(e => LogUtil.error(s"error => $e").unit,_ => UIO.unit).provideLayer(Clock.live)}
zim 是一个web端即时通讯系统,使用scala2语言,基于zio、tapir、akka,scallikejdbc等库实现。