⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jmsserversession.java

📁 一个java方面的消息订阅发送的源码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
    /**
     * Browse up to count messages
     *
     * @param consumerId the consumer identifier
     * @param count      the maximum number of messages to receive
     * @return a list of {@link MessageImpl} instances
     * @throws JMSException for any JMS error
     */
    public List browse(long consumerId, int count) throws JMSException {
        ConsumerEndpoint consumer = getConsumerEndpoint(consumerId);
        if (consumer == null) {
            throw new JMSException("Can't browse messages: no browser registered with "
                                   + "identifier "
                                   + consumerId
                                   + " on session");
        }
        if (!(consumer instanceof QueueBrowserEndpoint)) {
            throw new JMSException("Can't browse messages: invalid consumer");
        }

        Vector handles = ((QueueBrowserEndpoint) consumer).receiveMessages(
                count);
        List messages = new ArrayList(count);

        Iterator iterator = handles.iterator();
        while (iterator.hasNext()) {
            MessageHandle handle = (MessageHandle) iterator.next();
            MessageImpl orig = handle.getMessage();
            if (orig != null) {
                // clone the message to set client specific properties
                try {
                    MessageImpl message = (MessageImpl) orig.clone();
                    message.setJMSRedelivered(handle.getDelivered());
                    message.setConsumerId(handle.getConsumerId());
                    messages.add(message);
                } catch (Exception exception) {
                    _log.error(exception);
                }
                if (messages.size() == count) {
                    break;
                }
            }
        }
        return messages;
    }

    /**
     * Create a new message consumer
     *
     * @param destination the destination to consume messages from
     * @param selector    the message selector. May be <code>null</code>
     * @param noLocal     if true, and the destination is a topic, inhibits the
     *                    delivery of messages published by its own connection.
     *                    The behavior for <code>noLocal</code> is not specified
     *                    if the destination is a queue.
     * @return the identifty of the message consumer
     * @throws JMSException for any JMS error
     */
    public long createConsumer(JmsDestination destination, String selector,
                                 boolean noLocal) throws JMSException {
        if (_log.isDebugEnabled()) {
            _log.debug("createConsumer(destination=" + destination
                       + ", selector=" + selector + ", noLocal=" + noLocal
                       + ") [session=" + this + "]");
        }

        if (destination == null) {
            throw new InvalidDestinationException(
                    "Cannot create MessageConsumer for null destination");
        }

        // Retrieve the destination from the destination manager and use
        // it to create the consumer
        ConsumerEndpoint consumer =
                ConsumerManager.instance().createConsumerEndpoint(this,
                                                                  destination,
                                                                  selector, noLocal);
        final long id = consumer.getId();
        consumer.setStopped(_stopped);
        _consumers.put(new Long(id), consumer);
        return id;
    }

    /**
     * Create a new durable consumer. Durable consumers may only consume from
     * non-temporary <code>Topic</code> destinations.
     *
     * @param topic    the non-temporary <code>Topic</code> to subscribe to
     * @param name     the name used to identify this subscription
     * @param selector only messages with properties matching the message
     *                 selector expression are delivered.  A value of null or an
     *                 empty string indicates that there is no message selector
     *                 for the message consumer.
     * @param noLocal  if set, inhibits the delivery of messages published by
     *                 its own connection
     * @return the identity of the durable consumer
     * @throws JMSException for any JMS error
     */
    public long createDurableConsumer(JmsTopic topic, String name,
                                        String selector, boolean noLocal)
            throws JMSException {
        if (_log.isDebugEnabled()) {
            _log.debug("createDurableConsumer(topic=" + topic + ", name="
                       + name
                       + ", selector=" + selector + ", noLocal=" + noLocal
                       + ") [session=" + this + "]");
        }

        if (topic == null || topic.isTemporaryDestination()) {
            throw new InvalidDestinationException("Invalid topic: " + topic);
        }

        if (name == null) {
            throw new InvalidDestinationException("Invalid subscription name");
        }

        ConsumerManager manager = ConsumerManager.instance();

        if (manager.durableConsumerExists(name)) {
            // if the durable consumer exists then validate that
            // it was the specified topic that it was registered
            // under. If it is not registered for the topic then
            // we must delete the existing entry and recreate it
            // against the new topic
            if (!manager.validSubscription(topic.getName(), name)) {
                unsubscribe(name);
                manager.createDurableConsumer(topic, name);
            }
        } else {
            // the durable consumer does not exist. so create
            // it
            manager.createDurableConsumer(topic, name);
        }

        // if a durable subscriber with the specified name is
        // already active then this method will throw an exception.
        // attempt to create a durable consuinmer
        ConsumerEndpoint consumer = manager.createDurableConsumerEndpoint(this,
                                                                          topic,
                                                                          name,
                                                                          noLocal,
                                                                          selector);
        final long id = consumer.getId();
        consumer.setStopped(_stopped);
        _consumers.put(new Long(id), consumer);
        return id;
    }

    /**
     * Create a queue browser for this session. This allows clients to browse a
     * queue without removing any messages.
     *
     * @param queue    the queue to browse
     * @param selector the message selector. May be <code>null</code>
     * @return the identity of the queue browser
     * @throws JMSException for any JMS error
     */
    public long createBrowser(JmsQueue queue, String selector)
            throws JMSException {
        if (_log.isDebugEnabled()) {
            _log.debug("createBrowser(queue=" + queue + ", selector="
                       + selector
                       + ") [session=" + this + "]");
        }

        if (queue == null) {
            throw new JMSException("Cannot create QueueBrowser for null queue");
        }

        ConsumerEndpoint consumer =
                ConsumerManager.instance().createQueueBrowserEndpoint(this,
                                                                      queue,
                                                                      selector);

        final long id = consumer.getId();
        consumer.setStopped(_stopped);
        _consumers.put(new Long(id), consumer);
        return id;
    }

    /**
     * Delete the receiver with the specified identity and clean up all
     * associated resources.
     *
     * @param consumerId the consumer identifier
     * @throws JMSException if the consumer cannot be deleted
     */
    public void removeConsumer(long consumerId) throws JMSException {
        if (_log.isDebugEnabled()) {
            _log.debug("removeConsumer(consumerId=" + consumerId
                       + ") [session="
                       + this + "]");
        }

        ConsumerEndpoint consumer =
                (ConsumerEndpoint) _consumers.remove(new Long(consumerId));
        if (consumer == null) {
            throw new JMSException("No consuemr with id=" + consumerId);
        }

        // destroy the consumer endpoint
        ConsumerManager.instance().deleteConsumerEndpoint(consumer);
    }

    /**
     * Unsubscribe a durable subscription
     *
     * @param name the name used to identify the subscription
     * @throws JMSException for any JMS error
     */
    public void unsubscribe(String name) throws JMSException {
        if (_log.isDebugEnabled()) {
            _log.debug("unsubscribe(name=" + name + ") [session=" + this + "]");
        }

        ConsumerManager manager = ConsumerManager.instance();

        // check that the durable consumer actually exists. If it doesn't then
        // throw an exception
        if (!manager.durableConsumerExists(name)) {
            throw new InvalidDestinationException(
                    name + " is not a durable subscriber name");
        }

        // check that the durable consumer is not active before removing it. If
        // it is then throw an exception
        if (!manager.isDurableConsumerActive(name)) {
            manager.removeDurableConsumer(name);
        } else {
            throw new JMSException("Failed to unsubscribe subscriber "
                                   + name + " since is still active");
        }
    }

    /**
     * Start the message delivery for the session.
     */
    public void start() {
        if (_log.isDebugEnabled()) {
            _log.debug("start() [session=" + this + "]");
        }

        if (_stopped) {
            pause(false);
            _stopped = false;
        }
    }

    /**
     * Stop message delivery for the session
     */
    public void stop() {
        if (_log.isDebugEnabled()) {
            _log.debug("stop() [session=" + this + "]");
        }
        if (!_stopped) {
            pause(true);
            _stopped = true;
        }
    }

    /**
     * Set a message listener for the session. This is the channel used to
     * asynchronously deliver messages to consumers created on this session.
     *
     * @param listener the message listener
     */
    public void setMessageListener(JmsMessageListener listener) {
        _listener = listener;
    }

    /**
     * Enable or disable asynchronous message delivery for a particular
     * consumer
     *
     * @param consumerId the consumer identifier
     * @param enable     true to enable; false to disable
     * @throws JMSException for any JMS error
     */
    public void enableAsynchronousDelivery(long consumerId, boolean enable)
            throws JMSException {
        ConsumerEndpoint consumer = getConsumerEndpoint(consumerId);
        if (consumer == null) {
            throw new JMSException(consumerId + " is not registered");
        }

        if (enable) {
            consumer.setMessageListener(this);
        } else {
            consumer.setMessageListener(null);
        }
    }

    /**
     * Close and release any resource allocated to this session.
     *
     * @throws JMSException if the session cannot be closed
     */
    public void close() throws JMSException {
        boolean closed = false;

        synchronized (this) {
            closed = _closed;
            if (!closed) {
                _closed = true;
            }
        }

        if (!closed) {
            if (_log.isDebugEnabled()) {
                _log.debug("close() [session=" + this + "]");
            }

            // reset the listener
            setMessageListener(null);

            // iterate over the list of consumers and deregister the
            // associated endpoints and then remove all the entries
            Iterator consumers = _consumers.values().iterator();
            while (consumers.hasNext()) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -