⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jmssession.java

📁 一个java方面的消息订阅发送的源码
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
    public TemporaryQueue createTemporaryQueue() throws JMSException {
        ensureOpen();

        JmsTemporaryQueue queue = new JmsTemporaryQueue();
        queue.setOwningConnection(getConnection());
        return queue;
    }

    /**
     * Creates a <code>TemporaryTopic</code> object. Its lifetime will be that
     * of the <code>Connection</code> unless it is deleted earlier.
     *
     * @return a temporary topic identity
     * @throws JMSException if the session fails to create a temporary topic due
     *                      to some internal error.
     */
    public TemporaryTopic createTemporaryTopic() throws JMSException {
        ensureOpen();

        JmsTemporaryTopic topic = new JmsTemporaryTopic();
        topic.setOwningConnection(getConnection());
        return topic;
    }

    /**
     * Unsubscribes a durable subscription that has been created by a client.
     * <p/>
     * <P>This method deletes the state being maintained on behalf of the
     * subscriber by its provider.
     * <p/>
     * <P>It is erroneous for a client to delete a durable subscription while
     * there is an active <code>MessageConsumer</code> or
     * <code>TopicSubscriber</code> for the subscription, or while a consumed
     * message is part of a pending transaction or has not been acknowledged in
     * the session.
     *
     * @param name the name used to identify this subscription
     * @throws JMSException                if the session fails to unsubscribe
     *                                     to the durable subscription due to
     *                                     some internal error.
     * @throws InvalidDestinationException if an invalid subscription name is
     *                                     specified.
     */
    public void unsubscribe(String name) throws JMSException {
        ensureOpen();
        _session.unsubscribe(name);
    }

    /**
     * Commit all messages done in this transaction
     *
     * @throws JMSException if the transaction cannot be committed
     */
    public synchronized void commit() throws JMSException {
        ensureOpen();
        ensureTransactional();

        // send all the cached messages to the server
        getServerSession().send(_messagesToSend);
        _messagesToSend.clear();

        // commit the session
        getServerSession().commit();
    }

    /**
     * Rollback any messages done in this transaction
     *
     * @throws JMSException if the transaction cannot be rolled back
     */
    public synchronized void rollback() throws JMSException {
        ensureOpen();
        ensureTransactional();

        // clear all the cached messages
        _messagesToSend.clear();

        // rollback the session
        getServerSession().rollback();
    }

    /**
     * Close the session. This call will block until a receive or message
     * listener in progress has completed. A blocked message consumer receive
     * call returns <code>null</code> when this session is closed.
     *
     * @throws JMSException if the session can't be closed
     */
    public synchronized void close() throws JMSException {
        if (!_closed) {
            _closing = true;

            // must stop first before we close
            stop();

            // wake up any blocking consumers
            notifyConsumers();

            // go through all the producer and call close on them
            // respectively
            JmsMessageProducer[] producers =
               (JmsMessageProducer[]) _producers.toArray(
                       new JmsMessageProducer[0]);
            for (int i = 0; i < producers.length; ++i) {
                JmsMessageProducer producer = producers[i];
                producer.close();
            }

            // go through all the consumer and call close on them
            // respectively
            JmsMessageConsumer[] consumers =
                    (JmsMessageConsumer[]) _consumers.values().toArray(
                            new JmsMessageConsumer[0]);
            for (int i = 0; i < consumers.length; ++i) {
                JmsMessageConsumer consumer = consumers[i];
                consumer.close();
            }

            // deregister this with the connection
            _connection.removeSession(this);
            _connection = null;

            // clear any cached messages or acks
            _messagesToSend.clear();

            // issue a close to the remote session. This will release any
            // allocated remote resources
            getServerSession().close();
            _session = null;

            // update the session state
            _closed = true;
            _closing = false;
        }
    }

    /**
     * Stop message delivery in this session, and restart sending messages with
     * the oldest unacknowledged message
     *
     * @throws JMSException if the session can't be recovered
     */
    public synchronized void recover() throws JMSException {
        ensureOpen();
        if (!_transacted) {
            // let the server handle the recovery
            getServerSession().recover();
        } else {
            throw new IllegalStateException(
                    "Cannot recover from a transacted session");
        }
    }

    /**
     * Returns the message listener associated with the session
     *
     * @return the message listener associated with the session, or
     *         <code>null</code> if no listener is registered
     * @throws JMSException if the session is closed
     */
    public MessageListener getMessageListener() throws JMSException {
        ensureOpen();
        return _listener;
    }

    /**
     * Sets the session's message listener.
     *
     * @param listener the session's message listener
     * @throws JMSException if the session is closed
     */
    public void setMessageListener(MessageListener listener)
            throws JMSException {
        ensureOpen();
        _listener = listener;
    }

    /**
     * Iterates through the list of messages added by an {@link
     * JmsConnectionConsumer}, sending them to the registered listener
     */
    public void run() {
        try {
            while (!_messageCache.isEmpty()) {
                Message message = (Message) _messageCache.remove(0);
                _listener.onMessage(message);
            }
        } catch (Exception exception) {
            _log.error("Error in the Session.run()", exception);
        } finally {
            // Clear message cache
            _messageCache.clear();
        }
    }

    /**
     * Set the message listener for a particular consumer.
     * <p/>
     * If a listener is already registered for the consumer, it will be
     * automatically overwritten
     *
     * @param listener the message listener
     * @throws JMSException if the listener can't be set
     */
    public void setMessageListener(JmsMessageConsumer listener)
            throws JMSException {
        ensureOpen();
        enableAsynchronousDelivery(listener.getConsumerId(), true);
    }

    /**
     * Remove a message listener
     *
     * @param listener the message listener to remove
     * @throws JMSException if the listener can't be removed
     */
    public void removeMessageListener(JmsMessageConsumer listener)
            throws JMSException {

        ensureOpen();
        enableAsynchronousDelivery(listener.getConsumerId(), false);
    }

    /**
     * This will start message delivery to this session. If message delivery has
     * already started then this is a no-op.
     *
     * @throws JMSException if message delivery can't be started
     */
    public void start() throws JMSException {
        ensureOpen();
        if (_stopped) {
            getServerSession().start();
            _stopped = false;

            // wake up any blocking consumers
            notifyConsumers();
        }
    }

    /**
     * This will stop message delivery to this session. If message delivery has
     * already stoped then this is a no-op.
     *
     * @throws JMSException if message delivery can't be stopped
     */
    public void stop() throws JMSException {
        ensureOpen();
        if (!_stopped) {
            getServerSession().stop();
            _stopped = true;

            // wake up any blocking consumers
            notifyConsumers();
        }
    }

    /**
     * Acknowledge the specified message. This is only applicable for
     * CLIENT_ACKNOWLEDGE sessions. For other session types, the request is
     * ignored.
     * <p/>
     * Acking a message automatically acks all those that have come before it.
     *
     * @param message the message to acknowledge
     * @throws JMSException if the message can't be acknowledged
     */
    public void acknowledgeMessage(Message message) throws JMSException {
        ensureOpen();
        if (_ackMode == Session.CLIENT_ACKNOWLEDGE) {
            MessageImpl impl = (MessageImpl) message;
            getServerSession().acknowledgeMessage(impl.getConsumerId(),
                                                   impl.getAckMessageID());
        }
    }

    /**
     * Enable or disable asynchronous message delivery for the specified
     * client.
     *
     * @param consumerId the consumer identifier
     * @param enable     <code>true</code> to enable; <code>false</code> to
     *                   disable
     * @throws JMSException if message delivery cannot be enabled or disabled
     */
    public void enableAsynchronousDelivery(long consumerId, boolean enable)
            throws JMSException {

        ensureOpen();
        getServerSession().enableAsynchronousDelivery(consumerId, enable);
    }

    /**
     * Asynchronously deliver a message to a <code>MessageConsumer</code>
     *
     * @param message the message to deliver
     */
    public void onMessage(Message message) {
        if (message != null) {
            MessageImpl impl = (MessageImpl) message;
            impl.setJMSXRcvTimestamp(System.currentTimeMillis());

            // dispatch the message;
            execute(message);
        }
    }

    /**
     * Inform the session that there is a message available for a particular
     * consumer
     *
     * @param consumerId the consumer identity
     */
    public void onMessageAvailable(long consumerId) {
        // wake up any blocking consumers
        notifyConsumers();
    }

    /**
     * This is the called to process messages asynchronously delivered by the
     * server. The session is then responsible for delivering it to the
     * appropriate registered consumer. If it cannot resolve the consumer then
     * it must log an exception
     * <p/>
     * If the session has a registered listener then all messages will be
     * delivered to the session's listener instead of the individual consumer
     * message listeners.
     *
     * @param object received message
     */
    public synchronized void execute(Object object) {
        // if the session is closed then drop the object
        if (_closed) {
            _log.error("Received a message for a closed session");
            return;
        }

        MessageImpl message = (MessageImpl) object;
        long consumerId = message.getConsumerId();
        JmsMessageConsumer consumer =
                (JmsMessageConsumer) _consumers.get(new Long(consumerId));

        // tag the session that received this message
        message.setSession(this);
        if (consumer != null) {
            // if a listener is defined for the session then send all the
            // messages to that listener regardless if any consumers are

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -