⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 xaresourcemngr.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2004 - ScalAgent Distributed Technologies * Copyright (C) 2004 - Bull SA * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. *  * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * Lesser General Public License for more details. *  * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 * USA. * * Initial developer(s): Frederic Maistre (Bull SA) * Contributor(s): ScalAgent Distributed Technologies */package org.objectweb.joram.client.jms;import javax.jms.JMSException;import javax.transaction.xa.XAException;import javax.transaction.xa.XAResource;import javax.transaction.xa.Xid;import java.util.Enumeration;import java.util.Hashtable;import java.util.Vector;import org.objectweb.joram.shared.client.ProducerMessages;import org.objectweb.joram.shared.client.SessAckRequest;import org.objectweb.joram.shared.client.XACnxPrepare;import org.objectweb.joram.shared.client.XACnxCommit;import org.objectweb.joram.shared.client.XACnxRecoverReply;import org.objectweb.joram.shared.client.XACnxRecoverRequest;import org.objectweb.joram.shared.client.XACnxRollback;import org.objectweb.util.monolog.api.BasicLevel;/** * Utility class used by XA connections for managing XA resources. */public class XAResourceMngr {  /** Transaction active. */  public static final int STARTED = 0;  /** Transaction suspended. */  public static final int SUSPENDED = 1;  /** Transaction successful. */  public static final int SUCCESS = 2;  /** Failed transaction. */  public static final int ROLLBACK_ONLY = 3;  /** Prepared transaction. */  public static final int PREPARED = 4;  /**   * The table of known transactions.   * <p>   * <b>Key:</b> transaction identifier<br>   * <b>Object:</b> <code>XAContext</code> instance   */  private Hashtable transactions;  /** The connection this manager belongs to. */  Connection cnx;  /** table of Session (key Xid). */  Hashtable sessionTable;  /**   * Creates a <code>XAResourceMngr</code> instance.   *   * @param cnx   The connection this manager belongs to.   */  public XAResourceMngr(Connection cnx) {    this.cnx = cnx;    transactions = new Hashtable();    sessionTable = new Hashtable();    if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))      JoramTracing.dbgClient.log(BasicLevel.DEBUG,                                 " XAResourceMngr cnx = " + cnx);  }  /**   * Notifies the RM that a transaction is starting.   *   * @exception XAException  If the specified transaction is already known by   *                         the RM in an incompatible state with the start   *                         request.   */  synchronized void start(Xid xid, int flag, Session sess)     throws XAException {    if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))      JoramTracing.dbgClient.log(BasicLevel.DEBUG,                                 " XAResourceMngr start(" + xid +                                 ", " + flag +                                 ", " + sess +")");    sess.setTransacted(true); // for XAResource.TMRESUME    sessionTable.put(xid,sess);    // New transaction.    if (flag == XAResource.TMNOFLAGS) {      if (transactions.containsKey(xid))        throw new XAException("Can't start transaction already known by RM.");      transactions.put(xid, new XAContext());      if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))        JoramTracing.dbgClient.log(BasicLevel.DEBUG,                                   "--- "                                   + this                                   + ": involved in transaction "                                   + xid.toString());     }    // Resumed transaction.    else if (flag == XAResource.TMRESUME) {      if (! transactions.containsKey(xid))        throw new XAException("Can't resume unknown transaction.");      if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))        JoramTracing.dbgClient.log(BasicLevel.DEBUG,                                   "--- "                                   + this                                   + ": resumes transaction "                                   + xid.toString());     }    // Already known transaction.    else if (flag == XAResource.TMJOIN) {      if (! transactions.containsKey(xid))        throw new XAException("Can't join unknown transaction.");      if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))        JoramTracing.dbgClient.log(BasicLevel.DEBUG,                                   "--- "                                   + this                                   + ": joins transaction "                                   + xid.toString());     }    else      throw new XAException("Invalid flag: " + flag);    setStatus(xid, STARTED);  }   /**   * Notifies the RM that a transaction is ended.   *   * @exception XAException  If the specified transaction is in an   *                         incompatible state with the end request.   */  synchronized void end(Xid xid, int flag, Session sess)     throws XAException {    boolean saveResourceState = true;    if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))      JoramTracing.dbgClient.log(BasicLevel.DEBUG,                                 "--- "                                 + this                                 + ": end(" + xid                                 + ", " + flag                                 + ", " + sess + ")");         if (flag == XAResource.TMSUSPEND) {      if (getStatus(xid) != STARTED)        throw new XAException("Can't suspend non started transaction.");      setStatus(xid, SUSPENDED);    }    else {      if (getStatus(xid) != STARTED && getStatus(xid) != SUSPENDED)        throw new XAException("Can't end non active or non "                              + "suspended transaction.");      // No need to save the resource's state as it has already been done      // when suspending it.      if (getStatus(xid) == SUSPENDED)        saveResourceState = false;      if (flag == XAResource.TMSUCCESS)        setStatus(xid, SUCCESS);      else if (flag == XAResource.TMFAIL)        setStatus(xid, ROLLBACK_ONLY);      else        throw new XAException("Invalid flag: " + flag);    }    if (saveResourceState) {      XAContext xaC = (XAContext) transactions.get(xid);      xaC.addSendings(sess.sendings);      xaC.addDeliveries(sess.deliveries);    }    Session session = (Session) sessionTable.get(xid);    if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))      JoramTracing.dbgClient.log(BasicLevel.DEBUG,                                 "--- "                                 + this                                 + ": end(...) session="                                  + session);    if (session != null) {      session.setTransacted(false);      sessionTable.remove(xid);    }  }  /**    * Notifies the RM that a transaction is prepared.   *   * @exception XAException  If the specified transaction is in an   *                         incompatible state with the prepare request,   *                         or if the request fails.   */  synchronized void prepare(Xid xid)     throws XAException {    if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))      JoramTracing.dbgClient.log(BasicLevel.DEBUG,                                 "--- "                                 + this                                 + ": prepare(" + xid + ")");         try {      if (getStatus(xid) == ROLLBACK_ONLY)        throw new XAException("Can't prepare resource in ROLLBACK_ONLY state.");      XAContext xaC = (XAContext) transactions.get(xid);      if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))        JoramTracing.dbgClient.log(BasicLevel.DEBUG,                                   "--- "                                   + this                                   + ": prepares transaction "                                   + xid.toString());       Enumeration targets;          String target;      Vector pMs = new Vector();      MessageAcks acks;      Vector sessAcks = new Vector();      // Getting all the ProducerMessages to send:      targets = xaC.sendings.keys();      while (targets.hasMoreElements()) {        target = (String) targets.nextElement();        pMs.add(xaC.sendings.remove(target));      }      // Getting all the SessAckRequest to send:      targets = xaC.deliveries.keys();      while (targets.hasMoreElements()) {        target = (String) targets.nextElement();        acks = (MessageAcks) xaC.deliveries.remove(target);        sessAcks.add(new SessAckRequest(target, acks.getIds(),                                        acks.getQueueMode()));      }      // Sending to the proxy:      cnx.syncRequest(new XACnxPrepare(xid.getBranchQualifier(),                                        xid.getFormatId(),                                        xid.getGlobalTransactionId(),                                        pMs,                                        sessAcks));      setStatus(xid, PREPARED);    } catch (JMSException exc) {      setStatus(xid, ROLLBACK_ONLY);      throw new XAException("Prepare request failed: " + exc);    } catch (XAException exc) {      setStatus(xid, ROLLBACK_ONLY);      throw exc;    }  }  /**    * Notifies the RM that a transaction is commited.   *

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -