⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 queueimpl.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
    ClientMessages deadMessages = cleanPendingMessage(System.currentTimeMillis());    // Sending the dead messages to the DMQ, if needed:    if (deadMessages != null)      sendToDMQ(deadMessages, null);    Channel.sendTo(from, new Monit_GetNumberRep(not, messages.size()));  }  /**   * Method implementing the reaction to a   * <code>Monit_GetPendingRequests</code> notification requesting the   * number of pending requests.   *   * @exception AccessException  If the requester is not the administrator.   */  protected void doReact(AgentId from, Monit_GetPendingRequests not)                 throws AccessException  {    if (! isAdministrator(from))      throw new AccessException("ADMIN right not granted");    Channel.sendTo(from,                   new Monit_GetNumberRep(not, getWaitingRequestCount()));  }  /**   * Method implementing the reaction to a   * <code>Monit_GetNbMaxMsg</code> notification requesting the   * number max of messages in this queue.   *   * @exception AccessException  If the requester is not the administrator.   */  protected void doReact(AgentId from, Monit_GetNbMaxMsg not)    throws AccessException {    if (! isAdministrator(from))      throw new AccessException("ADMIN right not granted");    Channel.sendTo(from, new Monit_GetNbMaxMsgRep(not,nbMaxMsg));  }  /**   * Method implementing the reaction to a <code>ReceiveRequest</code>   * instance, requesting a message.   * <p>   * This method stores the request and launches a delivery sequence.   *   * @exception AccessException  If the sender is not a reader.   */  protected void doReact(AgentId from, ReceiveRequest not)                 throws AccessException  {    // If client is not a reader, sending an exception.    if (! isReader(from))      throw new AccessException("READ right not granted");    String[] toAck = not.getMessageIds();    if (toAck != null) {      for (int i = 0; i < toAck.length; i++) {        acknowledge(toAck[i]);      }    }    long current = System.currentTimeMillis();    cleanWaitingRequest(current);    // Storing the request:    not.requester = from;    not.setExpiration(current);    if (not.isPersistent()) {      // state change, so save.      setSave();    }    requests.add(not);    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(BasicLevel.DEBUG, " -> requests count = " + requests.size());    // Launching a delivery sequence for this request:    int reqIndex = requests.size() - 1;    deliverMessages(reqIndex);        // If the request has not been answered and if it is an immediate    // delivery request, sending a null:    if ((requests.size() - 1) == reqIndex && not.getTimeOut() == -1) {      requests.remove(reqIndex);      QueueMsgReply reply = new QueueMsgReply(not);      if (isLocal(from)) {        reply.setPersistent(false);      }      Channel.sendTo(from, reply);      if (logger.isLoggable(BasicLevel.DEBUG))        logger.log(BasicLevel.DEBUG, "Receive answered by a null.");    }  }  /**   * Method implementing the queue reaction to a <code>BrowseRequest</code>   * instance, requesting an enumeration of the messages on the queue.   * <p>   * The method sends a <code>BrowseReply</code> back to the client. Expired   * messages are sent to the DMQ.   *   * @exception AccessException  If the requester is not a reader.   */  protected void doReact(AgentId from, BrowseRequest not)                 throws AccessException  {    // If client is not a reader, sending an exception.    if (! isReader(from))      throw new AccessException("READ right not granted");    // Building the reply:    BrowseReply rep = new BrowseReply(not);        // Cleaning the possible expired messages.    ClientMessages deadMessages = cleanPendingMessage(System.currentTimeMillis());    // Adding the deliverable messages to it:    int i = 0;    Message message;    while (i < messages.size()) {      message = (Message) messages.get(i);      if (Selector.matches(message, not.getSelector())) {        // Matching selector: adding the message:        rep.addMessage(message);      }      i++;    }    // Sending the dead messages to the DMQ, if needed:    if (deadMessages != null)      sendToDMQ(deadMessages, null);    // Delivering the reply:    Channel.sendTo(from, rep);    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(BasicLevel.DEBUG, "Request answered.");  }  /**   * Method implementing the reaction to an <code>AcknowledgeRequest</code>   * instance, requesting messages to be acknowledged.   */  protected void doReact(AgentId from, AcknowledgeRequest not) {    for (Enumeration ids = not.getIds(); ids.hasMoreElements();) {      String msgId = (String) ids.nextElement();      acknowledge(msgId);    }  }    private void acknowledge(String msgId) {    Message msg = (Message) deliveredMsgs.remove(msgId);    if ((msg != null) && msg.getPersistent()) {      // state change, so save.      setSave();    }    consumers.remove(msgId);    contexts.remove(msgId);    if (msg != null) {      msg.delete();      if (logger.isLoggable(BasicLevel.DEBUG)) {        logger.log(BasicLevel.DEBUG, "Message " + msgId + " acknowledged.");      }    } else if (logger.isLoggable(BasicLevel.WARN)) {      logger.log(BasicLevel.WARN,                 "Message " + msgId + " not found for acknowledgement.");    }  }  /**   * Method implementing the reaction to a <code>DenyRequest</code>   * instance, requesting messages to be denied.   * <p>   * This method denies the messages and launches a delivery sequence.   * Messages considered as undeliverable are sent to the DMQ.   */  protected void doReact(AgentId from, DenyRequest not) {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(BasicLevel.DEBUG,                 "QueueImpl.doReact(" + from + ',' + not + ')');        Enumeration ids = not.getIds();    String msgId;    Message msg;    AgentId consId;    int consCtx;    ClientMessages deadMessages = null;    // If the deny request is empty, the denying is a contextual one: it    // requests the denying of all the messages consumed by the denier in    // the denying context:    if (! ids.hasMoreElements()) {      // Browsing the delivered messages:      for (Enumeration delIds = deliveredMsgs.keys();           delIds.hasMoreElements();) {        msgId = (String) delIds.nextElement();        msg = (Message) deliveredMsgs.get(msgId);        consId = (AgentId) consumers.get(msgId);        consCtx = ((Integer) contexts.get(msgId)).intValue();        if (logger.isLoggable(BasicLevel.DEBUG))          logger.log(BasicLevel.DEBUG,                     " -> deny msg " + msgId + "(consId = " + consId + ')');        // If the current message has been consumed by the denier in the same        // context: denying it.        if (consId.equals(from) && consCtx == not.getClientContext()) {          // state change, so save.          setSave();          consumers.remove(msgId);          contexts.remove(msgId);          deliveredMsgs.remove(msgId);          msg.denied = true;          // If message considered as undeliverable, adding          // it to the vector of dead messages:          if (isUndeliverable(msg)) {            msg.delete();            msg.undeliverable = true;            if (deadMessages == null)              deadMessages = new ClientMessages();            deadMessages.addMessage(msg);          } else {            // Else, putting the message back into the deliverables vector:            storeMessage(msg);          }          if (logger.isLoggable(BasicLevel.DEBUG))            logger.log(BasicLevel.DEBUG, "Message " + msgId + " denied.");        }      }    }    // For a non empty request, browsing the denied messages:    for (ids = not.getIds(); ids.hasMoreElements();) {      msgId = (String) ids.nextElement();      msg = (Message) deliveredMsgs.remove(msgId);      // Message may have already been denied. For example, a proxy may deny      // a message twice, first when detecting a connection failure - and      // in that case it sends a contextual denying -, then when receiving       // the message from the queue - and in that case it also sends an      // individual denying.      if (msg == null) {        if (logger.isLoggable(BasicLevel.ERROR))          logger.log(BasicLevel.ERROR, " -> already denied message " + msgId);        break;      }      msg.denied = true;      if (logger.isLoggable(BasicLevel.DEBUG))          logger.log(BasicLevel.DEBUG, " -> deny " + msgId);      // state change, so save.      setSave();      consumers.remove(msgId);      contexts.remove(msgId);      // If message considered as undeliverable, adding it      // to the vector of dead messages:      if (isUndeliverable(msg)) {        msg.delete();        msg.undeliverable = true;        if (deadMessages == null)          deadMessages = new ClientMessages();        deadMessages.addMessage(msg);      }      // Else, putting the message back into the deliverables vector:      else        storeMessage(msg);      if (logger.isLoggable(BasicLevel.DEBUG))        logger.log(BasicLevel.DEBUG, "Message " + msgId + " denied.");    }    // Sending the dead messages to the DMQ, if needed:    if (deadMessages != null)      sendToDMQ(deadMessages, null);    // Lauching a delivery sequence:    deliverMessages(0);  }  protected void doReact(AgentId from,                          AbortReceiveRequest not) {    for (int i = 0; i < requests.size(); i++) {      ReceiveRequest request = (ReceiveRequest) requests.get(i);      if (request.requester.equals(from) &&          request.getClientContext() == not.getClientContext() &&          request.getRequestId() == not.getAbortedRequestId()) {        if (not.isPersistent()) {          // state change, so save.          setSave();        }        requests.remove(i);        break;      }    }  }  private void doReact(AgentId from,                       DestinationAdminRequestNot not) {    org.objectweb.joram.shared.admin.AdminRequest adminRequest =       not.getRequest();    if (adminRequest instanceof GetQueueMessageIds) {      doReact((GetQueueMessageIds)adminRequest,              not.getReplyTo(),              not.getRequestMsgId(),              not.getReplyMsgId());    } else if (adminRequest instanceof GetQueueMessage) {      doReact((GetQueueMessage)adminRequest,              not.getReplyTo(),              not.getRequestMsgId(),              not.getReplyMsgId());    } else if (adminRequest instanceof DeleteQueueMessage) {      doReact((DeleteQueueMessage)adminRequest,              not.getReplyTo(),              not.getRequestMsgId(),              not.getReplyMsgId());    } else if (adminRequest instanceof ClearQueue) {      doReact((ClearQueue)adminRequest,              not.getReplyTo(),              not.getRequestMsgId(),              not.getReplyMsgId());    }  }  private void doReact(GetQueueMessageIds request,                       AgentId replyTo,                       String requestMsgId,                       String replyMsgId) {    String[] res = new String[messages.size()];    for (int i = 0; i < messages.size(); i++) {      Message msg = (Message)messages.elementAt(i);      res[i] = msg.getIdentifier();    }    GetQueueMessageIdsRep reply =       new GetQueueMessageIdsRep(res);    replyToTopic(reply, replyTo, requestMsgId, replyMsgId);  }  private void doReact(GetQueueMessage request,                       AgentId replyTo,                       String requestMsgId,                       String replyMsgId) {    Message msg = null;    for (int i = 0; i < messages.size(); i++) {      msg = (Message)messages.elementAt(i);      if (msg.getIdentifier().equals(request.getMessageId())) {        break;      }    }    if (msg != null) {      replyToTopic(        new GetQueueMessageRep(msg),        replyTo, requestMsgId, replyMsgId);    } else {      replyToTopic(        new org.objectweb.joram.shared.admin.AdminReply(          false, "Message not found: " + msg.getIdentifier()),        replyTo, requestMsgId, replyMsgId);    }  }  private void doReact(DeleteQueueMessage request,                       AgentId replyTo,                       String requestMsgId,                       String replyMsgId) {    for (int i = 0; i < messages.size(); i++) {      Message msg = (Message)messages.elementAt(i);      if (msg.getIdentifier().equals(request.getMessageId())) {        messages.removeElementAt(i);        ClientMessages deadMessages = new ClientMessages();        deadMessages.addMessage(msg);        sendToDMQ(deadMessages, null);        break;      }    }    replyToTopic(      new org.objectweb.joram.shared.admin.AdminReply(        true, null),      replyTo, requestMsgId, replyMsgId);  }  private void doReact(ClearQueue request,                       AgentId replyTo,                       String requestMsgId,                       String replyMsgId) {    if (messages.size() > 0) {      ClientMessages deadMessages = new ClientMessages();      for (int i = 0; i < messages.size(); i++) {        Message msg = (Message)messages.elementAt(i);        deadMessages.addMessage(msg);      }      sendToDMQ(deadMessages, null);      messages.clear();    }    replyToTopic(      new org.objectweb.joram.shared.admin.AdminReply(        true, null),      replyTo, requestMsgId, replyMsgId);  }  private void replyToTopic(    org.objectweb.joram.shared.admin.AdminReply reply,    AgentId replyTo,    String requestMsgId,    String replyMsgId) {    Message message = Message.create();    message.setCorrelationId(requestMsgId);    message.setTimestamp(System.currentTimeMillis());    message.setDestination(replyTo.toString(),                           Topic.TOPIC_TYPE);    message.setIdentifier(replyMsgId);    try {      message.setObject(reply);      Vector messages = new Vector();      messages.add(message);      ClientMessages clientMessages =         new ClientMessages(-1, -1, messages);      Channel.sendTo(replyTo, clientMessages);    } catch (Exception exc) {      if (logger.isLoggable(BasicLevel.ERROR))        logger.log(BasicLevel.ERROR, "", exc);      throw new Error(exc.getMessage());    }  }  /**   * The <code>DestinationImpl</code> class calls this method for passing   * notifications which have been partly processed, so that they are   * specifically processed by the <code>QueueImpl</code> class.   */  protected void specialProcess(Notification not) {    if (not instanceof SetRightRequest)      doProcess((SetRightRequest) not);    else if (not instanceof ClientMessages)      doProcess((ClientMessages) not);    else if (not instanceof UnknownAgent)      doProcess((UnknownAgent) not);    else if (not instanceof DeleteNot)      doProcess((DeleteNot) not);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -