resourcemanager.java

来自「OpenJMS是一个开源的Java Message Service API 1.」· Java 代码 · 共 1,362 行 · 第 1/4 页

JAVA
1,362
字号
                    TransactionLog log = getCurrentTransactionLog();                    addTridLogEntry(txid, log);                    log.logTransactionState(txid, _txExpiryTime * 1000, _rid,                                            state);                    // cache the transaction state                    _activeTransactions.put(txid, new LinkedList());                }                break;            case TransactionState.PREPARED_ORD:                // cache the transaction state                LinkedList list = (LinkedList) _activeTransactions.get(txid);                if (list != null) {                    list.add(state);                } else {                    throw new ResourceManagerException("Trasaction " + txid +                                                       " is not active.");                }                break;            case TransactionState.CLOSED_ORD:                {                    TransactionLog log = getTransactionLog(txid);                    log.logTransactionState(txid, _txExpiryTime * 1000, _rid,                                            state);                    removeTridLogEntry(txid, log);                    // check whether this log has anymore open transactions                    synchronized (_cacheLock) {                        if ((_logToTridCache.get(log) == null) &&                                (!isCurrentTransactionLog(log))) {                            log.close();                            // now check if gc mode is GC_SYNCHRONOUS. If it is                            // remove the log file                            if (_gcMode == GC_SYNCHRONOUS) {                                try {                                    log.destroy();                                } catch (TransactionLogException exception) {                                    exception.printStackTrace();                                }                            }                        }                    }                    // we also want to remove this entry from the list                    // of active transactions                    _activeTransactions.remove(txid);                }                break;            default:                throw new ResourceManagerException("Cannot process tx state " +                                                   state);        }    }    /**     * Add an {@link DataTransactionLogEntry} using the specified txid, rid and     * data     *     * @param txid - the transaction identifier     * @param rid  - the resource identifier     * @throws TransactionLogException  - error adding the entry     * @throws ResourceManagerException - error getting the trnasaction log     */    synchronized void logTransactionData(ExternalXid txid, String rid,                                         Object data)            throws ResourceManagerException, TransactionLogException {        getTransactionLog(txid).logTransactionData(txid, _txExpiryTime * 1000,                                                   rid, data);        // we also want to add this to the transaction data for that        // txid        LinkedList list = (LinkedList) _activeTransactions.get(txid);        if (list != null) {            list.add(data);        } else {            throw new ResourceManagerException("Trasaction " + txid +                                               " is not active.");        }    }    /**     * This is the entry point for the garbage collection callback. It scans     * through the each transaction log file and determines whether it can be     * garbage collected. If it can then it simply destroys the corresponding     * TransactionLog.     */    public void garbageCollect() {        try {            int gcfiles = 0;            // if there are no transaction log files then return            if (_logs.size() == 0) {                return;            }            TreeSet copy = null;            synchronized (_logs) {                copy = new TreeSet(_logs);            }            // remove the current log file, since this is likely to be the            // current log file            copy.remove(_logs.last());            // process each of the remaining log files            while (copy.size() > 0) {                TransactionLog log = (TransactionLog) copy.first();                copy.remove(log);                if (log.canGarbageCollect()) {                    // destroy the log                    log.destroy();                    // remove it from the log cache                    synchronized (_logs) {                        _logs.remove(log);                    }                    // increment the number of garbafe collected files                    ++gcfiles;                }            }            // print an informative message            _log.info("[RMGC] Collected " + gcfiles + " files.");        } catch (Exception exception) {            exception.printStackTrace();        }    }    /**     * Ensure that a transaction with the specified xid is currently active. If     * this is the case then commit the transaction based onb the value of the     * onePhase flag.     * <p/>     * This will have the effect of passing all messages through     *     * @param id       - the xa transaction identity     * @param onePhase - treu if it is a one phase commit     * @throws XAException - if there is a problem completing the call     */    public synchronized void commit(Xid id, boolean onePhase)            throws XAException {        // check that the xid is not null        if (id == null) {            throw new XAException(XAException.XAER_NOTA);        }        // covert to our internal representation of an xid        ExternalXid xid = new ExternalXid(id);        // check to see that the transaction is active and open. We should        // not be allowed to commit a committed transaction.        if (!isTransactionActive(xid)) {            throw new XAException(XAException.XAER_PROTO);        }        // process all the messages associated with this global transaction        // If a message has been  published then sent it to the message mgr        // for processing. If a message has been consumed then remove it        // from the list of unconsumed messages.        try {            // retrieve a list of recrods for the specified global transaction            // and process them. Ignore the state records and only process the            // data records, which are of type TransacitonalObjectWrapper.            Object[] records = getTransactionRecords(xid, _rid);            for (int index = 0; index < records.length; index++) {                if (records[index] instanceof TransactionalObjectWrapper) {                    TransactionalObjectWrapper wrapper =                            (TransactionalObjectWrapper) records[index];                    if (wrapper.isPublishedMessage()) {                        // send the published message to the message manager                        MessageImpl message = (MessageImpl) wrapper.getObject();                        _messages.add(message);                    } else if (wrapper.isReceivedMessage()) {                        // if it is a received message handle then simply                        // delete it and mark it as acknowledged                        MessageHandle handle = ((ReceivedMessageWrapper) (wrapper)).getMessageHandle();                        handle.destroy();                    }                } else {                    // ignore since it is a state records.                }            }        } catch (Exception exception) {            _log.error(exception, exception);            throw new XAException("Failed in ResourceManager.commit : " +                                  exception.toString());        } finally {            // and now mark the transaction as closed            try {                logTransactionState(xid, TransactionState.CLOSED);            } catch (Exception exception) {                throw new XAException("Error processing commit : " + exception);            }        }    }    /**     * Ends the work performed on behalf of a transaction branch. The resource     * manager disassociates the XA resource from the transaction branch     * specified and let the transaction be completedCommits an XA transaction     * that is in progress.     *     * @param id    - the xa transaction identity     * @param flags - one of TMSUCCESS, TMFAIL, or TMSUSPEND     * @throws XAException - if there is a problem completing the call     */    public synchronized void end(Xid id, int flags)            throws XAException {        //check the xid is not null        if (id == null) {            throw new XAException(XAException.XAER_NOTA);        }        // covert to our internal representation of an xid        ExternalXid xid = new ExternalXid(id);        // check that the flags are valid for this method        if ((flags != XAResource.TMSUSPEND) ||                (flags != XAResource.TMSUCCESS) ||                (flags != XAResource.TMFAIL)) {            throw new XAException(XAException.XAER_PROTO);        }        switch (flags) {            case XAResource.TMFAIL:                // check that the transaction exists                if (!isTransactionActive(xid)) {                    throw new XAException(XAException.XAER_PROTO);                }                // do not process that associated data, simply rollback                rollback(xid);                break;            case XAResource.TMSUSPEND:                // check that the transaction is opened                if (!isTransactionActive(xid)) {                    throw new XAException(XAException.XAER_PROTO);                }                break;            case XAResource.TMSUCCESS:                // nothing to do here but check that the resource manager is                // in a consistent state wrt to this xid. The xid should not                // be active if it received the commit, forget etc.                if (isTransactionActive(xid)) {                    throw new XAException(XAException.XAER_PROTO);                }                break;        }    }    /**     * Tell the resource manager to forget about a heuristically completed     * transaction branch.     *     * @param id - the xa transaction identity     * @throws XAException - if there is a problem completing the call     */    public synchronized void forget(Xid id)            throws XAException {        //check the xid is not null        if (id == null) {            throw new XAException(XAException.XAER_NOTA);        }        // covert to our internal representation of an xid        ExternalXid xid = new ExternalXid(id);        // check to see that the xid actually exists        if (!isTransactionActive(xid)) {            throw new XAException(XAException.XAER_PROTO);        }        // call rollback to complete the work        rollback(id);    }    /**     * Return the transaction timeout for this instance of the resource     * manager.     *     * @return int - the timeout in seconds     * @throws XAException - if there is a problem completing the call     */    public synchronized int getTransactionTimeout()            throws XAException {        return _txExpiryTime;    }    /**     * Ask the resource manager to prepare for a transaction commit of the     * transaction specified in xid     *     * @param xares     * @return int - XA_RDONLY or XA_OK     * @throws XAException - if there is a problem completing the call     */    public synchronized boolean isSameRM(XAResource xares)            throws XAException {        boolean result = false;        if ((xares == this) ||                ((xares instanceof ResourceManager) &&                (((ResourceManager) xares)._rid.equals(_rid)))) {            result = true;        }        return result;    }    /**     * Obtain a list of prepared transaction branches from a resource manager.     * The transaction manager calls this method during recovery to obtain the     * list of transaction branches that are currently in prepared or     * heuristically completed states.     *     * @throws XAException - if there is a problem completing the call     */    public synchronized int prepare(Xid id)            throws XAException {        //check the xid is not null        if (id == null) {            throw new XAException(XAException.XAER_NOTA);        }        // covert to our internal representation of an xid        ExternalXid xid = new ExternalXid(id);        // check to see that the xid actually exists        if (!isTransactionActive(xid)) {            throw new XAException(XAException.XAER_PROTO);        }        // can a prepare for the same resource occur multiple times

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?