📄 consumermanager.java
字号:
*
* @param session the owning session
* @param topic consumer for this topic
* @param name the unique subscriber name
* @param clientId the remote client identity
* @param selector the message selector. May be <code>null</code>
* @return the durable consumer endpoint
* @throws JMSException if a durable consumer is already active with
* the same <code>name</code>, or the <code>topic</code> doesn't exist,
* or <code>selector</code> is an invalid selector
*/
public synchronized DurableConsumerEndpoint createDurableConsumerEndpoint(
JmsServerSession session, JmsTopic topic, String name,
long clientId, String selector)
throws JMSException {
if (_log.isDebugEnabled()) {
_log.debug("createDurableConsumerEndpoint(session=[sessionId="
+ session.getSessionId() + "], topic=" + topic
+ ", name=" + name + ", clientId=" + clientId
+ ", selector=" + selector);
}
// check that the durable subscriber is not already registered. If it
// is registered then check to see whether the client is still active.
DurableConsumerEndpoint endpoint =
(DurableConsumerEndpoint) _endpoints.get(name);
if (endpoint != null) {
if (endpoint.getSession().isClientEndpointActive()) {
throw new JMSException(name + " is already registered");
} else {
// client endpoint must have been lost
if (_log.isDebugEnabled()) {
_log.debug("Closing session for inactive durable " +
"consumer [name=" + name + "]");
}
endpoint.getSession().close();
}
}
// check that the destination actually exists, if the topic
// is not a wildcard
if (!topic.isWildCard() &&
!DestinationManager.instance().destinationExists(topic)) {
throw new JMSException("Cannot create a durable consumer for "
+ topic);
}
// if we get this far then we need to create the durable consumer
endpoint = new DurableConsumerEndpoint(session, clientId, topic, name,
selector, _scheduler);
_endpoints.put(endpoint.getPersistentId(), endpoint);
return endpoint;
}
/**
* Check whether there are active durable consumers for the specified
* destination.
*
* @param topic the destination to check
* @param <code>true</code> if there is at least one active consumer
*/
public synchronized boolean hasActiveDurableConsumers(
JmsDestination topic) {
boolean result = false;
Vector consumers = (Vector) _destToConsumerMap.get(topic);
if (consumers != null) {
Enumeration iterator = consumers.elements();
while (iterator.hasMoreElements()) {
ConsumerEntry entry = (ConsumerEntry) iterator.nextElement();
if (entry._durable) {
result = true;
break;
}
}
}
return result;
}
/**
* Create a browser for the specified destination and the selector. A
* browser is responsible for passing all messages back to the client
* that reside on the queue
*
* @param session the owning session
* @param clientId the remote client identity, which is session scoped
* @param queue the queue destination cache
* @param selector optional filter
* @return the queue browser endpoint
*/
public synchronized ConsumerEndpoint createQueueBrowserEndpoint(
JmsServerSession session, long clientId, JmsQueue queue,
String selector)
throws JMSException {
ConsumerEndpoint consumer = null;
if (queue != null) {
consumer = new QueueBrowserEndpoint(session, clientId,
queue, selector, _scheduler);
} else {
throw new JMSException("Cannot create a browser for a null queue");
}
String id = consumer.getPersistentId();
_endpoints.put(id, consumer);
addToConsumerCache(id, queue, false);
return consumer;
}
/**
* Destroy the endpoint associated with the specified durable consumer
*
* @param name - name of the durable consumer
* @exception JMSException - if itt cannot complete the request
*/
public synchronized void deleteDurableConsumerEndpoint(String name)
throws JMSException {
if (_log.isDebugEnabled()) {
_log.debug("deleteDurableConsumerEndpoint(name=" + name + ")");
}
ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name);
if (entry != null) {
if (entry._durable) {
deleteConsumerEndpoint((ConsumerEndpoint) _endpoints.get(name));
} else {
throw new JMSException(name + " is not a durable subscriber");
}
} else {
// ignore since the consumer is not active
if (_log.isDebugEnabled()) {
_log.debug("deleteDurableConsumerEndpoint(name=" + name
+ "): failed to locate consumer");
}
}
}
/**
* Destroy the specified consumer
*
* @param consumer the consumer to destroy
*/
public synchronized void deleteConsumerEndpoint(
ConsumerEndpoint consumer) {
if (_log.isDebugEnabled()) {
_log.debug("deleteConsumerEndpoint(consumer=[clientId="
+ consumer.getClientId() + ", destination="
+ consumer.getDestination() + ", session=[sessionId="
+ consumer.getSession().getSessionId() + "]])");
}
String id = consumer.getPersistentId();
// if the consumer is currently active then delete it
ConsumerEndpoint existing = (ConsumerEndpoint) _endpoints.get(id);
if (existing != null) {
// unregister itself from all the caches
consumer.unregister();
// remove it from the list of active endpoints
// As a fix for bug 759752, only remove the consumer if it
// matches the existing one
if (consumer.getId().equals(existing.getId())) {
_endpoints.remove(id);
} else {
if (_log.isDebugEnabled()) {
_log.debug("Existing endpoint doesn't match that to " +
"be deleted - retaining");
}
}
// close the endpoint
consumer.close();
// remove it from the consumer cache if and only if it is a
// non-durable subscriber
if (!(consumer instanceof DurableConsumerEndpoint)) {
try {
removeFromConsumerCache(id);
} catch (JMSException exception) {
_log.debug("Failed to remove " + id + " from the cache",
exception);
}
}
}
}
/**
* Return the consumer with the specified identity
*
* @param id - identity of the consumer
* @return Consumer - associated consumer object or null
*/
public ConsumerEndpoint getConsumerEndpoint(String id) {
return (ConsumerEndpoint) _endpoints.get(id);
}
/**
* Return a list of {@link ConsumerEndpoint} objects, both transient and
* durable that are currently active.
*
* @return Iterator - iterator of {@link ConsumerEndpoint} objects
*/
public Iterator consumerEndpoints() {
return _endpoints.values().iterator();
}
/**
* Return a list of consumer names id's currently active in the
* consumer manager.
*
* @return Iterator - iterator of {#link String} ids
*/
public Iterator consumerIds() {
return _endpoints.keySet().iterator();
}
/**
* Check whether a consumer, with the specified identity actually
* exists.
*
* @param id - identity of the consumer
* @return boolean - true if one exists
*/
public boolean exists(String id) {
return (getConsumerEndpoint(id) != null);
}
/**
* Check whether there is an active consumer for the specified
* destination
*
* @param destination - the destination to check
* @return boolean - true if it exists
*/
public boolean hasActiveConsumers(JmsDestination destination)
throws JMSException {
boolean result = false;
Object[] endpoints = _endpoints.values().toArray();
for (int index = 0; index < endpoints.length; index++) {
ConsumerEndpoint endpoint = (ConsumerEndpoint) endpoints[index];
JmsDestination endpoint_dest = endpoint.getDestination();
if ((destination instanceof JmsTopic) &&
(endpoint_dest instanceof JmsTopic) &&
(((JmsTopic) endpoint_dest).isWildCard())) {
if (((JmsTopic) endpoint_dest).match((JmsTopic) destination)) {
result = true;
break;
}
} else {
if (endpoint_dest.equals(destination)) {
result = true;
break;
}
}
}
return result;
}
/**
* Check whether a particular durable consumer is active
*
* @param name - the consumer name
* @return boolean - true if active
*/
public boolean isDurableConsumerActive(String name) {
return (_endpoints.get(name) != null);
}
/**
* Return the destination assoicated with the specified durable
* consumer.
*
* @param name - consumer name
* @return JmsDestination - the destination is it registered under or null
*/
public JmsDestination getDestinationForConsumerName(String name) {
ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name);
return (entry != null) ? entry._destination : null;
}
/**
* Check if the specified durable consumer exists
*
* @param name - the name of the durable consumer
* @return boolean - true if successful
*/
public boolean durableConsumerExists(String name) {
boolean result = false;
ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name);
if ((entry != null) &&
(entry._durable)) {
result = true;
}
return result;
}
/**
* This method will check that the name-destination pair actually
* are valid and exist as a DurableConsumer entity
*
* @param topic - the name of the topic
* @param name - the name of the durable consumer
* @return boolean - true if valid and false otherwise
*/
public boolean validSubscription(String topic, String name) {
boolean result = false;
ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name);
if ((entry != null) &&
(entry._destination != null) &&
(entry._destination.getName().equals(topic))) {
result = true;
}
return result;
}
// implementation of DestinationCacheEventListener.messageAdded
synchronized public boolean messageAdded(MessageImpl message) {
return false;
}
// implementation of DestinationCacheEventListener.messageRemoved
synchronized public boolean messageRemoved(MessageImpl message) {
return false;
}
// implementation of DestinationCacheEventListener.persistentMessageAdded
synchronized public boolean persistentMessageAdded(Connection connection,
MessageImpl message)
throws PersistenceException {
try {
JmsDestination dest = (JmsDestination) message.getJMSDestination();
// check to ensure that the message is not of type queue. If it
// is then we don't want to process it here. All the processing
// for non-resident queues are done at the cache level
if (dest instanceof JmsQueue) {
return false;
}
// for each durable consumer we need to add a persistent handle
Vector names = getDurableConsumersForDest((JmsTopic) dest);
while (names.size() > 0) {
String name = (String) names.remove(0);
MessageHandleFactory.createPersistentHandle(connection, dest,
name, message);
}
} catch (JMSException exception) {
// rethrow as a PersistenceException...but does it make sense
throw new PersistenceException(
"Failed to create persistent handle", exception);
}
// return false, even though we processed it to force the
// message manager not to keep the message in memory.
return false;
}
// implementation of DestinationCacheEventListener.persistentMessageRemoved
synchronized public boolean persistentMessageRemoved(Connection connection,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -