📄 xaresourcemngr.java
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2004 - ScalAgent Distributed Technologies * Copyright (C) 2004 - Bull SA * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA. * * Initial developer(s): Frederic Maistre (Bull SA) * Contributor(s): ScalAgent Distributed Technologies */package org.objectweb.joram.client.jms;import javax.jms.JMSException;import javax.transaction.xa.XAException;import javax.transaction.xa.XAResource;import javax.transaction.xa.Xid;import java.util.Enumeration;import java.util.Hashtable;import java.util.Vector;import org.objectweb.joram.shared.client.ProducerMessages;import org.objectweb.joram.shared.client.SessAckRequest;import org.objectweb.joram.shared.client.XACnxPrepare;import org.objectweb.joram.shared.client.XACnxCommit;import org.objectweb.joram.shared.client.XACnxRecoverReply;import org.objectweb.joram.shared.client.XACnxRecoverRequest;import org.objectweb.joram.shared.client.XACnxRollback;import org.objectweb.util.monolog.api.BasicLevel;/** * Utility class used by XA connections for managing XA resources. */public class XAResourceMngr { /** Transaction active. */ public static final int STARTED = 0; /** Transaction suspended. */ public static final int SUSPENDED = 1; /** Transaction successful. */ public static final int SUCCESS = 2; /** Failed transaction. */ public static final int ROLLBACK_ONLY = 3; /** Prepared transaction. */ public static final int PREPARED = 4; /** * The table of known transactions. * <p> * <b>Key:</b> transaction identifier<br> * <b>Object:</b> <code>XAContext</code> instance */ private Hashtable transactions; /** The connection this manager belongs to. */ Connection cnx; /** table of Session (key Xid). */ Hashtable sessionTable; /** * Creates a <code>XAResourceMngr</code> instance. * * @param cnx The connection this manager belongs to. */ public XAResourceMngr(Connection cnx) { this.cnx = cnx; transactions = new Hashtable(); sessionTable = new Hashtable(); if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) JoramTracing.dbgClient.log(BasicLevel.DEBUG, " XAResourceMngr cnx = " + cnx); } /** * Notifies the RM that a transaction is starting. * * @exception XAException If the specified transaction is already known by * the RM in an incompatible state with the start * request. */ synchronized void start(Xid xid, int flag, Session sess) throws XAException { if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) JoramTracing.dbgClient.log(BasicLevel.DEBUG, " XAResourceMngr start(" + xid + ", " + flag + ", " + sess +")"); sess.setTransacted(true); // for XAResource.TMRESUME sessionTable.put(xid,sess); // New transaction. if (flag == XAResource.TMNOFLAGS) { if (transactions.containsKey(xid)) throw new XAException("Can't start transaction already known by RM."); transactions.put(xid, new XAContext()); if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) JoramTracing.dbgClient.log(BasicLevel.DEBUG, "--- " + this + ": involved in transaction " + xid.toString()); } // Resumed transaction. else if (flag == XAResource.TMRESUME) { if (! transactions.containsKey(xid)) throw new XAException("Can't resume unknown transaction."); if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) JoramTracing.dbgClient.log(BasicLevel.DEBUG, "--- " + this + ": resumes transaction " + xid.toString()); } // Already known transaction. else if (flag == XAResource.TMJOIN) { if (! transactions.containsKey(xid)) throw new XAException("Can't join unknown transaction."); if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) JoramTracing.dbgClient.log(BasicLevel.DEBUG, "--- " + this + ": joins transaction " + xid.toString()); } else throw new XAException("Invalid flag: " + flag); setStatus(xid, STARTED); } /** * Notifies the RM that a transaction is ended. * * @exception XAException If the specified transaction is in an * incompatible state with the end request. */ synchronized void end(Xid xid, int flag, Session sess) throws XAException { boolean saveResourceState = true; if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) JoramTracing.dbgClient.log(BasicLevel.DEBUG, "--- " + this + ": end(" + xid + ", " + flag + ", " + sess + ")"); if (flag == XAResource.TMSUSPEND) { if (getStatus(xid) != STARTED) throw new XAException("Can't suspend non started transaction."); setStatus(xid, SUSPENDED); } else { if (getStatus(xid) != STARTED && getStatus(xid) != SUSPENDED) throw new XAException("Can't end non active or non " + "suspended transaction."); // No need to save the resource's state as it has already been done // when suspending it. if (getStatus(xid) == SUSPENDED) saveResourceState = false; if (flag == XAResource.TMSUCCESS) setStatus(xid, SUCCESS); else if (flag == XAResource.TMFAIL) setStatus(xid, ROLLBACK_ONLY); else throw new XAException("Invalid flag: " + flag); } if (saveResourceState) { XAContext xaC = (XAContext) transactions.get(xid); xaC.addSendings(sess.sendings); xaC.addDeliveries(sess.deliveries); } Session session = (Session) sessionTable.get(xid); if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) JoramTracing.dbgClient.log(BasicLevel.DEBUG, "--- " + this + ": end(...) session=" + session); if (session != null) { session.setTransacted(false); sessionTable.remove(xid); } } /** * Notifies the RM that a transaction is prepared. * * @exception XAException If the specified transaction is in an * incompatible state with the prepare request, * or if the request fails. */ synchronized void prepare(Xid xid) throws XAException { if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) JoramTracing.dbgClient.log(BasicLevel.DEBUG, "--- " + this + ": prepare(" + xid + ")"); try { if (getStatus(xid) == ROLLBACK_ONLY) throw new XAException("Can't prepare resource in ROLLBACK_ONLY state."); XAContext xaC = (XAContext) transactions.get(xid); if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) JoramTracing.dbgClient.log(BasicLevel.DEBUG, "--- " + this + ": prepares transaction " + xid.toString()); Enumeration targets; String target; Vector pMs = new Vector(); MessageAcks acks; Vector sessAcks = new Vector(); // Getting all the ProducerMessages to send: targets = xaC.sendings.keys(); while (targets.hasMoreElements()) { target = (String) targets.nextElement(); pMs.add(xaC.sendings.remove(target)); } // Getting all the SessAckRequest to send: targets = xaC.deliveries.keys(); while (targets.hasMoreElements()) { target = (String) targets.nextElement(); acks = (MessageAcks) xaC.deliveries.remove(target); sessAcks.add(new SessAckRequest(target, acks.getIds(), acks.getQueueMode())); } // Sending to the proxy: cnx.syncRequest(new XACnxPrepare(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId(), pMs, sessAcks)); setStatus(xid, PREPARED); } catch (JMSException exc) { setStatus(xid, ROLLBACK_ONLY); throw new XAException("Prepare request failed: " + exc); } catch (XAException exc) { setStatus(xid, ROLLBACK_ONLY); throw exc; } } /** * Notifies the RM that a transaction is commited. *
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -