resourcemanager.java

来自「OpenJMS是一个开源的Java Message Service API 1.」· Java 代码 · 共 1,362 行 · 第 1/4 页

JAVA
1,362
字号
        // ????        try {            logTransactionState(xid, TransactionState.PREPARED);        } catch (Exception exception) {            throw new XAException("Error processing prepare : " + exception);        }        return XAResource.XA_OK;    }    /**     * Inform the resource manager to roll back work done on behalf of a     * transaction branch     *     * @throws XAException - if there is a problem completing the call     */    public synchronized Xid[] recover(int flag)            throws XAException {        Xid[] result = new Xid[0];        if ((flag == XAResource.TMNOFLAGS) ||                (flag == XAResource.TMSTARTRSCAN) ||                (flag == XAResource.TMENDRSCAN)) {            LinkedList xids = new LinkedList();            Iterator iter = _activeTransactions.keySet().iterator();            while (iter.hasNext()) {                Xid xid = (Xid) iter.next();                LinkedList list = (LinkedList) _activeTransactions.get(xid);                if (list.size() > 1) {                    // need at least a start in the chain.                    Object last = list.getLast();                    if ((last instanceof StateTransactionLogEntry)                            &&                            (((StateTransactionLogEntry) last).getState()                            .isPrepared())) {                        xids.add(xid);                    }                }            }            result = (Xid[]) xids.toArray();        }        return result;    }    /**     * Set the current transaction timeout value for this XAResource instance.     *     * @throws XAException - if there is a problem completing the call     */    public synchronized void rollback(Xid id)            throws XAException {        //check the xid is not null        if (id == null) {            throw new XAException(XAException.XAER_NOTA);        }        // covert to our internal representation of an xid        ExternalXid xid = new ExternalXid(id);        // check to see that the xid actually exists        if (!isTransactionActive(xid)) {            throw new XAException(XAException.XAER_PROTO);        }        // process the data in that transaction. If it was a published message        // then drop it. If it was a consumed message then return it back to        // the destination.        try {            // retrieve a list of recrods for the specified global transaction            // and process them. Ignore the state records and only process the            // data records, which are of type TransacitonalObjectWrapper.            Object[] records = getTransactionRecords(xid, _rid);            for (int index = 0; index < records.length; index++) {                if (records[index] instanceof TransactionalObjectWrapper) {                    TransactionalObjectWrapper wrapper =                            (TransactionalObjectWrapper) records[index];                    if (wrapper.isPublishedMessage()) {                        // we don't need to process these messages since the                        // global transaction has been rolled back.                    } else if (wrapper.isReceivedMessage()) {                        ReceivedMessageWrapper rmsg_wrapper =                                (ReceivedMessageWrapper) wrapper;                        MessageHandle handle =                                (MessageHandle) rmsg_wrapper.getObject();                        JmsDestination dest = handle.getDestination();                        DestinationCache cache =                                _destinations.getDestinationCache(dest);                        cache.returnMessageHandle(handle);                    }                } else {                    // ignore since it is a state records.                }            }        } catch (Exception exception) {            throw new XAException("Failed in ResourceManager.rollback : " +                                  exception.toString());        } finally {            // and now mark the transaction as closed            try {                logTransactionState(xid, TransactionState.CLOSED);            } catch (Exception exception) {                throw new XAException(                        "Error processing rollback : " + exception);            }        }    }    /**     * Start work on behalf of a transaction branch specified in xid If TMJOIN     * is specified, the start is for joining a transaction previously seen by     * the resource manager     *     * @throws XAException - if there is a problem completing the call     */    public synchronized boolean setTransactionTimeout(int seconds)            throws XAException {        _txExpiryTime = seconds;        return true;    }    // implementation of XAResource.start    public synchronized void start(Xid id, int flags)            throws XAException {        //check the xid is not null        if (id == null) {            throw new XAException(XAException.XAER_NOTA);        }        // covert to our internal representation of an xid        ExternalXid xid = new ExternalXid(id);        // check that the flags are valid for this method        if ((flags != XAResource.TMNOFLAGS) ||                (flags != XAResource.TMJOIN) ||                (flags != XAResource.TMRESUME)) {            throw new XAException(XAException.XAER_PROTO);        }        switch (flags) {            case XAResource.TMNOFLAGS:                // check to see that the xid does not already exist                if (isTransactionActive(xid)) {                    throw new XAException(XAException.XAER_DUPID);                }                // otherwise log the start of the transaction                try {                    logTransactionState(xid, TransactionState.OPENED);                } catch (Exception exception) {                    throw new XAException(                            "Error processing start : " + exception);                }                break;            case XAResource.TMJOIN:            case XAResource.TMRESUME:                // joining a transaction previously seen by the resource                // manager                if (!isTransactionActive(xid)) {                    throw new XAException(XAException.XAER_PROTO);                }                break;        }    }    /**     * Return the resource manager identity     *     * @return the resource manager identity     */    public String getResourceManagerId() {        return _rid;    }    /**     * Create the next {@link TransactionLog} and add it to the list of managed     * transaction logs.     * <p/>     * The method will throw ResourceManagerException if there is a problem     * completing the request.     *     * @throws ResourceManagerException     */    protected TransactionLog createNextTransactionLog()            throws ResourceManagerException {        TransactionLog newlog = null;        synchronized (_logs) {            try {                // get the last log number                long last = 1;                if (!_logs.isEmpty()) {                    last                            = getSequenceNumber(                                    ((TransactionLog) _logs.last()).getName());                }                // now that we have the last log number, increment it and use                // it to build the name of the next log file.                String name = _logDirectory                        + System.getProperty("file.separator") +                        RM_LOGFILE_PREFIX + Long.toString(++last)                        + RM_LOGFILE_EXTENSION;                // create a transaction log and add it to the collection                newlog = new TransactionLog(name, true);                _logs.add(newlog);            } catch (TransactionLogException exception) {                throw new ResourceManagerException(                        "Error in createNextTransactionLog " + exception);            }        }        return newlog;    }    /**     * Build a list of all log files in the specified log directory     *     * @throws IllegalArgumentException - if the directory does not exist.     */    protected void buildLogFileList() {        File dir = new File(_logDirectory);        if ((!dir.exists()) ||                (!dir.isDirectory())) {            throw new IllegalArgumentException(_logDirectory +                                               " is not a directory");        }        try {            File[] list = dir.listFiles(new FilenameFilter() {                // implementation of FilenameFilter.accept                public boolean accept(File dir, String name) {                    boolean result = false;                    if ((name.startsWith(RM_LOGFILE_PREFIX)) &&                            (name.endsWith(RM_LOGFILE_EXTENSION))) {                        result = true;                    }                    return result;                }            });            // add the files to the list            synchronized (_logs) {                for (int index = 0; index < list.length; index++) {                    _logs.add(new TransactionLog(list[index].getPath(), false));                }            }        } catch (Exception exception) {            // replace this with the exception strategy            exception.printStackTrace();        }    }    /**     * This method will process all the transaction logs, in the log diretory     * and call recover on each of them.     *     * @throws ResourceManagerException - if there is a problem recovering     */    private synchronized void recover()            throws ResourceManagerException {        try {            if (!_logs.isEmpty()) {                Iterator iter = _logs.iterator();                while (iter.hasNext()) {                    TransactionLog log = (TransactionLog) iter.next();                    HashMap records = log.recover();                }            }        } catch (Exception exception) {            throw new ResourceManagerException("Error in recover " +                                               exception.toString());        }    }    /**     * Retrieve the transaction log for the specified transaction id     *     * @param txid - the transaction identity     * @return TransactionLog     * @throws TransactionLogException  - if there is tx log exception     * @throws ResourceManagerException - if there is a resource problem.     */    private TransactionLog getTransactionLog(ExternalXid txid)            throws TransactionLogException, ResourceManagerException {        TransactionLog log = (TransactionLog) _tridToLogCache.get(txid);        if (log == null) {            log = getCurrentTransactionLog();            addTridLogEntry(txid, log);        }        return log;    }    /**     * Get the current transaction log. It will check the last transaction log     * opened by the resource manager and determine whether there is space     * enough to process another transaction.     * <p/>     * If there is space enough then it will return that transaction, otherwise     * it will create a new transaction log for the resource     *     * @return TransactionLog - the transaction log to use     * @throws ResourceManagerException     * @throws TransactionLogException     */    private TransactionLog getCurrentTransactionLog()            throws TransactionLogException, ResourceManagerException {        TransactionLog log = null;        synchronized (_logs) {            if (_logs.size() > 0) {                log = (TransactionLog) _logs.last();            }            if ((log == null) ||                    (log.size() > _logFileSize)) {                log = createNextTransactionLog();            }        }        return log;    }    /**     * Add an entry to the trid log cache table for the specified trid and     * transaction log mapping.     *     * @param trid - the transaction identifier     * @param log  - the transaction log     */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?