⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 connection.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies * Copyright (C) 1996 - 2000 Dyade * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. *  * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * Lesser General Public License for more details. *  * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 * USA. * * Initial developer(s): Frederic Maistre (INRIA) * Contributor(s): ScalAgent Distributed Technologies */package org.objectweb.joram.client.jms;import java.util.Vector;import javax.jms.IllegalStateException;import javax.jms.InvalidDestinationException;import javax.jms.InvalidSelectorException;import javax.jms.JMSException;import javax.jms.JMSSecurityException;import org.objectweb.joram.client.jms.connection.RequestChannel;import org.objectweb.joram.client.jms.connection.RequestMultiplexer;import org.objectweb.joram.client.jms.connection.Requestor;import org.objectweb.joram.shared.client.AbstractJmsReply;import org.objectweb.joram.shared.client.AbstractJmsRequest;import org.objectweb.joram.shared.client.CnxCloseRequest;import org.objectweb.joram.shared.client.CnxConnectReply;import org.objectweb.joram.shared.client.CnxConnectRequest;import org.objectweb.joram.shared.client.CnxStartRequest;import org.objectweb.joram.shared.client.CnxStopRequest;import org.objectweb.joram.shared.client.ConsumerSubRequest;import org.objectweb.util.monolog.api.BasicLevel;import org.objectweb.util.monolog.api.Logger;import fr.dyade.aaa.util.Debug;/** * Implements the <code>javax.jms.Connection</code> interface. */public class Connection implements javax.jms.Connection {    public static Logger logger =     Debug.getLogger(Connection.class.getName());    /**   * Status of the connection.   */  private static class Status {    /**     * Status of the connection when it is stopped.     * This is the initial status.     */    public static final int STOP = 0;    /**     * Status of the connection when it is started.     */    public static final int START = 1;    /**     * Status of the conenction when it is closed.     */    public static final int CLOSE = 2;    private static final String[] names = {      "STOP", "START", "CLOSE"};    public static String toString(int status) {      return names[status];    }  }  /**   * The request multiplexer used to communicate   * with the user proxy.   */  private RequestMultiplexer mtpx;  /**   * The requestor used to communicate   * with the user proxy.   */  private Requestor requestor;    /** Connection meta data. */  private ConnectionMetaData metaData = null;  /** Sessions counter. */  private int sessionsC = 0;  /** Messages counter. */  private int messagesC = 0;  /** Subscriptions counter. */  private int subsC = 0;  /** Client's agent proxy identifier. */  String proxyId;  /** Connection key. */  private int key;  /** The factory's parameters. */  private FactoryParameters factoryParameters;  /**   * Status of the connection.   * STOP, START, CLOSE   */  private int status;  /** Vector of the connection's sessions. */  private Vector sessions;  /** Vector of the connection's consumers. */  private Vector cconsumers;    /**   * Used to synchronize the method close()   */  private Closer closer;  /**   * Creates a <code>Connection</code> instance.   *   * @param factoryParameters  The factory parameters.   * @param connectionImpl  The actual connection to wrap.   *   * @exception JMSSecurityException  If the user identification is incorrect.   * @exception IllegalStateException  If the server is not listening.   */  public Connection(FactoryParameters factoryParameters,                    RequestChannel requestChannel)     throws JMSException {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG,         "Connection.<init>(" +         factoryParameters + ',' + requestChannel + ')');    this.factoryParameters = factoryParameters;    mtpx = new RequestMultiplexer(this,                                  requestChannel,                                  factoryParameters.cnxPendingTimer);    if (factoryParameters.multiThreadSync) {      mtpx.setMultiThreadSync(factoryParameters.multiThreadSyncDelay);    }        requestor = new Requestor(mtpx);    sessions = new Vector();    cconsumers = new Vector();        closer = new Closer();        setStatus(Status.STOP);    // Requesting the connection key and proxy identifier:    CnxConnectRequest req = new CnxConnectRequest();    CnxConnectReply rep =       (CnxConnectReply) requestor.request(req);    proxyId = rep.getProxyId();    key = rep.getCnxKey();        mtpx.setDemultiplexerDaemonName(toString());  }  private String newTrace(String trace) {    return "Connection[" + proxyId + ':' + key + ']' + trace;  }  private void setStatus(int status) {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG,         newTrace(".setStatus(" +                  Status.toString(status) + ')'));    this.status = status;  }  boolean isStopped() {    return (status == Status.STOP);  }  /** String image of the connection. */  public String toString() {    return "Cnx:" + proxyId + ':' + key;  }  final long getTxPendingTimer() {    return factoryParameters.txPendingTimer;  }    final boolean getAsyncSend() {    return factoryParameters.asyncSend;  }    final int getQueueMessageReadMax() {    return factoryParameters.queueMessageReadMax;  }    final int getTopicAckBufferMax() {    return factoryParameters.topicAckBufferMax;  }    final int getTopicActivationThreshold() {    return factoryParameters.topicActivationThreshold;  }  final int getTopicPassivationThreshold() {    return factoryParameters.topicPassivationThreshold;  }    /**   * Specializes this Object method; returns <code>true</code> if the   * parameter is a <code>Connection</code> instance sharing the same   * proxy identifier and connection key.   */  public boolean equals(Object obj) {    return (obj instanceof Connection)      && toString().equals(obj.toString());  }    /**   * Checks if the connecion is closed. If true   * raises an IllegalStateException.   */  final protected synchronized void checkClosed()     throws IllegalStateException {    if (status == Status.CLOSE ||        mtpx.isClosed())       throw new IllegalStateException(        "Forbidden call on a closed connection.");  }  /**   * API method.   *    * @exception IllegalStateException  If the connection is closed.   * @exception InvalidSelectorException  If the selector syntax is wrong.   * @exception InvalidDestinationException  If the target destination does   *              not exist.   * @exception JMSException  If the method fails for any other reason.   */  public synchronized javax.jms.ConnectionConsumer      createConnectionConsumer(        javax.jms.Destination dest,         String selector,        javax.jms.ServerSessionPool sessionPool,        int maxMessages)     throws JMSException {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG,         newTrace(".createConnectionConsumer(" +                  dest + ',' + selector + ',' +                  sessionPool + ',' + maxMessages + ')'));    checkClosed();    return createConnectionConsumer(        dest, null, selector, sessionPool, maxMessages);  }  /**   * API method.   *    * @exception IllegalStateException  If the connection is closed.   * @exception InvalidSelectorException  If the selector syntax is wrong.   * @exception InvalidDestinationException  If the target topic does   *              not exist.   * @exception JMSException  If the method fails for any other reason.   */  public javax.jms.ConnectionConsumer      createDurableConnectionConsumer(javax.jms.Topic topic,                                       String subName,                                      String selector,                                      javax.jms.ServerSessionPool sessPool,                                      int maxMessages)     throws JMSException {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG,         newTrace(".createDurableConnectionConsumer(" +                  topic + ',' + subName + ',' + selector + ',' +                  sessPool + ',' + maxMessages + ')'));    checkClosed();    if (subName == null)       throw new JMSException("Invalid subscription name: " + subName);    return createConnectionConsumer(        (Destination) topic, subName, selector, sessPool, maxMessages);  }    private synchronized javax.jms.ConnectionConsumer    createConnectionConsumer(        javax.jms.Destination dest,         String subName,        String selector,        javax.jms.ServerSessionPool sessionPool,        int maxMessages)     throws JMSException {    checkClosed();        try {      org.objectweb.joram.shared.selectors.Selector.checks(selector);    }    catch (org.objectweb.joram.shared.excepts.SelectorException sE) {      throw new InvalidSelectorException("Invalid selector syntax: " + sE);    }    if (sessionPool == null)      throw new JMSException("Invalid ServerSessionPool parameter: "                             + sessionPool);    if (maxMessages <= 0)      throw new JMSException("Invalid maxMessages parameter: " + maxMessages);        boolean queueMode;    String targetName;    boolean durable;        if (dest instanceof Queue) {      queueMode = true;      targetName = ((Destination) dest).getName();      durable = false;    } else {      queueMode = false;      if (subName == null) {        targetName = nextSubName();        durable = false;      } else {        targetName = subName;        durable = true;      }      requestor.request(new ConsumerSubRequest(((Destination) dest).getName(),          targetName, selector, false, durable));    }        MultiSessionConsumer msc =      new MultiSessionConsumer(          queueMode,          durable,          selector,          targetName,          sessionPool,          factoryParameters.queueMessageReadMax,          factoryParameters.topicActivationThreshold,          factoryParameters.topicPassivationThreshold,          factoryParameters.topicAckBufferMax,          mtpx,          this,          maxMessages);        msc.start();        cconsumers.addElement(msc);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -