⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 proxyimpl.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies * Copyright (C) 2004 France Telecom R&D * Copyright (C) 1996 - 2004 Bull SA * Copyright (C) 1996 - 2000 Dyade * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. *  * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * Lesser General Public License for more details. *  * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 * USA. * * Initial developer(s): Frederic Maistre (INRIA) * Contributor(s): ScalAgent Distributed Technologies */package org.objectweb.joram.mom.proxies;import java.io.IOException;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.util.Enumeration;import java.util.Hashtable;import java.util.Vector;import org.objectweb.util.monolog.api.BasicLevel;import org.objectweb.util.monolog.api.Logger;import fr.dyade.aaa.agent.AgentId;import fr.dyade.aaa.agent.DeleteNot;import fr.dyade.aaa.agent.Notification;import fr.dyade.aaa.agent.UnknownAgent;import fr.dyade.aaa.agent.UnknownNotificationException;import fr.dyade.aaa.agent.Channel;import fr.dyade.aaa.util.Debug;import org.objectweb.joram.mom.MomTracing;import org.objectweb.joram.mom.dest.*;import org.objectweb.joram.mom.notifications.*;import org.objectweb.joram.mom.util.MessagePersistenceModule;import org.objectweb.joram.shared.admin.GetSubscriptions;import org.objectweb.joram.shared.admin.GetSubscriptionsRep;import org.objectweb.joram.shared.admin.GetSubscriptionMessageIds;import org.objectweb.joram.shared.admin.GetSubscriptionMessageIdsRep;import org.objectweb.joram.shared.admin.GetSubscriptionMessage;import org.objectweb.joram.shared.admin.GetSubscriptionMessageRep;import org.objectweb.joram.shared.admin.DeleteSubscriptionMessage;import org.objectweb.joram.shared.admin.GetSubscription;import org.objectweb.joram.shared.admin.GetSubscriptionRep;import org.objectweb.joram.shared.admin.ClearSubscription;import org.objectweb.joram.shared.client.*;import org.objectweb.joram.shared.excepts.*;import org.objectweb.joram.shared.messages.Message;import org.objectweb.joram.shared.messages.MessageSoftRef;import javax.management.openmbean.CompositeDataSupport;/** * The <code>ProxyImpl</code> class implements the MOM proxy behaviour, * basically forwarding client requests to MOM destinations and MOM * destinations replies to clients. */ public class ProxyImpl implements java.io.Serializable, ProxyImplMBean {    public static Logger logger =     Debug.getLogger(ProxyImpl.class.getName());  /**   * Identifier of this proxy dead message queue, <code>null</code> for DMQ   * not set.   */  private AgentId dmqId = null;  /**   * Threshold value, 0 or negative for no threshold, <code>null</code> for   * value not set.   */  private Integer threshold = null;   /**   * Table of the proxy's <code>ClientContext</code> instances.   * <p>   * <b>Key:</b> context identifier<br>   * <b>Value:</b> context   */  private Hashtable contexts;  /**   * Table holding the <code>ClientSubscription</code> instances.   * <p>   * <b>Key:</b> subsription name<br>   * <b>Value:</b> client subscription   */  private Hashtable subsTable;  /**   * Table holding the recovered transactions branches.   * <p>   * <b>Key:</b> transaction identifier<br>   * <b>Value:</b> <code>XACnxPrepare</code> instance   */  private Hashtable recoveredTransactions;  /** Counter of message arrivals from topics. */   private long arrivalsCounter = 0;   /** The reference of the agent hosting the proxy. */  private ProxyAgentItf proxyAgent;  /**    * Table holding the <code>TopicSubscription</code> instances.   * <p>   * <b>Key:</b> topic identifier<br>   * <b>Value:</b> topic subscription   */  private transient Hashtable topicsTable;  /**   * Table holding the subsriptions' messages.   * <p>   * <b>Key:</b> message identifier<br>   * <b>Value:</b> message   */  private transient Hashtable messagesTable;  /**    * Identifier of the active context.    * Value -1 means that there's no active   * context.   */  private transient int activeCtxId;  /** Reference to the active <code>ClientContext</code> instance. */  private transient ClientContext activeCtx;    /**   * Constructs a <code>ProxyImpl</code> instance.   */  public ProxyImpl(ProxyAgentItf proxyAgent) {    contexts = new Hashtable();    subsTable = new Hashtable();    this.proxyAgent = proxyAgent;    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(BasicLevel.DEBUG, this + ": created.");  }  /**   * Returns a string representation of this user's proxy.   */  public String toString() {    if (proxyAgent == null)      return "ProxyImpl:";    else      return "ProxyImpl:" + proxyAgent.getId();  }  /**   * (Re)initializes the proxy.   *   * @exception Exception  If the proxy state could not be fully retrieved,   *              leading to an inconsistent state.   */  public void initialize(boolean firstTime)    throws Exception  {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(BasicLevel.DEBUG,                              "--- " + this + " (re)initializing...");     topicsTable = new Hashtable();    messagesTable = new Hashtable();    setActiveCtxId(-1);        // Re-initializing after a crash or a server stop.    // Browsing the pre-crash contexts:    ClientContext activeCtx;    AgentId destId;    for (Enumeration ctxIds = contexts.keys(); ctxIds.hasMoreElements();) {      activeCtx = (ClientContext) contexts.remove(ctxIds.nextElement());      // Denying the non acknowledged messages:      for (Enumeration queueIds = activeCtx.getDeliveringQueues();           queueIds.hasMoreElements();) {        destId = (AgentId) queueIds.nextElement();        proxyAgent.sendNot(destId, new DenyRequest(activeCtx.getId()));        if (logger.isLoggable(BasicLevel.DEBUG))          logger.log(BasicLevel.DEBUG,                                  "Denies messages on queue "                                  + destId.toString());      }      // Saving the prepared transactions.      Enumeration xids = activeCtx.getTxIds();      Xid xid;      XACnxPrepare recoveredPrepare;      XACnxPrepare prepare;      while (xids.hasMoreElements()) {        if (recoveredTransactions == null)          recoveredTransactions = new Hashtable();        xid = (Xid) xids.nextElement();        recoveredPrepare = (XACnxPrepare) recoveredTransactions.get(xid);        prepare = activeCtx.getTxPrepare(xid);        if (recoveredPrepare == null)          recoveredTransactions.put(xid, prepare);        else {          recoveredPrepare.getSendings().addAll(prepare.getSendings());          recoveredPrepare.getAcks().addAll(prepare.getAcks());        }      }      // Deleting the temporary destinations:      for (Enumeration tempDests = activeCtx.getTempDestinations();           tempDests.hasMoreElements();) {        destId = (AgentId) tempDests.nextElement();        deleteTemporaryDestination(destId);          if (logger.isLoggable(BasicLevel.DEBUG))          logger.log(BasicLevel.DEBUG,                                  "Deletes temporary destination "                                  + destId.toString());      }    }    // Retrieving the subscriptions' messages.    Vector messages = MessagePersistenceModule.loadAll(getStringId());    if (subsTable.isEmpty()) {      // it is possible because we always save MessageSoftRef      // so we must delete all message.      MessagePersistenceModule.deleteAll(getStringId());    }        // Browsing the pre-crash subscriptions:    String subName;    ClientSubscription cSub;    Vector topics = new Vector();    TopicSubscription tSub;    for (Enumeration subNames = subsTable.keys();         subNames.hasMoreElements();) {      subName = (String) subNames.nextElement();      cSub = (ClientSubscription) subsTable.get(subName);      destId = cSub.getTopicId();      if (! topics.contains(destId))        topics.add(destId);      // Deleting the non durable subscriptions.      if (! cSub.getDurable())        subsTable.remove(subName);      // Reinitializing the durable ones.      else {        cSub.setProxyAgent(proxyAgent);        cSub.reinitialize(getStringId(),                           messagesTable,                           messages,                          true);        tSub = (TopicSubscription) topicsTable.get(destId);        if (tSub == null) {          tSub = new TopicSubscription();          topicsTable.put(destId, tSub);        }        tSub.putSubscription(subName, cSub.getSelector());      }    }    // Browsing the topics and updating their subscriptions.    for (Enumeration topicIds = topics.elements();         topicIds.hasMoreElements();)      updateSubscriptionToTopic((AgentId) topicIds.nextElement(), -1, -1);  }  private void setActiveCtxId(int activeCtxId) {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG,         "ProxyImpl.setActiveCtxId(" + activeCtxId + ')');    this.activeCtxId = activeCtxId;  }  /**   * Method processing clients requests.   * <p>   * Some of the client requests are directly forwarded, some others are   * sent to the proxy so that their processing occurs in a transaction.   * <p>   * A <code>MomExceptionReply</code> wrapping a <tt>DestinationException</tt>   * might be sent back if a target destination can't be identified.   */  public void reactToClientRequest(int key, AbstractJmsRequest request)  {    try {      if (logger.isLoggable(BasicLevel.DEBUG)) {        logger.log(BasicLevel.DEBUG,                                "--- " + this                                + " got " + request.getClass().getName()                                + " with id: " + request.getRequestId()                                + " through activeCtx: " + key);      }      if (request instanceof ProducerMessages)        reactToClientRequest(key, (ProducerMessages) request);      else if (request instanceof ConsumerReceiveRequest)        reactToClientRequest(key, (ConsumerReceiveRequest) request);      else if (request instanceof ConsumerSetListRequest)        reactToClientRequest(key, (ConsumerSetListRequest) request);      else if (request instanceof QBrowseRequest)        reactToClientRequest(key, (QBrowseRequest) request);      else if (request instanceof JmsRequestGroup)        reactToClientRequest(key, (JmsRequestGroup) request);      else {        doReact(key, request);         }    }    // Catching an exception due to an invalid agent identifier to    // forward the request to:    catch (IllegalArgumentException iE) {      DestinationException dE =        new DestinationException("Proxy could not forward the request to"                                 + " incorrectly identified destination: "                                 + iE);      doReply(key, new MomExceptionReply(request.getRequestId(), dE));    }  }  /**   * Forwards the messages sent by the client in a   * <code>ProducerMessages</code> request as a <code>ClientMessages</code>   * MOM request directly to a destination, and acknowledges them by sending   * a <code>ServerReply</code> back.   */  private void reactToClientRequest(int key, ProducerMessages req) {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(BasicLevel.DEBUG,                              "ProxyImpl.reactToClientRequest(" +                               key + ',' + req + ')');    AgentId destId = AgentId.fromString(req.getTarget());    ClientMessages not = new ClientMessages(      key,      req.getRequestId(),      req.getMessages());    setDmq(not);            if (destId.getTo() == proxyAgent.getId().getTo()) {      if (logger.isLoggable(BasicLevel.DEBUG))        logger.log(BasicLevel.DEBUG, " -> local sending");      not.setPersistent(false);      if (req.getAsyncSend()) {        not.setAsyncSend(true);      }    } else {      if (logger.isLoggable(BasicLevel.DEBUG))        logger.log(BasicLevel.DEBUG, " -> remote sending");      if (!req.getAsyncSend()) {        proxyAgent.sendNot(proxyAgent.getId(), new SendReplyNot(key, req            .getRequestId()));      }    }        proxyAgent.sendNot(destId, not);  }    private void setDmq(ClientMessages not) {    //  Setting the producer's DMQ identifier field:     if (dmqId != null) {      not.setDMQId(dmqId);    } else {      not.setDMQId(DeadMQueueImpl.getId());    }  }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -