📄 connection.java
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies * Copyright (C) 1996 - 2000 Dyade * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA. * * Initial developer(s): Frederic Maistre (INRIA) * Contributor(s): ScalAgent Distributed Technologies */package org.objectweb.joram.client.jms;import java.util.Vector;import javax.jms.IllegalStateException;import javax.jms.InvalidDestinationException;import javax.jms.InvalidSelectorException;import javax.jms.JMSException;import javax.jms.JMSSecurityException;import org.objectweb.joram.client.jms.connection.RequestChannel;import org.objectweb.joram.client.jms.connection.RequestMultiplexer;import org.objectweb.joram.client.jms.connection.Requestor;import org.objectweb.joram.shared.client.AbstractJmsReply;import org.objectweb.joram.shared.client.AbstractJmsRequest;import org.objectweb.joram.shared.client.CnxCloseRequest;import org.objectweb.joram.shared.client.CnxConnectReply;import org.objectweb.joram.shared.client.CnxConnectRequest;import org.objectweb.joram.shared.client.CnxStartRequest;import org.objectweb.joram.shared.client.CnxStopRequest;import org.objectweb.joram.shared.client.ConsumerSubRequest;import org.objectweb.util.monolog.api.BasicLevel;import org.objectweb.util.monolog.api.Logger;import fr.dyade.aaa.util.Debug;/** * Implements the <code>javax.jms.Connection</code> interface. */public class Connection implements javax.jms.Connection { public static Logger logger = Debug.getLogger(Connection.class.getName()); /** * Status of the connection. */ private static class Status { /** * Status of the connection when it is stopped. * This is the initial status. */ public static final int STOP = 0; /** * Status of the connection when it is started. */ public static final int START = 1; /** * Status of the conenction when it is closed. */ public static final int CLOSE = 2; private static final String[] names = { "STOP", "START", "CLOSE"}; public static String toString(int status) { return names[status]; } } /** * The request multiplexer used to communicate * with the user proxy. */ private RequestMultiplexer mtpx; /** * The requestor used to communicate * with the user proxy. */ private Requestor requestor; /** Connection meta data. */ private ConnectionMetaData metaData = null; /** Sessions counter. */ private int sessionsC = 0; /** Messages counter. */ private int messagesC = 0; /** Subscriptions counter. */ private int subsC = 0; /** Client's agent proxy identifier. */ String proxyId; /** Connection key. */ private int key; /** The factory's parameters. */ private FactoryParameters factoryParameters; /** * Status of the connection. * STOP, START, CLOSE */ private int status; /** Vector of the connection's sessions. */ private Vector sessions; /** Vector of the connection's consumers. */ private Vector cconsumers; /** * Used to synchronize the method close() */ private Closer closer; /** * Creates a <code>Connection</code> instance. * * @param factoryParameters The factory parameters. * @param connectionImpl The actual connection to wrap. * * @exception JMSSecurityException If the user identification is incorrect. * @exception IllegalStateException If the server is not listening. */ public Connection(FactoryParameters factoryParameters, RequestChannel requestChannel) throws JMSException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "Connection.<init>(" + factoryParameters + ',' + requestChannel + ')'); this.factoryParameters = factoryParameters; mtpx = new RequestMultiplexer(this, requestChannel, factoryParameters.cnxPendingTimer); if (factoryParameters.multiThreadSync) { mtpx.setMultiThreadSync(factoryParameters.multiThreadSyncDelay); } requestor = new Requestor(mtpx); sessions = new Vector(); cconsumers = new Vector(); closer = new Closer(); setStatus(Status.STOP); // Requesting the connection key and proxy identifier: CnxConnectRequest req = new CnxConnectRequest(); CnxConnectReply rep = (CnxConnectReply) requestor.request(req); proxyId = rep.getProxyId(); key = rep.getCnxKey(); mtpx.setDemultiplexerDaemonName(toString()); } private String newTrace(String trace) { return "Connection[" + proxyId + ':' + key + ']' + trace; } private void setStatus(int status) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, newTrace(".setStatus(" + Status.toString(status) + ')')); this.status = status; } boolean isStopped() { return (status == Status.STOP); } /** String image of the connection. */ public String toString() { return "Cnx:" + proxyId + ':' + key; } final long getTxPendingTimer() { return factoryParameters.txPendingTimer; } final boolean getAsyncSend() { return factoryParameters.asyncSend; } final int getQueueMessageReadMax() { return factoryParameters.queueMessageReadMax; } final int getTopicAckBufferMax() { return factoryParameters.topicAckBufferMax; } final int getTopicActivationThreshold() { return factoryParameters.topicActivationThreshold; } final int getTopicPassivationThreshold() { return factoryParameters.topicPassivationThreshold; } /** * Specializes this Object method; returns <code>true</code> if the * parameter is a <code>Connection</code> instance sharing the same * proxy identifier and connection key. */ public boolean equals(Object obj) { return (obj instanceof Connection) && toString().equals(obj.toString()); } /** * Checks if the connecion is closed. If true * raises an IllegalStateException. */ final protected synchronized void checkClosed() throws IllegalStateException { if (status == Status.CLOSE || mtpx.isClosed()) throw new IllegalStateException( "Forbidden call on a closed connection."); } /** * API method. * * @exception IllegalStateException If the connection is closed. * @exception InvalidSelectorException If the selector syntax is wrong. * @exception InvalidDestinationException If the target destination does * not exist. * @exception JMSException If the method fails for any other reason. */ public synchronized javax.jms.ConnectionConsumer createConnectionConsumer( javax.jms.Destination dest, String selector, javax.jms.ServerSessionPool sessionPool, int maxMessages) throws JMSException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, newTrace(".createConnectionConsumer(" + dest + ',' + selector + ',' + sessionPool + ',' + maxMessages + ')')); checkClosed(); return createConnectionConsumer( dest, null, selector, sessionPool, maxMessages); } /** * API method. * * @exception IllegalStateException If the connection is closed. * @exception InvalidSelectorException If the selector syntax is wrong. * @exception InvalidDestinationException If the target topic does * not exist. * @exception JMSException If the method fails for any other reason. */ public javax.jms.ConnectionConsumer createDurableConnectionConsumer(javax.jms.Topic topic, String subName, String selector, javax.jms.ServerSessionPool sessPool, int maxMessages) throws JMSException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, newTrace(".createDurableConnectionConsumer(" + topic + ',' + subName + ',' + selector + ',' + sessPool + ',' + maxMessages + ')')); checkClosed(); if (subName == null) throw new JMSException("Invalid subscription name: " + subName); return createConnectionConsumer( (Destination) topic, subName, selector, sessPool, maxMessages); } private synchronized javax.jms.ConnectionConsumer createConnectionConsumer( javax.jms.Destination dest, String subName, String selector, javax.jms.ServerSessionPool sessionPool, int maxMessages) throws JMSException { checkClosed(); try { org.objectweb.joram.shared.selectors.Selector.checks(selector); } catch (org.objectweb.joram.shared.excepts.SelectorException sE) { throw new InvalidSelectorException("Invalid selector syntax: " + sE); } if (sessionPool == null) throw new JMSException("Invalid ServerSessionPool parameter: " + sessionPool); if (maxMessages <= 0) throw new JMSException("Invalid maxMessages parameter: " + maxMessages); boolean queueMode; String targetName; boolean durable; if (dest instanceof Queue) { queueMode = true; targetName = ((Destination) dest).getName(); durable = false; } else { queueMode = false; if (subName == null) { targetName = nextSubName(); durable = false; } else { targetName = subName; durable = true; } requestor.request(new ConsumerSubRequest(((Destination) dest).getName(), targetName, selector, false, durable)); } MultiSessionConsumer msc = new MultiSessionConsumer( queueMode, durable, selector, targetName, sessionPool, factoryParameters.queueMessageReadMax, factoryParameters.topicActivationThreshold, factoryParameters.topicPassivationThreshold, factoryParameters.topicAckBufferMax, mtpx, this, maxMessages); msc.start(); cconsumers.addElement(msc);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -