📄 managedconnectionimpl.java
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2004 - ScalAgent Distributed Technologies * Copyright (C) 2004 - Bull SA * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA. * * Initial developer(s): Frederic Maistre (Bull SA) * Contributor(s): Nicolas Tachker (Bull SA) * Florent Benoit (Bull SA) */package org.objectweb.joram.client.connector;import javax.jms.JMSException;import javax.jms.Session;import javax.jms.XAConnection;import javax.jms.XAQueueConnection;import javax.jms.XASession;import javax.jms.XATopicConnection;import javax.resource.ResourceException;import javax.resource.spi.CommException;import javax.resource.spi.ConnectionEvent;import javax.resource.spi.ConnectionEventListener;import javax.resource.spi.ConnectionRequestInfo;import javax.resource.spi.IllegalStateException;import javax.resource.spi.LocalTransactionException;import javax.resource.spi.ManagedConnectionMetaData;import javax.resource.spi.ResourceAdapterInternalException;import javax.resource.spi.SecurityException;import javax.transaction.xa.XAResource;import java.io.PrintWriter;import java.util.Vector;import org.objectweb.util.monolog.api.BasicLevel;/** * A <code>ManagedConnectionImpl</code> instance wraps a physical connection * to an underlying JORAM server, and provides "handles" for handling this * physical connection. */public class ManagedConnectionImpl implements javax.resource.spi.ManagedConnection, javax.resource.spi.LocalTransaction, javax.jms.ExceptionListener{ /** Central adapter authority. */ private JoramAdapter ra; /** Physical connection to the JORAM server. */ private XAConnection cnx = null; /** Vector of connection handles. */ private Vector handles; /** Vector of condition event listeners. */ private Vector listeners; /** <code>true</code> if a local transaction has been started. */ private boolean startedLocalTx = false; /** Connection meta data. */ private ManagedConnectionMetaDataImpl metaData = null; /** Out stream for error logging and tracing. */ private PrintWriter out = null; /** <code>true</code> if the connection is valid. */ private boolean valid = false; /** hashCode */ private int hashCode = -1; /** Underlying JORAM server host name. */ String hostName; /** Underlying JORAM server port number. */ int serverPort; /** Messaging mode (PTP or PubSub or Unified). */ String mode; /** User identification. */ String userName; /** * Unique session for the use of managed components, involved in local or * distributed transactions. */ Session session = null; /** * Creates a <code>ManagedConnectionImpl</code> instance wrapping a * physical connection to the underlying JORAM server. * * @param ra Central adapter authority. * @param cnx Physical connection to the JORAM server. * @param hostName JORAM server host name. * @param serverPort JORAM server port number. * @param userName User identification. */ ManagedConnectionImpl(JoramAdapter ra, XAConnection cnx, String hostName, int serverPort, String userName) { if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, "ManagedConnectionImpl(" + ra + ", " + cnx + ", " + hostName + ", " + serverPort + ", " + userName + ")"); this.ra = ra; this.cnx = cnx; this.hostName = hostName; this.serverPort = serverPort; this.userName = userName; if (cnx instanceof XAQueueConnection) mode = "PTP"; else if (cnx instanceof XATopicConnection) mode = "PubSub"; else mode = "Unified"; try { cnx.setExceptionListener(this); } catch (JMSException exc) {} handles = new Vector(); listeners = new Vector(); valid = true; hashCode = -1; ra.addProducer(this); } /** * Returns a new <code>OutboundConnection</code> instance for handling the * physical connection. * * @exception CommException If the wrapped physical connection is lost. */ public Object getConnection(javax.security.auth.Subject subject, ConnectionRequestInfo cxRequestInfo) throws ResourceException { if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " getConnection(" + subject + ", " + cxRequestInfo + ")"); if (! isValid()) { out.print("Physical connection to the underlying JORAM server has been lost."); throw new CommException("Physical connection to the underlying " + "JORAM server has been lost."); } OutboundConnection handle; if (cnx instanceof XAQueueConnection) handle = new OutboundQueueConnection(this, (XAQueueConnection) cnx); else if (cnx instanceof XATopicConnection) handle = new OutboundTopicConnection(this, (XATopicConnection) cnx); else handle = new OutboundConnection(this, cnx); handles.add(handle); if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " getConnection handles = " + handles); return handle; } /** * Dissociates a given connection handle and associates it to this * managed connection. * * @exception CommException If the wrapped physical connection is lost. * @exception ResourceException If the provided handle is invalid. */ public void associateConnection(Object connection) throws ResourceException { if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " associateConnection(" + connection + ")"); if (! isValid()) { out.print("Physical connection to the underlying JORAM server has been lost."); throw new CommException("Physical connection to the underlying " + "JORAM server has been lost."); } if (! (connection instanceof OutboundConnection)) { out.print("The provided connection handle is not a JORAM handle."); throw new ResourceException("The provided connection handle is not " + "a JORAM handle."); } OutboundConnection newConn = (OutboundConnection) connection; newConn.managedCx = this; newConn.xac = (org.objectweb.joram.client.jms.XAConnection) cnx; } /** Adds a connection event listener. */ public void addConnectionEventListener(ConnectionEventListener listener) { listeners.add(listener); } /** Removes a connection event listener. */ public void removeConnectionEventListener(ConnectionEventListener listener) { listeners.remove(listener); } /** * Provides a <code>XAResource</code> instance for managing distributed * transactions. * * @exception CommException If the physical connection * is lost. * @exception IllegalStateException If the managed connection is * involved in a local * transaction. * @exception ResourceAdapterInternalException If the XA resource can't be * retrieved. */ public XAResource getXAResource() throws ResourceException { if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " getXAResource()"); if (! isValid()) { out.print("Physical connection to the underlying JORAM server has been lost."); throw new CommException("Physical connection to the underlying " + "JORAM server has been lost."); } try { if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " getXAResource session = " + session); if (session == null) { OutboundConnection outboundCnx = null; for (java.util.Enumeration e = handles.elements(); e.hasMoreElements(); ) { outboundCnx = (OutboundConnection) e.nextElement(); if (outboundCnx.cnxEquals(cnx)) break; } if (outboundCnx == null) outboundCnx = (OutboundConnection) getConnection(null,null); if (outboundCnx != null) { if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " getXAResource outboundCnx = " + outboundCnx + "\n outboundCnx.sess = " + outboundCnx.sessions); OutboundSession outboundSession = null; if (outboundCnx.sessions.size() > 0) { outboundSession = (OutboundSession) outboundCnx.sessions.get(0); if (!(outboundSession.sess instanceof XASession)) { if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " getXAResource outboundSession.sess = " + outboundSession.sess); // getXARessourceManager (create by XAConnection) org.objectweb.joram.client.jms.XAResourceMngr xaResourceMngr = null; if (cnx instanceof org.objectweb.joram.client.jms.XAConnection) { xaResourceMngr = ((org.objectweb.joram.client.jms.XAConnection) cnx).getXAResourceMngr(); } else if (cnx instanceof org.objectweb.joram.client.jms.XAQueueConnection) { xaResourceMngr = ((org.objectweb.joram.client.jms.XAQueueConnection) cnx).getXAResourceMngr(); } else if (cnx instanceof org.objectweb.joram.client.jms.XATopicConnection) { xaResourceMngr = ((org.objectweb.joram.client.jms.XATopicConnection) cnx).getXAResourceMngr(); } if (xaResourceMngr == null) xaResourceMngr = new org.objectweb.joram.client.jms.XAResourceMngr( (org.objectweb.joram.client.jms.Connection) outboundCnx.xac); if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " getXAResource xaResourceMngr = " + xaResourceMngr); org.objectweb.joram.client.jms.Session sess = (org.objectweb.joram.client.jms.Session) outboundSession.sess; // set Session transacted = true sess.setTransacted(true); session = (Session) new org.objectweb.joram.client.jms.XASession( (org.objectweb.joram.client.jms.Connection) outboundCnx.xac, sess, xaResourceMngr); if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " getXAResource session = " + session); } } else { if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " getXAResource createXASession"); session = cnx.createXASession(); } } } else if (session instanceof org.objectweb.joram.client.jms.XASession) { if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " getXAResource session is XASession and not null"); // set Session transacted = true ((org.objectweb.joram.client.jms.XASession)session).getDelegateSession().setTransacted(true); // TODO // cnx.sessions.add((org.objectweb.joram.client.jms.Session) session); } else if (! (session instanceof javax.jms.XASession)) { out.print("Managed connection not involved in a local transaction."); throw new IllegalStateException("Managed connection not involved " + "in a local transaction."); } if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " getXAResource return = " + ((XASession) session).getXAResource()); return ((XASession) session).getXAResource(); } catch (JMSException exc) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -