📄 connection.java
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2001 - ScalAgent Distributed Technologies * Copyright (C) 1996 - Dyade * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA. * * Initial developer(s): Frederic Maistre (INRIA) * Contributor(s): Nicolas Tachker (ScalAgent) */package com.scalagent.kjoram;import com.scalagent.kjoram.excepts.IllegalStateException;import com.scalagent.kjoram.excepts.*;import com.scalagent.kjoram.jms.*;import com.scalagent.kjoram.*;import com.scalagent.kjoram.util.StoppedQueueException;import java.util.*;public class Connection{ /** Actual connection linking the client and the JORAM platform. */ private ConnectionItf connectionImpl; /** Client's agent proxy identifier. */ private String proxyId; /** Connection key. */ private int key; /** Connection meta data. */ private ConnectionMetaData metaData = null; /** The connection's exception listener, if any. */ private ExceptionListener excListener = null; /** Requests counter. */ private int requestsC = 0; /** Sessions counter. */ private int sessionsC = 0; /** Messages counter. */ private int messagesC = 0; /** Subscriptions counter. */ private int subsC = 0; /** Timer for closing pending sessions. */ private com.scalagent.kjoram.util.Timer sessionsTimer = null; /** The factory's parameters. */ FactoryParameters factoryParameters; /** Driver listening to asynchronous deliveries. */ Driver driver; /** <code>true</code> if the connection is started. */ boolean started = false; /** <code>true</code> if the connection is closing. */ boolean closing = false; /** <code>true</code> if the connection is closed. */ boolean closed = false; /** Vector of the connection's sessions. */ Vector sessions; /** Vector of the connection's consumers. */ Vector cconsumers; /** * Table holding requests related objects, either locks of synchronous * requests, or asynchronous consumers. */ Hashtable requestsTable; /** * Table holding the server replies to synchronous requests. */ Hashtable repliesTable; String name = null; /** * Creates a <code>Connection</code> instance. * * @param factoryParameters The factory parameters. * @param connectionImpl The actual connection to wrap. * * @exception JMSSecurityException If the user identification is incorrect. * @exception IllegalStateException If the server is not listening. */ public Connection(FactoryParameters factoryParameters, ConnectionItf connectionImpl) throws JMSException { try { this.factoryParameters = factoryParameters; sessions = new Vector(); requestsTable = new Hashtable(); repliesTable = new Hashtable(); this.connectionImpl = connectionImpl; name = connectionImpl.getUserName(); // Creating and starting the connection's driver: driver = connectionImpl.createDriver(this); driver.start(); // Requesting the connection key and proxy identifier: CnxConnectRequest req = new CnxConnectRequest(); CnxConnectReply rep = (CnxConnectReply) syncRequest(req); proxyId = rep.getProxyId(); key = rep.getCnxKey(); // Transactions will be scheduled; creating a timer. if (factoryParameters.txPendingTimer != 0) sessionsTimer = new com.scalagent.kjoram.util.Timer(); if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, this + ": opened."); } // Connection could not be established: catch (JMSException jE) { JoramTracing.log(JoramTracing.ERROR, jE); throw jE; } } public String getUserName() { return name; } /** String image of the connection. */ public String toString() { return "Cnx:" + proxyId + "-" + key; } /** * Specializes this Object method; returns <code>true</code> if the * parameter is a <code>Connection</code> instance sharing the same * proxy identifier and connection key. */ public boolean equals(Object obj) { return (obj instanceof Connection) && toString().equals(obj.toString()); } /** * API method. * * @exception IllegalStateException If the connection is closed. * @exception InvalidSelectorException If the selector syntax is wrong. * @exception InvalidDestinationException If the target destination does * not exist. * @exception JMSException If the method fails for any other reason. */ public ConnectionConsumer createConnectionConsumer(Destination dest, String selector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed" + " connection."); return new ConnectionConsumer(this, (Destination) dest, selector, sessionPool, maxMessages); } /** * API method. * * @exception IllegalStateException If the connection is closed. * @exception InvalidSelectorException If the selector syntax is wrong. * @exception InvalidDestinationException If the target topic does * not exist. * @exception JMSException If the method fails for any other reason. */ public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subName, String selector, ServerSessionPool sessPool, int maxMessages) throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed" + " connection."); return new ConnectionConsumer(this, (Topic) topic, subName, selector, sessPool, maxMessages); } /** * API method. * * @exception IllegalStateException If the connection is closed. * @exception JMSException In case of an invalid acknowledge mode. */ public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed" + " connection."); return new Session(this, transacted, acknowledgeMode); } /** * API method. * * @exception IllegalStateException If the connection is closed. */ public void setExceptionListener(ExceptionListener listener) throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed" + " connection."); this.excListener = listener; } /** * API method. * * @exception IllegalStateException If the connection is closed. */ public ExceptionListener getExceptionListener() throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed" + " connection."); return excListener; } /** * Passes an asynchronous exception to the exception listener, if any. * * @param jE The asynchronous JMSException. */ synchronized void onException(JMSException jE) { if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.WARN, this + ": " + jE); if (excListener != null) excListener.onException(jE); } /** * API method. * * @exception IllegalStateException Systematically thrown. */ public void setClientID(String clientID) throws JMSException { throw new IllegalStateException("ClientID is already set by the" + " provider."); } /** * API method. * * @exception IllegalStateException If the connection is closed. */ public String getClientID() throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed" + " connection."); return proxyId; } /** * API method. * * @exception IllegalStateException If the connection is closed. */ public ConnectionMetaData getMetaData() throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed" + " connection."); if (metaData == null) metaData = new ConnectionMetaData(); return metaData; } /** * API method for starting the connection. * * @exception IllegalStateException If the connection is closed or broken. */ public void start() throws JMSException { // If closed, throwing an exception: if (closed) throw new IllegalStateException("Forbidden call on a closed" + " connection."); // Ignoring the call if the connection is started: if (started) return; if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, "--- " + this + ": starting..."); // Starting the sessions: Session session; for (int i = 0; i < sessions.size(); i++) { session = (Session) sessions.elementAt(i); session.repliesIn.start(); session.start(); } // Sending a start request to the server: asyncRequest(new CnxStartRequest()); started = true; if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, this + ": started."); } /** * API method for stopping the connection; even if the connection appears * to be broken, stops the sessions. * * @exception IllegalStateException If the connection is closed or broken. */ public void stop() throws JMSException { IllegalStateException isE = null; // If closed, throwing an exception: if (closed) throw new IllegalStateException("Forbidden call on a closed" + " connection."); // Ignoring the call if the connection is already stopped: if (! started) return; if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, this + ": stopping..."); // Sending a synchronous "stop" request to the server: try { syncRequest(new CnxStopRequest());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -