destinationmanagerimpl.java

来自「OpenJMS是一个开源的Java Message Service API 1.」· Java 代码 · 共 788 行 · 第 1/2 页

JAVA
788
字号
     * Returns all destinations.     *     * @return a list of JmsDestination instances.     * @throws JMSException for any JMS error     */    public List getDestinations() throws JMSException {        synchronized (_lock) {            return new ArrayList(_destinations.values());        }    }    /**     * Returns a map of all destinations that match the specified topic.     * <p/>     * If the topic represents a wildcard then it may match none, one or more     * destinations.     *     * @param topic the topic     * @return a map of topics to DestinationCache instances     */    public Map getTopicDestinationCaches(JmsTopic topic) {        HashMap result = new HashMap();        synchronized (_lock) {            Iterator iter = _caches.keySet().iterator();            while (iter.hasNext()) {                JmsDestination dest = (JmsDestination) iter.next();                if ((dest instanceof JmsTopic) &&                        (topic.match((JmsTopic) dest))) {                    result.put(dest, _caches.get(dest));                }            }        }        return result;    }    /**     * Perform any garbage collection on this resource. This will have the     * effect of releasing system resources.  If the 'aggressive' flag is set to     * true then the garbage collection should do more to release memory related     * resources since it is called when the application memory is low.     *     * @param aggressive <code>true</code> for aggressive garbage collection     */    public void collectGarbage(boolean aggressive) {        int gcCaches = 0;        int gcDestinations = 0;        DestinationCache[] caches;        synchronized (_lock) {            caches = (DestinationCache[]) _caches.values().toArray(                    new DestinationCache[0]);        }        for (int index = 0; index < caches.length; index++) {            DestinationCache cache = caches[index];            if (cache.canDestroy()) {                if (_log.isDebugEnabled()) {                    _log.debug("Garbage collecting destination cache="                               + cache);                }                destroyDestinationCache(cache);                gcCaches++;            } else {                // the cache is active, so issue a garbage collection                // request on it                cache.collectGarbage(aggressive);            }        }        // get rid of non-persistent destinations, without associated caches.        synchronized (_lock) {            JmsDestination[] destinations                    = (JmsDestination[]) _destinations.values().toArray(                            new JmsDestination[0]);            for (int i = 0; i < destinations.length; ++i) {                JmsDestination dest = destinations[i];                if (!dest.getPersistent() && !_caches.containsKey(dest)) {                    gcDestinations++;                    _destinations.remove(dest.getName());                }            }            // log the information            _log.info("DMGC Collected " + gcCaches + " caches, "                      + _caches.size()                      + " remaining.");            _log.info("DMGC Collected " + gcDestinations + " destinations, "                      + _destinations.size() + " remaining.");        }    }    /**     * Start the service.     *     * @throws ServiceException if the service fails to start     */    protected void doStart() throws ServiceException {        if (_consumers == null) {            throw new ServiceException(                    "ConsumerManager hasn't been initialised");        }        init();        _collector.register(this);    }    /**     * Stop the service.     *     * @throws ServiceException if the service fails to stop     */    protected void doStop() throws ServiceException {        _collector.unregister(this);        JmsDestination[] destinations;        synchronized (_lock) {            destinations = (JmsDestination[]) _caches.keySet().toArray(                    new JmsDestination[0]);        }        for (int index = 0; index < destinations.length; index++) {            destroyDestinationCache(destinations[index]);        }        _caches.clear();        _destinations.clear();        // remove all the listeners        synchronized (_listeners) {            _listeners.clear();        }    }    /**     * Initialises the destination manager.     *     * @throws ServiceException if the service cannot be initialised     */    protected void init() throws ServiceException {        Enumeration iter;        try {            _database.begin();            Connection connection = _database.getConnection();            // return a list of JmsDestination objects.            iter = _database.getAdapter().getAllDestinations(connection);            _database.commit();        } catch (PersistenceException exception) {            _log.error(exception, exception);            rollback();            throw new ServiceException("Failed to get destinations", exception);        }        while (iter.hasMoreElements()) {            // add each destination to the cache            JmsDestination dest = (JmsDestination) iter.nextElement();            addToDestinations(dest);        }    }    /**     * Determines if a destination exists.     *     * @param name the destination name     * @return <code>true</code> if the destination exists, otherwise     *         <code>false     */    protected boolean exists(String name) {        return getDestination(name) != null;    }    /**     * Delete the specfied destination.     *     * @param cache the destination to destroy     */    protected void destroyDestinationCache(DestinationCache cache) {        destroyDestinationCache(cache.getDestination());    }    /**     * Delete the specfied destination.     *     * @param dest the destination to destroy     */    protected void destroyDestinationCache(JmsDestination dest) {        synchronized (_lock) {            DestinationCache cache = (DestinationCache) _caches.remove(dest);            if (cache != null) {                // deregister the cache from message manager.                _messages.removeEventListener(dest);                // notify the listeners that a cache has been removed from                // the destination manager                notifyCacheRemoved(cache);                cache.destroy();            }        }    }    /**     * Create a persistent destination.     *     * @param destination the destination to create     * @throws JMSException if the destination cannot be created     */    private void createPersistentDestination(JmsDestination destination)            throws JMSException {        if (_log.isDebugEnabled()) {            _log.debug("createPersistentDestination(destination="                       + destination + ")");        }        boolean queue = (destination instanceof JmsQueue) ? true : false;        PersistenceAdapter adapter = _database.getAdapter();        // check that the destination does not exist. If it exists then return        // false. If it doesn't exists the create it and bind it to the jndi        // context        try {            _database.begin();            Connection connection = _database.getConnection();            adapter.addDestination(connection, destination.getName(), queue);            _database.commit();        } catch (Exception exception) { // JMSException, PersistenceException            cleanup("Failed to create persistent destination "                    + destination.getName(), exception);        }    }    /**     * Notify the list of {@link DestinationEventListener} objects that the     * specified destination has been added.     *     * @param destination the added destination     * @throws JMSException if a listener fails to be notified     */    private void notifyDestinationAdded(JmsDestination destination)            throws JMSException {        DestinationEventListener[] listeners = getListeners();        for (int i = 0; i < listeners.length; ++i) {            listeners[i].destinationAdded(destination);        }    }    /**     * Notify the list of {@link DestinationEventListener} objects that the     * specified destination has been removed.     *     * @param destination the added destination     * @throws JMSException if a listeners fails to be notified     */    private void notifyDestinationRemoved(JmsDestination destination)            throws JMSException {        DestinationEventListener[] listeners = getListeners();        for (int i = 0; i < listeners.length; ++i) {            listeners[i].destinationRemoved(destination);        }    }    /**     * Notify the list of {@link DestinationEventListener} objects that the     * specified message cache has been added.     *     * @param cache the added cache     */    private void notifyCacheAdded(DestinationCache cache) {        JmsDestination destination = cache.getDestination();        DestinationEventListener[] listeners = getListeners();        for (int i = 0; i < listeners.length; ++i) {            listeners[i].cacheAdded(destination, cache);        }    }    /**     * Notify the list of {@link DestinationEventListener} objects that the     * specified message cache has been removed.     *     * @param cache the added cache     */    private void notifyCacheRemoved(DestinationCache cache) {        JmsDestination destination = cache.getDestination();        DestinationEventListener[] listeners = getListeners();        for (int i = 0; i < listeners.length; ++i) {            listeners[i].cacheRemoved(destination, cache);        }    }    /**     * Add the specified destination to the destination cache.     *     * @param destination the destination to add     */    private void addToDestinations(JmsDestination destination) {        synchronized (_lock) {            if (!_destinations.containsKey(destination.getName())) {                _destinations.put(destination.getName(), destination);            }        }    }    /**     * Remove the specified destination from the cache.     *     * @param destination the destination to remove     */    private void removeFromDestinations(JmsDestination destination) {        synchronized (_lock) {            _destinations.remove(destination.getName());        }    }    /**     * Returns a destination given its name.     *     * @param name the name of the destination     * @return the destination corresponding to <code>name</code>     * @throws InvalidDestinationException if the named destination doesn't     *                                     exist     */    private JmsDestination getExistingDestination(String name)            throws InvalidDestinationException {        JmsDestination destination = getDestination(name);        if (destination == null) {            throw new InvalidDestinationException(                    "Destination does not exist:" + name);        }        return destination;    }    /**     * Ensures that the specified destination isn't a wildcard.     *     * @param destination the destination to check     * @throws InvalidDestinationException if the destination is a wildcard     */    private void checkWildcard(JmsDestination destination)            throws InvalidDestinationException {        if (destination instanceof JmsTopic                && ((JmsTopic) destination).isWildCard()) {            throw new InvalidDestinationException(                    "Wildcarded topics cannot be managed: "                    + destination.getName());        }    }    /**     * Returns the registered {@link DestinationEventListener}s.     *     * @return the registered {@link DestinationEventListener}s.     */    private DestinationEventListener[] getListeners() {        synchronized (_listeners) {            return (DestinationEventListener[]) _listeners.toArray(                    new DestinationEventListener[0]);        }    }    /**     * Rollback the current transaction, logging any error.     */    private void rollback() {        try {            _database.rollback();        } catch (PersistenceException exception) {            _log.error(exception, exception);        }    }    /**     * Cleanup a failed transaction, and propagate the exception as a     * JMSException.     *     * @param message   the message to log     * @param exception the exception propagate     * @throws JMSException <code>exception</code> if it is an instance of     *                      JMSException, else a new JMSException containing     *                      <code>message</code>     */    private void cleanup(String message, Exception exception)            throws JMSException {        _log.error(message, exception);        rollback();        if (exception instanceof JMSException) {            throw (JMSException) exception;        } else {            throw new JMSException(message + ": " + exception.getMessage());        }    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?