📄 clientsubscription.java
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2003 - 2006 ScalAgent Distributed Technologies * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA. * * Initial developer(s): Frederic Maistre (INRIA) * Contributor(s): ScalAgent Distributed Technologies */package org.objectweb.joram.mom.proxies;import fr.dyade.aaa.agent.AgentId;import fr.dyade.aaa.agent.Channel;import org.objectweb.joram.mom.MomTracing;import org.objectweb.joram.mom.dest.DeadMQueueImpl;import org.objectweb.joram.mom.notifications.ClientMessages;import org.objectweb.joram.shared.client.ConsumerMessages;import org.objectweb.joram.shared.messages.Message;import org.objectweb.joram.shared.selectors.Selector;import org.objectweb.util.monolog.api.BasicLevel;import java.util.Enumeration;import java.util.Hashtable;import java.util.Vector;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.io.IOException;/** * The <code>ClientSubscription</code> class holds the data of a client * subscription, and the methods managing the delivery and acknowledgement * of the messages. */class ClientSubscription implements java.io.Serializable { /** The proxy's agent identifier. */ private AgentId proxyId; /** <code>true</code> if the subscription is durable. */ private boolean durable; /** The topic identifier. */ private AgentId topicId; /** The subscription name. */ private String name; /** The selector for filtering messages. */ private String selector; /** * Identifier of the subscriber's dead message queue, <code>null</code> for * DMQ not set. */ private AgentId dmqId; /** * Threshold value, 0 or negative for no threshold, <code>null</code> for * value not set. */ private Integer threshold; /** nb Max of Message store in queue (-1 no limit). */ protected int nbMaxMsg = -1; /** Vector of identifiers of the messages to deliver. */ private Vector messageIds; /** Table of delivered messages identifiers. */ private Hashtable deliveredIds; /** Table keeping the denied messages identifiers. */ private Hashtable deniedMsgs; /** Identifier of the subscription context. */ private transient int contextId; /** Identifier of the subscription request. */ private transient int subRequestId; /** * <code>true</code> if the subscriber does not wish to consume * messages published in the same context. */ private transient boolean noLocal; /** * <code>true</code> if the subscription does not filter messages * in any way. */ private transient boolean noFiltering; /** <code>true</code> if the subscription is active. */ private transient boolean active; /** * Identifier of the request requesting messages, either the listener's * request, or a "receive" request. */ private transient int requestId; /** <code>true</code> if the messages are destinated to a listener. */ private transient boolean toListener; /** Expiration time of the "receive" request, if any. */ private transient long requestExpTime; /** Proxy messages table. */ private transient Hashtable messagesTable; /** string proxy agent id */ private transient String proxyStringId; private transient ProxyAgentItf proxy; /** * Constructs a <code>ClientSubscription</code> instance. * * @param proxyId Proxy's identifier. * @param contextId Context identifier. * @param reqId Request identifier. * @param durable <code>true</code> for a durable subscription. * @param topicId Topic identifier. * @param name Subscription's name. * @param selector Selector for filtering messages. * @param noLocal <code>true</code> for not consuming messages published * within the same proxy's context. * @param dmqId Identifier of the proxy's dead message queue, if any. * @param threshold Proxy's threshold value, if any. * @param messagesTable Proxy's messages table. */ ClientSubscription(AgentId proxyId, int contextId, int reqId, boolean durable, AgentId topicId, String name, String selector, boolean noLocal, AgentId dmqId, Integer threshold, Hashtable messagesTable) { this.proxyId = proxyId; this.contextId = contextId; this.subRequestId = reqId; this.durable = durable; this.topicId = topicId; this.name = name; this.selector = selector; this.noLocal = noLocal; this.dmqId = dmqId; this.threshold = threshold; this.messagesTable = messagesTable; messageIds = new Vector(); deliveredIds = new Hashtable(); deniedMsgs = new Hashtable(); noFiltering = (! noLocal) && (selector == null || selector.equals("")); active = true; requestId = -1; toListener = false; proxyStringId = proxyId.toString(); if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, this + ": created."); }// public String dump() {// StringBuffer buff = new StringBuffer();// buff.append("ClientSubscription (proxyId=");// buff.append(proxyId);// buff.append(",topicId=");// buff.append(topicId);// buff.append(",messageIds=");// buff.append(messageIds);// buff.append(",contextId=");// buff.append(contextId);// buff.append(",subRequestId=");// buff.append(subRequestId);// buff.append(",noLocal=");// buff.append(noLocal);// buff.append(",active=");// buff.append(active);// buff.append(",requestId=");// buff.append(requestId);// buff.append(",toListener=");// buff.append(toListener);// buff.append(",messagesTable=");// buff.append(messagesTable);// buff.append(")");// return buff.toString();// } public String toString() { return "ClientSubscription" + proxyId + name; } /** Returns the subscription's context identifier. */ int getContextId() { return contextId; } /** Returns the identifier of the subscribing request. */ int getSubRequestId() { return subRequestId; } /** Returns the name of the subscription. */ String getName() { return name; } /** Returns the identifier of the subscription topic. */ AgentId getTopicId() { return topicId; } /** Returns the selector. */ String getSelector() { return selector; } /** Returns <code>true</code> if the subscription is durable. */ boolean getDurable() { return durable; } /** Returns <code>true</code> if the subscription is active. */ boolean getActive() { return active; } /** * Returns the maximum number of message for the subscription. * If the limit is unset the method returns -1. * * @return the maximum number of message for subscription if set; * -1 otherwise. */ public int getNbMaxMsg() { return nbMaxMsg; } /** * Sets the maximum number of message for the subscription. * * @param nbMaxMsg the maximum number of message for subscription (-1 set * no limit). */ public void setNbMaxMsg(int nbMaxMsg) { this.nbMaxMsg = nbMaxMsg; } /** * Returns the number of pending messages for the subscription. * * @return The number of pending message for the subscription. */ int getMessageCount() { return messageIds.size(); } /** * Returns the list of message's identifiers for the subscription. * * @return the list of message's identifiers for the subscription. */ String[] getMessageIds() { String[] res = new String[messageIds.size()]; messageIds.copyInto(res); return res; } void setProxyAgent(ProxyAgentItf px) { proxy = px; } /** * Re-initializes the client subscription. * * @param proxyStringId string proxy id. * @param messagesTable Proxy's table where storing the messages. * @param persistedMessages Proxy's persisted messages. * @param denyDeliveredMessages Denies already delivered messages. */ void reinitialize(String proxyStringId, Hashtable messagesTable, Vector persistedMessages, boolean denyDeliveredMessages) { if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, "ClientSubscription[" + this + "].reinitialize()"); this.proxyStringId = proxyStringId; this.messagesTable = messagesTable; // Browsing the persisted messages. Message message; String msgId; for (Enumeration e = persistedMessages.elements(); e.hasMoreElements();) { message = (Message) e.nextElement(); msgId = message.getIdentifier(); if (messageIds.contains(msgId) || deliveredIds.contains(msgId)) { if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log( BasicLevel.DEBUG, " -> contains message " + msgId); message.acksCounter++; message.durableAcksCounter++; if (message.acksCounter == 1) { if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log( BasicLevel.DEBUG, " -> messagesTable.put(" + msgId + ')'); messagesTable.put(msgId, message); }// if (message.durableAcksCounter == 1) {// if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))// MomTracing.dbgProxy.log(// BasicLevel.DEBUG,// " -> save message " + message);// it's alredy save.// message.save(proxyStringId); // } } } if (denyDeliveredMessages) { // Denying all previously delivered messages: deny(deliveredIds.keys()); deliveredIds.clear(); } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -