resourcemanager.java
来自「OpenJMS是一个开源的Java Message Service API 1.」· Java 代码 · 共 1,362 行 · 第 1/4 页
JAVA
1,362 行
TransactionLog log = getCurrentTransactionLog(); addTridLogEntry(txid, log); log.logTransactionState(txid, _txExpiryTime * 1000, _rid, state); // cache the transaction state _activeTransactions.put(txid, new LinkedList()); } break; case TransactionState.PREPARED_ORD: // cache the transaction state LinkedList list = (LinkedList) _activeTransactions.get(txid); if (list != null) { list.add(state); } else { throw new ResourceManagerException("Trasaction " + txid + " is not active."); } break; case TransactionState.CLOSED_ORD: { TransactionLog log = getTransactionLog(txid); log.logTransactionState(txid, _txExpiryTime * 1000, _rid, state); removeTridLogEntry(txid, log); // check whether this log has anymore open transactions synchronized (_cacheLock) { if ((_logToTridCache.get(log) == null) && (!isCurrentTransactionLog(log))) { log.close(); // now check if gc mode is GC_SYNCHRONOUS. If it is // remove the log file if (_gcMode == GC_SYNCHRONOUS) { try { log.destroy(); } catch (TransactionLogException exception) { exception.printStackTrace(); } } } } // we also want to remove this entry from the list // of active transactions _activeTransactions.remove(txid); } break; default: throw new ResourceManagerException("Cannot process tx state " + state); } } /** * Add an {@link DataTransactionLogEntry} using the specified txid, rid and * data * * @param txid - the transaction identifier * @param rid - the resource identifier * @throws TransactionLogException - error adding the entry * @throws ResourceManagerException - error getting the trnasaction log */ synchronized void logTransactionData(ExternalXid txid, String rid, Object data) throws ResourceManagerException, TransactionLogException { getTransactionLog(txid).logTransactionData(txid, _txExpiryTime * 1000, rid, data); // we also want to add this to the transaction data for that // txid LinkedList list = (LinkedList) _activeTransactions.get(txid); if (list != null) { list.add(data); } else { throw new ResourceManagerException("Trasaction " + txid + " is not active."); } } /** * This is the entry point for the garbage collection callback. It scans * through the each transaction log file and determines whether it can be * garbage collected. If it can then it simply destroys the corresponding * TransactionLog. */ public void garbageCollect() { try { int gcfiles = 0; // if there are no transaction log files then return if (_logs.size() == 0) { return; } TreeSet copy = null; synchronized (_logs) { copy = new TreeSet(_logs); } // remove the current log file, since this is likely to be the // current log file copy.remove(_logs.last()); // process each of the remaining log files while (copy.size() > 0) { TransactionLog log = (TransactionLog) copy.first(); copy.remove(log); if (log.canGarbageCollect()) { // destroy the log log.destroy(); // remove it from the log cache synchronized (_logs) { _logs.remove(log); } // increment the number of garbafe collected files ++gcfiles; } } // print an informative message _log.info("[RMGC] Collected " + gcfiles + " files."); } catch (Exception exception) { exception.printStackTrace(); } } /** * Ensure that a transaction with the specified xid is currently active. If * this is the case then commit the transaction based onb the value of the * onePhase flag. * <p/> * This will have the effect of passing all messages through * * @param id - the xa transaction identity * @param onePhase - treu if it is a one phase commit * @throws XAException - if there is a problem completing the call */ public synchronized void commit(Xid id, boolean onePhase) throws XAException { // check that the xid is not null if (id == null) { throw new XAException(XAException.XAER_NOTA); } // covert to our internal representation of an xid ExternalXid xid = new ExternalXid(id); // check to see that the transaction is active and open. We should // not be allowed to commit a committed transaction. if (!isTransactionActive(xid)) { throw new XAException(XAException.XAER_PROTO); } // process all the messages associated with this global transaction // If a message has been published then sent it to the message mgr // for processing. If a message has been consumed then remove it // from the list of unconsumed messages. try { // retrieve a list of recrods for the specified global transaction // and process them. Ignore the state records and only process the // data records, which are of type TransacitonalObjectWrapper. Object[] records = getTransactionRecords(xid, _rid); for (int index = 0; index < records.length; index++) { if (records[index] instanceof TransactionalObjectWrapper) { TransactionalObjectWrapper wrapper = (TransactionalObjectWrapper) records[index]; if (wrapper.isPublishedMessage()) { // send the published message to the message manager MessageImpl message = (MessageImpl) wrapper.getObject(); _messages.add(message); } else if (wrapper.isReceivedMessage()) { // if it is a received message handle then simply // delete it and mark it as acknowledged MessageHandle handle = ((ReceivedMessageWrapper) (wrapper)).getMessageHandle(); handle.destroy(); } } else { // ignore since it is a state records. } } } catch (Exception exception) { _log.error(exception, exception); throw new XAException("Failed in ResourceManager.commit : " + exception.toString()); } finally { // and now mark the transaction as closed try { logTransactionState(xid, TransactionState.CLOSED); } catch (Exception exception) { throw new XAException("Error processing commit : " + exception); } } } /** * Ends the work performed on behalf of a transaction branch. The resource * manager disassociates the XA resource from the transaction branch * specified and let the transaction be completedCommits an XA transaction * that is in progress. * * @param id - the xa transaction identity * @param flags - one of TMSUCCESS, TMFAIL, or TMSUSPEND * @throws XAException - if there is a problem completing the call */ public synchronized void end(Xid id, int flags) throws XAException { //check the xid is not null if (id == null) { throw new XAException(XAException.XAER_NOTA); } // covert to our internal representation of an xid ExternalXid xid = new ExternalXid(id); // check that the flags are valid for this method if ((flags != XAResource.TMSUSPEND) || (flags != XAResource.TMSUCCESS) || (flags != XAResource.TMFAIL)) { throw new XAException(XAException.XAER_PROTO); } switch (flags) { case XAResource.TMFAIL: // check that the transaction exists if (!isTransactionActive(xid)) { throw new XAException(XAException.XAER_PROTO); } // do not process that associated data, simply rollback rollback(xid); break; case XAResource.TMSUSPEND: // check that the transaction is opened if (!isTransactionActive(xid)) { throw new XAException(XAException.XAER_PROTO); } break; case XAResource.TMSUCCESS: // nothing to do here but check that the resource manager is // in a consistent state wrt to this xid. The xid should not // be active if it received the commit, forget etc. if (isTransactionActive(xid)) { throw new XAException(XAException.XAER_PROTO); } break; } } /** * Tell the resource manager to forget about a heuristically completed * transaction branch. * * @param id - the xa transaction identity * @throws XAException - if there is a problem completing the call */ public synchronized void forget(Xid id) throws XAException { //check the xid is not null if (id == null) { throw new XAException(XAException.XAER_NOTA); } // covert to our internal representation of an xid ExternalXid xid = new ExternalXid(id); // check to see that the xid actually exists if (!isTransactionActive(xid)) { throw new XAException(XAException.XAER_PROTO); } // call rollback to complete the work rollback(id); } /** * Return the transaction timeout for this instance of the resource * manager. * * @return int - the timeout in seconds * @throws XAException - if there is a problem completing the call */ public synchronized int getTransactionTimeout() throws XAException { return _txExpiryTime; } /** * Ask the resource manager to prepare for a transaction commit of the * transaction specified in xid * * @param xares * @return int - XA_RDONLY or XA_OK * @throws XAException - if there is a problem completing the call */ public synchronized boolean isSameRM(XAResource xares) throws XAException { boolean result = false; if ((xares == this) || ((xares instanceof ResourceManager) && (((ResourceManager) xares)._rid.equals(_rid)))) { result = true; } return result; } /** * Obtain a list of prepared transaction branches from a resource manager. * The transaction manager calls this method during recovery to obtain the * list of transaction branches that are currently in prepared or * heuristically completed states. * * @throws XAException - if there is a problem completing the call */ public synchronized int prepare(Xid id) throws XAException { //check the xid is not null if (id == null) { throw new XAException(XAException.XAER_NOTA); } // covert to our internal representation of an xid ExternalXid xid = new ExternalXid(id); // check to see that the xid actually exists if (!isTransactionActive(xid)) { throw new XAException(XAException.XAER_PROTO); } // can a prepare for the same resource occur multiple times
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?