⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 destinationmanager.java

📁 实现了Jms的服务器源码,支持多种适配器,DB,FTP,支持多种数据库
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
            }
        }

        return cache;
    }

    /**
     * Check if the specified destination exists.
     *
     * @param dest - destination to check
     * @return boolean - true if it exists
     */
    public boolean hasDestinationCache(JmsDestination dest) {
        return (_caches.containsKey(dest));
    }

    /**
     * Create a non-administered destination and cache it. It does not
     * check to see whether or not it is an administered destination
     * this  must be done by the caller
     *
     * @param destination  - the destination to create
     */
    public synchronized void createDestination(JmsDestination destination) {
        addToDestinationCache(destination);
    }

    /**
     * Create an administered destination using the specified destination.
     * It will create the destination in the database and register it with
     * the jndi context.
     *
     * @param dest - the destination
     * @return boolean - true if successful
     */
    public synchronized boolean createAdministeredDestination(
        JmsDestination dest) throws JMSException {
        if (_log.isDebugEnabled()) {
            _log.debug("createAdministeredDestination(dest=" + dest + ")");
        }

        boolean success = true;
        boolean queue = (dest instanceof JmsQueue) ? true : false;
        PersistenceAdapter adapter = DatabaseService.getAdapter();

        // check that the destination does not exist. If it exists then return
        // false. If it doesn't exists the create it and bind it to the jndi
        // context

        Connection connection = null;
        try {

            connection = DatabaseService.getConnection();

            if (!adapter.checkDestination(connection, dest.getName())) {
                adapter.addDestination(connection, dest.getName(), queue);

                // destination was created in persistent store, now create it
                // in transient memory and also bind it in the jndi context
                addToDestinationCache(dest);
                try {
                    dest.setPersistent(true);
                    ContextHelper.rebind(getContext(), dest.getName(), dest);
                } catch (NamingException exception) {
                    String msg = "Failed to add destination " + dest.getName()
                        + " to JNDI context";
                    _log.error(msg, exception);
                    throw new JMSException(msg + ": " +
                        exception.getMessage());
                }
            } else {
                success = false;
            }
            connection.commit();
        } catch (JMSException exception) {
            SQLHelper.rollback(connection);
            throw exception;
        } catch (Exception exception) { // PersistenceException, SQLException
            SQLHelper.rollback(connection);
            String msg = "Failed to create administered destination"
                + dest.getName();
            _log.error(msg, exception);
            throw new JMSException(msg + ": " + exception.getMessage());
        } finally {
            SQLHelper.close(connection);
        }

        return success;
    }

    /**
     * Remove the corresponding administered destination from persistent
     * store, from transient memory and from the jndi context. This will
     * also remove all durable consumers for this topic.
     *
     * @param dest - the destination to remove
     * @return boolean - true if successful
     */
    public synchronized void deleteAdministeredDestination(JmsDestination dest)
        throws JMSException {

        if (_log.isDebugEnabled()) {
            _log.debug("deleteAdministeredDestination(dest=" + dest + ")");
        }

        boolean success = false;
        boolean queue = (dest instanceof JmsQueue) ? true : false;
        ConsumerManager consumerMgr = ConsumerManager.instance();

        // If we are dealing with a topic then first check that there are
        // no active durable consumers for the destination
        if (!queue) {
            if (consumerMgr.hasActiveDurableConsumers(dest)) {
                throw new JMSException(
                    "Cannot delete the administered destination " + dest
                    + " since there are active durable consumers.");
            }
            // no active consumers. Remove all durable consumers to this
            // destination
            consumerMgr.removeDurableConsumers(dest);
        }

        // make sure there are not active endpoints, but first clear
        // unreferenced endpoints.
        consumerMgr.cleanUnreferencedEndpoints(dest);
        int active = consumerMgr.getEndpointsForDest(dest).size();
        if (active > 0) {
            throw new JMSException(
                "Cannot delete the administered destination" + dest
                + " since there are " + active + " active endpoints.");
        }

        // unbind it from the jndi context so that now it is unavailable
        // to other consumers
        try {
            getContext().unbind(dest.getName());
        } catch (NamingException error) {
            _log.error("Failed to remove destination " + dest.getName()
                + " from JNDI", error);
        }

        // now that we have removed all the durable consumers we can remove
        // the administered topic. First delete it from memory and then
        // from the persistent store
        Connection connection = null;
        try {
            connection = DatabaseService.getConnection();

            DatabaseService.getAdapter().removeDestination(connection,
                dest.getName());
            destroyDestinationCache(dest);
            removeFromDestinationCache(dest);
            connection.commit();
        } catch (PersistenceException exception) {
            SQLHelper.rollback(connection);
            String msg = "Failed to remove destination " + dest.getName();
            _log.error(msg, exception);
            throw new JMSException(msg + ":" + exception.getMessage());
        } catch (SQLException exception) {
            SQLHelper.rollback(connection);
            String msg = "Failed to remove destination " + dest.getName();
            _log.error(msg, exception);
            throw new JMSException(msg + ":" + exception.getMessage());
        } finally {
            SQLHelper.close(connection);
        }
    }

    /**
     * Return a list of destination names currently supported by the destination
     * manager. This includes all types of destinations.
     *
     * @return Iterator - iterator for {@link JmsDestination} objects
     */
    public Iterator destinationNames() {
        return _destinationCache.values().iterator();
    }

    /**
     * Return a list of {@link DestinationCache} objects that are currently
     * active and in memory. This will return a list of all destination
     * types (temporary. transient, administered}.
     *
     * @return Iterator - set of DestinationCache objects
     */
    public Iterator destinations() {
        return _caches.values().iterator();
    }

    /**
     * This method will create the administered destinations specified in
     * the configuration. A topic may also have zero or more preconfigured
     * durable sunbscribers. An equivalent entity for queues does not
     * exist.
     */
    public void registerConfiguredAdministeredDestinations() {
        AdministeredDestinations destinations =
            ConfigurationManager.getConfig().getAdministeredDestinations();
        if (destinations != null) {

            // first process the topics
            int count = destinations.getAdministeredTopicCount();
            for (int index = 0; index < count; index++) {
                AdministeredTopic topic = destinations.getAdministeredTopic(index);

                // define a persistent topic destination and then use the
                // message manager administrator to add it
                JmsTopic destination = new JmsTopic(topic.getName());
                destination.setPersistent(true);
                try {

                    createAdministeredDestination(destination);

                    // register the subscribers for each topic.
                    int scount = topic.getSubscriberCount();
                    ConsumerManager mgr = ConsumerManager.instance();
                    for (int sindex = 0; sindex < scount; sindex++) {
                        Subscriber subscriber = topic.getSubscriber(sindex);

                        // create the durable consumer only if one does
                        // not already exist
                        if (!mgr.exists(subscriber.getName())) {
                            mgr.createDurableConsumer(destination,
                                subscriber.getName());
                        }
                    }
                } catch (JMSException exception) {
                    _log.error("Failed to register persistent topic "
                        + topic.getName(), exception);
                }
            }

            // next process the queue destinations. QueueDestinations do not
            // have any associated durable subscribers
            count = destinations.getAdministeredQueueCount();
            for (int index = 0; index < count; index++) {
                AdministeredQueue queue = destinations.getAdministeredQueue(index);

                // define a persistent topic destination and then use the
                // message manager administrator to add it
                JmsQueue destination = new JmsQueue(queue.getName());
                destination.setPersistent(true);
                try {
                    createAdministeredDestination(destination);
                } catch (JMSException exception) {
                    _log.error("Failed to register persistent queue "
                        + queue.getName(), exception);
                }
            }
        }
    }

    // implementation of MessageManagerEventListener.messageAdded
    public synchronized boolean messageAdded(JmsDestination destination, MessageImpl message) {
        boolean result = false;
        try {
            if (destination instanceof JmsTopic) {
                // check to see whether there are active consumers for the
                // specified destination. If there are then we need to
                // create a destination cache and pass the message to it.
                if (ConsumerManager.instance().hasActiveConsumers(destination)) {
                    if (!destinationExists(destination)) {
                        createDestination(destination);
                    }
                    DestinationCache cache = createDestinationCache(destination);
                    result = cache.messageAdded(destination, message);
                }
            } else {
                // assume that the destination is a queue. since the message
                // is non-persistent then we need to create the cache and pass the
                // message to it.
                if (!destinationExists(destination)) {
                    createDestination(destination);
                }
                DestinationCache cache = createDestinationCache(destination);
                result = cache.messageAdded(destination, message);
            }
        } catch (Exception exception) {
            _log.error("Exception in DestinationManager.messageAdded",
                exception);
        }

        return result;
    }

    // implementation of MessageManagerEventListener.messageRemoved
    public void messageRemoved(JmsDestination destination, MessageImpl message) {
        // removing a non-persistent messages, when the associated destination
        // is not active is a noop
    }

    // implementation of MessageManagerEventListener.persistentMessageAdded
    public synchronized boolean persistentMessageAdded(Connection connection,
                                                       JmsDestination destination, MessageImpl message)
        throws PersistenceException {

        boolean result = false;

        try {
            if (destination instanceof JmsTopic) {
                // the cache for this destination is inactive. Determine, if
                // there are any active wildcard consumers for this destination
                // If there are then create the destination cache and let it
                // handle the message. Otherwise send it to the ConsumerManager
                // for processing
                ConsumerManager manager = ConsumerManager.instance();
                if (manager.hasActiveConsumers(destination)) {
                    // create the destincation cache and let it process the
                    // message
                    DestinationCache cache = createDestinationCache(connection,
                        destination);
                    result = cache.persistentMessageAdded(connection,
                        destination, message);
                } else {
                    // This is now handled by the MessageMgr when the message
                    // enters the system
                    // let the consumer manager handle this
                    // result = ConsumerManager.instance().persistentMessageAdded(
                    //    connection, message);
                }
            } else {
                // This is now handled by the MessageMgr when the message
                // enters the system
                // assume that the destination is a queue. Since the message is
                // persistent then we do not need to activate the cache, simply
                // create a persistent handle and be done with it.
                // MessageHandleFactory.createPersistentHandle(connection,
                //    destination, null, message);
            }
        } catch (Exception exception) {
            // rethrow as a PersistenceException
            exception.printStackTrace();
            throw new PersistenceException(
                "Exception in DestinationManager.messageAdded " +
                exception.toString());
        }

        return result;
    }

    // implementation of MessageManagerEventListener.persistentMessageRemoved
    public void persistentMessageRemoved(Connection connection,
                                         JmsDestination destination, MessageImpl message)
        throws PersistenceException {
        try {
            if (destination instanceof JmsTopic) {
                // this is a persistent message so we need to retrieve the
                // set of durable subscribers for this.
                Vector names =
                    ConsumerManager.instance().getDurableConsumersForDest(
                        (JmsTopic) destination);

                // for each durable consumer we need to destory that handle
                while (names.size() > 0) {
                    String name = (String) names.remove(0);
                    MessageHandleFactory.destroyPersistentHandle(connection,
                        destination, name, message);
                }
            } else {
                // assume it is a queue and destroy the handle.
                MessageHandleFactory.destroyPersistentHandle(connection,

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -