当前位置: 代码迷 >> 综合 >> 快学Scala- Scala Actor 并发编程
  详细解决方案

快学Scala- Scala Actor 并发编程

热度:97   发布时间:2023-11-07 04:41:35.0

1、scala并发编程介绍
来自学习资料
Scala中的Actor能够实现并行编程的强大功能,它是基于事件模型的并发机制,Scala是运用消息(message)的发送、接收来实现多线程的。使用Scala能够更容易地实现多线程应用的开发。

与java的并发编程比较
对于Java,我们都知道它的多线程实现需要对共享资源(变量、对象等)使用synchronized 关键字进行代码块同步、对象锁互斥等等。而且,常常一大块的try…catch语句块中加上wait方法、notify方法、notifyAll方法是让人很头疼的。原因就在于Java中多数使用的是可变状态的对象资源,对这些资源进行共享来实现多线程编程的话,控制好资源竞争与防止对象状态被意外修改是非常重要的,而对象状态的不变性也是较难以保证的。 而在Scala中,我们可以通过复制不可变状态的资源(即对象,Scala中一切都是对象,连函数、方法也是)的一个副本,再基于Actor的消息发送、接收机制进行并行编程

Actor方法执行顺序
1.首先调用start()方法启动Actor
2.调用start()方法后其act()方法会被执行
3.向Actor发送消息

发送消息的方式
! 发送异步消息,没有返回值
!? 发送同步消息,等待返回值
!! 发送异步消息,返回值是Future[Any]
2、案例
注:我这里用的是Scala Actor是scala 2.10.6版本,也是为了学akka做准备
Scala在2.11.x版本中将Akka加入其中,作为其默认的Actor,老版本的Actor已经废弃
1、案例1
首先回顾使用java来写个多线程的demo,然后使用scala来实现
java版本

public class JActorDemo {
    public static void main(String[] args){Thread t1 = new Thread(new JMyActor1());Thread t2 = new Thread(new JMyActor2());t1.start();t2.start();}
}
class JMyActor1 implements Runnable{
    @Overridepublic void run() {for (int i = 1; i <= 10; i++) {try {System.out.println("thread-1:" + i);Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}}
}
class JMyActor2 implements Runnable{
    @Overridepublic void run() {for (int i = 1; i <= 10; i++) {try {System.out.println("thread-2:" + i);Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}}
}
输出
thread-1:1
thread-2:1
thread-2:2
thread-1:2
thread-1:3
thread-2:3
thread-2:4
thread-1:4
thread-1:5
thread-2:5
thread-2:6
thread-1:6
thread-1:7
thread-2:7
thread-2:8
thread-1:8
thread-2:9
thread-1:9
thread-1:10
thread-2:10

scala版本,对比java那么就算继承的类不同,重写的方法不同但是功能类似

/*** 1.首先调用start()方法启动Actor* 2.调用start()方法后其act()方法会被执行* 3.向Actor发送消息* Created by 12706 on 2017/11/27.*/
class MyActor1 extends Actor{
    //重写Actor的act方法override def act(): Unit = {for (i <- 1 to 10) {println("thread-1:" + i)Thread.sleep(500)}}
}
class MyActor2 extends Actor{
    override def act(): Unit = {for (i <- 1 to 10) {println("thread-2:" + i)Thread.sleep(500)}}
}
object ActorDemo1 {
    def main(args: Array[String]): Unit = {val ma1 = new MyActor1ma1.start()val ma2 = new MyActor2ma2.start()}
}
输出统java版本一致

这两个Actor是并行执行的,act()方法中的for循环执行完成后actor程序就退出了

2、案例2
写个actor,可以不停地接收消息

class MessageActor extends Actor{
    override def act(): Unit = {//在act()方法中加入了while (true) 循环,就可以不停的接收消息while (true) {//接收消息receive {case "start" => {println("开始接收消息")Thread.sleep(1000)}case "continue" => {println("接收消息中")Thread.sleep(1000)}case "stop" => {println("停止接收消息")Thread.sleep(1000)}//其他情况则退出否则会一直阻塞case _ => exit()}}}
}
object MessageActor {def main(args: Array[String]): Unit = {val ma = new MessageActorma.start()//发送异步消息,发送异步消息(向actor发送start串),发送完了继续往下走,但是一个actor对象接收到消息执行的过程是同步的按顺序执行//!,!?,!!三种发送方式ma ! "start"ma ! "continue"ma ! "stop"ma ! "break"}
}

3、案例3
还是不停接收消息,但是改变了方式(不同于案例2)
使用了react而不是recieve,react方式会复用线程,比receive更高效。而react 如果要反复执行消息处理,react外层要用loop,不能用while。

/***使用了react而不是recieve,react方式会复用线程,比receive更高效。* 而react 如果要反复执行消息处理,react外层要用loop,不能用while。* Created by 12706 on 2017/11/27.*/
class LoopMessageActor extends Actor{
    override def act(): Unit = {loop {react {case "start" => {println("开始接收消息")Thread.sleep(1000)}case "continue" => {println("接收消息中")Thread.sleep(1000)}case "stop" => {println("停止接收消息")Thread.sleep(1000)}//其他情况则退出否则会一直阻塞case _ => exit()}}}
}
object LoopMessageActor {
    def main(args: Array[String]): Unit = {val lma = new LoopMessageActorlma.start()lma ! "start"lma ! "continue"lma ! "stop"lma ! "break"}
}

4、结合case class发送消息

case class SynMessage(id : Int, name : String)
case class AsyMessage(id : Int, name : String)
case class ResultContainer(id : Int, name : String)class ApplyActor extends Actor{
    override def act(): Unit = {loop {react {case SynMessage(id,name) => {println(s"同步消息: $id,$name")Thread.sleep(3000)//返回结果给发送者sender ! ResultContainer(id,name)}case AsyMessage(id, name) => {println(s"异步消息: $id,$name")Thread.sleep(3000)}}}}
}
object ApplyActor {
    def main(args: Array[String]): Unit = {val actor = new ApplyActoractor.start()//发送AsyMessage消息actor ! AsyMessage(9527,"异步")//发送SynMessage消息,接收返回结果Futureval result = actor !! SynMessage(7259,"同步")//判断是否返回了结果,使用Future的isSet方法(java中则是isDone方法,返回true则表示拿到了结果)println(result.isSet)//Future的apply()方法会构建一个异步操作且在未来某一个时刻返回一个值//Future的值不知道什么时候可用,所以需要一种机制来获取异步操作的结果,// 一种是不停的查看Future的完成状态,另一个采用阻塞的方式,scala提供了第二种方式的支持//到这进入阻塞,主程序不会往下走了,直到取到Future的值val message = result.apply()//再次判断println(result.isSet)println(message)}
}
输出
false
异步消息: 9527,异步//到这停顿3秒
同步消息: 7259,同步//到这停顿3秒
true
ResultContainer(7259,同步)