⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 topicimpl.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
    if (adminRequest instanceof GetSubscriberIds) {      doReact((GetSubscriberIds)adminRequest,              not.getReplyTo(),              not.getRequestMsgId(),              not.getReplyMsgId());    }  }  private void doReact(GetSubscriberIds request,                       AgentId replyTo,                       String requestMsgId,                       String replyMsgId) {    GetSubscriberIdsRep reply =       new GetSubscriberIdsRep(getSubscriberIds());    replyToTopic(reply, replyTo, requestMsgId, replyMsgId);  }  /**   * Returns the list of unique identifiers of all subscribers. Each user   * appears once even if there is multiples subscriptions, the different   * subscriptions can be enumerate through the proxy MBean.   *   * @return the list of unique identifiers of all subscribers.   */  public String[] getSubscriberIds() {    String[] res = new String[subscribers.size()];    for (int i = 0; i < res.length; i++) {      AgentId aid = (AgentId)subscribers.elementAt(i);      res[i] = aid.toString();    }    return res;  }  private void replyToTopic(    org.objectweb.joram.shared.admin.AdminReply reply,    AgentId replyTo,    String requestMsgId,    String replyMsgId) {    Message message = Message.create();    message.setCorrelationId(requestMsgId);    message.setTimestamp(System.currentTimeMillis());    message.setDestination(replyTo.toString(),                            Topic.TOPIC_TYPE);    message.setIdentifier(replyMsgId);    try {      message.setObject(reply);      Vector messages = new Vector();      messages.add(message);      ClientMessages clientMessages =         new ClientMessages(-1, -1, messages);      Channel.sendTo(replyTo, clientMessages);    } catch (Exception exc) {      if (MomTracing.dbgDestination.isLoggable(BasicLevel.ERROR))        MomTracing.dbgDestination.log(BasicLevel.ERROR, "", exc);      throw new Error(exc.getMessage());    }  }  /**   * The <code>DestinationImpl</code> class calls this method for passing   * notifications which have been partly processed, so that they are   * specifically processed by the <code>TopicImpl</code> class.   */  protected void specialProcess(Notification not)  {    if (not instanceof SetRightRequest)      doProcess((SetRightRequest) not);    else if (not instanceof ClientMessages)      doProcess((ClientMessages) not);    else if (not instanceof UnknownAgent)      doProcess((UnknownAgent) not);    else if (not instanceof DeleteNot)      doProcess((DeleteNot) not);  }    /**   * Method specifically processing a <code>SetRightRequest</code> instance.   * <p>   * When a reader is removed, deleting this reader's subscription if any,   * and sending an <code>ExceptionReply</code> notification to the client.   */  protected void doProcess(SetRightRequest not)  {    // If the request does not unset a reader, doing nothing.    if (not.getRight() != -READ)      return;    AgentId user = not.getClient();    AccessException exc = new AccessException("READ right removed.");    // Identified user: removing it.    if (user != null) {      // state change, so save.      setSave();      subscribers.remove(user);      selectors.remove(user);      Channel.sendTo(user, new ExceptionReply(exc));    }    // Free reading right removed: removing all non readers.    else {      for (Enumeration subs = subscribers.elements();            subs.hasMoreElements();) {        user = (AgentId) subs.nextElement();        if (! isReader(user)) {          // state change, so save.          setSave();          subscribers.remove(user);          selectors.remove(user);          Channel.sendTo(user, new ExceptionReply(exc));        }      }    }  }  /**   * Method specifically processing a <code>ClientMessages</code> instance.   * <p>   * This method may forward the messages to the topic father if any, or   * to the cluster fellows if any.It may finally send   * <code>TopicMsgsReply</code> instances to the valid subscribers.   */  protected void doProcess(ClientMessages not) {    // Forwarding the messages to the father or the cluster fellows, if any:    forwardMessages(not);    // Processing the messages:    processMessages(not);  }  /**   * Method specifically processing an <code>UnknownAgent</code> instance.   * <p>   * This method notifies the administrator of the failing cluster or   * hierarchy building request, if needed, or removes the subscriptions of   * the deleted client, if any, or sets the father identifier to null if it   * comes from a deleted father.   */  protected void doProcess(UnknownAgent uA)  {    AgentId agId = uA.agent;    Notification not = uA.not;    // Deleted topic was requested to join the cluster: notifying the    // requester:    String info = null;    if (not instanceof ClusterTest) {      ClusterTest cT = (ClusterTest) not;      info = strbuf.append("Topic [").append(agId)        .append("] can't join cluster as it does not exist").toString();      strbuf.setLength(0);      Channel.sendTo(cT.requester, new AdminReply(cT.request, false, info));    } else if (not instanceof FatherTest) {    // Deleted topic was requested as a father: notifying the requester:      FatherTest fT = (FatherTest) not;      info = strbuf.append("Topic [").append(agId)        .append("] can't join hierarchy as it does not exist").toString();      strbuf.setLength(0);      Channel.sendTo(fT.requester, new AdminReply(fT.request, false, info));    } else {      // state change, so save.      setSave();      // Removing the deleted client's subscriptions, if any.      subscribers.remove(agId);      selectors.remove(agId);      // Removing the father identifier, if needed.      if (fatherId != null && agId.equals(fatherId)) {        // state change, so save.        setSave();        fatherId = null;      }    }  }  /**   * Method specifically processing a   * <code>fr.dyade.aaa.agent.DeleteNot</code> instance.   * <p>   * <code>UnknownAgent</code> notifications are sent to each subscriber   * and <code>UnclusterNot</code> notifications to the cluster   * fellows.   */  protected void doProcess(DeleteNot not)  {    AgentId clientId;    Vector subs;    SubscribeRequest sub;    // For each subscriber...    for (int i = 0; i < subscribers.size(); i++) {      clientId = (AgentId) subscribers.get(i);      Channel.sendTo(clientId, new UnknownAgent(destId, null));    }    // For each cluster fellow if any...    if (friends != null) {      AgentId topicId;      while (! friends.isEmpty()) {        // state change, so save.        setSave();        topicId = (AgentId) friends.remove(0);        Channel.sendTo(topicId, new UnclusterNot());      }    }  }  /**   * Actually forwards a vector of messages to the father or the cluster   * fellows, if any.   */  protected void forwardMessages(ClientMessages messages)  {    if (friends != null && ! friends.isEmpty()) {      AgentId topicId;      for (int i = 0; i < friends.size(); i++) {        topicId = (AgentId) friends.get(i);        Channel.sendTo(topicId, new TopicForwardNot(messages, false));        if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))          MomTracing.dbgDestination.log(BasicLevel.DEBUG, "Messages "                                        + "forwarded to fellow "                                        + topicId.toString());      }     }    else if (fatherId != null) {      Channel.sendTo(fatherId, new TopicForwardNot(messages, true));      if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))        MomTracing.dbgDestination.log(BasicLevel.DEBUG, "Messages "                                      + "forwarded to father "                                      + fatherId.toString());    }  }  /**   * Actually processes the distribution of the received messages to the   * valid subscriptions by sending a <code>TopicMsgsReply</code> notification   * to the valid subscribers.   */  protected void processMessages(ClientMessages not)  {    Vector messages = not.getMessages();    AgentId subscriber;    boolean local;    String selector;    Vector deliverables;    Message message;    nbMsgsReceiveSinceCreation = nbMsgsReceiveSinceCreation + messages.size();    setNoSave();    boolean persistent = false;    // Browsing the subscribers.    for (Enumeration subs = subscribers.elements(); subs.hasMoreElements();) {      subscriber = (AgentId) subs.nextElement();      local = (subscriber.getTo() == AgentServer.getServerId());      selector = (String) selectors.get(subscriber);      // Current subscriber does not filter messages: all messages will be      // sent.      if (selector == null || selector.equals("")) {        // Subscriber not local, or no other sending occured locally: directly        // sending the messages.        if (! local) {          deliverables = messages;          persistent = true;        } else if (! alreadySentLocally) {          deliverables = messages;          alreadySentLocally = true;        }        // A local sending already occured: cloning the messages.        else {          deliverables = new Vector();          for (Enumeration msgs = messages.elements(); msgs.hasMoreElements();)            deliverables.add(((Message) msgs.nextElement()).clone());        }      }      // Current subscriber filters messages; sending the matching messages.      else {        deliverables = new Vector();        for (int i = 0; i < messages.size(); i++) {          message = (Message) messages.get(i);                  if (Selector.matches(message, selector)) {            // Subscriber not local, or no other sending occured locally:            // directly sending the message.            if (! local) {              deliverables.add(message);              persistent = true;            } else if (! alreadySentLocally) {              deliverables.add(message);              alreadySentLocally = true;            }            // A local sending already occured: cloning the message.            else               deliverables.add(message.clone());          }        }        }      // There are messages to send.      if (! deliverables.isEmpty()) {        TopicMsgsReply topicMsgsReply = new TopicMsgsReply(deliverables);        topicMsgsReply.setPersistent(persistent);        Channel.sendTo(subscriber, topicMsgsReply);        nbMsgsDeliverSinceCreation = nbMsgsDeliverSinceCreation + deliverables.size();      }    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -