📄 queueimpl.java
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies * Copyright (C) 1996 - 2000 Dyade * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA. * * Initial developer(s): Frederic Maistre (INRIA) * Contributor(s): ScalAgent Distributed Technologies */package org.objectweb.joram.mom.dest;import java.io.IOException;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.util.Enumeration;import java.util.Hashtable;import java.util.Vector;import java.util.Properties;import fr.dyade.aaa.agent.AgentId;import fr.dyade.aaa.agent.AgentServer;import fr.dyade.aaa.agent.Channel;import fr.dyade.aaa.agent.DeleteNot;import fr.dyade.aaa.agent.Notification;import fr.dyade.aaa.agent.UnknownAgent;import fr.dyade.aaa.agent.UnknownNotificationException;import fr.dyade.aaa.util.Debug;import org.objectweb.joram.mom.MomTracing;import org.objectweb.joram.mom.notifications.*;import org.objectweb.joram.mom.notifications.AdminReply;import org.objectweb.joram.mom.util.MessagePersistenceModule;import org.objectweb.joram.shared.admin.*;import org.objectweb.joram.shared.excepts.*;import org.objectweb.joram.shared.messages.Message;import org.objectweb.joram.shared.selectors.*;import org.objectweb.util.monolog.api.BasicLevel;import org.objectweb.util.monolog.api.Logger;/** * The <code>QueueImpl</code> class implements the MOM queue behaviour, * basically storing messages and delivering them upon clients requests. */public class QueueImpl extends DestinationImpl implements QueueImplMBean { public static Logger logger = Debug.getLogger(QueueImpl.class.getName()); /** period to run task at regular interval: cleaning, load-balancing, etc. */ protected long period = -1; /** * Returns the period value of this queue, -1 if not set. * * @return the period value of this queue; -1 if not set. */ public long getPeriod() { return period; } /** * Sets or unsets the period for this queue. * * @param period The period value to be set or -1 for unsetting previous * value. */ public void setPeriod(long period) { this.period = period; } /** * Threshold above which messages are considered as undeliverable because * constantly denied; 0 stands for no threshold, <code>null</code> for value * not set. */ private Integer threshold = null; /** * Returns the threshold value of this queue, -1 if not set. * * @return the threshold value of this queue; -1 if not set. */ public int getThreshold() { if (threshold == null) return -1; else return threshold.intValue(); } /** * Sets or unsets the threshold for this queue. * * @param The threshold value to be set (-1 for unsetting previous value). */ public void setThreshold(int threshold) { if (threshold < 0) this.threshold = null; else this.threshold = new Integer(threshold); } /** <code>true</code> if all the stored messages have the same priority. */ private boolean samePriorities; /** Common priority value. */ private int priority; /** Table keeping the messages' consumers identifiers. */ protected Hashtable consumers; /** Table keeping the messages' consumers contexts. */ protected Hashtable contexts; /** Counter of messages arrivals. */ protected long arrivalsCounter = 0; /** * Returns the number of messages received since creation time. * * @return The number of received messages. */ public int getMessageCounter() { if (messages != null) { return messages.size(); } return 0; } /** Vector holding the requests before reply or expiry. */ protected Vector requests; /** * Cleans the waiting request list. * Removes all request that the expiration time is less than the time * given in parameter. * * @param currentTime The current time. */ protected void cleanWaitingRequest(long currentTime) { int index = 0; while (index < requests.size()) { if (! ((ReceiveRequest) requests.get(index)).isValid(currentTime)) { // Request expired: removing it requests.remove(index); // It's not really necessary to save its state, in case of failure // a similar work will be done at restart. } else { index++; } } } /** * Returns the number of waiting requests in the queue. * * @return The number of waiting requests. */ public int getWaitingRequestCount() { if (requests != null) { cleanWaitingRequest(System.currentTimeMillis()); return requests.size(); } return 0; } /** <code>true</code> if the queue is currently receiving messages. */ protected transient boolean receiving = false; /** Vector holding the messages before delivery. */ protected transient Vector messages; /** * Cleans the pending messages list. * Removes all messages that the expiration time is less than the time * given in parameter. * * @param currentTime The current time. * @return A vector of all expired messages. */ protected ClientMessages cleanPendingMessage(long currentTime) { int index = 0; ClientMessages deadMessages = null; Message message = null; while (index < messages.size()) { message = (Message) messages.get(index); if (! message.isValid(currentTime)) { messages.remove(index); message.delete(); message.expired = true; if (deadMessages == null) deadMessages = new ClientMessages(); deadMessages.addMessage(message); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "Removes expired message " + message.getIdentifier()); } else { index++; } } return deadMessages; } /** * Returns the number of pending messages in the queue. * * @return The number of pending messages. */ public int getPendingMessageCount() { if (messages != null) { return messages.size(); } return 0; } /** Table holding the delivered messages before acknowledgement. */ protected transient Hashtable deliveredMsgs; /** * Returns the number of messages delivered and waiting for acknowledge. * * @return The number of messages delivered. */ public int getDeliveredMessageCount() { if (deliveredMsgs != null) { return deliveredMsgs.size(); } return 0; } /** nb Max of Message store in queue (-1 no limit). */ protected int nbMaxMsg = -1; /** * Returns the maximum number of message for the destination. * If the limit is unset the method returns -1. * * @return the maximum number of message for subscription if set; * -1 otherwise. */ public int getNbMaxMsg() { return nbMaxMsg; } /** * Sets the maximum number of message for the destination. * * @param nbMaxMsg the maximum number of message (-1 set no limit). */ public void setNbMaxMsg(int nbMaxMsg) { // state change, so save. setSave(); this.nbMaxMsg = nbMaxMsg; } /** * Constructs a <code>QueueImpl</code> instance. * * @param destId Identifier of the agent hosting the queue. * @param adminId Identifier of the administrator of the queue. * @param prop The initial set of properties. */ public QueueImpl(AgentId destId, AgentId adminId, Properties prop) { super(destId, adminId, prop); try { if (prop != null) period = Long.valueOf(prop.getProperty("period")).longValue(); } catch (NumberFormatException exc) { period = -1L; } consumers = new Hashtable(); contexts = new Hashtable(); requests = new Vector(); } /** * Returns a string representation of this destination. */ public String toString() { return "QueueImpl:" + destId.toString(); } /** * Distributes the received notifications to the appropriate reactions. * * @exception UnknownNotificationException When receiving an unexpected * notification. */ public void react(AgentId from, Notification not) throws UnknownNotificationException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "QueueImpl.react(" + from + ',' + not + ')'); int reqId = -1; if (not instanceof AbstractRequest) reqId = ((AbstractRequest) not).getRequestId(); try { if (not instanceof SetThreshRequest) doReact(from, (SetThreshRequest) not); else if (not instanceof SetNbMaxMsgRequest) doReact(from, (SetNbMaxMsgRequest) not); else if (not instanceof Monit_GetPendingMessages) doReact(from, (Monit_GetPendingMessages) not); else if (not instanceof Monit_GetPendingRequests) doReact(from, (Monit_GetPendingRequests) not); else if (not instanceof Monit_GetNbMaxMsg) doReact(from, (Monit_GetNbMaxMsg) not); else if (not instanceof ReceiveRequest) doReact(from, (ReceiveRequest) not); else if (not instanceof BrowseRequest) doReact(from, (BrowseRequest) not); else if (not instanceof AcknowledgeRequest) doReact(from, (AcknowledgeRequest) not); else if (not instanceof DenyRequest) doReact(from, (DenyRequest) not); else if (not instanceof AbortReceiveRequest) doReact(from, (AbortReceiveRequest) not); else if (not instanceof DestinationAdminRequestNot) doReact(from, (DestinationAdminRequestNot) not); else if (not instanceof WakeUpNot) doReact((WakeUpNot) not); else super.react(from, not); } // MOM Exceptions are sent to the requester. catch (MomException exc) { if (logger.isLoggable(BasicLevel.WARN)) logger.log(BasicLevel.WARN, exc); if (not instanceof AbstractRequest) { AbstractRequest req = (AbstractRequest) not; Channel.sendTo(from, new ExceptionReply(req, exc)); } } } /** * wake up, and cleans the queue. */ protected void doReact(WakeUpNot not) { long current = System.currentTimeMillis(); cleanWaitingRequest(current); // Cleaning the possible expired messages. ClientMessages deadMessages = cleanPendingMessage(current); // If needed, sending the dead messages to the DMQ: if (deadMessages != null) sendToDMQ(deadMessages, null); } /** * Method implementing the reaction to a <code>SetThreshRequest</code> * instance setting the threshold value for this queue. * * @exception AccessException If the requester is not the administrator. */ protected void doReact(AgentId from, SetThreshRequest req) throws AccessException { if (! isAdministrator(from)) throw new AccessException("ADMIN right not granted"); // state change, so save. setSave(); threshold = req.getThreshold(); String info = strbuf.append("Request [").append(req.getClass().getName()) .append("], sent to Queue [").append(destId) .append("], successful [true]: threshold [") .append(threshold).append("] set").toString(); strbuf.setLength(0); Channel.sendTo(from, new AdminReply(req, true, info)); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, info); } /** * Method implementing the reaction to a <code>SetNbMaxMsgRequest</code> * instance setting the NbMaxMsg value for this queue. * * @exception AccessException If the requester is not the administrator. */ protected void doReact(AgentId from, SetNbMaxMsgRequest req) throws AccessException { if (! isAdministrator(from)) throw new AccessException("ADMIN right not granted"); nbMaxMsg = req.getNbMaxMsg(); String info = strbuf.append("Request [").append(req.getClass().getName()) .append("], sent to Queue [").append(destId) .append("], successful [true]: nbMaxMsg [") .append(nbMaxMsg).append("] set").toString(); strbuf.setLength(0); Channel.sendTo(from, new AdminReply(req, true, info)); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, info); } /** * Overrides this <code>DestinationImpl</code> method for sending back * the threshold along with the DMQ id. * * @exception AccessException If the requester is not the administrator. */ protected void doReact(AgentId from, Monit_GetDMQSettings not) throws AccessException { if (! isAdministrator(from)) throw new AccessException("ADMIN right not granted"); String id = null; if (dmqId != null) id = dmqId.toString(); Channel.sendTo(from, new Monit_GetDMQSettingsRep(not, id, threshold)); } /** * Method implementing the reaction to a * <code>Monit_GetPendingMessages</code> notification requesting the * number of pending messages. * * @exception AccessException If the requester is not the administrator. */ protected void doReact(AgentId from, Monit_GetPendingMessages not) throws AccessException { if (! isAdministrator(from)) throw new AccessException("ADMIN right not granted"); // Cleaning the possible expired messages.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -