⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 destinationmanager.java

📁 一个java方面的消息订阅发送的源码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
    }

    /**
     * Delete the specfied destination.
     *
     * @param dest the destination to destroy
     */
    protected synchronized void destroyDestinationCache(JmsDestination dest) {
        DestinationCache cache = (DestinationCache) _caches.remove(dest);
        if (cache != null) {
            cache.destroy();

            // notify the listeners that a destination has been removed from
            // the destination manager
            notifyDestinationRemoved(dest, cache);
        }
    }

    /**
     * Returns a destination given its name.
     *
     * @param name the name of the destination
     * @return the destination corresponding to <code>name</code> or
     *         <code>null</code> if none exists
     */
    public synchronized JmsDestination getDestination(String name) {
        return (JmsDestination) _destinationCache.get(name);
    }

    /**
     * Register the specified DestinationEventListener. If the listener is
     * already registered then do not re-register it again.
     *
     * @param listener the listener to add
     */
    void addDestinationEventListener(DestinationEventListener listener) {
        synchronized (_listeners) {
            if (!_listeners.contains(listener)) {
                _listeners.add(listener);
            }

        }
    }

    /**
     * Remove the specified DestinationEventListener from the list.
     *
     * @param listener the listener to remove
     */
    void removeDestinationEventListener(DestinationEventListener listener) {
        synchronized (_listeners) {
            _listeners.remove(listener);
        }
    }

    /**
     * Create a non-administered destination and cache it. It does not check to
     * see whether or not it is an administered destination this  must be done
     * by the caller
     *
     * @param destination - the destination to create
     */
    public synchronized void createDestination(JmsDestination destination) {
        addToDestinationCache(destination);
    }

    /**
     * Create an administered destination using the specified destination. It
     * will create the destination in the database and register it with the jndi
     * context.
     *
     * @param dest - the destination
     * @return boolean - true if successful
     */
    public synchronized boolean createAdministeredDestination(
            JmsDestination dest) throws JMSException {
        if (_log.isDebugEnabled()) {
            _log.debug("createAdministeredDestination(dest=" + dest + ")");
        }

        boolean success = true;
        boolean queue = (dest instanceof JmsQueue) ? true : false;
        PersistenceAdapter adapter = DatabaseService.getAdapter();

        // check that the destination does not exist. If it exists then return
        // false. If it doesn't exists the create it and bind it to the jndi
        // context

        Connection connection = null;
        try {

            connection = DatabaseService.getConnection();

            if (!adapter.checkDestination(connection, dest.getName())) {
                adapter.addDestination(connection, dest.getName(), queue);

                dest.setPersistent(true);

                // destination was created in persistent store, now create it
                // in transient memory and also bind it in the jndi context
                addToDestinationCache(dest);
                try {
                    ContextHelper.rebind(getContext(), dest.getName(), dest);
                } catch (NamingException exception) {
                    String msg = "Failed to add destination " + dest.getName()
                            + " to JNDI context";
                    _log.error(msg, exception);
                    throw new JMSException(msg + ": " +
                                           exception.getMessage());
                }
            } else {
                success = false;
            }
            connection.commit();
        } catch (JMSException exception) {
            SQLHelper.rollback(connection);
            throw exception;
        } catch (Exception exception) { // PersistenceException, SQLException
            SQLHelper.rollback(connection);
            String msg = "Failed to create administered destination"
                    + dest.getName();
            _log.error(msg, exception);
            throw new JMSException(msg + ": " + exception.getMessage());
        } finally {
            SQLHelper.close(connection);
        }

        return success;
    }

    /**
     * Remove the corresponding administered destination from persistent store,
     * from transient memory and from the jndi context. This will also remove
     * all durable consumers for this topic.
     *
     * @param dest - the destination to remove
     */
    public synchronized void deleteAdministeredDestination(JmsDestination dest)
            throws JMSException {

        if (_log.isDebugEnabled()) {
            _log.debug("deleteAdministeredDestination(dest=" + dest + ")");
        }

        boolean queue = (dest instanceof JmsQueue) ? true : false;
        ConsumerManager consumerMgr = ConsumerManager.instance();

        // If we are dealing with a topic then first check that there are
        // no active durable consumers for the destination
        if (!queue) {
            if (consumerMgr.hasActiveDurableConsumers(dest)) {
                throw new JMSException(
                        "Cannot delete the administered destination "
                        + dest
                        + " since there are active durable consumers.");
            }
            // no active consumers. Remove all durable consumers to this
            // destination
            consumerMgr.removeDurableConsumers(dest);
        }

        // make sure there are not active endpoints
        int active = consumerMgr.getEndpointsForDest(dest).size();
        if (active > 0) {
            throw new JMSException(
                    "Cannot delete the administered destination"
                    + dest
                    + " since there are "
                    + active
                    + " active endpoints.");
        }

        // unbind it from the jndi context so that now it is unavailable
        // to other consumers
        try {
            getContext().unbind(dest.getName());
        } catch (NamingException error) {
            _log.error("Failed to remove destination " + dest.getName()
                       + " from JNDI", error);
        }

        // now that we have removed all the durable consumers we can remove
        // the administered topic. First delete it from memory and then
        // from the persistent store
        Connection connection = null;
        try {
            connection = DatabaseService.getConnection();

            DatabaseService.getAdapter().removeDestination(connection,
                                                           dest.getName());
            destroyDestinationCache(dest);
            removeFromDestinationCache(dest);
            connection.commit();
        } catch (PersistenceException exception) {
            SQLHelper.rollback(connection);
            String msg = "Failed to remove destination " + dest.getName();
            _log.error(msg, exception);
            throw new JMSException(msg + ":" + exception.getMessage());
        } catch (SQLException exception) {
            SQLHelper.rollback(connection);
            String msg = "Failed to remove destination " + dest.getName();
            _log.error(msg, exception);
            throw new JMSException(msg + ":" + exception.getMessage());
        } finally {
            SQLHelper.close(connection);
        }
    }

    /**
     * This method will create the administered destinations specified in the
     * configuration. A topic may also have zero or more preconfigured durable
     * sunbscribers. An equivalent entity for queues does not exist.
     */
    public void registerConfiguredAdministeredDestinations() {
        AdministeredDestinations destinations =
                ConfigurationManager.getConfig().getAdministeredDestinations();
        if (destinations != null) {

            // first process the topics
            int count = destinations.getAdministeredTopicCount();
            for (int index = 0; index < count; index++) {
                AdministeredTopic topic = destinations.getAdministeredTopic(
                        index);

                // define a persistent topic destination and then use the
                // message manager administrator to add it
                JmsTopic destination = new JmsTopic(topic.getName());
                destination.setPersistent(true);
                try {

                    createAdministeredDestination(destination);

                    // register the subscribers for each topic.
                    int scount = topic.getSubscriberCount();
                    ConsumerManager mgr = ConsumerManager.instance();
                    for (int sindex = 0; sindex < scount; sindex++) {
                        Subscriber subscriber = topic.getSubscriber(sindex);
                        mgr.createDurableConsumer(destination,
                                                  subscriber.getName());
                    }
                } catch (JMSException exception) {
                    _log.error("Failed to register persistent topic "
                               + topic.getName(), exception);
                }
            }

            // next process the queue destinations. QueueDestinations do not
            // have any associated durable subscribers
            count = destinations.getAdministeredQueueCount();
            for (int index = 0; index < count; index++) {
                AdministeredQueue queue = destinations.getAdministeredQueue(
                        index);

                // define a persistent topic destination and then use the
                // message manager administrator to add it
                JmsQueue destination = new JmsQueue(queue.getName());
                destination.setPersistent(true);
                try {
                    createAdministeredDestination(destination);
                } catch (JMSException exception) {
                    _log.error("Failed to register persistent queue "
                               + queue.getName(), exception);
                }
            }
        }

    }

    /**
     * Invoked when the {@link MessageMgr} receives a non-persistent message
     *
     * @param destination the message's destination
     * @param message     the message
     * @throws JMSException if the message can't be processed
     */
    public synchronized void messageAdded(JmsDestination destination,
                                          MessageImpl message)
            throws JMSException {
        if (destination instanceof JmsTopic) {
            // check to see whether there are active consumers for the
            // specified destination. If there are then we need to
            // create a destination cache and pass the message to it.
            if (ConsumerManager.instance().hasActiveConsumers(destination)) {
                if (!destinationExists(destination)) {
                    createDestination(destination);
                }
                DestinationCache cache = getDestinationCache(destination);
                cache.messageAdded(destination, message);
            }
        } else {
            // destination is a queue. Since the message is non-persistent,
            // create the cache and pass the message to it.
            if (!destinationExists(destination)) {
                createDestination(destination);
            }
            DestinationCache cache = getDestinationCache(destination);
            cache.messageAdded(destination, message);
        }
    }

    /**
     * Invoked when the {@link MessageMgr} receives a persistent message
     *
     * @param connection  the database connection
     * @param destination the message's destination
     * @param message     the message
     * @throws JMSException         if the message can't be processed
     * @throws PersistenceException if there is a persistence related problem
     */
    public synchronized void persistentMessageAdded(Connection connection,
                                                    JmsDestination destination,
                                                    MessageImpl message)
            throws JMSException, PersistenceException {
        DestinationCache cache = getDestinationCache(destination, connection);
        cache.persistentMessageAdded(connection, destination, message);
    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -