我用 java写的 akka zeromq 看不到效果,pubSocket发送的message subSocket 不能够收到,请高手解答一下。
代码如下
package com.hantek.akka.zeromq;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.lang.management.OperatingSystemMXBean;
import java.text.SimpleDateFormat;
import java.util.Date;
import scala.Serializable;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.serialization.Serialization;
import akka.serialization.SerializationExtension;
import akka.util.Duration;
import akka.zeromq.Bind;
import akka.zeromq.Connect;
import akka.zeromq.Frame;
import akka.zeromq.Listener;
import akka.zeromq.Subscribe;
import akka.zeromq.ZMQMessage;
import akka.zeromq.ZeroMQ;
import akka.zeromq.ZeroMQExtension;
public class TestZeroMQ1 {
public static final Object TICK = "TICK";
public static class Heap
implements Serializable {
/**
*
*/
private static final long serialVersionUID = 2807062186812700532L;
public final long timestamp;
public final long used;
public final long max;
public Heap(long timestamp, long used, long max){
this.timestamp = timestamp;
this.used = used;
this.max = max;
}
}
public static class Load
implements Serializable {
/**
*
*/
private static final long serialVersionUID = 3321800432516503175L;
public final long timestamp;
public final double loadAverage;
public Load(long timestamp, double loadAverage) {
this.timestamp = timestamp;
this.loadAverage = loadAverage;
}
}
public static class HealthProbe
extends UntypedActor {
ActorRef pubSocket = ZeroMQExtension.get(getContext().system()).newPubSocket(new Bind("tcp://127.0.0.1:1237"));
MemoryMXBean memory = ManagementFactory.getMemoryMXBean();
OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
Serialization ser = SerializationExtension.get(getContext().system());
int i=0;
@Override
public void preStart() {
System.out.println("_____pubSocket____开始");
getContext().system().scheduler().schedule(Duration.parse("1 second"),
Duration.parse("1 second"), getSelf(), TICK);
}
@Override
public void postRestart(Throwable reason) {
// don't call preStart, only schedule once
}
@Override
public void onReceive(Object message) {
if (message.equals(TICK)) {
i++;
if(i==10){
System.exit(1);
}
MemoryUsage currentHeap = memory.getHeapMemoryUsage();
long timestamp = System.currentTimeMillis();
byte[] heapPayload = ser.serializerFor(Heap.class).toBinary(
new Heap(timestamp, currentHeap.getUsed(), currentHeap.getMax()));
pubSocket.tell(new ZMQMessage(new Frame("health.heap"),new Frame(heapPayload)));
byte[] loadPayload = ser.serializerFor(Load.class).toBinary(
new Load(timestamp, os.getSystemLoadAverage()));
pubSocket.tell(new ZMQMessage(new Frame("health.load"),new Frame(loadPayload)));
//
System.out.println((i)+"_____pubSocket__TIKE__onReceive :"+message);
}
else {
unhandled(message);
System.out.println("_____pubSocket__UNHAND :"+message);
}
}
}
public static class Logger
extends UntypedActor {