📄 xaresourcemngr.java
字号:
* @exception XAException If the specified transaction is in an * incompatible state with the commit request, * or if the request fails. */ synchronized void commit(Xid xid) throws XAException { if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) JoramTracing.dbgClient.log(BasicLevel.DEBUG, "--- " + this + ": commit(" + xid + ")"); try { if (getStatus(xid) != PREPARED) throw new XAException("Can't commit non prepared transaction."); XAContext xaC = (XAContext) transactions.get(xid); if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) JoramTracing.dbgClient.log(BasicLevel.DEBUG, "--- " + this + ": commits transaction " + xid.toString()); cnx.syncRequest(new XACnxCommit(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId())); transactions.remove(xid); Session session = (Session) sessionTable.get(xid); if (session != null) session.setTransacted(false); } catch (JMSException exc) { setStatus(xid, ROLLBACK_ONLY); throw new XAException("Commit request failed: " + exc); } catch (XAException exc) { setStatus(xid, ROLLBACK_ONLY); throw exc; } } /** * Notifies the RM that a transaction is rolled back. * * @exception XAException If the specified transaction is in an * incompatible state with the rollback request, * or if the request fails. */ synchronized void rollback(Xid xid) throws XAException { if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) JoramTracing.dbgClient.log(BasicLevel.DEBUG, "--- " + this + ": rollback(" + xid + ")"); try { XAContext xaC = (XAContext) transactions.get(xid); if (xaC == null) throw new XAException("Unknown transaction."); if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) JoramTracing.dbgClient.log(BasicLevel.DEBUG, "--- " + this + ": rolls back transaction " + xid.toString()); Enumeration targets; String target; MessageAcks acks; XACnxRollback rollbackRequest; targets = xaC.deliveries.keys(); rollbackRequest = new XACnxRollback(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId()); while (targets.hasMoreElements()) { target = (String) targets.nextElement(); acks = (MessageAcks) xaC.deliveries.remove(target); rollbackRequest.add(target, acks.getIds(), acks.getQueueMode()); } // Sending to the proxy: cnx.syncRequest(rollbackRequest); transactions.remove(xid); Session session = (Session) sessionTable.get(xid); if (session != null) { session.setTransacted(false); sessionTable.remove(xid); } } catch (JMSException exc) { setStatus(xid, ROLLBACK_ONLY); throw new XAException("Rollback request failed: " + exc); } catch (XAException exc) { setStatus(xid, ROLLBACK_ONLY); throw exc; } } /** * Notifies the RM to recover the prepared transactions. * * @exception XAException If the specified flag is invalid, or if the * request fails. */ synchronized Xid[] recover(int flag) throws XAException { if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) JoramTracing.dbgClient.log(BasicLevel.DEBUG, "--- " + this + ": recovers transactions."); if (flag == XAResource.TMSTARTRSCAN || flag == XAResource.TMENDRSCAN) throw new XAException("Non supported recovery flag: " + flag); try { XACnxRecoverReply reply = (XACnxRecoverReply) cnx.syncRequest(new XACnxRecoverRequest()); Xid[] xids = new Xid[reply.getSize()]; for (int i = 0; i < reply.getSize(); i++) { xids[i] = new XidImpl(reply.getBranchQualifier(i), reply.getFormatId(i), reply.getGlobalTransactionId(i)); } return xids; } catch (Exception exc) { throw new XAException("Recovery request failed: " + exc.getMessage()); } } /** * Sets the status of a transaction. * * @exception XAException If the transaction is unknown. */ private void setStatus(Xid xid, int status) throws XAException { XAContext xac = (XAContext) transactions.get(xid); if (xac == null) throw new XAException("Unknown transaction."); xac.status = status; } /** * Gets the status of a transaction. * * @exception XAException If the transaction is unknown. */ private int getStatus(Xid xid) throws XAException { XAContext xac = (XAContext) transactions.get(xid); if (xac == null) throw new XAException("Unknown transaction."); return xac.status; } /** Resource managers are equal if they belong to the same connection. */ public boolean equals(Object o) { if (! (o instanceof XAResourceMngr)) return false; XAResourceMngr other = (XAResourceMngr) o; if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) JoramTracing.dbgClient.log(BasicLevel.DEBUG, this + ": equals other = " + other.cnx + ", this.cnx = " + cnx + ", equals = " + cnx.equals(other.cnx)); return cnx.equals(other.cnx); }}/** * Utility class holding a resource's state during transaction progress. */class XAContext{ /** The transaction status. */ int status; /** * Table holding the <code>ProducerMessages</code> produced in the * transaction. * <p> * <b>Key:</b> destination name<br> * <b>Object:</b> <code>ProducerMessages</code> */ Hashtable sendings; /** * Table holding the identifiers of the messages delivered per * destination or subscription, in the transaction. * <p> * <b>Key:</b> destination or subscription name<br> * <b>Object:</b> corresponding <code>MessageAcks</code> instance */ Hashtable deliveries; /** * Constructs an <code>XAContext</code> instance. */ XAContext() { sendings = new Hashtable(); deliveries = new Hashtable(); } /** * Adds new sendings performed by the resumed transaction. */ void addSendings(Hashtable newSendings) { String newDest; ProducerMessages newPM; ProducerMessages storedPM; Vector msgs; // Browsing the destinations for which messages have been produced: Enumeration newDests = newSendings.keys(); while (newDests.hasMoreElements()) { newDest = (String) newDests.nextElement(); newPM = (ProducerMessages) newSendings.remove(newDest); storedPM = (ProducerMessages) sendings.get(newDest); // If messages haven't already been produced for this destination, // storing the new ProducerMessages object: if (storedPM == null) sendings.put(newDest, newPM); // Else, adding the newly produced messages to the existing // ProducerMessages: else { msgs = newPM.getMessages(); for (int i = 0; i < msgs.size(); i++) storedPM.addMessage((org.objectweb.joram.shared.messages.Message) msgs.get(i)); } } } /** * Adds new deliveries occured within the resumed transaction. */ void addDeliveries(Hashtable newDeliveries) { String newName; MessageAcks newAcks; MessageAcks storedAcks; // Browsing the destinations or subscriptions to which messages will have // to be acknowledged: Enumeration newNames = newDeliveries.keys(); while (newNames.hasMoreElements()) { newName = (String) newNames.nextElement(); newAcks = (MessageAcks) newDeliveries.remove(newName); storedAcks = (MessageAcks) deliveries.get(newName); // If there are no messages to acknowledge for this destination or // subscription, storing the new vector: if (storedAcks == null) deliveries.put(newName, newAcks); // Else, adding the new ids to the stored ones: else storedAcks.addIds(newAcks.getIds()); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -