⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 resourcemanager.java

📁 实现了Jms的服务器源码,支持多种适配器,DB,FTP,支持多种数据库
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
        return _gcMode;
    }

    /**
     * Check whether garbage collection has been disabled
     *
     * @return boolean - true if gc is disabled
     */
    public boolean gcDisabled() {
        return (_gcMode == GC_DISABLED) ? true : false;
    }

    /**
     * Log this published message so that it can be passed through the system
     * when the associated global transaction commits.
     *
     * @param xid - the global transaction identity
     * @param message - the message published
     * @throws TransactionLogException - error adding the entry
     * @throws ResourceManagerException - error getting the trnasaction log
     * @throws JMSException - if there is an issue with prep'ing the message
     */
    public synchronized void logPublishedMessage(Xid xid, MessageImpl message)
        throws TransactionLogException, ResourceManagerException, JMSException {
        MessageMgr.instance().checkAndPrepareMessage(message);
        logTransactionData(new ExternalXid(xid), _rid,
            createPublishedMessageWrapper(message));
    }

    /**
     * Log that this message handle was sent to the consumer within the specified
     * global transaction identity. The message will be acknowledged when the
     * global transaction commits. Alternatively, if the global transaction is
     * rolled back the message handle will be returned to the destination
     *
     * @param txid - the global transaction identity
     * @param id - the consumer receiving this message
     * @param handle - the handle of the message received
     * @throws TransactionLogException - error adding the entry
     * @throws ResourceManagerException - error getting the transaction log
     */
    public synchronized void logReceivedMessage(Xid xid, String id, MessageHandle handle)
        throws TransactionLogException, ResourceManagerException {
        logTransactionData(new ExternalXid(xid), _rid,
            createReceivedMessageWrapper(id, handle));
    }

    /**
     * Add an {@link StateTransactionLogEntry} using the specified txid,
     * rid and state
     *
     * @param xid - the transaction identifier
     * @param state - the transaction log state
     * @throws TransactionLogException - error adding the entry
     * @throws ResourceManagerException - error getting the trnasaction log
     */
    public synchronized void logTransactionState(Xid xid, TransactionState state)
        throws TransactionLogException, ResourceManagerException {
        ExternalXid txid = new ExternalXid(xid);
        switch (state.getOrd()) {
            case TransactionState.OPENED_ORD:
                {
                    TransactionLog log = getCurrentTransactionLog();
                    addTridLogEntry(txid, log);
                    log.logTransactionState(txid, _txExpiryTime * 1000, _rid,
                        state);

                    // cache the transaction state
                    _activeTransactions.put(txid, new LinkedList());
                }
                break;

            case TransactionState.PREPARED_ORD:
                // cache the transaction state
                LinkedList list = (LinkedList) _activeTransactions.get(txid);
                if (list != null) {
                    list.add(state);
                } else {
                    throw new ResourceManagerException("Trasaction " + txid +
                        " is not active.");
                }
                break;

            case TransactionState.CLOSED_ORD:
                {
                    TransactionLog log = getTransactionLog(txid);
                    log.logTransactionState(txid, _txExpiryTime * 1000, _rid,
                        state);
                    removeTridLogEntry(txid, log);

                    // check whether this log has anymore open transactions
                    synchronized (_cacheLock) {
                        if ((_logToTridCache.get(log) == null) &&
                            (!isCurrentTransactionLog(log))) {
                            log.close();

                            // now check if gc mode is GC_SYNCHRONOUS. If it is
                            // remove the log file
                            if (_gcMode == GC_SYNCHRONOUS) {
                                try {
                                    log.destroy();
                                } catch (TransactionLogException exception) {
                                    exception.printStackTrace();
                                }
                            }
                        }
                    }

                    // we also want to remove this entry from the list
                    // of active transactions
                    _activeTransactions.remove(txid);
                }
                break;

            default:
                throw new ResourceManagerException("Cannot process tx state " +
                    state);
        }
    }

    /**
     * Add an {@link DataTransactionLogEntry} using the specified txid,
     * rid and data
     *
     * @param txid - the transaction identifier
     * @param rid - the resource identifier
     * @param state - the transaction log state
     * @throws TransactionLogException - error adding the entry
     * @throws ResourceManagerException - error getting the trnasaction log
     */
    synchronized void logTransactionData(ExternalXid txid, String rid,
                                         Object data)
        throws ResourceManagerException, TransactionLogException {
        getTransactionLog(txid).logTransactionData(txid, _txExpiryTime * 1000,
            rid, data);

        // we also want to add this to the transaction data for that
        // txid
        LinkedList list = (LinkedList) _activeTransactions.get(txid);
        if (list != null) {
            list.add(data);
        } else {
            throw new ResourceManagerException("Trasaction " + txid +
                " is not active.");
        }
    }

    /**
     * This is the entry point for the garbage collection callback. It scans
     * through the each transaction log file and determines whether it can
     * be garbage collected. If it can then it simply destroys the corresponding
     * TransactionLog.
     */
    public void garbageCollect() {
        try {
            int gcfiles = 0;

            // if there are no transaction log files then return
            if (_logs.size() == 0) {
                return;
            }

            TreeSet copy = null;
            synchronized (_logs) {
                copy = new TreeSet(_logs);
            }

            // remove the current log file, since this is likely to be the
            // current log file
            copy.remove(_logs.last());

            // process each of the remaining log files
            while (copy.size() > 0) {
                TransactionLog log = (TransactionLog) copy.first();
                copy.remove(log);
                if (log.canGarbageCollect()) {
                    // destroy the log
                    log.destroy();

                    // remove it from the log cache
                    synchronized (_logs) {
                        _logs.remove(log);
                    }

                    // increment the number of garbafe collected files
                    ++gcfiles;
                }
            }

            // print an informative message
            _log.info("[RMGC] Collected " + gcfiles + " files.");
        } catch (Exception exception) {
            exception.printStackTrace();
        }
    }

    /**
     * Ensure that a transaction with the specified xid is currently active.
     * If this is the case then commit the transaction based onb the value
     * of the onePhase flag.
     * <p>
     * This will have the effect of passing all messages through
     *
     * @param xid - the xa transaction identity
     * @param onePhase - treu if it is a one phase commit
     * @throws XAException - if there is a problem completing the call
     */
    public synchronized void commit(Xid id, boolean onePhase)
        throws XAException {
        // check that the xid is not null
        if (id == null) {
            throw new XAException(XAException.XAER_NOTA);
        }

        // covert to our internal representation of an xid
        ExternalXid xid = new ExternalXid(id);

        // check to see that the transaction is active and open. We should
        // not be allowed to commit a committed transaction.
        if (!isTransactionActive(xid)) {
            throw new XAException(XAException.XAER_PROTO);
        }

        // process all the messages associated with this global transaction
        // If a message has been  published then sent it to the message mgr
        // for processing. If a message has been consumed then remove it
        // from the list of unconsumed messages.
        Connection connection = null;
        try {
            // get a connection to the database
            connection = DatabaseService.getConnection();

            // retrieve a list of recrods for the specified global transaction
            // and process them. Ignore the state records and only process the
            // data records, which are of type TransacitonalObjectWrapper.
            Object[] records = getTransactionRecords(xid, _rid);
            for (int index = 0; index < records.length; index++) {
                if (records[index] instanceof TransactionalObjectWrapper) {
                    TransactionalObjectWrapper wrapper =
                        (TransactionalObjectWrapper) records[index];
                    if (wrapper.isPublishedMessage()) {
                        // send the published message to the message manager
                        MessageMgr.instance().add(connection,
                            (MessageImpl) wrapper.getObject());

                    } else if (wrapper.isReceivedMessage()) {
                        // if it is a received message handle then simply
                        // delete it and mark it as acknowledged
                        MessageHandle handle = ((ReceivedMessageWrapper) (wrapper)).getMessageHandle();
                        if (handle instanceof PersistentMessageHandle) {
                            MessageHandleFactory.destroyPersistentHandle(connection,
                                (PersistentMessageHandle) handle);
                        } else {
                            handle.destroy();
                        }
                    }
                } else {
                    // ignore since it is a state records.
                }
            }
            connection.commit();
        } catch (PersistenceException exception) {
            if (connection != null) {
                try {
                    connection.rollback();
                } catch (Exception nested) {
                    // ignore
                }
            }
            throw new XAException("Failed in ResourceManager.commit : " +
                exception.toString());
        } catch (Exception exception) {
            throw new XAException("Failed in ResourceManager.commit : " +
                exception.toString());
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception nested) {
                    // ignore
                }
            }

            // and now mark the transaction as closed
            try {
                logTransactionState(xid, TransactionState.CLOSED);
            } catch (Exception exception) {
                throw new XAException("Error processing commit : " + exception);
            }
        }
    }

    /**
     * Ends the work performed on behalf of a transaction branch. The resource
     * manager disassociates the XA resource from the transaction branch
     * specified and let the transaction be completedCommits an XA transaction
     * that is in progress.
     *
     * @param xid - the xa transaction identity
     * @param flags - one of TMSUCCESS, TMFAIL, or TMSUSPEND
     * @throws XAException - if there is a problem completing the call
     */
    public synchronized void end(Xid id, int flags)
        throws XAException {
        //check the xid is not null
        if (id == null) {
            throw new XAException(XAException.XAER_NOTA);
        }

        // covert to our internal representation of an xid
        ExternalXid xid = new ExternalXid(id);

        // check that the flags are valid for this method
        if ((flags != XAResource.TMSUSPEND) ||
            (flags != XAResource.TMSUCCESS) ||
            (flags != XAResource.TMFAIL)) {
            throw new XAException(XAException.XAER_PROTO);
        }

        switch (flags) {
            case XAResource.TMFAIL:
                // check that the transaction exists
                if (!isTransactionActive(xid)) {
                    throw new XAException(XAException.XAER_PROTO);
                }

                // do not process that associated data, simply rollback
                rollback(xid);
                break;

            case XAResource.TMSUSPEND:
                // check that the transaction is opened
                if (!isTransactionActive(xid)) {
                    throw new XAException(XAException.XAER_PROTO);
                }
                break;

            case XAResource.TMSUCCESS:
                // nothing to do here but check that the resource manager is
                // in a consistent state wrt to this xid. The xid should not
                // be active if it received the commit, forget etc.
                if (isTransactionActive(xid)) {
                    throw new XAException(XAException.XAER_PROTO);
                }
                break;
        }
    }

    /**
     * Tell the resource manager to forget about a heuristically completed
     * transaction branch.
     *
     * @param xid - the xa transaction identity
     * @throws XAException - if there is a problem completing the call
     */
    public synchronized void forget(Xid id)
        throws XAException {
        //check the xid is not null
        if (id == null) {
            throw new XAException(XAException.XAER_NOTA);
        }

        // covert to our internal representation of an xid
        ExternalXid xid = new ExternalXid(id);

        // check to see that the xid actually exists
        if (!isTransactionActive(xid)) {
            throw new XAException(XAException.XAER_PROTO);
        }

        // call rollback to complete the work
        rollback(id);
    }

    /**
     * Return the transaction timeout for this instance of the resource
     * manager.
     *
     * @return int - the timeout in seconds
     * @throws XAException - if there is a problem completing the call
     */
    public synchronized int getTransactionTimeout()
        throws XAException {
        return _txExpiryTime;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -