⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jmsserversession.java

📁 一个java方面的消息订阅发送的源码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
                ConsumerEndpoint consumer = (ConsumerEndpoint) consumers.next();
                ConsumerManager.instance().deleteConsumerEndpoint(consumer);
            }

            // clear the unacked message cache
            _sentMessageCache.clear();

            // clear the consumers
            _consumers.clear();

            // de-register the session from the connection
            _connection.closed(this);
        } else {
            if (_log.isDebugEnabled()) {
                _log.debug("close() [session=" + this +
                           "]: session already closed");
            }
        }
    }

    /**
     * Send the specified message to the client
     *
     * @param handle a handle to the message
     * @throws JMSException    if the message can't be resolved from the handle
     * @throws RemoteException if the message can't be delivered to the client
     */
    public void onMessage(MessageHandle handle) throws JMSException,
            RemoteException {
        if (_listener != null) {
            MessageImpl message = handle.getMessage();
            MessageImpl m = null;

            // get the message. It may be null if it has expired
            if (message != null) {
                try {
                    m = (MessageImpl) message.clone();
                } catch (CloneNotSupportedException exception) {
                    throw new JMSException(exception.toString());
                }

                m.setConsumerId(handle.getConsumerId());
                m.setJMSRedelivered(handle.getDelivered());

                // if we are acking the message and the session is
                // transacted and the acknowledge mode is
                // CLIENT_ACKNOWLEDGE then send it to the cache before
                // we send it to the listener. This will enable clients
                // to ack the message while in the onMessage method
                if (_transacted || (_ackMode == Session.CLIENT_ACKNOWLEDGE)) {
                    _sentMessageCache.process(handle);
                }

                try {
                    // send the message to the listener.
                    _listener.onMessage(m);

                    // if the session is not transacted or the acknowledge mode
                    // is not CLIENT_ACKNOWLEDGE then process it through the
                    // sent message cache now.
                    if (!_transacted &&
                            (_ackMode != Session.CLIENT_ACKNOWLEDGE)) {
                        _sentMessageCache.process(handle);
                    }
                } catch (RemoteException exception) {
                    // close all resources and rethrow it
                    close();
                    throw exception;
                }
            }
        } else {
            _log.error("Failed to stop async consumer endpoints?");
        }
    }

    /**
     * Notifies that a message is available for a particular consumer
     *
     * @param consumerId the identity of the message consumer
     * @throws RemoteException if the session can't be notified
     */
    public void onMessageAvailable(long consumerId) throws RemoteException {
        _listener.onMessageAvailable(consumerId);
    }

    /**
     * Call recover on all registered consumers. This will cause all
     * unacknowledged messages to be redelivered. Before we recover we need to
     * stop messages delivery. We then need to start redelivery when the
     * recovery has been completed
     *
     * @throws JMSException if the session can't be recovered
     */
    public void recover() throws JMSException {
        // stop message delivery
        stop();

        // clear the messages in the sent message cache
        _sentMessageCache.clear();

        // restart message delivery
        start();
    }

    /**
     * Commit this session, which will acknowledge all sent messages for all
     * consumers.
     *
     * @throws JMSException - if there are any problems
     */
    public void commit() throws JMSException {
        try {
            _sentMessageCache.acknowledgeAllMessages();
        } catch (OutOfMemoryError exception) {
            String msg =
                    "Failed to commit transaction due to out-of-memory error";
            _log.error(msg, exception);
            throw new JMSException(msg);
        }
    }

    /**
     * Abort, will return all unacked messages to their respective endpoints, if
     * they are still active.
     *
     * @throws JMSException - if there are any problems
     */
    public void rollback() throws JMSException {
        _sentMessageCache.clear();
    }

    // implementation of XAResource.setTransactionTimeout
    public void start(Xid xid, int flags) throws XAException {
        try {
            ResourceManager.instance().start(xid, flags);

            // set this as the current xid for this session
            _xid = xid;
        } catch (ResourceManagerException exception) {
            throw new XAException("Failed in start " + exception);
        }
    }

    // implementation XAResource.isSame
    public int prepare(Xid xid) throws XAException {
        try {
            return ResourceManager.instance().prepare(xid);
        } catch (ResourceManagerException exception) {
            throw new XAException("Failed in prepare " + exception);
        }
    }

    // implementation XAResource.commit
    public void commit(Xid xid, boolean onePhase) throws XAException {
        try {
            ResourceManager.instance().commit(xid, onePhase);
        } catch (ResourceManagerException exception) {
            throw new XAException("Failed in commit " + exception);
        } finally {
            _xid = null;
        }
    }

    // implementation of XAResource.end
    public void end(Xid xid, int flags) throws XAException {
        try {
            ResourceManager.instance().end(xid, flags);
        } catch (ResourceManagerException exception) {
            throw new XAException("Failed in end " + exception);
        } finally {
            _xid = null;
        }
    }

    // implementation of XAResource.forget
    public void forget(Xid xid) throws XAException {
        try {
            ResourceManager.instance().forget(xid);
        } catch (ResourceManagerException exception) {
            throw new XAException("Failed in forget " + exception);
        } finally {
            _xid = null;
        }
    }

    // implementation of XAResource.prepare
    public Xid[] recover(int flag) throws XAException {
        try {
            return ResourceManager.instance().recover(flag);
        } catch (ResourceManagerException exception) {
            throw new XAException("Failed in recover " + exception);
        }
    }

    // implementation of XAResource.recover
    public void rollback(Xid xid) throws XAException {
        try {
            ResourceManager.instance().rollback(xid);
        } catch (ResourceManagerException exception) {
            throw new XAException("Failed in rollback " + exception);
        } finally {
             // clear the current xid
            _xid = null;
        }
    }

    // implementation of XAResource.getTransactionTimeout
    public int getTransactionTimeout() throws XAException {
        try {
            return ResourceManager.instance().getTransactionTimeout();
        } catch (ResourceManagerException exception) {
            throw new XAException("Failed in getTransactionTimeout " +
                                  exception);
        }
    }

    // implementation of XAResource.isSameRM
    public boolean isSameRM(XAResource xares) throws XAException {
        return true;
    }


    // implementation of XAResource.rollback
    public boolean setTransactionTimeout(int seconds) throws XAException {
        try {
            return ResourceManager.instance().setTransactionTimeout(seconds);
        } catch (ResourceManagerException exception) {
            throw new XAException("Failed in setTransactionTimeout "
                                  + exception);
        }
    }

    /**
     * Return the xid that is currently associated with this session or null if
     * this session is currently not part of a global transactions
     *
     * @return Xid
     */
    public Xid getXid() {
        return _xid;
    }

    /**
     * Return the identity of the {@link ResourceManager}. The transaction
     * manager should be the only one to initiating this call.
     *
     * @return the identity of the resource manager
     * @throws XAException - if it cannot retrieve the rid.
     */
    public String getResourceManagerId() throws XAException {
        try {
            return ResourceManager.instance().getResourceManagerId();
        } catch (ResourceManagerException exception) {
            throw new XAException("Failed in getResourceManagerId "
                                  + exception);
        }
    }

    /**
     * Determines if the session is transacted
     *
     * @return <code>true</code> if the session is transacted
     */
    public boolean isTransacted() {
        return _transacted;
    }

    /**
     * Returns the message acknowledgement mode for the session
     */
    public int getAckMode() {
        return _ackMode;
    }

    /**
     * Returns the consumer endpoint given its identifier
     *
     * @param consumerId the consumer identifier
     * @return the consumer endpoint corresponding to <code>consumerId</code>,
     *         or <code>null</code> if none exists
     */
    public ConsumerEndpoint getConsumerEndpoint(long consumerId) {
        return (ConsumerEndpoint) _consumers.get(new Long(consumerId));
    }

    /**
     * This method is used to stop and restart the session. Stopping the session
     * should stop all message delivery to session consumers
     *
     * @param stop - true if we need to stop the session, false otherwise
     */
    private void pause(boolean stop) {
        Iterator iter = _consumers.values().iterator();
        while (iter.hasNext()) {
            ((ConsumerEndpoint) iter.next()).setStopped(stop);
        }
    }

    /**
     * Check the delivery mode of the message. If the delivery mode is
     * persistent and the destination is non-administered then change the
     * delivery mode to non-persistent so that it can be processed correctly by
     * the server
     *
     * @param message - the message to check
     * @throws JMSException - propagate JMSException to client
     */
    private void checkDeliveryMode(MessageImpl message) throws JMSException {
        if ((message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT)
                &&
                (!DestinationManager.instance()
                .isMessageForAdministeredDestination(message))) {
            message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
        }
    }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -