⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jmsserversession.java

📁 实现了Jms的服务器源码,支持多种适配器,DB,FTP,支持多种数据库
💻 JAVA
📖 第 1 页 / 共 4 页
字号:

        // Retrieve the destination from the destination manager and use
        // it to create the consumer
        ConsumerEndpoint consumer =
            ConsumerManager.instance().createQueueBrowserEndpoint(this,
                clientId, queue, selector);

        // if the session is stopped then we should also stop the
        // consumer, so that it doesn't deliver messages and then
        // cache it for future reference.
        consumer.setStopped(_stopped);
        _consumers.put(Long.toString(clientId), consumer);
    }

    /**
     * Delete the receiver with the specified identity and clean up all
     * associated resources.
     *
     * @param clientId the identity of the receiver
     * @throws JMSException if the receiver cannot be deleted
     */
    public void deleteReceiver(long clientId) throws JMSException {
        if (_log.isDebugEnabled()) {
            _log.debug("deleteReceiver(clientId=" + clientId + ") [sessionId="
                + _sessionId + "]");
        }

        ConsumerEndpoint consumer =
            (ConsumerEndpoint) _consumers.remove(Long.toString(clientId));
        if (consumer == null) {
            throw new JMSException("No receiver with id " + clientId);
        }

        // destroy the consumer endpoint
        ConsumerManager.instance().deleteConsumerEndpoint(consumer);
    }

    /**
     * Delete the sender associated with the specified queue from the session
     * If the corresponding sender does not exist or it cannot delete it then
     * throw the JMSException.
     *
     * @param clientId the identity of the sender
     * @throws JMSException if the sender cannot be deleted
     */
    public void deleteSender(long clientId) throws JMSException {
        // no-op
    }

    /**
     * Delete the queue browser associated with the specified queue from
     * the session.
     *
     * @param clientId the identity of the browser
     * @throws JMSException if the browser cannot be deleted
     */
    public void deleteBrowser(long clientId) throws JMSException {
        ConsumerEndpoint consumer =
            (ConsumerEndpoint) _consumers.remove(Long.toString(clientId));
        if (consumer == null) {
            throw new JMSException("No browser with id " + clientId);
        }
        // destroy the consumer endpoint
        ConsumerManager.instance().deleteConsumerEndpoint(consumer);
    }

    /**
     * Create a subscriber endpoint for this session. A subscriber is a message
     * consumer specific to the topic message model. The subscriber is
     * associated with a topic. Register the consumer with the message
     * manager so that a queue can be set up for it. Finally add the consumer
     * to the list of consumers managed by this session.
     * <p>
     * Note that the message manager manages consumers  for all server sessions
     * <p>
     * You cannot create more than one subscriber for the same destination.
     * Currently we don't check this
     *
     * @param       topic               subscriber destination
     * @param       name                consumer name
     * @param       clientId            the client session allocated
     *                                  identifier of the consumer
     * @param       selector            the selector to filter messages.
     *                                  This may be null.
     * @param       noLocal             true to inhibit consumption of messages
     *                                  published on this connection.
     * @throws   JMSException.
     */
    public void createSubscriber(JmsTopic topic, String name, long clientId,
                                 String selector, boolean noLocal)
        throws JMSException {

        if (_log.isDebugEnabled()) {
            _log.debug("createSubscriber(topic=" + topic + ", name=" + name
                + ", clientId=" + clientId + ", selector=" + selector
                + ", noLocal=" + noLocal + ") [sessionId="
                + _sessionId + "]");
        }

        // check to ensure that the methods preconditions have been met
        if (topic == null) {
            throw new JMSException("Cannot create subscriber for null topic");
        }

        // Retrieve the destination from the destination manager and
        // use it to create the consumer through the consumer manager.
        ConsumerEndpoint consumer = null;

        if (name != null) {
            if (name.length() > 0) {

                // for a durable consumer the topic must be
                ConsumerManager manager = ConsumerManager.instance();

                if (manager.durableConsumerExists(name)) {
                    // if the durable consumer exists then validate that
                    // it was the specified topic that it was registered
                    // under. If it is not registered for the topic then
                    // we must delete the existing entry and recreate it
                    // against the new topic
                    if (!manager.validSubscription(topic.getName(), name)) {
                        unsubscribe(name);
                        manager.createDurableConsumer(topic, name);
                    }
                } else {
                    // if the durable consumer does not exist then create
                    // it
                    manager.createDurableConsumer(topic, name);
                }

                // if a durable subscriber with the specified name is
                // alreayd active then this method will throw an
                // exception.
                // attempt to create a durable consuinmer
                consumer = manager.createDurableConsumerEndpoint(this,
                    topic, name, clientId, selector);
                consumer.setConnectionId(_connection.hashCode());
                consumer.setTransacted(_transacted);
                consumer.setAckMode(_ackMode);
                consumer.setNoLocal(noLocal);
            } else {
                throw new JMSException("Name in createSubscriber was null");
            }
        } else {
            // Create a non-durable subscriber for the specified destination
            // and using the required selector.
            consumer = ConsumerManager.instance().createConsumerEndpoint(this,
                clientId, topic, selector);
            consumer.setConnectionId(_connection.hashCode());
            consumer.setTransacted(_transacted);
            consumer.setAckMode(_ackMode);
            consumer.setNoLocal(noLocal);
        }

        // once the consumer has been created then set it to the same state
        // as the session and add it to the list on consumers to manage
        consumer.setStopped(_stopped);
        _consumers.put(Long.toString(clientId), consumer);
    }

    /**
     * This should be a no operation. Do we need to maintain state information
     * that a publisher has been created.
     *
     * @param       topic               receiver destination
     * @throws   JMSException.
     */
    public void createPublisher(JmsTopic topic)
        throws JMSException {
    }

    /**
     * This function deletes a persistent subsrciber and its history from
     * the database. It his subscriber re-connects it get everything available
     * for the queue topic. If the subscriber is reliable, this is a no op.
     * See UnregisterSubscriber below for just unregistering the subscriber
     * but leaving its persistent data in the db.
     * <p>
     * The data contains information necessary to delete the subscriber
     *
     * @param       clientId            the client identity
     * @throws   JMSException.
     */
    public void deleteSubscriber(long clientId) throws JMSException {
        if (_log.isDebugEnabled()) {
            _log.debug("deleteSubscriber(clientId=" + clientId
                + ") [sessionId=" + _sessionId + "]");
        }

        // retrieve the endpoint corresponding to the client id and
        // then acknowledge the messsage
        ConsumerEndpoint consumer =
            (ConsumerEndpoint) _consumers.remove(Long.toString(clientId));
        if (consumer == null) {
            throw new JMSException("Failed to close consumer with id " +
                "[" + hashCode() + ":" + clientId + "]");
        }

        ConsumerManager.instance().deleteConsumerEndpoint(consumer);
    }

    /**
     * Delete the publisher associated with the specified topic from the
     * session. If the corresponding publisher does not exist or it cannot
     * delete it then throw the JMSException.
     *
     * @param       topic               sender destination
     * @throws   JMSException.
     */
    public void deletePublisher(JmsTopic topic) throws JMSException {
        // no-op
    }

    /**
     * Unsubscribe a durable subscription. This will delete the state of
     * the durable subscriber maintained by the server. A durable subscriber
     * is uniquely identifiable and the same subscriber cannot be associated
     * with more than topic.
     *
     * @param       name                the name used to uniquely identify the
     *                                  subscription
     * @throws   JMSException        if the subscription cannot be removed
     *                                  or any other problem.
     */
    public void unsubscribe(String name) throws JMSException {
        if (_log.isDebugEnabled()) {
            _log.debug("unsubscribe(name=" + name + ") [sessionId="
                + _sessionId + "]");
        }

        ConsumerManager manager = ConsumerManager.instance();

        // check that the durable consumer actually exists. If it doesn't then
        // throw an exception
        if (!manager.durableConsumerExists(name)) {
            throw new InvalidDestinationException(name +
                " is not a durable subscriber name");
        }

        // check that the durable consumer is not active before removing it. If
        // it is then throw an exception
        if (!manager.isDurableConsumerActive(name)) {
            manager.removeDurableConsumer(name);
        } else {
            throw new JMSException("Failed to unsubscribe subscriber "
                + name + " since is still active");
        }
    }

    /**
     * Stop message delivery to this session. If there are any problems
     * completing the request then throw the JMSException exception
     *
     * @throws   JMSException
     */
    public void stopMessageDelivery() throws JMSException {
        stop();
    }

    /**
     * Start message delivery to this session. If there are any problems
     * completing this request then throw the JMSException exception
     *
     * @throws   JMSException
     */
    public void startMessageDelivery() throws JMSException {
        start();
    }

    /**
     * Check if the specified message handle is in the session's list
     * of unacked messages
     *
     * @param handle - the handle to query
     * @return boolean - true if it is and false otherwise
     */
    public boolean containsUnackedHandle(MessageHandle handle) {
        return _sentMessageCache.handleInCache(handle);
    }

    // implementation of InternalMessageListener.onMessage
    public void onMessage(MessageHandle handle, boolean ignore)
        throws Exception {

        if ((handle != null) &&
            (_listener != null)) {
            MessageImpl message = handle.getMessage();
            MessageImpl m = null;
            if (message != null) {
                m = (MessageImpl) message.clone();
                m.setClientId(handle.getClientId());
                m.setJMSRedelivered(handle.getDelivered());

                // if we are acking the message and the session is
                // transacted and the acknowledge mode is
                // CLIENT_ACKNOWLEDGE then send it to the cache before
                // we send it to the listener. This will enable clients
                // to ack the message while in the onMessage method
                if ((_transacted) ||
                    (_ackMode == Session.CLIENT_ACKNOWLEDGE)) {
                    _sentMessageCache.process(handle);
                }

                try {
                    // send the message to the listener.
                    _listener.onMessage(m);

                    // if the session is not transacted or the acknowledge mode
                    // is not CLIENT_ACKNOWLEDGE then process it through the
                    // sent message cache now.
                    if ((!_transacted) &&
                        (_ackMode != Session.CLIENT_ACKNOWLEDGE)) {
                        _sentMessageCache.process(handle);
                    }
                } catch (ClientDisconnectionException exception) {
                    // close all resources and rethrow it
                    close();
                    throw exception;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -