📄 consumermanager.java
字号:
_wildcardConsumers.clear();
_wildcardConsumers = null;
// reset the singleton
_instance = null;
}
/**
* Return a list of durable subscribers for the specified destination
*
* @return Vector - a vector of strings, which denote the name
*/
public synchronized List getDurableConsumersForDest(JmsTopic dest) {
List names = new ArrayList();
List consumers = (List) _destToConsumerMap.get(dest);
if (consumers != null) {
Iterator iterator = consumers.iterator();
while (iterator.hasNext()) {
ConsumerEntry entry = (ConsumerEntry) iterator.next();
if (entry.isDurable()) {
names.add(entry.getName());
}
}
}
// if the destination is a topic and part is also a wildcard then
// check the wildcardConsumers for additional consumers
Iterator wildconsumers = _wildcardConsumers.keySet().iterator();
while (wildconsumers.hasNext()) {
ConsumerEntry entry = (ConsumerEntry) wildconsumers.next();
JmsDestination adest = entry.getDestination();
if (entry.isDurable() && adest instanceof JmsTopic &&
((JmsTopic) adest).match((JmsTopic) dest)) {
names.add(entry.getName());
}
}
return names;
}
/**
* Return a list of {@link ConsumerEndpoint} objects attached to the
* specified destination
*
* @param dest the destination to query
* @return list of endpoints
*/
public synchronized List getEndpointsForDest(JmsDestination dest) {
LinkedList endpoints = new LinkedList();
Iterator iter = _endpoints.values().iterator();
while (iter.hasNext()) {
ConsumerEndpoint endpoint = (ConsumerEndpoint) iter.next();
if (dest.equals(endpoint.getDestination())) {
endpoints.add(endpoint);
}
}
return endpoints;
}
/**
* Add the specified consumer to the cache.
*
* @param key a key to identify the consumer
* @param dest the destination it is subscribed to. It can be a wildcard
* @param durable indicates whether it is a durable subscription
*/
private synchronized void addToConsumerCache(Object key, JmsDestination dest,
boolean durable) {
if (_log.isDebugEnabled()) {
_log.debug("addToConsumerCache(key=" + key + ", dest=" + dest
+ ", durable=" + durable + ")");
}
if (!_consumerCache.containsKey(key)) {
ConsumerEntry entry = new ConsumerEntry(key, dest, durable);
_consumerCache.put(key, entry);
// if the specified destination is a JmsTopic and also a wildcard
// then we need to add it to all matching desitnations
if (dest instanceof JmsTopic && ((JmsTopic) dest).isWildCard()) {
// store wild card consumers in a separate array.
_wildcardConsumers.put(entry, dest);
} else {
// we also need to add the reverse mapping
List consumers = (List) _destToConsumerMap.get(dest);
if (consumers == null) {
consumers = new ArrayList();
_destToConsumerMap.put(dest, consumers);
}
// add the mapping
consumers.add(entry);
}
}
}
/**
* Remove the specified consumer from the cache
*
* @param key the consumer key
*/
private synchronized void removeFromConsumerCache(Object key) {
if (_log.isDebugEnabled()) {
_log.debug("removeFromConsumerCache(key=" + key + ")");
}
ConsumerEntry entry = (ConsumerEntry) _consumerCache.remove(key);
if (entry != null) {
JmsDestination dest = entry.getDestination();
if (dest instanceof JmsTopic && ((JmsTopic) dest).isWildCard()) {
// remove it from the wildcard cache.
_wildcardConsumers.remove(entry);
} else {
// remove it from the specified destination
List consumers = (List) _destToConsumerMap.get(dest);
if (consumers != null) {
consumers.remove(entry);
// if consumers is of size 0 then remove it
if (consumers.isEmpty()) {
_destToConsumerMap.remove(dest);
}
}
}
} else {
if (_log.isDebugEnabled()) {
_log.debug("removeFromConsumerCache(key=" + key +
"): consumer not found");
}
}
}
/**
* Remove all the consumers for the specified destination from the cache.
*
* @param destination - destination to remove.
*/
private synchronized void removeFromConsumerCache(
JmsDestination destination) {
if (_destToConsumerMap.containsKey(destination)) {
_destToConsumerMap.remove(destination);
}
}
/**
* Initialises the consumer manager
*
* @throws ServiceException if the manager can't be initialised
*/
private void init() throws ServiceException {
_scheduler = Scheduler.instance();
Connection connection = null;
try {
connection = DatabaseService.getConnection();
PersistenceAdapter adapter = DatabaseService.getAdapter();
connection.commit();
// return a list of JmsDestination objects.
HashMap map = adapter.getAllDurableConsumers(connection);
Iterator iter = map.keySet().iterator();
// Create an endpoint for each durable consumer
while (iter.hasNext()) {
// for each destination, create the destination cache
String consumer = (String) iter.next();
String deststr = (String) map.get(consumer);
JmsDestination dest =
DestinationManager.instance().getDestination(deststr);
if (dest == null) {
// this maybe a wildcard subscription
dest = new JmsTopic(deststr);
if (!((JmsTopic) dest).isWildCard()) {
dest = null;
}
}
if (consumer != null && dest != null &&
dest instanceof JmsTopic) {
// cache the consumer-destination mapping in memory.
addToConsumerCache(consumer, dest, true);
} else {
// what should we do about this stage
_log.error("Failure in ConsumerManager.init : " + consumer +
":" + dest);
}
}
} catch (ServiceException exception) {
SQLHelper.rollback(connection);
throw exception;
} catch (Exception exception) {
SQLHelper.rollback(connection);
throw new ServiceException("Failed to initialise ConsumerManager",
exception);
} finally {
SQLHelper.close(connection);
}
}
/**
* Returns the next seed value to be allocated to a new consumer
*
* @return a unique identifier for a consumer
*/
private synchronized long getNextConsumerId() {
return ++_consumerIdSeed;
}
/**
* Verifies that a destination is valid.
* <p/>
* A destination is valid if it is persistent and is registered with {@link
* DestinationManager}, or non-persistent. If it is non-persistent, it will
* be registered
*
* @param destination the destination to check
* @throws InvalidDestinationException if the destination is invalid
*/
private void checkDestination(JmsDestination destination)
throws InvalidDestinationException {
final DestinationManager manager = DestinationManager.instance();
final String name = destination.getName();
final JmsDestination existing = manager.getDestination(name);
if (existing == null) {
if (destination.getPersistent()) {
throw new InvalidDestinationException(
"No persistent destination with name=" + name
+ " exists");
}
// non-persistent destinations can be registered dynamically
manager.createDestination(destination);
} else {
// make sure the supplied destination has the same properties
// as the existing one
if (existing.getPersistent() != destination.getPersistent()) {
throw new InvalidDestinationException(
"Mismatched destination properties for destination"
+ "with name=" + name);
}
}
}
/**
* Helper class used to maintain consumer information
*/
private static final class ConsumerEntry {
/**
* An identifier for the consumer.
*/
private final Object _key;
/**
* Indicated whether this entry is for a durable subscriber
*/
private final boolean _durable;
/**
* The destination that the consumer is actually subscribed too
*/
private final JmsDestination _destination;
/**
* Construct an instance of this class using the specified name and
* durable subscriber indicator
*
* @param key an identifier for the consumer
* @param destination the destination consumer is subscribed to
* @param durable indicates whether it is a durable subscription
*/
public ConsumerEntry(Object key, JmsDestination destination,
boolean durable) {
_key = key;
_destination = destination;
_durable = durable;
}
// override Object.equals
public boolean equals(Object obj) {
boolean result = false;
if ((obj != null) &&
(obj instanceof ConsumerEntry) &&
(((ConsumerEntry) obj)._key.equals(_key))) {
result = true;
}
return result;
}
public Object getKey() {
return _key;
}
public String getName() {
return (_key instanceof String) ? (String) _key : null;
}
public JmsDestination getDestination() {
return _destination;
}
public boolean isDurable() {
return _durable;
}
/**
* Helper to return a key for identifying {@link ConsumerEndpoint}
* instances. This returns the consumers persistent identifier if it has
* one; if not, it returns its transient identifier.
*
* @param consumer the consumer
* @return a key for identifying <code>consumer</code>
*/
public static Object getConsumerKey(ConsumerEndpoint consumer) {
Object key = null;
String id = consumer.getPersistentId();
if (id != null) {
key = id;
} else {
key = new Long(consumer.getId());
}
return key;
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -