📄 connection.java
字号:
return msc; } /** * API method. * * @exception IllegalStateException If the connection is closed. * @exception JMSException In case of an invalid acknowledge mode. */ public synchronized javax.jms.Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, newTrace(".createSession(" + transacted + ',' + acknowledgeMode + ')')); checkClosed(); Session session = new Session( this, transacted, acknowledgeMode, mtpx); addSession(session); return session; } /** * Called here and by sub-classes. */ protected synchronized void addSession(Session session) { sessions.addElement(session); if (status == Status.START) { session.start(); } } /** * API method. * * @exception IllegalStateException If the connection is closed. */ public synchronized void setExceptionListener( javax.jms.ExceptionListener listener) throws JMSException { checkClosed(); mtpx.setExceptionListener(listener); } /** * API method. * * @exception IllegalStateException If the connection is closed. */ public javax.jms.ExceptionListener getExceptionListener() throws JMSException { checkClosed(); return mtpx.getExceptionListener(); } /** * API method. * * @exception IllegalStateException Systematically thrown. */ public void setClientID(String clientID) throws JMSException { throw new IllegalStateException("ClientID is already set by the" + " provider."); } /** * API method. * * @exception IllegalStateException If the connection is closed. */ public String getClientID() throws JMSException { checkClosed(); return proxyId; } /** * API method. * * @exception IllegalStateException If the connection is closed. */ public javax.jms.ConnectionMetaData getMetaData() throws JMSException { checkClosed(); if (metaData == null) metaData = new ConnectionMetaData(); return metaData; } /** * API method for starting the connection. * * @exception IllegalStateException If the connection is closed or broken. */ public synchronized void start() throws JMSException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, newTrace(".start()")); checkClosed(); // Ignoring the call if the connection is started: if (status == Status.START) return; if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "--- " + this + ": starting..."); // Starting the sessions: for (int i = 0; i < sessions.size(); i++) { Session session = (Session) sessions.elementAt(i); session.start(); } // Sending a start request to the server: mtpx.sendRequest(new CnxStartRequest()); setStatus(Status.START); } /** * API method for stopping the connection; even if the connection appears * to be broken, stops the sessions. * * @exception IllegalStateException If the connection is closed or broken. */ public void stop() throws JMSException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, newTrace(".stop()")); checkClosed(); synchronized (this) { if (status == Status.STOP) return; } // At this point, the server won't deliver messages anymore, // the connection just waits for the sessions to have finished their // processings. // Must go out of the synchronized block in order to enable // the message listeners to use the connection. // As a csq, the connection stop is reentrant. Several // threads can enter this method during the stopping stage. for (int i = 0; i < sessions.size(); i++) { Session session = (Session) sessions.get(i); session.stop(); } synchronized (this) { if (status == Status.STOP) return; // Sending a synchronous "stop" request to the server: requestor.request(new CnxStopRequest()); // Set the status as STOP as the following operations // (Session.stop) can't fail. setStatus(Status.STOP); } } /** * API method for closing the connection; even if the connection appears * to be broken, closes the sessions. * * @exception JMSException Actually never thrown. */ public void close() throws JMSException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, newTrace(".close()")); closer.close(); } /** * This class synchronizes the close. * Close can't be synchronized with 'this' * because the connection must be accessed * concurrently during its closure. So * we need a second lock. */ class Closer { synchronized void close() { doClose(); } } void doClose() { synchronized (this) { if (status == Status.CLOSE) { return; } } Vector sessionsToClose = (Vector)sessions.clone(); sessions.clear(); for (int i = 0; i < sessionsToClose.size(); i++) { Session session = (Session) sessionsToClose.elementAt(i); try { session.close(); } catch (JMSException exc) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "", exc); } } Vector consumersToClose = (Vector)cconsumers.clone(); cconsumers.clear(); for (int i = 0; i < consumersToClose.size(); i++) { MultiSessionConsumer consumer = (MultiSessionConsumer) consumersToClose.elementAt(i); try { consumer.close(); } catch (JMSException exc) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "", exc); } } try { CnxCloseRequest closeReq = new CnxCloseRequest(); requestor.request(closeReq); } catch (JMSException exc) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "", exc); } mtpx.close(); synchronized (this) { setStatus(Status.CLOSE); } } /** * Used by OutboundConnection in the connector layer. * When a connection is put back in a pool, * it must be cleaned up. */ public void cleanup() { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, newTrace(".cleanup()")); // Closing the sessions: // Session session; Vector sessionsToClose = (Vector)sessions.clone(); sessions.clear(); for (int i = 0; i < sessionsToClose.size(); i++) { Session session = (Session) sessionsToClose.elementAt(i); try { session.close(); } catch (JMSException exc) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "", exc); } } mtpx.cleanup(); } /** Returns a new session identifier. */ synchronized String nextSessionId() { if (sessionsC == Integer.MAX_VALUE) sessionsC = 0; sessionsC++; return "c" + key + "s" + sessionsC; } /** Returns a new message identifier. */ synchronized String nextMessageId() { if (messagesC == Integer.MAX_VALUE) messagesC = 0; messagesC++; return "ID:" + proxyId.substring(1) + "c" + key + "m" + messagesC; } /** Returns a new subscription name. */ synchronized String nextSubName() { if (subsC == Integer.MAX_VALUE) subsC = 0; subsC++; return "c" + key + "sub" + subsC; } /** * Called by Session. */ synchronized void closeSession(Session session) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, newTrace(".closeSession(" + session + ')')); sessions.removeElement(session); } /** * Called by MultiSessionConsumer. * Synchronized with run(). */ synchronized void closeConnectionConsumer( MultiSessionConsumer cc) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, newTrace(".closeConnectionConsumer(" + cc + ')')); cconsumers.removeElement(cc); } synchronized AbstractJmsReply syncRequest( AbstractJmsRequest request) throws JMSException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, newTrace(".syncRequest(" + request + ')')); return requestor.request(request); } /** * Called by temporary destinations deletion. */ synchronized void checkConsumers(String agentId) throws JMSException { for (int i = 0; i < sessions.size(); i++) { Session sess = (Session) sessions.elementAt(i); sess.checkConsumers(agentId); } } protected final RequestMultiplexer getRequestMultiplexer() { return mtpx; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -