destinationmanagerimpl.java
来自「OpenJMS是一个开源的Java Message Service API 1.」· Java 代码 · 共 788 行 · 第 1/2 页
JAVA
788 行
* Returns all destinations. * * @return a list of JmsDestination instances. * @throws JMSException for any JMS error */ public List getDestinations() throws JMSException { synchronized (_lock) { return new ArrayList(_destinations.values()); } } /** * Returns a map of all destinations that match the specified topic. * <p/> * If the topic represents a wildcard then it may match none, one or more * destinations. * * @param topic the topic * @return a map of topics to DestinationCache instances */ public Map getTopicDestinationCaches(JmsTopic topic) { HashMap result = new HashMap(); synchronized (_lock) { Iterator iter = _caches.keySet().iterator(); while (iter.hasNext()) { JmsDestination dest = (JmsDestination) iter.next(); if ((dest instanceof JmsTopic) && (topic.match((JmsTopic) dest))) { result.put(dest, _caches.get(dest)); } } } return result; } /** * Perform any garbage collection on this resource. This will have the * effect of releasing system resources. If the 'aggressive' flag is set to * true then the garbage collection should do more to release memory related * resources since it is called when the application memory is low. * * @param aggressive <code>true</code> for aggressive garbage collection */ public void collectGarbage(boolean aggressive) { int gcCaches = 0; int gcDestinations = 0; DestinationCache[] caches; synchronized (_lock) { caches = (DestinationCache[]) _caches.values().toArray( new DestinationCache[0]); } for (int index = 0; index < caches.length; index++) { DestinationCache cache = caches[index]; if (cache.canDestroy()) { if (_log.isDebugEnabled()) { _log.debug("Garbage collecting destination cache=" + cache); } destroyDestinationCache(cache); gcCaches++; } else { // the cache is active, so issue a garbage collection // request on it cache.collectGarbage(aggressive); } } // get rid of non-persistent destinations, without associated caches. synchronized (_lock) { JmsDestination[] destinations = (JmsDestination[]) _destinations.values().toArray( new JmsDestination[0]); for (int i = 0; i < destinations.length; ++i) { JmsDestination dest = destinations[i]; if (!dest.getPersistent() && !_caches.containsKey(dest)) { gcDestinations++; _destinations.remove(dest.getName()); } } // log the information _log.info("DMGC Collected " + gcCaches + " caches, " + _caches.size() + " remaining."); _log.info("DMGC Collected " + gcDestinations + " destinations, " + _destinations.size() + " remaining."); } } /** * Start the service. * * @throws ServiceException if the service fails to start */ protected void doStart() throws ServiceException { if (_consumers == null) { throw new ServiceException( "ConsumerManager hasn't been initialised"); } init(); _collector.register(this); } /** * Stop the service. * * @throws ServiceException if the service fails to stop */ protected void doStop() throws ServiceException { _collector.unregister(this); JmsDestination[] destinations; synchronized (_lock) { destinations = (JmsDestination[]) _caches.keySet().toArray( new JmsDestination[0]); } for (int index = 0; index < destinations.length; index++) { destroyDestinationCache(destinations[index]); } _caches.clear(); _destinations.clear(); // remove all the listeners synchronized (_listeners) { _listeners.clear(); } } /** * Initialises the destination manager. * * @throws ServiceException if the service cannot be initialised */ protected void init() throws ServiceException { Enumeration iter; try { _database.begin(); Connection connection = _database.getConnection(); // return a list of JmsDestination objects. iter = _database.getAdapter().getAllDestinations(connection); _database.commit(); } catch (PersistenceException exception) { _log.error(exception, exception); rollback(); throw new ServiceException("Failed to get destinations", exception); } while (iter.hasMoreElements()) { // add each destination to the cache JmsDestination dest = (JmsDestination) iter.nextElement(); addToDestinations(dest); } } /** * Determines if a destination exists. * * @param name the destination name * @return <code>true</code> if the destination exists, otherwise * <code>false */ protected boolean exists(String name) { return getDestination(name) != null; } /** * Delete the specfied destination. * * @param cache the destination to destroy */ protected void destroyDestinationCache(DestinationCache cache) { destroyDestinationCache(cache.getDestination()); } /** * Delete the specfied destination. * * @param dest the destination to destroy */ protected void destroyDestinationCache(JmsDestination dest) { synchronized (_lock) { DestinationCache cache = (DestinationCache) _caches.remove(dest); if (cache != null) { // deregister the cache from message manager. _messages.removeEventListener(dest); // notify the listeners that a cache has been removed from // the destination manager notifyCacheRemoved(cache); cache.destroy(); } } } /** * Create a persistent destination. * * @param destination the destination to create * @throws JMSException if the destination cannot be created */ private void createPersistentDestination(JmsDestination destination) throws JMSException { if (_log.isDebugEnabled()) { _log.debug("createPersistentDestination(destination=" + destination + ")"); } boolean queue = (destination instanceof JmsQueue) ? true : false; PersistenceAdapter adapter = _database.getAdapter(); // check that the destination does not exist. If it exists then return // false. If it doesn't exists the create it and bind it to the jndi // context try { _database.begin(); Connection connection = _database.getConnection(); adapter.addDestination(connection, destination.getName(), queue); _database.commit(); } catch (Exception exception) { // JMSException, PersistenceException cleanup("Failed to create persistent destination " + destination.getName(), exception); } } /** * Notify the list of {@link DestinationEventListener} objects that the * specified destination has been added. * * @param destination the added destination * @throws JMSException if a listener fails to be notified */ private void notifyDestinationAdded(JmsDestination destination) throws JMSException { DestinationEventListener[] listeners = getListeners(); for (int i = 0; i < listeners.length; ++i) { listeners[i].destinationAdded(destination); } } /** * Notify the list of {@link DestinationEventListener} objects that the * specified destination has been removed. * * @param destination the added destination * @throws JMSException if a listeners fails to be notified */ private void notifyDestinationRemoved(JmsDestination destination) throws JMSException { DestinationEventListener[] listeners = getListeners(); for (int i = 0; i < listeners.length; ++i) { listeners[i].destinationRemoved(destination); } } /** * Notify the list of {@link DestinationEventListener} objects that the * specified message cache has been added. * * @param cache the added cache */ private void notifyCacheAdded(DestinationCache cache) { JmsDestination destination = cache.getDestination(); DestinationEventListener[] listeners = getListeners(); for (int i = 0; i < listeners.length; ++i) { listeners[i].cacheAdded(destination, cache); } } /** * Notify the list of {@link DestinationEventListener} objects that the * specified message cache has been removed. * * @param cache the added cache */ private void notifyCacheRemoved(DestinationCache cache) { JmsDestination destination = cache.getDestination(); DestinationEventListener[] listeners = getListeners(); for (int i = 0; i < listeners.length; ++i) { listeners[i].cacheRemoved(destination, cache); } } /** * Add the specified destination to the destination cache. * * @param destination the destination to add */ private void addToDestinations(JmsDestination destination) { synchronized (_lock) { if (!_destinations.containsKey(destination.getName())) { _destinations.put(destination.getName(), destination); } } } /** * Remove the specified destination from the cache. * * @param destination the destination to remove */ private void removeFromDestinations(JmsDestination destination) { synchronized (_lock) { _destinations.remove(destination.getName()); } } /** * Returns a destination given its name. * * @param name the name of the destination * @return the destination corresponding to <code>name</code> * @throws InvalidDestinationException if the named destination doesn't * exist */ private JmsDestination getExistingDestination(String name) throws InvalidDestinationException { JmsDestination destination = getDestination(name); if (destination == null) { throw new InvalidDestinationException( "Destination does not exist:" + name); } return destination; } /** * Ensures that the specified destination isn't a wildcard. * * @param destination the destination to check * @throws InvalidDestinationException if the destination is a wildcard */ private void checkWildcard(JmsDestination destination) throws InvalidDestinationException { if (destination instanceof JmsTopic && ((JmsTopic) destination).isWildCard()) { throw new InvalidDestinationException( "Wildcarded topics cannot be managed: " + destination.getName()); } } /** * Returns the registered {@link DestinationEventListener}s. * * @return the registered {@link DestinationEventListener}s. */ private DestinationEventListener[] getListeners() { synchronized (_listeners) { return (DestinationEventListener[]) _listeners.toArray( new DestinationEventListener[0]); } } /** * Rollback the current transaction, logging any error. */ private void rollback() { try { _database.rollback(); } catch (PersistenceException exception) { _log.error(exception, exception); } } /** * Cleanup a failed transaction, and propagate the exception as a * JMSException. * * @param message the message to log * @param exception the exception propagate * @throws JMSException <code>exception</code> if it is an instance of * JMSException, else a new JMSException containing * <code>message</code> */ private void cleanup(String message, Exception exception) throws JMSException { _log.error(message, exception); rollback(); if (exception instanceof JMSException) { throw (JMSException) exception; } else { throw new JMSException(message + ": " + exception.getMessage()); } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?