Akka

一个多线程处理框架,非常轻量的事件驱动处理。异步、非阻塞、高性能的事件驱动编程模型。

每个 actor 处理自己的消息

缺点是代码比较难理解,异步编程

使用场景

适用于复杂的消息处理,比如有很多类型的消息,每个消息对应怎么处理,消息直接处理还有顺序、依赖、循环。

MapReduce 操作

实际生产中,存在很多消息类型,可以以内部类的方式创建消息entity,避免混淆,缺点是代码不直观

其次,如果需要查询数据库等操作,应该一开始就把全部数据都查回来,避免每个子操作再查询。

角色介绍

  • ActorSystem 创建Actor系统,创建者
  • ActorRef:actor 引用持有者,用于 actor 之间相互发送时需要使用 ActorRef.tell 消息
  • AbstractActor:actor 具体的实现,

domo

  1. 创建 ActorSystem,这里可以预先创建好各个 xxxActor,也可以等到具体使用时在创建。先创建后续就选择使用,没必要重复创建。

    public class WordCountAkka {
        public static void main(String[] args) {
            //1、创建Actor系统,名字为wordcount
            ActorSystem actorSystem = ActorSystem.create("wordcount");
            try {
                //2、创建SplitActor,用于拆分每行的单词
                ActorRef splitActor = actorSystem.actorOf((Props.create(SplitActor.class)), "SplitActor");
                //2.1、创建CountActor,用于统计单词的次数
                ActorRef countActor = actorSystem.actorOf((Props.create(CountActor.class)), "CountActor");
    
                //3、创建消息
                //TODO 接收的消息串,可以修改为从控制台输入,本次就直接写死了
                Message msg = new Message("Hello Akka Akka Hello ni hao aa hao");
                //4、给SplitActor发消息
                splitActor.tell(msg, ActorRef.noSender());
    
                //5、按回车退出应用
                System.out.println(">>> Press ENTER to exit <<<");
                System.in.read();
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                actorSystem.terminate();
            }
        }
    }
    
  2. 分别实现 Actor SplitActor

    public class SplitActor extends AbstractActor {
        @Override
        public Receive createReceive() {
            return receiveBuilder().match(Message.class, t -> {
                System.out.println(self() + "  收到来自于 " + sender() + " 的消息: " + t);
                //按照空格拆分数据
                String[] words = String.valueOf(t.getContent()).toLowerCase().split("\\W+");
                //封装消息请求给CountActor
                Message msg = new Message(words);
                System.out.println(self() + "  发送消息 : " + Arrays.toString(words));
                //根据路径查找下一个处理者
                // ActorSelection countActorRef = getContext().actorSelection("/user/CountActor");
    
                ActorRef countActorRef = getContext().actorOf((Props.create(CountActor.class)), "CountActor");
                //将消息发给下一个处理者CountActor
                countActorRef.tell(msg, self());
            }).build();
    
        }
    }
    

    CountActor

    public class CountActor extends AbstractActor {
        @Override
        public Receive createReceive() {
            return receiveBuilder().match(Message.class, t -> {
                //收到消息
                String[] words = (String[]) t.getContent();
                System.out.println(self() + " 收到来自于 " + sender() + " 的消息: " + Arrays.toString(words));
    
                //统计处理
                Map conutMap = new HashMap<>();
                for (String word : words) {
                    Integer num = (Integer) conutMap.get(word);
                    conutMap.put(word, num == null ? 1 : num + 1);
                }
    
               ActorSelection actorSelection = getContext().actorSelection("/user/CountActor");
               // System.out.println(self() + " 每个单词出现次数的统计结果为 : " + conutMap);
    
               Result result = new Result();
               result.info = self() + " 每个单词出现次数的统计结果为 : " + conutMap.entrySet();
               actorSelection.tell(result, ActorRef.noSender());
            }).match(Result.class, t ->{
                // 处理再次调用
                System.out.println(t.info);
                                   }).build();
    
        }
    
        static class Result {
            public String info;
        }
    
    }
    

    消息类 Message

    public class Message {
    
        private Object content;
    
        public Message(Object content) {
            this.content = content;
        }
    
        public Object getContent() {
            return content;
        }
    
        public void setContent(Object content) {
            this.content = content;
        }
    }
    
  3. 执行完毕关闭资源 actorSystem.terminate();

  4. 打印结果,里面有具体的路径,可以再选择的时候填上

    Actor[akka://wordcount/user/SplitActor#-82947062]  收到来自于 Actor[akka://wordcount/deadLetters] 的消息: com.example.learn.Message@6b4c11ac
    Actor[akka://wordcount/user/SplitActor#-82947062]  发送消息 : [hello, akka, akka, hello, ni, hao, aa, hao]
    Actor[akka://wordcount/user/SplitActor/CountActor#-656262900] 收到来自于 Actor[akka://wordcount/user/SplitActor#-82947062] 的消息: [hello, akka, akka, hello, ni, hao, aa, hao]
    Actor[akka://wordcount/user/SplitActor/CountActor#-656262900] 每个单词出现次数的统计结果为 : [aa=1, hao=2, hello=2, ni=1, akka=2]
    

有限状态自动机

事件驱动和观察者模式本质一样,事件驱动是观察者模式的经典实现。

AKKA-FSM 优势:

  1. 高性能且易于进行业务逻辑设计和理解,因其基于akka的高性能特性加上FMS思想。

  2. 功能丰富,支持异常处理、超时处理、状态机监控、事件追踪、滚动事件日志、定时器等辅助功能。

AKKA-FSM 劣势:

  • 底层代码由 scale 实现,源码无法阅读。

  • 中文文档非常少英文文档丰富,需要具备良好的英文文档阅读能力

官方文档

依赖

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <akka.version>2.6.19</akka.version>
    <scala.binary.version>2.13</scala.binary.version>
  </properties>
    
    <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>com.typesafe.akka</groupId>
        <artifactId>akka-bom_${scala.binary.version}</artifactId>
        <version>2.6.19</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>
   
   <dependency>
      <groupId>com.typesafe.akka</groupId>
      <artifactId>akka-actor-typed_2.13</artifactId>
      <version>${akka.version}</version>
    </dependency>
        <dependency>
      <groupId>com.typesafe.akka</groupId>
      <artifactId>akka-testkit_${scala.binary.version}</artifactId>
    </dependency>
    <dependency>
      <groupId>com.typesafe.akka</groupId>
      <artifactId>akka-actor_${scala.binary.version}</artifactId>
    </dependency>

异常处理,用于处理fsm内部异常捕获

@Slf4j
public class ExceptionActor  extends AbstractActor {

    @Override
    public SupervisorStrategy supervisorStrategy() {
        //以下的duration和retries暂未发掘其用处
        return new OneForOneStrategy(
                2,
                Duration.ofMillis(1000),
                DeciderBuilder
                        .matchAny(new FI.Apply<Throwable, SupervisorStrategy.Directive>() {
                            @Override
                            public SupervisorStrategy.Directive apply(Throwable throwable) throws Exception {
                                ActorRef actorRef = context().child("system").get();
                                if(actorRef != null){
                                    //给对话流状态机发送异常事件
                                    context().child("system").get().tell(throwable, self());
                                    return SupervisorStrategy.resume();
                                }else{
                                    return SupervisorStrategy.stop();
                                }
                            }
                        })
                        .build());
    }

    // #strategy
    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(
                        Props.class,
                            props -> {
                                //将对话流状态机的actor纳入监管
       getSender().tell(getContext().actorOf(props, "system"), getSelf());
                        })
                .build();
    }
}

全局的异常处理直接匹配事件 matchEvent(Throwable.class 对应处理即可。

上次更新时间: 2024/5/7 05:59:02