📄 consumermanager.java
字号:
addToConsumerCache(key, destination, false);
}
return endpoint;
}
/**
* Create a durable consumer with the specified well-known name.
*
* @param session the owning session
* @param topic consumer for this topic
* @param name the unique subscriber name
* @param selector the message selector. May be <code>null</code>
* @param noLocal if true, and the destination is a topic, inhibits the
* delivery of messages published by its own connection. The
* behavior for <code>noLocal</code> is not specified if the
* destination is a queue.
* @return the durable consumer endpoint
* @throws JMSException if a durable consumer is already active with the
* same <code>name</code>, or the <code>topic</code>
* doesn't exist, or <code>selector</code> is an
* invalid selector
*/
public synchronized DurableConsumerEndpoint createDurableConsumerEndpoint(
JmsServerSession session, JmsTopic topic, String name,
boolean noLocal,
String selector)
throws JMSException {
if (_log.isDebugEnabled()) {
_log.debug("createDurableConsumerEndpoint(session=" + session
+ ", topic=" + topic + ", name=" + name
+ ", selector=" + selector + ", noLocal=" + noLocal
+ ")");
}
// check that the durable subscriber is not already registered. If it
// is registered then check to see whether the client is still active.
DurableConsumerEndpoint endpoint =
(DurableConsumerEndpoint) _endpoints.get(name);
if (endpoint != null) {
throw new JMSException(name + " is already registered");
}
// check that the destination actually exists, if the topic
// is not a wildcard
if (!topic.isWildCard() &&
!DestinationManager.instance().destinationExists(topic)) {
throw new JMSException("Cannot create a durable consumer for "
+ topic);
}
// if we get this far then we need to create the durable consumer
long consumerId = getNextConsumerId();
endpoint = new DurableConsumerEndpoint(consumerId, session, topic, name,
selector, noLocal, _scheduler);
_endpoints.put(endpoint.getPersistentId(), endpoint);
return endpoint;
}
/**
* Check whether there are active durable consumers for the specified
* destination.
*
* @param topic the destination to check
* @return <code>true</code> if there is at least one active consumer
*/
public synchronized boolean hasActiveDurableConsumers(JmsDestination topic) {
boolean result = false;
List consumers = (List) _destToConsumerMap.get(topic);
if (consumers != null) {
Iterator iterator = consumers.iterator();
while (iterator.hasNext()) {
ConsumerEntry entry = (ConsumerEntry) iterator.next();
if (entry.isDurable()) {
result = true;
break;
}
}
}
return result;
}
/**
* Create a browser for the specified destination and the selector. A
* browser is responsible for passing all messages back to the client that
* reside on the queue
*
* @param session the owning session
* @param queue the queue to browse
* @param selector the message selector. May be <code>null</code>
* @return the queue browser endpoint
*/
public synchronized ConsumerEndpoint createQueueBrowserEndpoint(
JmsServerSession session, JmsQueue queue, String selector)
throws JMSException {
// ensure that the destination is valid before proceeding
checkDestination(queue);
long consumerId = getNextConsumerId();
ConsumerEndpoint consumer = new QueueBrowserEndpoint(consumerId, session,
queue, selector,
_scheduler);
Object key = ConsumerEntry.getConsumerKey(consumer);
_endpoints.put(key, consumer);
addToConsumerCache(key, queue, false);
return consumer;
}
/**
* Destroy the endpoint associated with the specified durable consumer
*
* @param name - name of the durable consumer
* @throws JMSException - if itt cannot complete the request
*/
public synchronized void deleteDurableConsumerEndpoint(String name)
throws JMSException {
if (_log.isDebugEnabled()) {
_log.debug("deleteDurableConsumerEndpoint(name=" + name + ")");
}
ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name);
if (entry != null) {
if (entry.isDurable()) {
deleteConsumerEndpoint((ConsumerEndpoint) _endpoints.get(name));
} else {
throw new JMSException(name + " is not a durable subscriber");
}
} else {
// ignore since the consumer is not active
if (_log.isDebugEnabled()) {
_log.debug("deleteDurableConsumerEndpoint(name=" + name
+ "): failed to locate consumer");
}
}
}
/**
* Destroy the specified consumer.
*
* @param consumer the consumer to destroy
*/
public synchronized void deleteConsumerEndpoint(ConsumerEndpoint consumer) {
if (_log.isDebugEnabled()) {
_log.debug("deleteConsumerEndpoint(consumer=[Id="
+ consumer.getId() + ", destination="
+ consumer.getDestination() + ")");
}
Object key = ConsumerEntry.getConsumerKey(consumer);
// if the consumer is currently active then delete it
ConsumerEndpoint existing = (ConsumerEndpoint) _endpoints.get(key);
if (existing != null) {
// remove it from the list of active endpoints
// As a fix for bug 759752, only remove the consumer if it
// matches the existing one
if (consumer.getId() == existing.getId()) {
_endpoints.remove(key);
} else {
if (_log.isDebugEnabled()) {
_log.debug("Existing endpoint doesn't match that to " +
"be deleted - retaining");
}
}
// close the endpoint
consumer.close();
// remove it from the consumer cache if and only if it is a
// non-durable subscriber
if (!(consumer instanceof DurableConsumerEndpoint)) {
removeFromConsumerCache(key);
}
}
}
/**
* Return the consumer with the specified identity.
*
* @param consumerId the identity of the consumer
* @return the associated consumer, or <code>null</code> if none exists
*/
public ConsumerEndpoint getConsumerEndpoint(long consumerId) {
return (ConsumerEndpoint) _endpoints.get(new Long(consumerId));
}
/**
* Return the consumer with the specified persistent identity.
*
* @param persistentId the persistent identity of the consumer
* @return the associated consumer, or <code>null</code> if none exists
*/
public ConsumerEndpoint getConsumerEndpoint(String persistentId) {
return (ConsumerEndpoint) _endpoints.get(persistentId);
}
/**
* Check whether there is an active consumer for the specified destination
*
* @param destination - the destination to check
* @return boolean - true if it exists
*/
public boolean hasActiveConsumers(JmsDestination destination)
throws JMSException {
boolean result = false;
Object[] endpoints = _endpoints.values().toArray();
for (int index = 0; index < endpoints.length; index++) {
ConsumerEndpoint endpoint = (ConsumerEndpoint) endpoints[index];
JmsDestination endpoint_dest = endpoint.getDestination();
if ((destination instanceof JmsTopic) &&
(endpoint_dest instanceof JmsTopic) &&
(((JmsTopic) endpoint_dest).isWildCard())) {
if (((JmsTopic) endpoint_dest).match((JmsTopic) destination)) {
result = true;
break;
}
} else {
if (endpoint_dest.equals(destination)) {
result = true;
break;
}
}
}
return result;
}
/**
* Check whether a particular durable consumer is active
*
* @param name - the consumer name
* @return boolean - true if active
*/
public boolean isDurableConsumerActive(String name) {
return (_endpoints.get(name) != null);
}
/**
* Return the destination assoicated with the specified durable consumer.
*
* @param name - consumer name
* @return JmsDestination - the destination is it registered under or null
*/
public JmsDestination getDestinationForConsumerName(String name) {
ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name);
return (entry != null) ? entry.getDestination() : null;
}
/**
* Check if the specified durable consumer exists
*
* @param name - the name of the durable consumer
* @return boolean - true if successful
*/
public boolean durableConsumerExists(String name) {
boolean result = false;
ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name);
if ((entry != null) &&
(entry._durable)) {
result = true;
}
return result;
}
/**
* This method will check that the name-destination pair actually are valid
* and exist as a DurableConsumer entity
*
* @param topic - the name of the topic
* @param name - the name of the durable consumer
* @return boolean - true if valid and false otherwise
*/
public boolean validSubscription(String topic, String name) {
boolean result = false;
ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name);
if ((entry != null) &&
(entry._destination != null) &&
(entry._destination.getName().equals(topic))) {
result = true;
}
return result;
}
/**
* Returns inactive subscription names for a persistent topic
*
* @param topic the topic
* @return a list of subscription names, as <code>String</code>s
*/
public synchronized List getInactiveSubscriptions(JmsTopic topic) {
List result = new ArrayList();
List consumers = (List) _destToConsumerMap.get(topic);
if (consumers != null) {
Iterator iterator = consumers.iterator();
while (iterator.hasNext()) {
ConsumerEntry entry = (ConsumerEntry) iterator.next();
if (entry.isDurable()
&& !_endpoints.containsKey(entry.getKey())) {
result.add(entry.getName());
}
}
}
return result;
}
/**
* Destroy this manager. This is brutal and final
*/
public synchronized void destroy() {
// clean up all the destinations
Object[] endpoints = _endpoints.values().toArray();
for (int index = 0; index < endpoints.length; index++) {
deleteConsumerEndpoint((ConsumerEndpoint) endpoints[index]);
}
_endpoints.clear();
// remove cache data structures
_consumerCache.clear();
_consumerCache = null;
_destToConsumerMap.clear();
_destToConsumerMap = null;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -