54 CHEN

Java中使用akka手记三 Cluster详例

一个例子

message

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21

public interface TransformationMessages {
  
    public static class TransformationJob implements Serializable {
      private final String text;
  //......
    }
  
    public static class TransformationResult implements Serializable {
      private final String text;
  //.....
    }
  
    public static class JobFailed implements Serializable {
      private final String reason;
      private final TransformationJob job;
  //....
    }
  
    public static final String BACKEND_REGISTRATION = "BackendRegistration";
  
  }

backend处理逻辑

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34

public class TransformationBackend extends UntypedActor {
  
    Cluster cluster = Cluster.get(getContext().system());
  //...  
    @Override
    public void onReceive(Object message) {
      if (message instanceof TransformationJob) {
        TransformationJob job = (TransformationJob) message;
        getSender().tell(new TransformationResult(job.getText().toUpperCase()),
            getSelf());
  
      } else if (message instanceof CurrentClusterState) {
        CurrentClusterState state = (CurrentClusterState) message;
        for (Member member : state.getMembers()) {
          if (member.status().equals(MemberStatus.up())) {
            register(member);
          }
        }
  
      } else if (message instanceof MemberUp) {
        MemberUp mUp = (MemberUp) message;
        register(mUp.member());
  
      } else {
        unhandled(message);
      }
    }
  
    void register(Member member) {
      if (member.hasRole("frontend"))
        getContext().actorSelection(member.address() + "/user/frontend").tell(
            BACKEND_REGISTRATION, getSelf());
    }
  }

frontend节点

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30

public class TransformationFrontend extends UntypedActor {
    List<ActorRef> backends = new ArrayList<ActorRef>();
    int jobCounter = 0;
    @Override
    public void onReceive(Object message) {
      if ((message instanceof TransformationJob) && backends.isEmpty()) {
        TransformationJob job = (TransformationJob) message;
        getSender().tell(
            new JobFailed("Service unavailable, try again later", job),
            getSender());
  
      } else if (message instanceof TransformationJob) {
        TransformationJob job = (TransformationJob) message;
        jobCounter++;
        backends.get(jobCounter % backends.size())
            .forward(job, getContext());
  
      } else if (message.equals(BACKEND_REGISTRATION)) {
        getContext().watch(getSender());
        backends.add(getSender());
  
      } else if (message instanceof Terminated) {
        Terminated terminated = (Terminated) message;
        backends.remove(terminated.getActor());
  
      } else {
        unhandled(message);
      }
    }
  }

运行TransformationApp

1 2 3 4 5 6 7 8 9 10 11 12

system.scheduler().schedule(interval, interval, new Runnable() {
        public void run() {
          ask(frontend,
              new TransformationJob("hello-" + counter.incrementAndGet()),
              timeout).onSuccess(new OnSuccess<Object>() {
            public void onSuccess(Object result) {
              System.out.println(result);
            }
          }, ec);
        }
  
      }, ec);

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25

public void onReceive(Object message) {
      if ((message instanceof TransformationJob) && backends.isEmpty()) {
        TransformationJob job = (TransformationJob) message;
        getSender().tell(
            new JobFailed("Service unavailable, try again later", job),
            getSender());
  
      } else if (message instanceof TransformationJob) {
        TransformationJob job = (TransformationJob) message;
        jobCounter++;
        backends.get(jobCounter % backends.size())
                .forward(job, getContext());
  
      } else if (message.equals(BACKEND_REGISTRATION)) {
        getContext().watch(getSender());
        backends.add(getSender());
  
      } else if (message instanceof Terminated) {
        Terminated terminated = (Terminated) message;
        backends.remove(terminated.getActor());
  
      } else {
        unhandled(message);
      }
    }

1 2 3 4 5

void register(Member member) {
      if (member.hasRole("frontend"))
        getContext().actorSelection(member.address() + "/user/frontend").tell(
            BACKEND_REGISTRATION, getSelf());
    }

代码

原创文章如转载,请注明:转载自五四陈科学院[http://www.54chen.com]

Posted by 54chen java,akka

« java中使用akka手记二 cluster java中使用akka手记四 用法速查 »