📄 destinationmanager.java
字号:
destination, null, message);
}
} catch (PersistenceException exception) {
// catch and rethrow
throw exception;
} catch (Exception exception) {
throw new PersistenceException(
"Exception in DestinationManager.messageRemoved " +
exception.toString());
}
}
// implement of GarbageCollectable.collectGarbage
public synchronized void collectGarbage(boolean aggressive) {
// before continuing we should change the priority of the thread
// to the lowest value.
int gc_caches = 0;
int gc_destinations = 0;
Object[] caches = _caches.values().toArray();
for (int index = 0; index < caches.length; index++) {
DestinationCache cache = (DestinationCache) caches[index];
if (cache.canDestroy()) {
if (_log.isDebugEnabled()) {
_log.debug("Garbage collecting destination cache="
+ cache);
}
destroyDestinationCache(cache);
gc_caches++;
} else {
// the cache is active, so issue a garbage collection
// request on it
cache.collectGarbage(aggressive);
}
}
// get rid of non-administered destinations, without
// associated caches.
Iterator destinations = _destinationCache.values().iterator();
Vector to_delete = new Vector();
while (destinations.hasNext()) {
JmsDestination dest = (JmsDestination) destinations.next();
if (!(dest.getPersistent()) &&
(!_caches.containsKey(dest))) {
to_delete.add(dest);
gc_destinations++;
}
}
// now delete the actual destinations
Enumeration todel = to_delete.elements();
while (todel.hasMoreElements()) {
_destinationCache.remove(((JmsDestination) todel.nextElement()).getName());
}
// log the information
_log.info("DMGC Collected " + gc_caches + " caches, " + _caches.size()
+ " remaining.");
_log.info("DMGC Collected " + gc_destinations + " destinations, "
+ _destinationCache.size() + " remaining.");
}
/**
* Return a HashMap of all destinations that match the specified topic
* If the topic represents a wildcard then it may match none, one or more
* destinations. The results are returns as a mapping of destination and
* the corresponding cache
* <p>
* The topic maybe a straight topic name or a wildcard
*
* @param topic - topic to query
* @return HashMap
*/
synchronized HashMap getTopicDestinationCaches(JmsTopic topic) {
HashMap result = new HashMap();
Iterator iter = _caches.keySet().iterator();
while (iter.hasNext()) {
JmsDestination dest = (JmsDestination) iter.next();
if ((dest instanceof JmsTopic) &&
(topic.match((JmsTopic) dest))) {
result.put(dest, _caches.get(dest));
}
}
return result;
}
/**
* Destroy this manager. This is brutal and final
*/
public synchronized void destroy() {
// clean up all the destinations
Object[] dests = _caches.keySet().toArray();
for (int index = 0; index < dests.length; index++) {
destroyDestinationCache((JmsDestination) dests[index]);
}
_caches.clear();
_caches = null;
_destinationCache.clear();
_destinationCache = null;
// remove all the listeners
_listeners.clear();
_listeners = null;
_context = null;
// reset the singleton
_instance = null;
}
/**
* Test whether the specified destination is an administered destination. It
* assumes that the destination exsits.
*
* @param name - the name of the destination
* @return boolean - true if it is and false otherwise
*/
public boolean isAdministeredDestination(String name) {
boolean result = false;
JmsDestination dest = (JmsDestination) _destinationCache.get(name);
if ((dest != null) &&
(dest.getPersistent())) {
result = true;
}
return result;
}
/**
* Test whether the specified message is for an administered destination.
* This would be the case if the destination is administered or if there
* is an administered wildcard destination that is satisfied by the
* message destination
*
* @param message - the message to check
* @return boolean
*/
public boolean isMessageForAdministeredDestination(MessageImpl msg) {
boolean result = false;
try {
JmsDestination mdest = (JmsDestination) msg.getJMSDestination();
JmsDestination dest = (JmsDestination) _destinationCache.get(mdest.getName());
if (dest != null) {
if (dest.getPersistent()) {
result = true;
} else if (mdest instanceof JmsTopic) {
// check if any of the wildcards are administered
Object[] dests = _wildcardDestinations.toArray();
for (int index = 0; index < dests.length; index++) {
JmsTopic adest = (JmsTopic) dests[index];
if ((adest.match((JmsTopic) mdest)) &&
(adest.getPersistent())) {
result = true;
break;
}
}
}
}
} catch (JMSException ignore) {
}
return result;
}
/**
* Add the specified entry to the destination cache, if it doesn't
* already exist.
*
* @param destination - destination to add
*/
void addToDestinationCache(JmsDestination destination) {
synchronized (_destinationCache) {
if (!_destinationCache.containsKey(destination.getName())) {
_destinationCache.put(destination.getName(), destination);
// check whether it is a wildcard destination
if (((destination instanceof JmsTopic) &&
(((JmsTopic) destination).isWildCard()))) {
_wildcardDestinations.add(destination);
}
}
}
}
/**
* Remove the specified destination from the cache
*
* @param destination - the destination to remove
*/
void removeFromDestinationCache(JmsDestination destination) {
synchronized (_destinationCache) {
if (_destinationCache.remove(destination.getName()) != null) {
// check whether we also need to delete it from the
// list of wildcard subscriptions
if (((destination instanceof JmsTopic) &&
(((JmsTopic) destination).isWildCard()))) {
_wildcardDestinations.remove(destination);
}
}
}
}
/**
* Check if the specified destination exists.
*
* @param destination - the destination to check
* @return boolean - true if it exists and false otherwise
*/
public boolean destinationExists(JmsDestination destination) {
return _destinationCache.containsKey(destination.getName());
}
/**
* Initialises the destination manager.
*
* @throws ServiceException if the service cannot be initialised
*/
protected void init() throws ServiceException {
Connection connection = null;
TransactionManager tm = null;
try {
connection = DatabaseService.getConnection();
// return a list of JmsDestination objects.
Enumeration iter =
DatabaseService.getAdapter().getAllDestinations(connection);
connection.commit();
while (iter.hasMoreElements()) {
// add each destination to the cache and also bind
// it to the context
JmsDestination dest = (JmsDestination) iter.nextElement();
addToDestinationCache(dest);
try {
// for each of the administered destinations rebind it to
// the jndi context
dest.setPersistent(true);
ContextHelper.rebind(getContext(), dest.getName(), dest);
} catch (NamingException error) {
throw new ServiceException(
"Failed to add destination " + dest.getName()
+ " to JNDI", error);
}
}
} catch (PersistenceException exception) {
SQLHelper.rollback(connection);
String msg = "Failed to initialise DestinationManager";
_log.error(msg, exception);
throw exception;
} catch (SQLException exception) {
SQLHelper.rollback(connection);
String msg = "Failed to initialise DestinationManager";
_log.error(msg, exception);
throw new ServiceException(msg, exception);
} finally {
SQLHelper.close(connection);
}
}
/**
* Notyify the list of DestinationEventListener objects that the specified
* destination has been added
*
* @param dest - the destination that was added
* @param cache - the corresponding cache
*/
private void notifyDestinationAdded(JmsDestination dest,
DestinationCache cache) {
synchronized (_listeners) {
Iterator iter = _listeners.iterator();
while (iter.hasNext()) {
((DestinationEventListener) iter.next()).destinationAdded(dest, cache);
}
}
}
/**
* Notyify the list of DestinationEventListener objects that the specified
* destination has been removed
*
* @param dest - the destination that was removed
* @param cache - the corresponding cache
*/
private void notifyDestinationRemoved(JmsDestination dest,
DestinationCache cache) {
synchronized (_listeners) {
Iterator iter = _listeners.iterator();
while (iter.hasNext()) {
((DestinationEventListener) iter.next()).destinationRemoved(dest, cache);
}
}
}
/**
* Return the initial context using the JndiHelper. It assumes that the
* configuration manager has been successfully initialized. If the context
* has been retrieved it is cached so subsequent gets will return the
* cached instance
*
* @return the root context
*/
private static Context getContext() throws NamingException {
return NamingHelper.getInitialContext(
ConfigurationManager.getConfig());
}
/**
* This static class is used to maintain information about a
* destination
*/
private static class DestinationEntry {
/**
* The destination that this entry pertains too
*/
public JmsDestination _destination = null;
/**
* Indicates whether the destination is administered
*/
public boolean _administered = false;
/**
* Construct an instance of the entry using the specified
* parameters
*
* @param dest - the destination
* @param administered - true if the destination is administered
*/
DestinationEntry(JmsDestination dest, boolean administered) {
_destination = dest;
_administered = administered;
}
// override Object.equals
public boolean equals(Object obj) {
boolean result = false;
if ((obj != null) &&
(obj instanceof DestinationEntry)) {
result = _destination.equals(((DestinationEntry) obj)._destination);
}
return result;
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -