博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Akka 实践之(三) 远程部署
阅读量:2491 次
发布时间:2019-05-11

本文共 7549 字,大约阅读时间需要 25 分钟。

Akka 可以支持客户端和服务端远程部署,这点对与高并发分布式调用非常有利。 在上篇的代码基础上,新增2个类,LookupActor和LookupApplication.java

1、首先看一下类图

2、 源码

在本例中,将远程创建CalculatorActor 对象,然后通过指定path的方式创建这个对象的远程Actor(其实我认为是这个对象的代理)

    final String path = "akka.tcp://CalculatorSystem@127.0.0.1:8552/user/calculator";    final ActorRef actor = system.actorOf( Props.create(LookupActor.class, path), "lookupActor");
LookupActor和 LookupApplication的源码如下,重点关注是如何创建远程对象,以及如何获取对远程对象的使用。

package com.cwqsolo.study.akka.demo;import static java.util.concurrent.TimeUnit.SECONDS;import scala.concurrent.duration.Duration;import akka.actor.ActorRef;import akka.actor.ActorIdentity;import akka.actor.Identify;import akka.actor.Terminated;import akka.actor.UntypedActor;import akka.actor.ReceiveTimeout;import akka.japi.Procedure;public class LookupActor extends UntypedActor {  private final String path;  private ActorRef calculator = null;  public LookupActor(String path) {    this.path = path;    sendIdentifyRequest();  }  private void sendIdentifyRequest() {    getContext().actorSelection(path).tell(new Identify(path), getSelf());    getContext()        .system()        .scheduler()        .scheduleOnce(Duration.create(3, SECONDS), getSelf(),            ReceiveTimeout.getInstance(), getContext().dispatcher(), getSelf());  }  @Override  public void onReceive(Object message) throws Exception {    if (message instanceof ActorIdentity) {      calculator = ((ActorIdentity) message).getRef();      if (calculator == null) {        System.out.println("Remote actor not available: " + path);      } else {        getContext().watch(calculator);        getContext().become(active, true);      }    } else if (message instanceof ReceiveTimeout) {      sendIdentifyRequest();    } else {      System.out.println("Not ready yet");    }  }  Procedure active = new Procedure() {    @Override    public void apply(Object message) {      if (message instanceof Op.MathOp) {        // send message to server actor        calculator.tell(message, getSelf());      } else if (message instanceof Op.AddResult) {        Op.AddResult result = (Op.AddResult) message;        System.out.printf("Add result: %d + %d = %d\n", result.getN1(),            result.getN2(), result.getResult());      } else if (message instanceof Op.SubtractResult) {        Op.SubtractResult result = (Op.SubtractResult) message;        System.out.printf("Sub result: %d - %d = %d\n", result.getN1(),            result.getN2(), result.getResult());      } else if (message instanceof Terminated) {        System.out.println("Calculator terminated");        sendIdentifyRequest();        getContext().unbecome();                      } else if (message instanceof ReceiveTimeout) {        // ignore      } else {        unhandled(message);      }    }  };}
可以看到LookupActor 与上节中的CreationActor类没有什么区别,重点是LookupApplication类源码。

package com.cwqsolo.study.akka.demo;import static java.util.concurrent.TimeUnit.SECONDS;import java.util.Random;//import scala.concurrent.duration.Duration;//import akka.actor.ActorRef;//import akka.actor.ActorSystem;//import akka.actor.Props;import com.typesafe.config.ConfigFactory;import java.util.Arrays;  import java.util.concurrent.Callable;    import scala.concurrent.*;import scala.concurrent.duration.Duration;import akka.actor.ActorRef;  import akka.actor.ActorSystem;  import akka.actor.Props;  public class LookupApplication {  public static void main(String[] args) {    if (args.length == 0 || args[0].equals("Calculator"))      startRemoteCalculatorSystem();    if (args.length == 0 || args[0].equals("Lookup"))      startRemoteLookupSystem();       }  public static void startRemoteCalculatorSystem() {    final ActorSystem system = ActorSystem.create("CalculatorSystem",        ConfigFactory.load(("calculator")));    final ActorRef calActor = system.actorOf(Props.create(CalculatorActor.class), "calculator");    //calActor.    System.out.println("Started CalculatorSystem");  }  public static void startRemoteLookupSystem() {    final ActorSystem system = ActorSystem.create("LookupSystem",        ConfigFactory.load("remotelookup"));    final String path = "akka.tcp://CalculatorSystem@127.0.0.1:8552/user/calculator";    final ActorRef actor = system.actorOf(        Props.create(LookupActor.class, path), "lookupActor");    System.out.println("Started LookupSystem");    final Random r = new Random();    system.scheduler().schedule(Duration.create(1, SECONDS),        Duration.create(1, SECONDS), new Runnable() {          @Override          public void run() {            if (r.nextInt(100) % 2 == 0) {              actor.tell(new Op.Add(r.nextInt(100), r.nextInt(100)), null);            } else {              actor.tell(new Op.Subtract(r.nextInt(100), r.nextInt(100)), null);            }          }        }, system.dispatcher());  }}
标红代码这里要注意,这里是关键,而且可以看到,这里用到了配置文件(一开始,这里没有搞懂,运行起来都是无法找到远程对象)

配置文件有3个

common.conf

[root@archive akka]# cat  common.confakka {    actor {      provider = "akka.remote.RemoteActorRefProvider"    }      remote {       enabled-transports = ["akka.remote.netty.tcp"]      netty.tcp {        hostname = "127.0.0.1"      }    }  }
[root@archive akka]# cat  calculator.confinclude "common"    akka {    # LISTEN on tcp port 2552    remote.netty.tcp.port = 8552  }  

[root@archive akka]# cat  remotelookup.conf include "common"akka {  remote.netty.tcp.port = 2553}
从上面的配置文件可以看到, calculator对象在8552端口进行监听,  lookup对象通过remotelookup创建了 2553端口的监听,lookup对象通过path 获取

到calculator对象的引用(代理),然后就可以进行对象间的调用。

3、执行结果

[root@archive akka]# sh  ./start.sh

java version "1.7.0_80"
Java(TM) SE Runtime Environment (build 1.7.0_80-b15)
Java HotSpot(TM) 64-Bit Server VM (build 24.80-b11, mixed mode)
[INFO] [10/27/2016 16:48:26.956] [main] [Remoting] Starting remoting
[INFO] [10/27/2016 16:48:27.234] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://CalculatorSystem@127.0.0.1:8552]
[INFO] [10/27/2016 16:48:27.236] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://CalculatorSystem@127.0.0.1:8552]
Started CalculatorSystem
[INFO] [10/27/2016 16:48:27.311] [main] [Remoting] Starting remoting
[INFO] [10/27/2016 16:48:27.339] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://LookupSystem@127.0.0.1:2553]
[INFO] [10/27/2016 16:48:27.339] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://LookupSystem@127.0.0.1:2553]
Started LookupSystem
Calculating 9 - 61
Sub result: 9 - 61 = -52
Calculating 51 - 86
Sub result: 51 - 86 = -35
Calculating 64 - 30
Sub result: 64 - 30 = 34
Calculating 38 + 71
Add result: 38 + 71 = 109
Calculating 51 - 73
Sub result: 51 - 73 = -22
Calculating 29 + 12
Add result: 29 + 12 = 41

4、注意内容

4.1 因为引入了配置文件,需要使用google的配置文件类protobuf-java-2.5.0.jar

4.2 打包运行

      我是eclipse环境中调试后,放centos执行的,一开始把代码和依赖库打包在一起,执行会报错。最后是将依赖库单独在centos上建一个lib,只打包代码部分,这样执行才没有问题。

    

[root@archive akka]# Exception in thread "main" com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.version'        at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)        at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:145)
    或者是这样

   

Java HotSpot(TM) 64-Bit Server VM (build 24.80-b11, mixed mode)[root@archive akka]# Exception in thread "main" java.lang.NoClassDefFoundError: scala/concurrent/ExecutionContext        at java.lang.Class.getDeclaredMethods0(Native Method)        at java.lang.Class.privateGetDeclaredMethods(Class.java:2625)        at java.lang.Class.getMethod0(Class.java:2866)        at java.lang.Class.getMethod(Class.java:1676)
     最后通过只打包自己的代码, lib库单独放置的方式解决
 

你可能感兴趣的文章
消息队列2
查看>>
C++ 线程同步之临界区CRITICAL_SECTION
查看>>
测试—自定义消息处理
查看>>
MFC中关于虚函数的一些问题
查看>>
根据图层名获取图层和图层序号
查看>>
规范性附录 属性值代码
查看>>
提取面狭长角
查看>>
Arcsde表空间自动增长
查看>>
Arcsde报ora-29861: 域索引标记为loading/failed/unusable错误
查看>>
记一次断电恢复ORA-01033错误
查看>>
C#修改JPG图片EXIF信息中的GPS信息
查看>>
从零开始的Docker ELK+Filebeat 6.4.0日志管理
查看>>
How it works(1) winston3源码阅读(A)
查看>>
How it works(2) autocannon源码阅读(A)
查看>>
How it works(3) Tilestrata源码阅读(A)
查看>>
How it works(12) Tileserver-GL源码阅读(A) 服务的初始化
查看>>
uni-app 全局变量的几种实现方式
查看>>
echarts 为例讲解 uni-app 如何引用 npm 第三方库
查看>>
uni-app跨页面、跨组件通讯
查看>>
springmvc-helloworld(idea)
查看>>