⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 queueimpl.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
  }  /**   * Method specifically processing a <code>SetRightRequest</code> instance.   * <p>   * When a reader is removed, and receive requests of this reader are still   * on the queue, they are replied to by an <code>ExceptionReply</code>.   */  protected void doProcess(SetRightRequest not)  {    // If the request does not unset a reader, doing nothing.    if (not.getRight() != -READ)      return;    AgentId user = not.getClient();    ReceiveRequest request;    AccessException exc;    ExceptionReply reply;    // Free reading right has been removed; replying to the non readers    // requests.    if (user == null) {      for (int i = 0; i < requests.size(); i++) {        request = (ReceiveRequest) requests.get(i);        if (! isReader(request.requester)) {          exc = new AccessException("Free READ access removed");          reply = new ExceptionReply(request, exc);          Channel.sendTo(request.requester, reply);          // state change, so save.          setSave();          requests.remove(i);          i--;        }      }    }    // Reading right of a given user has been removed; replying to its    // requests.    else {      for (int i = 0; i < requests.size(); i++) {        request = (ReceiveRequest) requests.get(i);        if (user.equals(request.requester)) {          exc = new AccessException("READ right removed");          reply = new ExceptionReply(request, exc);          Channel.sendTo(request.requester, reply);          // state change, so save.          setSave();          requests.remove(i);          i--;        }      }    }  }  /**   * Method specifically processing a <code>ClientMessages</code> instance.   * <p>   * This method stores the messages and launches a delivery sequence.   */  protected void doProcess(ClientMessages not)  {    receiving = true;    Message msg;    // Storing each received message:    for (Enumeration msgs = not.getMessages().elements();         msgs.hasMoreElements();) {      if (arrivalsCounter == Long.MAX_VALUE)        arrivalsCounter = 0;      msg = (Message) msgs.nextElement();      if (not.isPersistent()) {        // state change, so save.        setSave();      }      msg.order = arrivalsCounter++;      storeMessage(msg);    }    // Lauching a delivery sequence:    deliverMessages(0);    receiving = false;  }  /**   * Method specifically processing an <code>UnknownAgent</code> instance.   * <p>   * The specific processing is done when a <code>QueueMsgReply</code> was    * sent to a requester which does not exist anymore. In that case, the   * messages sent to this requester and not yet acknowledged are marked as   * "denied" for delivery to an other requester, and a new delivery sequence   * is launched. Messages considered as undeliverable are removed and sent to   * the DMQ.   */   protected void doProcess(UnknownAgent uA)  {    AgentId client = uA.agent;    Notification not = uA.not;    // If the notification is not a delivery, doing nothing.     if (! (not instanceof QueueMsgReply))      return;    String msgId;    Message msg;    AgentId consId;    ClientMessages deadMessages = null;    for (Enumeration e = deliveredMsgs.keys(); e.hasMoreElements();) {      msgId = (String) e.nextElement();      msg = (Message) deliveredMsgs.get(msgId);      consId = (AgentId) consumers.get(msgId);      // Delivered message has been delivered to the deleted client:      // denying it.      if (consId.equals(client)) {        deliveredMsgs.remove(msgId);        msg.denied = true;        // state change, so save.        setSave();        consumers.remove(msgId);        contexts.remove(msgId);        // If message considered as undeliverable, adding it to the        // vector of dead messages:        if (isUndeliverable(msg)) {          msg.delete();          msg.undeliverable = true;          if (deadMessages == null)            deadMessages = new ClientMessages();          deadMessages.addMessage(msg);        }        // Else, putting it back into the deliverables vector:        else          storeMessage(msg);        if (logger.isLoggable(BasicLevel.WARN))          logger.log(BasicLevel.WARN,                     "Message " + msg.getIdentifier() + " denied.");      }    }    // Sending dead messages to the DMQ, if needed:    if (deadMessages != null)      sendToDMQ(deadMessages, null);    // Launching a delivery sequence:    deliverMessages(0);  }  /**   * Method specifically processing a   * <code>fr.dyade.aaa.agent.DeleteNot</code> instance.   * <p>   * <code>ExceptionReply</code> replies are sent to the pending receivers,   * and the remaining messages are sent to the DMQ and deleted.   */  protected void doProcess(DeleteNot not)  {    // Building the exception to send to the pending receivers:    DestinationException exc = new DestinationException("Queue " + destId                                                        + " is deleted.");    ReceiveRequest rec;    ExceptionReply excRep;    // Sending it to the pending receivers:    cleanWaitingRequest(System.currentTimeMillis());    for (int i = 0; i < requests.size(); i++) {      rec = (ReceiveRequest) requests.elementAt(i);      excRep = new ExceptionReply(rec, exc);      if (logger.isLoggable(BasicLevel.DEBUG))        logger.log(BasicLevel.DEBUG,                   "Requester " + rec.requester +                   " notified of the queue deletion.");      Channel.sendTo(rec.requester, excRep);    }    // Sending the remaining messages to the DMQ, if needed:    if (! messages.isEmpty()) {      Message msg;      ClientMessages deadMessages = new ClientMessages();      while (! messages.isEmpty()) {        msg = (Message) messages.remove(0);        msg.deletedDest = true;        deadMessages.addMessage(msg);      }      sendToDMQ(deadMessages, null);    }    // Deleting the messages:    MessagePersistenceModule.deleteAll(getDestinationId());  }  /**   * Actually stores a message in the deliverables vector.   *   * @param message  The message to store.   */  protected final synchronized void storeMessage(Message message) {    addMessage(message);    // Persisting the message.    message.save(getDestinationId());    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(BasicLevel.DEBUG,                 "Message " + message.getIdentifier() + " stored.");  }  protected final synchronized void addMessage(Message message) {    nbMsgsReceiveSinceCreation++;    if (nbMaxMsg > -1 && nbMaxMsg <= messages.size()) {      ClientMessages deadMessages = new ClientMessages();      deadMessages.addMessage(message);      sendToDMQ(deadMessages, null);      return;    }    if (messages.isEmpty()) {      samePriorities = true;      priority = message.getPriority();    } else if (samePriorities && priority != message.getPriority()) {      samePriorities = false;    }    if (samePriorities) {      // Constant priorities: no need to insert the message according to      // its priority.      if (receiving) {        // Message being received: adding it at the end of the queue.        messages.add(message);      } else {        // Denying or recovery: adding the message according to its original        // arrival order.        long currentO;        int i = 0;        for (Enumeration e = messages.elements(); e.hasMoreElements();) {          currentO = ((Message) e.nextElement()).order;          if (currentO > message.order) break;          i++;        }        messages.insertElementAt(message, i);      }    } else {      // Non constant priorities: inserting the message according to its       // priority.      Message currentMsg;      int currentP;      long currentO;      int i = 0;      for (Enumeration e = messages.elements(); e.hasMoreElements();) {        currentMsg = (Message) e.nextElement();        currentP = currentMsg.getPriority();        currentO = currentMsg.order;        if (! receiving && currentP == message.getPriority()) {          // Message denied or recovered, priorities are equal: inserting the          // message according to its original arrival order.          if (currentO > message.order) break;        } else if (currentP < message.getPriority()) {          // Current priority lower than the message to store: inserting it.          break;        }        i++;      }      messages.insertElementAt(message, i);    }  }  /**   * Actually tries to answer the pending "receive" requests.   * <p>   * The method may send <code>QueueMsgReply</code> replies to clients.   *   * @param index  Index where starting to "browse" the requests.   */  protected void deliverMessages(int index) {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(BasicLevel.DEBUG, "QueueImpl.deliverMessages(" + index + ')');    ReceiveRequest notRec = null;    boolean replied;    int j = 0;    Message msg;    QueueMsgReply notMsg;    ClientMessages deadMessages = null;    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(BasicLevel.DEBUG, " -> requests = " + requests + ')');    long current = System.currentTimeMillis();    cleanWaitingRequest(current);     // Cleaning the possible expired messages.    deadMessages = cleanPendingMessage(current);       // Processing each request as long as there are deliverable messages:    while (! messages.isEmpty() && index < requests.size()) {       notRec = (ReceiveRequest) requests.get(index);      replied = false;      notMsg = new QueueMsgReply(notRec);      // Checking the deliverable messages:      while (j < messages.size()) {        msg = (Message) messages.get(j);        // If selector matches, sending the message:        if (Selector.matches(msg, notRec.getSelector())             && checkDelivery(msg)) {          messages.remove(j);          msg.deliveryCount++;          notMsg.addMessage(msg);                        if (isLocal(notRec.requester)) {            notMsg.setPersistent(false);          }          nbMsgsDeliverSinceCreation++;          // use in sub class see ClusterQueueImpl          messageDelivered(msg.getIdentifier());          if (logger.isLoggable(BasicLevel.DEBUG))            logger.log(BasicLevel.DEBUG,                       "Message " + msg.getIdentifier() + " to " +                       notRec.requester + " as reply to " + notRec.getRequestId());                                                    // Removing the message if request in auto ack mode:          if (notRec.getAutoAck())            msg.delete();          // Else, putting the message in the delivered messages table:          else {            if (notMsg.isPersistent()) {              // state change, so save.              setSave();            }            consumers.put(msg.getIdentifier(), notRec.requester);            contexts.put(msg.getIdentifier(),                         new Integer(notRec.getClientContext()));            deliveredMsgs.put(msg.getIdentifier(), msg);          }                        if (notMsg.getSize() == notRec.getMessageCount()) {            break;          }        } else {          // If message delivered or selector does not match: going on          j++;        }      }      // Next request:      if (notMsg.getSize() > 0) {        requests.remove(index);        Channel.sendTo(notRec.requester, notMsg);      } else {        index++;      }      j = 0;    }    // If needed, sending the dead messages to the DMQ:    if (deadMessages != null)      sendToDMQ(deadMessages, null);  }    protected boolean checkDelivery(Message msg) {	return true;    }  /**    * call in deliverMessages just after channel.sendTo(msg),   * overload this methode to process a specific treatment.   */  protected void messageDelivered(String msgId) {}  /**    * call in deliverMessages just after a remove message (invalid),   * overload this methode to process a specific treatment.   */  protected void messageRemoved(String msgId) {}    /**   * Returns <code>true</code> if a given message is considered as    * undeliverable, because its delivery count matches the queue's    * threshold, if any, or the server's default threshold value (if any).   */  protected boolean isUndeliverable(Message message)  {    if (threshold != null)      return message.deliveryCount == threshold.intValue();    else if (DeadMQueueImpl.threshold != null)      return message.deliveryCount == DeadMQueueImpl.threshold.intValue();    return false;  }  /** Deserializes a <code>QueueImpl</code> instance. */  private void readObject(java.io.ObjectInputStream in)               throws IOException, ClassNotFoundException {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(BasicLevel.DEBUG, "QueueImpl.readObject()");    in.defaultReadObject();    receiving = false;    messages = new Vector();    deliveredMsgs = new Hashtable();    // Retrieving the persisted messages, if any.    Vector persistedMsgs = MessagePersistenceModule.loadAll(getDestinationId());    if (persistedMsgs != null) {      Message persistedMsg;      AgentId consId;      while (! persistedMsgs.isEmpty()) {        persistedMsg = (Message) persistedMsgs.remove(0);        consId = (AgentId) consumers.get(persistedMsg.getIdentifier());        if (consId == null) {          addMessage(persistedMsg);        } else if (isLocal(consId)) {          if (logger.isLoggable(BasicLevel.DEBUG))            logger.log(BasicLevel.DEBUG,                       " -> deny " + persistedMsg.getIdentifier());          consumers.remove(persistedMsg.getIdentifier());          contexts.remove(persistedMsg.getIdentifier());          addMessage(persistedMsg);        } else {          deliveredMsgs.put(persistedMsg.getIdentifier(), persistedMsg);        }      }    }  }  public void readBag(ObjectInputStream in)     throws IOException, ClassNotFoundException {    receiving = in.readBoolean();    messages = (Vector)in.readObject();    deliveredMsgs = (Hashtable)in.readObject();    for (int i = 0; i < messages.size(); i++) {      Message message = (Message)messages.elementAt(i);      message.save(getDestinationId());    }  }  public void writeBag(ObjectOutputStream out)    throws IOException {    out.writeBoolean(receiving);    out.writeObject(messages);    out.writeObject(deliveredMsgs);  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -