⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jmssession.java

📁 实现了Jms的服务器源码,支持多种适配器,DB,FTP,支持多种数据库
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
     * otherwise wait the specified time.
     * @return the received message, or <code>null</code>, if no message is
     * available
     * @throws JMSException if an error occurs retrieving the message
     */
    public Message retrieveMessage(long clientId, long wait)
        throws JMSException {

        ensureOpen();

        boolean breakOnNextRead = false;
        long start = System.currentTimeMillis();
        long end = start + wait;
        MessageImpl message = null;
        while (true) {
            synchronized (_receiveLock) {
                if (_closing || _closed) {
                    // session is in the process of closing, or has been
                    // closed. Need to return null.
                    break;
                } else if (_stopped) {
                    // connection has been stopped. No message can be returned,
                    // but receives continue to time out
                } else {
                    // connection is started. Messages may be returned.
                    message = (MessageImpl) getJmsSessionStub().receiveMessage(
                        clientId, wait);
                }
                if (message != null) {
                    message.setSession(this);
                    break;
                } else {
                    // if we have instructed to break, then exit the loop.
                    if (breakOnNextRead) {
                        break;
                    }

                    // no message was received. Block for the specified time
                    // until one of the following occurs:
                    // . a message is received
                    // . the receive times out
                    // . the session is closed
                    if (wait >= 0) {
                        try {
                            if (wait > 0) {
                                // wait for a specific period of time
                                _receiveLock.wait(wait);
                                long current = System.currentTimeMillis();
                                if (current >= end) {
                                    breakOnNextRead = true;
                                } else {
                                    // update the time to wait. If the value
                                    // is zero then break on the next read
                                    wait = end - current;
                                    if (wait == 0) {
                                        breakOnNextRead = true;
                                    }
                                }
                            } else {
                                // wait indefinitely
                                _receiveLock.wait();
                            }
                        } catch (InterruptedException ignore) {
                            // no-op
                        }
                    } else {
                        // exit the loop since the client is performing a non
                        // blocking read
                        break;
                    }
                }
            }
        }

        return message;
    }

    /**
     * Fetch up to count messages from the endpoint. This should only
     * be called via a {@link JmsQueueBrowser}.
     *
     * @param clientId scoped to the session
     * @param count the max messages to retrieve.
     * @return the set of retrieve messages
     * @throws JMSException if messages can't be retrieved
     */
    public synchronized Vector retrieveMessages(long clientId, int count)
        throws JMSException {
        ensureOpen();
        return getJmsSessionStub().receiveMessages(clientId, count);
    }

    /**
     * Release local resources used by this session object
     *
     * @throws JMSException - if there is a problem completing this request
     */
    public void destroy() throws JMSException {
        if (!_closed) {
            _closing = true;

            // wake up any blocking consumers
            notifyConsumers();

            // go through all the producer and call close on them
            // respectively
            Enumeration producers = getProducers();
            while (producers.hasMoreElements()) {
                JmsMessageProducer producer =
                    (JmsMessageProducer) producers.nextElement();
                producer.destroy();
            }

            // go through all the consumer and call close on them
            // respectively
            Enumeration consumers = getConsumers();
            while (consumers.hasMoreElements()) {
                JmsMessageConsumer consumer =
                    (JmsMessageConsumer) consumers.nextElement();
                consumer.destroy();
            }

            // deregister this with the connection
            _connection.removeSession(this);
            _connection = null;

            // clear any cached messages or acks
            _messagesToSend.clear();

            // simply release the reference to the sever session
            _stub = null;

            // update the session state
            _closed = true;
            _closing = false;
        }
    }

    /**
     * Send the specified message to the server.
     *
     * @param message the message to send
     * @throws JMSException if the message can't be sent
     */
    protected synchronized void sendMessage(Message message)
        throws JMSException {

        if (_transacted) {
            // if the session is transacted then cache the message locally.
            // and wait for a commit or a rollback
            if (message instanceof MessageImpl) {
                try {
                    message = (Message) ((MessageImpl) message).clone();
                } catch (CloneNotSupportedException error) {
                    throw new JMSException(error.getMessage());
                }
            } else {
                message = convert(message);
            }
            _messagesToSend.addElement(message);
        } else {
            if (!(message instanceof MessageImpl)) {
                message = convert(message);
            }
            getJmsSessionStub().sendMessage(message);
            _publishCount++;
        }
    }

    /**
     * Return an instance of the remote stub. This is set during object
     * creation time
     *
     * @return the remote stub
     */
    protected JmsSessionStubIfc getJmsSessionStub() {
        return _stub;
    }

    /**
     * Return a reference to the connection that created this session
     *
     * @return the owning connection
     */
    protected JmsConnection getConnection() {
        return _connection;
    }

    /**
     * This method checks the destination. If the destination is not temporary
     * then return true. If it is a temporary destination and it is owned by
     * this session's connection then it returns true. If it is a tmeporary
     * destination and it is owned by another connection then it returns false
     *
     * @param destination the destination to check
     * @return <code>true</code> if the destination is valid
     */
    protected boolean checkForValidTemporaryDestination(
        JmsDestination destination) {
        boolean result = false;

        if (destination.isTemporaryDestination()) {
            JmsTemporaryDestination temp =
                (JmsTemporaryDestination) destination;

            // check  that this temp destination is owned by the session's
            // connection.
            if (temp.validForConnection(getConnection())) {
                result = true;
            }
        } else {
            result = true;
        }

        return result;
    }

    /**
     * Returns a list of registered producers for the session
     *
     * @return an enumeration of the producers managed by the session
     */
    protected Enumeration getProducers() {
        return _producers.elements();
    }

    /**
     * Returns a list of registered consumers for the session
     *
     * @return an enumeration of the consumers managed by the session
     */
    protected Enumeration getConsumers() {
        return _consumers.elements();
    }

    /**
     * Returns the next seed value to be allocated to a new consumer
     *
     * @return  a unique identifier for a consumer for this session
     */
    protected long getNextConsumerId() {
        return ++_consumerIdSeed;
    }

    /**
     * Add a consumer to the list of consumers managed by this session
     *
     * @param consumer the consumer to add
     */
    protected void addConsumer(JmsMessageConsumer consumer) {
        _consumers.put(new Long(consumer.getClientId()), consumer);
    }

    /**
     * Remove the consumer with the specified id from the list of managed
     * consumers
     *
     * @param consumer the consumer to remove
     */
    protected void removeConsumer(JmsMessageConsumer consumer) {
        _consumers.remove(new Long(consumer.getClientId()));
    }

    /**
     * Add a producer to the list of producers managed by this session
     *
     * @param producer the producer to add
     */
    protected void addProducer(JmsMessageProducer producer) {
        _producers.addElement(producer);
    }

    /**
     * Remove the producer from the list of managed producers
     *
     * @param producer the producer to remove
     */
    protected void removeProducer(JmsMessageProducer producer) {
        _producers.remove(producer);
    }

    /**
     * Check if the session is closed
     *
     * @return <code>true</code> if the session is closed
     */
    protected final boolean isClosed() {
        return _closed;
    }

    /**
     * Add a message to the message cache. This message will be processed
     * when the run() method is called.
     *
     * @param message the message to add.
     */
    protected void addMessage(Message message) {
        _messageCache.addElement(message);
    }

    /**
     * Verifies that the session isn't closed
     *
     * @throws IllegalStateException if the session is closed
     */
    protected void ensureOpen() throws IllegalStateException {
        if (_closed) {
            throw new IllegalStateException(
                "Cannot perform operation - session has been closed");
        }
    }

    /**
     * Verifies that the session is transactional
     *
     * @throws IllegalStateException if the session isn't transactional
     */
    private void ensureTransactional() throws IllegalStateException {
        if (!_transacted) {
            throw new IllegalStateException(
                "Cannot perform operatiorn - session is not transactional");
        }
    }

    /**
     * Notifies any blocking synchronous consumers
     */
    private void notifyConsumers() {
        synchronized (_receiveLock) {
            _receiveLock.notifyAll();
        }
    }

    /**
     * Convert a message to its corresponding OpenJMS implementation
     *
     * @param message the message to convert
     * @return the OpenJMS implementation of the message
     * @throws JMSException for any error
     */
    private Message convert(Message message) throws JMSException {
        MessageConverter converter =
            MessageConverterFactory.create(message);
        return converter.convert(message);
    }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -