⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jmsserversession.java

📁 实现了Jms的服务器源码,支持多种适配器,DB,FTP,支持多种数据库
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
                }
            } else {
                throw new JMSException(
                    "Could not get message for handle " + handle,
                    JMSErrorCodes.FailedToResolveHandle);
            }
        }
    }

    // implementation of InternalMessageListener.onMessage
    public void onMessages(Vector handles) throws Exception {
        _log.error("Illegal to call onMessage");
        Thread.currentThread().dumpStack();
    }

    // implementation of InternalMessageListener.onMessageAvailable
    public void onMessageAvailable(long clientId) throws Exception {
        _listener.onMessageAvailable(clientId);
    }

    /**
     * This will send a null message down the connection to the client to
     * test whether the client endpoint is alive.
     *
     * @return <code>true</code> if it is active, otherwise <code>false</code>
     */
    public boolean isClientEndpointActive() {
        boolean active = true;
        if (_listener != null) {
            try {
                // send the message to the listener.
                _listener.onMessage(null);
            } catch (ClientDisconnectionException exception) {
                _log.info("Failed to verify that session " + _sessionId
                    + " is active.");
                active = false;
                // ignore the exception
            }
        }

        return active;
    }

    /**
     * Set a message listener for the session. This is the channel used
     * to asynchronously deliver messages to consumers created on this
     * session.
     *
     * @param listener the message listener
     */
    public void setMessageListener(JmsMessageListener listener) {
        _listener = listener;
    }

    /**
     * Check whether to enable asynchronous message delivery for a particular
     * consumer
     *
     * @param clientId the id of the client to check
     * @param id the last processed message
     * @param enable <code>true</code> to enable; <code>false</code> to disable
     */
    public void enableAsynchronousDelivery(long clientId, String id,
                                           boolean enable)
        throws JMSException {
        ConsumerEndpoint consumer = getConsumerEndpoint(clientId);
        if (consumer == null) {
            throw new JMSException(clientId + " is not registered");
        }

        if (enable) {
            consumer.setMessageListener(this);
        } else {
            consumer.setMessageListener(null);
        }
    }

    /**
     * Call recover on all registered consumers. This will cause all
     * unacknowledged messages to be redelivered. Before we recover we
     * need to stop messages delivery. We then need to start redelivery
     * when the recovery has been completed
     *
     * @throws JMSException if the session can't be recovered
     */
    public void recover() throws JMSException {
        // stop message delivery
        stop();

        // iterate over the list of consumers recover them
        Iterator consumers = _consumers.values().iterator();
        while (consumers.hasNext()) {
            ((ConsumerEndpoint) consumers.next()).recover();
        }

        // clear the messages in the sent message cache
        _sentMessageCache.clear();

        // restart message delivery
        start();


    }

    /**
     * Commit this session, which will acknowledge all sent messages for
     * all consumers.
     *
     * @throws JMSException - if there are any problems
     */
    public void commit() throws JMSException {
        try {
            _sentMessageCache.acknowledgeAllMessages();
        } catch (OutOfMemoryError exception) {
            String msg =
                "Failed to commit transaction due to out-of-memory error";
            _log.error(msg, exception);
            throw new JMSException(msg);
        }
    }

    /**
     * Abort, will return all unacked messages to their respective endpoints,
     * if they are still active.
     *
     * @throws JMSException - if there are any problems
     */
    public void rollback()
        throws JMSException {
        _sentMessageCache.clear();
    }

    // implementation XAResource.commit
    public void commit(Xid xid, boolean onePhase) throws XAException {
        try {
            ResourceManager.instance().commit(xid, onePhase);
        } catch (ResourceManagerException exception) {
            throw new XAException("Failed in commit " + exception);
        } finally {
            _xid = null;
        }
    }

    // implementation of XAResource.end
    public void end(Xid xid, int flags) throws XAException {
        try {
            ResourceManager.instance().end(xid, flags);
        } catch (ResourceManagerException exception) {
            throw new XAException("Failed in end " + exception);
        } finally {
            _xid = null;
        }
    }

    // implementation of XAResource.forget
    public void forget(Xid xid) throws XAException {
        try {
            ResourceManager.instance().forget(xid);
        } catch (ResourceManagerException exception) {
            throw new XAException("Failed in forget " + exception);
        } finally {
            _xid = null;
        }
    }

    // implementation of XAResource.getTransactionTimeout
    public int getTransactionTimeout() throws XAException {
        try {
            return ResourceManager.instance().getTransactionTimeout();
        } catch (ResourceManagerException exception) {
            throw new XAException("Failed in getTransactionTimeout " +
                exception);
        }
    }

    // implementation of XAResource.isSameRM
    public boolean isSameRM(XAResource xares) throws XAException {
        return true;
    }

    // implementation XAResource.isSame
    public int prepare(Xid xid) throws XAException {
        try {
            return ResourceManager.instance().prepare(xid);
        } catch (ResourceManagerException exception) {
            throw new XAException("Failed in prepare " + exception);
        }
    }

    // implementation of XAResource.prepare
    public Xid[] recover(int flag) throws XAException {
        try {
            return ResourceManager.instance().recover(flag);
        } catch (ResourceManagerException exception) {
            throw new XAException("Failed in recover " + exception);
        }
    }

    // implementation of XAResource.recover
    public void rollback(Xid xid) throws XAException {
        try {
            ResourceManager.instance().rollback(xid);
        } catch (ResourceManagerException exception) {
            throw new XAException("Failed in rollback " + exception);
        } finally {
            // clear the current xid
            _xid = null;
        }
    }

    // implementation of XAResource.rollback
    public boolean setTransactionTimeout(int seconds) throws XAException {
        try {
            return ResourceManager.instance().setTransactionTimeout(seconds);
        } catch (ResourceManagerException exception) {
            throw new XAException("Failed in setTransactionTimeout "
                + exception);
        }
    }

    // implementation of XAResource.setTransactionTimeout
    public void start(Xid xid, int flags) throws XAException {
        try {
            ResourceManager.instance().start(xid, flags);

            // set this as the current xid for this session
            _xid = xid;
        } catch (ResourceManagerException exception) {
            throw new XAException("Failed in start " + exception);
        }
    }

    /**
     * Return the xid that is currently associated with this session or null
     * if this session is currently not part of a global transactions
     *
     * @return Xid
     */
    public Xid getXid() {
        return _xid;
    }

    /**
     * Return the identity of the {@link ResourceManager}. The transaction
     * manager should be the only one to initiating this call.
     *
     * @return the identity of the resource manager
     * @throws XAException - if it cannot retrieve the rid.
     */
    public String getResourceManagerId() throws XAException {
        try {
            return ResourceManager.instance().getResourceManagerId();
        } catch (ResourceManagerException exception) {
            throw new XAException("Failed in getResourceManagerId "
                + exception);
        }
    }

    /**
     * Determines if the session is transacted
     *
     * @return <code>true</code> if the session is transacted
     */
    public boolean isTransacted() {
        return _transacted;
    }

    /**
     * Returns the message acknowledgement mode for the session
     */
    public int getAckMode() {
        return _ackMode;
    }

    /**
     * Returns the consumer endpoint for the supplied client id
     *
     * @param clientId the identity of the consumer endpoint
     * @return the consumer endpoint corresponding to <code>clientId</code>,
     * or <code>null</code> if none exists
     */
    public ConsumerEndpoint getConsumerEndpoint(long clientId) {
        String identity = Long.toString(clientId);
        return (ConsumerEndpoint) _consumers.get(identity);
    }

    /**
     * This method is used to stop and restart the session. Stopping the
     * session should stop all message delivery to session consumers
     *
     * @param stop - true if we need to stop the session, false otherwise
     */
    private void pause(boolean stop) {
        Iterator iter = _consumers.values().iterator();
        while (iter.hasNext()) {
            ((ConsumerEndpoint) iter.next()).setStopped(stop);
        }
    }

    /**
     * Check the delivery mode of the message. If the delivery mode is
     * persistent and the destination is non-administered then change the
     * delivery mode to non-persistent so that it can be processed correctly
     * by the server
     *
     * @param message - the message to check
     * @throws JMSException - propagate JMSException to client
     */
    private void checkDeliveryMode(MessageImpl message) throws JMSException {
        if ((message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT) &&
            (!DestinationManager.instance().isMessageForAdministeredDestination(message))) {
            message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
        }
    }

} //-- JmsServerSession

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -