⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 queueimpl.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies * Copyright (C) 1996 - 2000 Dyade * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. *  * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * Lesser General Public License for more details. *  * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 * USA. * * Initial developer(s): Frederic Maistre (INRIA) * Contributor(s): ScalAgent Distributed Technologies */package org.objectweb.joram.mom.dest;import java.io.IOException;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.util.Enumeration;import java.util.Hashtable;import java.util.Vector;import java.util.Properties;import fr.dyade.aaa.agent.AgentId;import fr.dyade.aaa.agent.AgentServer;import fr.dyade.aaa.agent.Channel;import fr.dyade.aaa.agent.DeleteNot;import fr.dyade.aaa.agent.Notification;import fr.dyade.aaa.agent.UnknownAgent;import fr.dyade.aaa.agent.UnknownNotificationException;import fr.dyade.aaa.util.Debug;import org.objectweb.joram.mom.MomTracing;import org.objectweb.joram.mom.notifications.*;import org.objectweb.joram.mom.notifications.AdminReply;import org.objectweb.joram.mom.util.MessagePersistenceModule;import org.objectweb.joram.shared.admin.*;import org.objectweb.joram.shared.excepts.*;import org.objectweb.joram.shared.messages.Message;import org.objectweb.joram.shared.selectors.*;import org.objectweb.util.monolog.api.BasicLevel;import org.objectweb.util.monolog.api.Logger;/** * The <code>QueueImpl</code> class implements the MOM queue behaviour, * basically storing messages and delivering them upon clients requests. */public class QueueImpl extends DestinationImpl implements QueueImplMBean {  public static Logger logger = Debug.getLogger(QueueImpl.class.getName());  /** period to run task at regular interval: cleaning, load-balancing, etc. */  protected long period = -1;  /**   * Returns  the period value of this queue, -1 if not set.   *   * @return the period value of this queue; -1 if not set.   */  public long getPeriod() {    return period;  }  /**   * Sets or unsets the period for this queue.   *   * @param period The period value to be set or -1 for unsetting previous   *               value.   */  public void setPeriod(long period) {    this.period = period;  }  /**   * Threshold above which messages are considered as undeliverable because   * constantly denied; 0 stands for no threshold, <code>null</code> for value   * not set.   */  private Integer threshold = null;  /**   * Returns  the threshold value of this queue, -1 if not set.   *   * @return the threshold value of this queue; -1 if not set.   */  public int getThreshold() {    if (threshold == null)      return -1;    else      return threshold.intValue();  }  /**   * Sets or unsets the threshold for this queue.   *   * @param The threshold value to be set (-1 for unsetting previous value).   */  public void setThreshold(int threshold) {    if (threshold < 0)      this.threshold = null;    else      this.threshold = new Integer(threshold);  }  /** <code>true</code> if all the stored messages have the same priority. */  private boolean samePriorities;  /** Common priority value. */  private int priority;   /** Table keeping the messages' consumers identifiers. */  protected Hashtable consumers;  /** Table keeping the messages' consumers contexts. */  protected Hashtable contexts;  /** Counter of messages arrivals. */  protected long arrivalsCounter = 0;  /**   * Returns the number of messages received since creation time.   *   * @return The number of received messages.   */  public int getMessageCounter() {    if (messages != null) {      return messages.size();    }    return 0;  }  /** Vector holding the requests before reply or expiry. */  protected Vector requests;  /**   * Cleans the waiting request list.   * Removes all request that the expiration time is less than the time   * given in parameter.   *   * @param currentTime The current time.   */  protected void cleanWaitingRequest(long currentTime) {    int index = 0;    while (index < requests.size()) {      if (! ((ReceiveRequest) requests.get(index)).isValid(currentTime)) {        // Request expired: removing it        requests.remove(index);        // It's not really necessary to save its state, in case of failure        // a similar work will be done at restart.      } else {        index++;      }    }  }  /**   * Returns the number of waiting requests in the queue.   *   * @return The number of waiting requests.   */  public int getWaitingRequestCount() {    if (requests != null) {       cleanWaitingRequest(System.currentTimeMillis());      return requests.size();    }    return 0;  }  /** <code>true</code> if the queue is currently receiving messages. */  protected transient boolean receiving = false;  /** Vector holding the messages before delivery. */  protected transient Vector messages;  /**   * Cleans the pending messages list.   * Removes all messages that the expiration time is less than the time   * given in parameter.   *   * @param currentTime The current time.   * @return		A vector of all expired messages.   */  protected ClientMessages cleanPendingMessage(long currentTime) {    int index = 0;    ClientMessages deadMessages = null;    Message message = null;    while (index < messages.size()) {      message = (Message) messages.get(index);      if (! message.isValid(currentTime)) {        messages.remove(index);        message.delete();                message.expired = true;        if (deadMessages == null)          deadMessages = new ClientMessages();        deadMessages.addMessage(message);        if (logger.isLoggable(BasicLevel.DEBUG))          logger.log(BasicLevel.DEBUG,                     "Removes expired message " + message.getIdentifier());      } else {        index++;      }    }    return deadMessages;  }  /**   * Returns the number of pending messages in the queue.   *   * @return The number of pending messages.   */  public int getPendingMessageCount() {    if (messages != null) {      return messages.size();    }    return 0;  }  /** Table holding the delivered messages before acknowledgement. */  protected transient Hashtable deliveredMsgs;  /**   * Returns the number of messages delivered and waiting for acknowledge.   *   * @return The number of messages delivered.   */  public int getDeliveredMessageCount() {    if (deliveredMsgs != null) {      return deliveredMsgs.size();    }    return 0;  }  /** nb Max of Message store in queue (-1 no limit). */  protected int nbMaxMsg = -1;  /**   * Returns the maximum number of message for the destination.   * If the limit is unset the method returns -1.   *   * @return the maximum number of message for subscription if set;   *	     -1 otherwise.   */  public int getNbMaxMsg() {    return nbMaxMsg;  }  /**   * Sets the maximum number of message for the destination.   *   * @param nbMaxMsg the maximum number of message (-1 set no limit).   */  public void setNbMaxMsg(int nbMaxMsg) {    // state change, so save.    setSave();    this.nbMaxMsg = nbMaxMsg;  }  /**   * Constructs a <code>QueueImpl</code> instance.   *   * @param destId   Identifier of the agent hosting the queue.   * @param adminId  Identifier of the administrator of the queue.   * @param prop     The initial set of properties.   */  public QueueImpl(AgentId destId, AgentId adminId, Properties prop) {    super(destId, adminId, prop);    try {      if (prop != null)        period = Long.valueOf(prop.getProperty("period")).longValue();    } catch (NumberFormatException exc) {      period = -1L;    }    consumers = new Hashtable();    contexts = new Hashtable();    requests = new Vector();  }  /**   * Returns a string representation of this destination.   */  public String toString() {    return "QueueImpl:" + destId.toString();  }  /**   * Distributes the received notifications to the appropriate reactions.   *   * @exception UnknownNotificationException  When receiving an unexpected   *              notification.   */  public void react(AgentId from, Notification not)              throws UnknownNotificationException {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(BasicLevel.DEBUG,                 "QueueImpl.react(" + from + ',' + not + ')');    int reqId = -1;    if (not instanceof AbstractRequest)      reqId = ((AbstractRequest) not).getRequestId();    try {      if (not instanceof SetThreshRequest)        doReact(from, (SetThreshRequest) not);      else if (not instanceof SetNbMaxMsgRequest)        doReact(from, (SetNbMaxMsgRequest) not);      else if (not instanceof Monit_GetPendingMessages)        doReact(from, (Monit_GetPendingMessages) not);      else if (not instanceof Monit_GetPendingRequests)        doReact(from, (Monit_GetPendingRequests) not);      else if (not instanceof Monit_GetNbMaxMsg)        doReact(from, (Monit_GetNbMaxMsg) not);      else if (not instanceof ReceiveRequest)        doReact(from, (ReceiveRequest) not);      else if (not instanceof BrowseRequest)        doReact(from, (BrowseRequest) not);      else if (not instanceof AcknowledgeRequest)        doReact(from, (AcknowledgeRequest) not);      else if (not instanceof DenyRequest)        doReact(from, (DenyRequest) not);      else if (not instanceof AbortReceiveRequest)        doReact(from, (AbortReceiveRequest) not);      else if (not instanceof DestinationAdminRequestNot)        doReact(from, (DestinationAdminRequestNot) not);      else if (not instanceof WakeUpNot)        doReact((WakeUpNot) not);      else        super.react(from, not);    }    // MOM Exceptions are sent to the requester.    catch (MomException exc) {      if (logger.isLoggable(BasicLevel.WARN))        logger.log(BasicLevel.WARN, exc);      if (not instanceof AbstractRequest) {        AbstractRequest req = (AbstractRequest) not;        Channel.sendTo(from, new ExceptionReply(req, exc));      }    }  }  /**   * wake up, and cleans the queue.   */  protected void doReact(WakeUpNot not) {    long current = System.currentTimeMillis();    cleanWaitingRequest(current);     // Cleaning the possible expired messages.    ClientMessages deadMessages = cleanPendingMessage(current);    // If needed, sending the dead messages to the DMQ:    if (deadMessages != null)      sendToDMQ(deadMessages, null);  }  /**   * Method implementing the reaction to a <code>SetThreshRequest</code>   * instance setting the threshold value for this queue.   *   * @exception AccessException  If the requester is not the administrator.   */  protected void doReact(AgentId from, SetThreshRequest req)    throws AccessException {    if (! isAdministrator(from))      throw new AccessException("ADMIN right not granted");    // state change, so save.    setSave();    threshold = req.getThreshold();        String info = strbuf.append("Request [").append(req.getClass().getName())      .append("], sent to Queue [").append(destId)      .append("], successful [true]: threshold [")      .append(threshold).append("] set").toString();    strbuf.setLength(0);    Channel.sendTo(from, new AdminReply(req, true, info));    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(BasicLevel.DEBUG, info);  }  /**   * Method implementing the reaction to a <code>SetNbMaxMsgRequest</code>   * instance setting the NbMaxMsg value for this queue.   *   * @exception AccessException  If the requester is not the administrator.   */  protected void doReact(AgentId from, SetNbMaxMsgRequest req)    throws AccessException {    if (! isAdministrator(from))      throw new AccessException("ADMIN right not granted");    nbMaxMsg = req.getNbMaxMsg();        String info = strbuf.append("Request [").append(req.getClass().getName())      .append("], sent to Queue [").append(destId)      .append("], successful [true]: nbMaxMsg [")      .append(nbMaxMsg).append("] set").toString();    strbuf.setLength(0);    Channel.sendTo(from, new AdminReply(req, true, info));    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(BasicLevel.DEBUG, info);  }  /**   * Overrides this <code>DestinationImpl</code> method for sending back   * the threshold along with the DMQ id.   *   * @exception AccessException  If the requester is not the administrator.   */  protected void doReact(AgentId from, Monit_GetDMQSettings not)                 throws AccessException  {    if (! isAdministrator(from))      throw new AccessException("ADMIN right not granted");    String id = null;    if (dmqId != null)      id = dmqId.toString();    Channel.sendTo(from, new Monit_GetDMQSettingsRep(not, id, threshold));  }  /**   * Method implementing the reaction to a   * <code>Monit_GetPendingMessages</code> notification requesting the   * number of pending messages.   *   * @exception AccessException  If the requester is not the administrator.   */  protected void doReact(AgentId from, Monit_GetPendingMessages not)                 throws AccessException {    if (! isAdministrator(from))      throw new AccessException("ADMIN right not granted");    // Cleaning the possible expired messages.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -