📄 consumermanager.java
字号:
MessageImpl message)
throws PersistenceException {
try {
JmsDestination dest = (JmsDestination) message.getJMSDestination();
// check to ensure that the message is not of type queue. If it
// is then we don't want to process it here.
if (dest instanceof JmsQueue) {
return false;
}
Vector consumers = (Vector) _destToConsumerMap.get(dest);
if (consumers != null) {
// now search through the list of inactive durable consumers
// and remove it from their cache
Enumeration entries = consumers.elements();
while (entries.hasMoreElements()) {
ConsumerEntry entry = (ConsumerEntry) entries.nextElement();
if ((entry._durable) &&
(!_endpoints.containsKey(entry._name))) {
MessageHandleFactory.destroyPersistentHandle(connection,
dest, entry._name, message);
}
}
}
} catch (PersistenceException exception) {
throw exception;
} catch (Exception exception) {
// rethrow as a PersistenceException...but does it make sense
throw new PersistenceException(
"Exception in ConsumerManager.persistentMessageRemoved " +
exception.toString());
}
return true;
}
/**
* Destroy this manager. This is brutal and final
*/
public synchronized void destroy() {
// clean up all the destinations
Object[] endpoints = _endpoints.values().toArray();
for (int index = 0; index < endpoints.length; index++) {
deleteConsumerEndpoint((ConsumerEndpoint) endpoints[index]);
}
_endpoints.clear();
// remove cache data structures
_consumerCache.clear();
_consumerCache = null;
_destToConsumerMap.clear();
_destToConsumerMap = null;
_wildcardConsumers.clear();
_wildcardConsumers = null;
// reset the singleton
_instance = null;
}
/**
* Return a list of durable subscribers for the specified destination
*
* @return Vector - a vector of strings, which denote the name
*/
public synchronized Vector getDurableConsumersForDest(JmsTopic dest) {
Vector names = new Vector();
Vector consumers = (Vector) _destToConsumerMap.get(dest);
if (consumers != null) {
Enumeration entries = consumers.elements();
while (entries.hasMoreElements()) {
ConsumerEntry entry = (ConsumerEntry) entries.nextElement();
if (entry._durable) {
names.add(entry._name);
}
}
}
// if the destination is a topic and part is also a wildcard then
// check the wildcardConsumers for additional consumers
Iterator wildconsumers = _wildcardConsumers.keySet().iterator();
while (wildconsumers.hasNext()) {
ConsumerEntry entry = (ConsumerEntry) wildconsumers.next();
JmsDestination adest = entry._destination;
if ((entry._durable) &&
(adest instanceof JmsTopic) &&
(((JmsTopic) adest).match((JmsTopic) dest))) {
names.add(entry._name);
}
}
return names;
}
/**
* Clean all the unreferenced endpoints for a specified destination.
* Unreference endpoints usually occur when the client abnormally
* terminates leaving some dangling endpoints on the server.
*
* @param dest- the destination to query
*/
public void cleanUnreferencedEndpoints(JmsDestination dest) {
Object[] endpoints = _endpoints.values().toArray();
for (int index = 0; index < endpoints.length; index++) {
ConsumerEndpoint endpoint = (ConsumerEndpoint) endpoints[index];
if (dest.equals(endpoint.getDestination())) {
if (!endpoint.getSession().isClientEndpointActive()) {
try {
endpoint.getSession().close();
} catch (Exception ignore) {
// ignore this exception
}
}
}
}
}
/**
* Return a list of {@link ConsumerEndpoint} objects attached to the
* specified destination
*
* @param dest the destination to query
* @return list of endpoints
*/
public synchronized LinkedList getEndpointsForDest(JmsDestination dest) {
LinkedList endpoints = new LinkedList();
Iterator iter = _endpoints.values().iterator();
while (iter.hasNext()) {
ConsumerEndpoint endpoint = (ConsumerEndpoint) iter.next();
if (dest.equals(endpoint.getDestination())) {
endpoints.add(endpoint);
}
}
return endpoints;
}
// implement of GarbageCollectable.collectGarbage
public synchronized void collectGarbage(boolean aggressive) {
if (aggressive) {
Object[] endpoints = _endpoints.values().toArray();
int count = endpoints.length;
for (int index = 0; index < count; index++) {
((ConsumerEndpoint) endpoints[index]).collectGarbage(aggressive);
}
}
}
/**
* Add the specified consumer to the cache.
*
* @param name - the name of the consumer
* @param dest - the destination it is subscribed to. It can be a wildcard
* @param durable - indicates whether it is a durable subscription
*/
synchronized void addToConsumerCache(String name, JmsDestination dest,
boolean durable)
throws JMSException {
if (_log.isDebugEnabled()) {
_log.debug("addToConsumerCache(name=" + name + ", dest=" + dest
+ ", durable=" + durable + ")");
}
if (!_consumerCache.containsKey(name)) {
ConsumerEntry entry = new ConsumerEntry(name, dest, durable);
_consumerCache.put(name, entry);
// if the specified destination is a JmsTopic and also a wildcard
// then we need to add it to all matching desitnations
if ((dest instanceof JmsTopic) &&
(((JmsTopic) dest).isWildCard())) {
// store wild card consumers in a separate array.
_wildcardConsumers.put(new ConsumerEntry(name, dest, durable),
dest);
} else {
// we also need to add the reverse mapping
Vector consumers = (Vector) _destToConsumerMap.get(dest);
if (consumers == null) {
consumers = new Vector();
_destToConsumerMap.put(dest, consumers);
}
// add the mapping
consumers.add(entry);
}
}
}
/**
* Remove the specified consumer from the cache and make all necessary
* adjustments
*
* @param name - name of consumer to remove
*/
synchronized void removeFromConsumerCache(String name)
throws JMSException {
if (_log.isDebugEnabled()) {
_log.debug("removeFromConsumerCache(name=" + name + ")");
}
if (_consumerCache.containsKey(name)) {
ConsumerEntry entry = (ConsumerEntry) _consumerCache.remove(name);
JmsDestination dest = entry._destination;
if ((dest instanceof JmsTopic) &&
(((JmsTopic) dest).isWildCard())) {
// remove it from the wildcard cache.
_wildcardConsumers.remove(name);
} else {
// remove it from the specified destination
Vector consumers = (Vector) _destToConsumerMap.get(dest);
if (consumers != null) {
consumers.remove(entry);
// if consumers is of size 0 then remove it
if (consumers.size() == 0) {
_destToConsumerMap.remove(dest);
}
}
}
} else {
if (_log.isDebugEnabled()) {
_log.debug("removeFromConsumerCache(name=" + name +
"): consumer not found");
}
}
}
/**
* Remove all the consumers for the specified destination from the
* cache.
*
* @param destination - destination to remove.
*/
synchronized void removeFromConsumerCache(JmsDestination destination) {
if (_destToConsumerMap.containsKey(destination)) {
_destToConsumerMap.remove(destination);
// i am not too sure whether we need more house keeping
}
}
/**
* Return the number of active consumer endpoints
*
* @return int - number of active consumer endpoints
*/
int getConsumerEndpointCount() {
return _endpoints.size();
}
/**
* Initialises the consumer manager
*
* @throws ServiceException if the manager can't be initialised
*/
private void init() throws ServiceException {
_scheduler = Scheduler.instance();
Connection connection = null;
try {
connection = DatabaseService.getConnection();
PersistenceAdapter adapter = DatabaseService.getAdapter();
connection.commit();
// return a list of JmsDestination objects.
HashMap map = adapter.getAllDurableConsumers(connection);
Iterator iter = map.keySet().iterator();
// Create an endpoint for each durable consumer
while (iter.hasNext()) {
// for each destination, create the destination cache
String consumer = (String) iter.next();
String deststr = (String) map.get(consumer);
JmsDestination dest =
DestinationManager.instance().destinationFromString(
deststr);
if (dest == null) {
// this maybe a wildcard subscription
dest = new JmsTopic(deststr);
if (!((JmsTopic) dest).isWildCard()) {
dest = null;
}
}
if (consumer != null && dest != null &&
dest instanceof JmsTopic) {
// cache the consumer-destination mapping in memory.
addToConsumerCache(consumer, dest, true);
} else {
// what should we do about this stage
_log.error(
"Failure in ConsumerManager.init : " + consumer +
":" + dest);
}
}
} catch (ServiceException exception) {
SQLHelper.rollback(connection);
throw exception;
} catch (Exception exception) {
SQLHelper.rollback(connection);
throw new ServiceException("Failed to initialise ConsumerManager",
exception);
} finally {
SQLHelper.close(connection);
}
}
/**
* This private static class is used to maintain consumer information
*/
private static class ConsumerEntry {
/**
* The name of the consumer. This name is either the durable
* subscriber name or a uniquely generated name.
*/
String _name = null;
/**
* Indicated whether this entry is for a durable subscriber
*/
boolean _durable = false;
/**
* The destination that the consumer is actually subscribed too
*/
JmsDestination _destination = null;
/**
* Construct an instance of this class using the specified
* name and durable subscriber indicator
*
* @param name - the name of the consumer
* @param destination - the destination consumer is subscribed too
* @param durable - indicates whether it is a durable subscription
*/
ConsumerEntry(String name, JmsDestination destination,
boolean durable) {
_name = name;
_destination = destination;
_durable = durable;
}
// override Object.equals
public boolean equals(Object obj) {
boolean result = false;
if ((obj != null) &&
(obj instanceof ConsumerEntry) &&
(((ConsumerEntry) obj)._name.equals(_name))) {
result = true;
}
return result;
}
} //-- ConsumerEntry
} //-- ConsumerManager
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -