📄 proxyimpl.java
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies * Copyright (C) 2004 France Telecom R&D * Copyright (C) 1996 - 2004 Bull SA * Copyright (C) 1996 - 2000 Dyade * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA. * * Initial developer(s): Frederic Maistre (INRIA) * Contributor(s): ScalAgent Distributed Technologies */package org.objectweb.joram.mom.proxies;import java.io.IOException;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.util.Enumeration;import java.util.Hashtable;import java.util.Vector;import org.objectweb.util.monolog.api.BasicLevel;import org.objectweb.util.monolog.api.Logger;import fr.dyade.aaa.agent.AgentId;import fr.dyade.aaa.agent.DeleteNot;import fr.dyade.aaa.agent.Notification;import fr.dyade.aaa.agent.UnknownAgent;import fr.dyade.aaa.agent.UnknownNotificationException;import fr.dyade.aaa.agent.Channel;import fr.dyade.aaa.util.Debug;import org.objectweb.joram.mom.MomTracing;import org.objectweb.joram.mom.dest.*;import org.objectweb.joram.mom.notifications.*;import org.objectweb.joram.mom.util.MessagePersistenceModule;import org.objectweb.joram.shared.admin.GetSubscriptions;import org.objectweb.joram.shared.admin.GetSubscriptionsRep;import org.objectweb.joram.shared.admin.GetSubscriptionMessageIds;import org.objectweb.joram.shared.admin.GetSubscriptionMessageIdsRep;import org.objectweb.joram.shared.admin.GetSubscriptionMessage;import org.objectweb.joram.shared.admin.GetSubscriptionMessageRep;import org.objectweb.joram.shared.admin.DeleteSubscriptionMessage;import org.objectweb.joram.shared.admin.GetSubscription;import org.objectweb.joram.shared.admin.GetSubscriptionRep;import org.objectweb.joram.shared.admin.ClearSubscription;import org.objectweb.joram.shared.client.*;import org.objectweb.joram.shared.excepts.*;import org.objectweb.joram.shared.messages.Message;import org.objectweb.joram.shared.messages.MessageSoftRef;import javax.management.openmbean.CompositeDataSupport;/** * The <code>ProxyImpl</code> class implements the MOM proxy behaviour, * basically forwarding client requests to MOM destinations and MOM * destinations replies to clients. */ public class ProxyImpl implements java.io.Serializable, ProxyImplMBean { public static Logger logger = Debug.getLogger(ProxyImpl.class.getName()); /** * Identifier of this proxy dead message queue, <code>null</code> for DMQ * not set. */ private AgentId dmqId = null; /** * Threshold value, 0 or negative for no threshold, <code>null</code> for * value not set. */ private Integer threshold = null; /** * Table of the proxy's <code>ClientContext</code> instances. * <p> * <b>Key:</b> context identifier<br> * <b>Value:</b> context */ private Hashtable contexts; /** * Table holding the <code>ClientSubscription</code> instances. * <p> * <b>Key:</b> subsription name<br> * <b>Value:</b> client subscription */ private Hashtable subsTable; /** * Table holding the recovered transactions branches. * <p> * <b>Key:</b> transaction identifier<br> * <b>Value:</b> <code>XACnxPrepare</code> instance */ private Hashtable recoveredTransactions; /** Counter of message arrivals from topics. */ private long arrivalsCounter = 0; /** The reference of the agent hosting the proxy. */ private ProxyAgentItf proxyAgent; /** * Table holding the <code>TopicSubscription</code> instances. * <p> * <b>Key:</b> topic identifier<br> * <b>Value:</b> topic subscription */ private transient Hashtable topicsTable; /** * Table holding the subsriptions' messages. * <p> * <b>Key:</b> message identifier<br> * <b>Value:</b> message */ private transient Hashtable messagesTable; /** * Identifier of the active context. * Value -1 means that there's no active * context. */ private transient int activeCtxId; /** Reference to the active <code>ClientContext</code> instance. */ private transient ClientContext activeCtx; /** * Constructs a <code>ProxyImpl</code> instance. */ public ProxyImpl(ProxyAgentItf proxyAgent) { contexts = new Hashtable(); subsTable = new Hashtable(); this.proxyAgent = proxyAgent; if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, this + ": created."); } /** * Returns a string representation of this user's proxy. */ public String toString() { if (proxyAgent == null) return "ProxyImpl:"; else return "ProxyImpl:" + proxyAgent.getId(); } /** * (Re)initializes the proxy. * * @exception Exception If the proxy state could not be fully retrieved, * leading to an inconsistent state. */ public void initialize(boolean firstTime) throws Exception { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "--- " + this + " (re)initializing..."); topicsTable = new Hashtable(); messagesTable = new Hashtable(); setActiveCtxId(-1); // Re-initializing after a crash or a server stop. // Browsing the pre-crash contexts: ClientContext activeCtx; AgentId destId; for (Enumeration ctxIds = contexts.keys(); ctxIds.hasMoreElements();) { activeCtx = (ClientContext) contexts.remove(ctxIds.nextElement()); // Denying the non acknowledged messages: for (Enumeration queueIds = activeCtx.getDeliveringQueues(); queueIds.hasMoreElements();) { destId = (AgentId) queueIds.nextElement(); proxyAgent.sendNot(destId, new DenyRequest(activeCtx.getId())); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "Denies messages on queue " + destId.toString()); } // Saving the prepared transactions. Enumeration xids = activeCtx.getTxIds(); Xid xid; XACnxPrepare recoveredPrepare; XACnxPrepare prepare; while (xids.hasMoreElements()) { if (recoveredTransactions == null) recoveredTransactions = new Hashtable(); xid = (Xid) xids.nextElement(); recoveredPrepare = (XACnxPrepare) recoveredTransactions.get(xid); prepare = activeCtx.getTxPrepare(xid); if (recoveredPrepare == null) recoveredTransactions.put(xid, prepare); else { recoveredPrepare.getSendings().addAll(prepare.getSendings()); recoveredPrepare.getAcks().addAll(prepare.getAcks()); } } // Deleting the temporary destinations: for (Enumeration tempDests = activeCtx.getTempDestinations(); tempDests.hasMoreElements();) { destId = (AgentId) tempDests.nextElement(); deleteTemporaryDestination(destId); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "Deletes temporary destination " + destId.toString()); } } // Retrieving the subscriptions' messages. Vector messages = MessagePersistenceModule.loadAll(getStringId()); if (subsTable.isEmpty()) { // it is possible because we always save MessageSoftRef // so we must delete all message. MessagePersistenceModule.deleteAll(getStringId()); } // Browsing the pre-crash subscriptions: String subName; ClientSubscription cSub; Vector topics = new Vector(); TopicSubscription tSub; for (Enumeration subNames = subsTable.keys(); subNames.hasMoreElements();) { subName = (String) subNames.nextElement(); cSub = (ClientSubscription) subsTable.get(subName); destId = cSub.getTopicId(); if (! topics.contains(destId)) topics.add(destId); // Deleting the non durable subscriptions. if (! cSub.getDurable()) subsTable.remove(subName); // Reinitializing the durable ones. else { cSub.setProxyAgent(proxyAgent); cSub.reinitialize(getStringId(), messagesTable, messages, true); tSub = (TopicSubscription) topicsTable.get(destId); if (tSub == null) { tSub = new TopicSubscription(); topicsTable.put(destId, tSub); } tSub.putSubscription(subName, cSub.getSelector()); } } // Browsing the topics and updating their subscriptions. for (Enumeration topicIds = topics.elements(); topicIds.hasMoreElements();) updateSubscriptionToTopic((AgentId) topicIds.nextElement(), -1, -1); } private void setActiveCtxId(int activeCtxId) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "ProxyImpl.setActiveCtxId(" + activeCtxId + ')'); this.activeCtxId = activeCtxId; } /** * Method processing clients requests. * <p> * Some of the client requests are directly forwarded, some others are * sent to the proxy so that their processing occurs in a transaction. * <p> * A <code>MomExceptionReply</code> wrapping a <tt>DestinationException</tt> * might be sent back if a target destination can't be identified. */ public void reactToClientRequest(int key, AbstractJmsRequest request) { try { if (logger.isLoggable(BasicLevel.DEBUG)) { logger.log(BasicLevel.DEBUG, "--- " + this + " got " + request.getClass().getName() + " with id: " + request.getRequestId() + " through activeCtx: " + key); } if (request instanceof ProducerMessages) reactToClientRequest(key, (ProducerMessages) request); else if (request instanceof ConsumerReceiveRequest) reactToClientRequest(key, (ConsumerReceiveRequest) request); else if (request instanceof ConsumerSetListRequest) reactToClientRequest(key, (ConsumerSetListRequest) request); else if (request instanceof QBrowseRequest) reactToClientRequest(key, (QBrowseRequest) request); else if (request instanceof JmsRequestGroup) reactToClientRequest(key, (JmsRequestGroup) request); else { doReact(key, request); } } // Catching an exception due to an invalid agent identifier to // forward the request to: catch (IllegalArgumentException iE) { DestinationException dE = new DestinationException("Proxy could not forward the request to" + " incorrectly identified destination: " + iE); doReply(key, new MomExceptionReply(request.getRequestId(), dE)); } } /** * Forwards the messages sent by the client in a * <code>ProducerMessages</code> request as a <code>ClientMessages</code> * MOM request directly to a destination, and acknowledges them by sending * a <code>ServerReply</code> back. */ private void reactToClientRequest(int key, ProducerMessages req) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "ProxyImpl.reactToClientRequest(" + key + ',' + req + ')'); AgentId destId = AgentId.fromString(req.getTarget()); ClientMessages not = new ClientMessages( key, req.getRequestId(), req.getMessages()); setDmq(not); if (destId.getTo() == proxyAgent.getId().getTo()) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, " -> local sending"); not.setPersistent(false); if (req.getAsyncSend()) { not.setAsyncSend(true); } } else { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, " -> remote sending"); if (!req.getAsyncSend()) { proxyAgent.sendNot(proxyAgent.getId(), new SendReplyNot(key, req .getRequestId())); } } proxyAgent.sendNot(destId, not); } private void setDmq(ClientMessages not) { // Setting the producer's DMQ identifier field: if (dmqId != null) { not.setDMQId(dmqId); } else { not.setDMQId(DeadMQueueImpl.getId()); } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -