⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 xaresourcemngr.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
   * @exception XAException  If the specified transaction is in an   *                         incompatible state with the commit request,   *                         or if the request fails.   */  synchronized void commit(Xid xid)     throws XAException {    if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))      JoramTracing.dbgClient.log(BasicLevel.DEBUG,                                 "--- "                                 + this                                 + ": commit(" + xid + ")");     try {      if (getStatus(xid) != PREPARED)        throw new XAException("Can't commit non prepared transaction.");      XAContext xaC = (XAContext) transactions.get(xid);      if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))        JoramTracing.dbgClient.log(BasicLevel.DEBUG,                                   "--- "                                   + this                                   + ": commits transaction "                                   + xid.toString());       cnx.syncRequest(new XACnxCommit(xid.getBranchQualifier(),                                       xid.getFormatId(),                                       xid.getGlobalTransactionId()));       transactions.remove(xid);      Session session = (Session) sessionTable.get(xid);      if (session != null)        session.setTransacted(false);    } catch (JMSException exc) {      setStatus(xid, ROLLBACK_ONLY);      throw new XAException("Commit request failed: " + exc);    } catch (XAException exc) {      setStatus(xid, ROLLBACK_ONLY);      throw exc;    }  }  /**    * Notifies the RM that a transaction is rolled back.   *   * @exception XAException  If the specified transaction is in an   *                         incompatible state with the rollback request,   *                         or if the request fails.   */  synchronized void rollback(Xid xid)    throws XAException {    if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))      JoramTracing.dbgClient.log(BasicLevel.DEBUG,                                 "--- "                                 + this                                 + ": rollback(" + xid + ")");    try {      XAContext xaC = (XAContext) transactions.get(xid);      if (xaC == null)        throw new XAException("Unknown transaction.");      if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))        JoramTracing.dbgClient.log(BasicLevel.DEBUG,                                   "--- "                                   + this                                   + ": rolls back transaction "                                   + xid.toString());       Enumeration targets;       String target;      MessageAcks acks;      XACnxRollback rollbackRequest;        targets = xaC.deliveries.keys();        rollbackRequest = new XACnxRollback(xid.getBranchQualifier(),                                           xid.getFormatId(),                                           xid.getGlobalTransactionId());      while (targets.hasMoreElements()) {        target = (String) targets.nextElement();        acks = (MessageAcks) xaC.deliveries.remove(target);        rollbackRequest.add(target, acks.getIds(), acks.getQueueMode());      }      // Sending to the proxy:      cnx.syncRequest(rollbackRequest);      transactions.remove(xid);      Session session = (Session) sessionTable.get(xid);      if (session != null) {        session.setTransacted(false);        sessionTable.remove(xid);      }    } catch (JMSException exc) {      setStatus(xid, ROLLBACK_ONLY);      throw new XAException("Rollback request failed: " + exc);    } catch (XAException exc) {      setStatus(xid, ROLLBACK_ONLY);      throw exc;    }  }  /**    * Notifies the RM to recover the prepared transactions.   *   * @exception XAException  If the specified flag is invalid, or if the   *                         request fails.   */  synchronized Xid[] recover(int flag) throws XAException  {    if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))      JoramTracing.dbgClient.log(BasicLevel.DEBUG,                                 "--- "                                 + this                                 + ": recovers transactions.");    if (flag == XAResource.TMSTARTRSCAN || flag == XAResource.TMENDRSCAN)      throw new XAException("Non supported recovery flag: " + flag);    try {      XACnxRecoverReply reply =        (XACnxRecoverReply) cnx.syncRequest(new XACnxRecoverRequest());      Xid[] xids = new Xid[reply.getSize()];      for (int i = 0; i < reply.getSize(); i++) {        xids[i] = new XidImpl(reply.getBranchQualifier(i),                              reply.getFormatId(i),                              reply.getGlobalTransactionId(i));      }      return xids;    }    catch (Exception exc) {      throw new XAException("Recovery request failed: " + exc.getMessage());    }  }  /**    * Sets the status of a transaction.   *   * @exception XAException  If the transaction is unknown.   */  private void setStatus(Xid xid, int status) throws XAException  {    XAContext xac = (XAContext) transactions.get(xid);    if (xac == null)      throw new XAException("Unknown transaction.");    xac.status = status;  }  /**    * Gets the status of a transaction.   *   * @exception XAException  If the transaction is unknown.   */  private int getStatus(Xid xid) throws XAException  {    XAContext xac = (XAContext) transactions.get(xid);    if (xac == null)      throw new XAException("Unknown transaction.");    return xac.status;  }  /** Resource managers are equal if they belong to the same connection. */  public boolean equals(Object o) {    if (! (o instanceof XAResourceMngr))      return false;    XAResourceMngr other = (XAResourceMngr) o;    if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))      JoramTracing.dbgClient.log(BasicLevel.DEBUG,                                 this + ": equals other = " + other.cnx +                                  ", this.cnx = " + cnx +                                 ", equals = " + cnx.equals(other.cnx));        return cnx.equals(other.cnx);  }}/** * Utility class holding a resource's state during transaction progress. */class XAContext{  /** The transaction status. */  int status;  /**   * Table holding the <code>ProducerMessages</code> produced in the   * transaction.   * <p>   * <b>Key:</b> destination name<br>   * <b>Object:</b> <code>ProducerMessages</code>   */  Hashtable sendings;  /**    * Table holding the identifiers of the messages delivered per   * destination or subscription, in the transaction.   * <p>   * <b>Key:</b> destination or subscription name<br>   * <b>Object:</b> corresponding <code>MessageAcks</code> instance   */  Hashtable deliveries;  /**   * Constructs an <code>XAContext</code> instance.   */  XAContext()  {    sendings = new Hashtable();    deliveries = new Hashtable();  }  /**   * Adds new sendings performed by the resumed transaction.   */  void addSendings(Hashtable newSendings)  {    String newDest;    ProducerMessages newPM;    ProducerMessages storedPM;    Vector msgs;    // Browsing the destinations for which messages have been produced:    Enumeration newDests = newSendings.keys();    while (newDests.hasMoreElements()) {      newDest = (String) newDests.nextElement();      newPM = (ProducerMessages) newSendings.remove(newDest);      storedPM = (ProducerMessages) sendings.get(newDest);      // If messages haven't already been produced for this destination,      // storing the new ProducerMessages object:      if (storedPM == null)        sendings.put(newDest, newPM);      // Else, adding the newly produced messages to the existing      // ProducerMessages:      else {        msgs = newPM.getMessages();        for (int i = 0; i < msgs.size(); i++)          storedPM.addMessage((org.objectweb.joram.shared.messages.Message)                              msgs.get(i));      }    }  }  /**   * Adds new deliveries occured within the resumed transaction.   */  void addDeliveries(Hashtable newDeliveries)  {    String newName;    MessageAcks newAcks;    MessageAcks storedAcks;    // Browsing the destinations or subscriptions to which messages will have    // to be acknowledged:    Enumeration newNames = newDeliveries.keys();    while (newNames.hasMoreElements()) {      newName = (String) newNames.nextElement();      newAcks = (MessageAcks) newDeliveries.remove(newName);      storedAcks = (MessageAcks) deliveries.get(newName);      // If there are no messages to acknowledge for this destination or       // subscription, storing the new vector:      if (storedAcks == null)        deliveries.put(newName, newAcks);      // Else, adding the new ids to the stored ones:      else        storedAcks.addIds(newAcks.getIds());    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -