⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 resourcemanager.java

📁 实现了Jms的服务器源码,支持多种适配器,DB,FTP,支持多种数据库
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
    }

    /**
     * Ask the resource manager to prepare for a transaction commit of the
     * transaction specified in xid
     *
     * @param xid - the xa transaction identity
     * @return int - XA_RDONLY or XA_OK
     * @throws XAException - if there is a problem completing the call
     */
    public synchronized boolean isSameRM(XAResource xares)
        throws XAException {
        boolean result = false;

        if ((xares == this) ||
            ((xares instanceof ResourceManager) &&
            (((ResourceManager) xares)._rid.equals(_rid)))) {
            result = true;
        }

        return result;
    }

    /**
     * Obtain a list of prepared transaction branches from a resource manager.
     * The transaction manager calls this method during recovery to obtain the
     * list of transaction branches that are currently in prepared or
     * heuristically completed states.
     *
     * @param flag - One of TMSTARTRSCAN, TMENDRSCAN, TMNOFLAGS. TMNOFLAGS
     * @param Xid[] - the set of Xids to recover
     * @throws XAException - if there is a problem completing the call
     */
    public synchronized int prepare(Xid id)
        throws XAException {
        //check the xid is not null
        if (id == null) {
            throw new XAException(XAException.XAER_NOTA);
        }

        // covert to our internal representation of an xid
        ExternalXid xid = new ExternalXid(id);

        // check to see that the xid actually exists
        if (!isTransactionActive(xid)) {
            throw new XAException(XAException.XAER_PROTO);
        }

        // can a prepare for the same resource occur multiple times
        // ????

        try {
            logTransactionState(xid, TransactionState.PREPARED);
        } catch (Exception exception) {
            throw new XAException("Error processing prepare : " + exception);
        }

        return XAResource.XA_OK;
    }

    /**
     * Inform the resource manager to roll back work done on behalf of a
     * transaction branch
     *
     * @param xid - the xa transaction identity
     * @throws XAException - if there is a problem completing the call
     */
    public synchronized Xid[] recover(int flag)
        throws XAException {

        Xid[] result = new Xid[0];

        if ((flag == XAResource.TMNOFLAGS) ||
            (flag == XAResource.TMSTARTRSCAN) ||
            (flag == XAResource.TMENDRSCAN)) {
            LinkedList xids = new LinkedList();
            Iterator iter = _activeTransactions.keySet().iterator();
            while (iter.hasNext()) {
                Xid xid = (Xid) iter.next();
                LinkedList list = (LinkedList) _activeTransactions.get(xid);
                if (list.size() > 1) {
                    // need at least a start in the chain.
                    Object last = list.getLast();
                    if ((last instanceof StateTransactionLogEntry) &&
                        (((StateTransactionLogEntry) last).getState().isPrepared())) {
                        xids.add(xid);
                    }
                }

            }
            result = (Xid[]) xids.toArray();
        }

        return result;
    }

    /**
     * Set the current transaction timeout value for this XAResource instance.
     *
     * @param seconds - timeout in seconds
     * @return boolean - true if the new transaction timeout was accepted
     * @throws XAException - if there is a problem completing the call
     */
    public synchronized void rollback(Xid id)
        throws XAException {
        //check the xid is not null
        if (id == null) {
            throw new XAException(XAException.XAER_NOTA);
        }

        // covert to our internal representation of an xid
        ExternalXid xid = new ExternalXid(id);

        // check to see that the xid actually exists
        if (!isTransactionActive(xid)) {
            throw new XAException(XAException.XAER_PROTO);
        }

        // process the data in that transaction. If it was a published message
        // then drop it. If it was a consumed message then return it back to
        // the destination.
        Connection connection = null;
        try {
            // get a connection to the database
            connection = DatabaseService.getConnection();

            // retrieve a list of recrods for the specified global transaction
            // and process them. Ignore the state records and only process the
            // data records, which are of type TransacitonalObjectWrapper.
            Object[] records = getTransactionRecords(xid, _rid);
            for (int index = 0; index < records.length; index++) {
                if (records[index] instanceof TransactionalObjectWrapper) {
                    TransactionalObjectWrapper wrapper =
                        (TransactionalObjectWrapper) records[index];
                    if (wrapper.isPublishedMessage()) {
                        // we don't need to process these messages since the
                        // global transaction has been rolled back.
                    } else if (wrapper.isReceivedMessage()) {
                        // use the ConsumerManager to retrieve an instance of
                        // of the ConsumerEndpoint and then return the message
                        // back to the consumer.
                        ReceivedMessageWrapper rmsg_wrapper =
                            (ReceivedMessageWrapper) wrapper;
                        ConsumerEndpoint endpoint =
                            ConsumerManager.instance().getConsumerEndpoint(
                                rmsg_wrapper.getConsumerId());
                        if (endpoint != null) {
                            // the endpoint exists so we must return the message
                            // back to it.
                            endpoint.returnMessage(
                                (MessageHandle) rmsg_wrapper.getObject());
                        } else {
                            // endpoint no longer exists so we can ignore this
                        }
                    }
                } else {
                    // ignore since it is a state records.
                }
            }

            connection.commit();
        } catch (PersistenceException exception) {
            if (connection != null) {
                try {
                    connection.rollback();
                } catch (Exception nested) {
                    // ignore
                }
            }
            throw new XAException("Failed in ResourceManager.rollback : " +
                exception.toString());
        } catch (Exception exception) {
            throw new XAException("Failed in ResourceManager.rollback : " +
                exception.toString());
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception nested) {
                    // ignore
                }
            }

            // and now mark the transaction as closed
            try {
                logTransactionState(xid, TransactionState.CLOSED);
            } catch (Exception exception) {
                throw new XAException("Error processing rollback : " + exception);
            }
        }
    }

    /**
     * Start work on behalf of a transaction branch specified in xid If TMJOIN
     * is specified, the start is for joining a transaction previously seen by
     * the resource manager
     *
     * @param xid - the xa transaction identity
     * @param flags - One of TMNOFLAGS, TMJOIN, or TMRESUME
     * @throws XAException - if there is a problem completing the call
     */
    public synchronized boolean setTransactionTimeout(int seconds)
        throws XAException {
        _txExpiryTime = seconds;
        return true;
    }

    // implementation of XAResource.start
    public synchronized void start(Xid id, int flags)
        throws XAException {

        //check the xid is not null
        if (id == null) {
            throw new XAException(XAException.XAER_NOTA);
        }

        // covert to our internal representation of an xid
        ExternalXid xid = new ExternalXid(id);

        // check that the flags are valid for this method
        if ((flags != XAResource.TMNOFLAGS) ||
            (flags != XAResource.TMJOIN) ||
            (flags != XAResource.TMRESUME)) {
            throw new XAException(XAException.XAER_PROTO);
        }

        switch (flags) {
            case XAResource.TMNOFLAGS:
                // check to see that the xid does not already exist
                if (isTransactionActive(xid)) {
                    throw new XAException(XAException.XAER_DUPID);
                }

                // otherwise log the start of the transaction
                try {
                    logTransactionState(xid, TransactionState.OPENED);
                } catch (Exception exception) {
                    throw new XAException("Error processing start : " + exception);
                }
                break;

            case XAResource.TMJOIN:
            case XAResource.TMRESUME:
                // joining a transaction previously seen by the resource
                // manager
                if (!isTransactionActive(xid)) {
                    throw new XAException(XAException.XAER_PROTO);
                }
                break;
        }
    }

    // override ServiceManager.start
    public void start()
        throws ServiceException {
        this.setState(ServiceState.RUNNING);
    }

    // override ServiceManager.stop
    public void stop()
        throws ServiceException {
        this.setState(ServiceState.STOPPED);
    }

    // override ServiceManager.run
    public void run() {
        // do nothing
    }

    /**
     * Return the resource manager identity
     *
     * @return the resource manager identity
     */
    public String getResourceManagerId() {
        return _rid;
    }

    /**
     * Create the next {@link TransactionLog} and add it to the list of
     * managed transaction logs.
     * <p>
     * The method will throw ResourceManagerException if there is a
     * problem completing the request.
     *
     * @throws ResourceManagerException
     */
    protected TransactionLog createNextTransactionLog()
        throws ResourceManagerException {
        TransactionLog newlog = null;

        synchronized (_logs) {
            try {
                // get the last log number
                long last = 1;
                if (!_logs.isEmpty()) {
                    last = getSequenceNumber(((TransactionLog) _logs.last()).getName());
                }

                // now that we have the last log number, increment it and use
                // it to build the name of the next log file.
                String name = _logDirectory + System.getProperty("file.separator") +
                    RM_LOGFILE_PREFIX + Long.toString(++last) + RM_LOGFILE_EXTENSION;

                // create a transaction log and add it to the collection
                newlog = new TransactionLog(name, true);
                _logs.add(newlog);
            } catch (TransactionLogException exception) {
                throw new ResourceManagerException(
                    "Error in createNextTransactionLog " + exception);
            }
        }

        return newlog;
    }

    /**
     * Build a list of all log files in the specified log directory
     *
     * @throws IllegalArgumentException - if the directory does not exist.
     */
    protected void buildLogFileList() {
        File dir = new File(_logDirectory);
        if ((!dir.exists()) ||
            (!dir.isDirectory())) {
            throw new IllegalArgumentException(_logDirectory +
                " is not a directory");
        }

        try {
            File[] list = dir.listFiles(new FilenameFilter() {

                // implementation of FilenameFilter.accept
                public boolean accept(File dir, String name) {
                    boolean result = false;

                    if ((name.startsWith(RM_LOGFILE_PREFIX)) &&
                        (name.endsWith(RM_LOGFILE_EXTENSION))) {
                        result = true;
                    }

                    return result;
                }
            });

            // add the files to the list
            synchronized (_logs) {
                for (int index = 0; index < list.length; index++) {
                    _logs.add(new TransactionLog(list[index].getPath(), false));
                }
            }
        } catch (Exception exception) {
            // replace this with the exception strategy
            exception.printStackTrace();
        }

    }

    /**
     * This method will process all the transaction logs, in the log diretory
     * and call recover on each of them.
     *
     * @throws ResourceManagerException - if there is a problem recovering
     */
    private synchronized void recover()
        throws ResourceManagerException {
        try {
            if (!_logs.isEmpty()) {
                Iterator iter = _logs.iterator();
                while (iter.hasNext()) {
                    TransactionLog log = (TransactionLog) iter.next();
                    HashMap records = log.recover();
                }
            }
        } catch (Exception exception) {
            throw new ResourceManagerException("Error in recover " +
                exception.toString());
        }
    }

    /**
     * Retrieve the transaction log for the specified transaction id
     *
     * @param txid - the transaction identity

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -