⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jmssession.java

📁 一个java方面的消息订阅发送的源码
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
            // have registered listeners...bit confusing but this is what
            // I believe it should do
            if (_listener != null) {
                _listener.onMessage(message);
            } else {
                // send it to the appropriate consumer
                consumer.onMessage(message);
            }
        } else {
            // consumer no longer active...so drop the message
            _log.error("Received a message for an inactive consumer");
        }
    }

    /**
     * Fetch the next message for this client. If the session's ackMode is
     * client acknowledge then set the session for the message, othwerwise ack
     * the message before returning it.
     *
     * @param consumerId the consumer identififer
     * @param wait       the maximum time to wait for a message, in
     *                   milliseconds. If <code>-1</code>, don't wait, if
     *                   <code>0</code> wait indefinitely, otherwise wait the
     *                   specified time.
     * @return the received message, or <code>null</code>, if no message is
     *         available
     * @throws JMSException if an error occurs retrieving the message
     */
    public Message retrieveMessage(long consumerId, long wait)
            throws JMSException {

        ensureOpen();

        boolean breakOnNextRead = false;
        long start = System.currentTimeMillis();
        long end = start + wait;
        MessageImpl message = null;
        while (true) {
            synchronized (_receiveLock) {
                if (_closing || _closed) {
                    // session is in the process of closing, or has been
                    // closed. Need to return null.
                    break;
                } else if (_stopped) {
                    // connection has been stopped. No message can be returned,
                    // but receives continue to time out
                } else {
                    // connection is started. Messages may be returned.
                    message = (MessageImpl) getServerSession().receive(
                            consumerId, wait);
                }
                if (message != null) {
                    message.setSession(this);
                    break;
                } else {
                    // if we have instructed to break, then exit the loop.
                    if (breakOnNextRead) {
                        break;
                    }

                    // no message was received. Block for the specified time
                    // until one of the following occurs:
                    // . a message is received
                    // . the receive times out
                    // . the session is closed
                    if (wait >= 0) {
                        try {
                            if (wait > 0) {
                                // wait for a specific period of time
                                _receiveLock.wait(wait);
                                long current = System.currentTimeMillis();
                                if (current >= end) {
                                    breakOnNextRead = true;
                                } else {
                                    // update the time to wait. If the value
                                    // is zero then break on the next read
                                    wait = end - current;
                                    if (wait == 0) {
                                        breakOnNextRead = true;
                                    }
                                }
                            } else {
                                // wait indefinitely
                                _receiveLock.wait();
                            }
                        } catch (InterruptedException ignore) {
                            // no-op
                        }
                    } else {
                        // exit the loop since the client is performing a non
                        // blocking read
                        break;
                    }
                }
            }
        }

        return message;
    }

    /**
     * Browse up to count messages.
     *
     * @param consumerId the consumer identifier
     * @param count      the maximum number of messages to receive
     * @return a list of {@link MessageImpl} instances
     * @throws JMSException for any JMS error
     */
    public synchronized List browse(long consumerId, int count)
            throws JMSException {
        ensureOpen();
        return getServerSession().browse(consumerId, count);
    }

    /**
     * Send the specified message to the server.
     *
     * @param message the message to send
     * @throws JMSException if the message can't be sent
     */
    protected synchronized void sendMessage(Message message)
            throws JMSException {

        if (_transacted) {
            // if the session is transacted then cache the message locally.
            // and wait for a commit or a rollback
            if (message instanceof MessageImpl) {
                try {
                    message = (Message) ((MessageImpl) message).clone();
                } catch (CloneNotSupportedException error) {
                    throw new JMSException(error.getMessage());
                }
            } else {
                message = convert(message);
            }
            _messagesToSend.add(message);
        } else {
            if (!(message instanceof MessageImpl)) {
                message = convert(message);
            }
            getServerSession().send((MessageImpl) message);
        }
    }

    /**
     * Returns the server session.
     *
     * @return the server session
     */
    protected ServerSession getServerSession() {
        return _session;
    }

    /**
     * Return a reference to the connection that created this session.
     *
     * @return the owning connection
     */
    protected JmsConnection getConnection() {
        return _connection;
    }

    /**
     * Creates a new message consumer, returning its identity.
     *
     * @param destination the destination to access
     * @param selector    the message selector. May be <code>null</code>
     * @param noLocal     if true, and the destination is a topic, inhibits the
     *                    delivery of messages published by its own connection.
     *                    The behavior for <code>noLocal</code> is not specified
     *                    if the destination is a queue.
     * @throws JMSException                if the session fails to create a
     *                                     MessageConsumer due to some internal
     *                                     error.
     * @throws InvalidDestinationException if an invalid destination is
     *                                     specified.
     * @throws InvalidSelectorException    if the message selector is invalid.
     */
    protected long allocateConsumer(Destination destination,
                                      String selector, boolean noLocal)
            throws JMSException {
        ensureOpen();

        if (!(destination instanceof JmsDestination)) {
            throw new InvalidDestinationException(
                    "Cannot create MessageConsumer for destination="
                     + destination);
        }
        JmsDestination dest = (JmsDestination) destination;

        // check to see if the destination is temporary. A temporary destination
        // can only be used within the context of the owning connection
        if (!checkForValidTemporaryDestination(dest)) {
            throw new InvalidDestinationException(
                    "Trying to create a MessageConsumer for a temporary "
                    + "destination that is not bound to this connection");
        }

        long consumerId = _session.createConsumer(dest, selector, noLocal);
        return consumerId;
    }

    /**
     * This method checks the destination. If the destination is not temporary
     * then return true. If it is a temporary destination and it is owned by
     * this session's connection then it returns true. If it is a tmeporary
     * destination and it is owned by another connection then it returns false
     *
     * @param destination the destination to check
     * @return <code>true</code> if the destination is valid
     */
    protected boolean checkForValidTemporaryDestination(
            JmsDestination destination) {
        boolean result = false;

        if (destination.isTemporaryDestination()) {
            JmsTemporaryDestination temp =
                    (JmsTemporaryDestination) destination;

            // check  that this temp destination is owned by the session's
            // connection.
            if (temp.validForConnection(getConnection())) {
                result = true;
            }
        } else {
            result = true;
        }

        return result;
    }

    /**
     * Add a consumer to the list of consumers managed by this session.
     *
     * @param consumer the consumer to add
     */
    protected void addConsumer(JmsMessageConsumer consumer) {
        _consumers.put(new Long(consumer.getConsumerId()), consumer);
    }

    /**
     * Remove a consumer, deregistering it on the server.
     *
     * @param consumer the consumer to remove
     * @throws JMSException if removal fails
     */
    protected void removeConsumer(JmsMessageConsumer consumer)
            throws JMSException {
        long consumerId = consumer.getConsumerId();
        try {
            if (!(consumer instanceof JmsQueueBrowser)) {
                removeMessageListener(consumer);
            }
            _session.removeConsumer(consumerId);
        } finally {
            _consumers.remove(new Long(consumerId));
        }
    }

    /**
     * Add a producer to the list of producers managed by this session.
     *
     * @param producer the producer to add
     */
    protected void addProducer(JmsMessageProducer producer) {
        _producers.add(producer);
    }

    /**
     * Remove the producer from the list of managed producers.
     *
     * @param producer the producer to remove
     */
    protected void removeProducer(JmsMessageProducer producer) {
        _producers.remove(producer);
    }

    /**
     * Check if the session is closed.
     *
     * @return <code>true</code> if the session is closed
     */
    protected final boolean isClosed() {
        return _closed;
    }

    /**
     * Add a message to the message cache. This message will be processed when
     * the run() method is called.
     *
     * @param message the message to add.
     */
    protected void addMessage(Message message) {
        _messageCache.add(message);
    }

    /**
     * Verifies that the session isn't closed.
     *
     * @throws IllegalStateException if the session is closed
     */
    protected void ensureOpen() throws IllegalStateException {
        if (_closed) {
            throw new IllegalStateException(
                    "Cannot perform operation - session has been closed");
        }
    }

    /**
     * Verifies that the session is transactional.
     *
     * @throws IllegalStateException if the session isn't transactional
     */
    private void ensureTransactional() throws IllegalStateException {
        if (!_transacted) {
            throw new IllegalStateException(
                    "Cannot perform operatiorn - session is not transactional");
        }
    }

    /**
     * Notifies any blocking synchronous consumers.
     */
    private void notifyConsumers() {
        synchronized (_receiveLock) {
            _receiveLock.notifyAll();
        }
    }

    /**
     * Convert a message to its corresponding OpenJMS implementation.
     *
     * @param message the message to convert
     * @return the OpenJMS implementation of the message
     * @throws JMSException for any error
     */
    private Message convert(Message message) throws JMSException {
        MessageConverter converter =
                MessageConverterFactory.create(message);
        return converter.convert(message);
    }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -