📄 messagemgr.java
字号:
} catch (Exception ignore) {
// no-op
}
}
}
}
/**
* This method is used to process persistent messages published through
* the resource manager.
*
* @param connection - the database connection to use.
* @param message - the message to process
* @throws JMSException - if the message cannot be processed
*/
protected void addPersistentMessage(Connection connection,
MessageImpl message)
throws JMSException {
// Use the message to retrieve the corresponding destination object.
// This method will create the object if one does not already exist.
JmsDestination destination = (JmsDestination) message.getJMSDestination();
if (destination != null) {
try {
// notify all listeensers that a persistent message has arrived
notifyOnAddPersistentMessage(connection, destination, message);
_messagesProcessed++;
} catch (PersistenceException exception) {
throw new JMSException("Failed in addPersistentMessage : " +
exception.toString());
} catch (Exception exception) {
throw new JMSException("Failed in addPersistentMessage : " +
exception.toString());
}
} else {
// shouldn't really get here, since the message should have been
// checked and prepared before passed to this routine.
_log.error("Can't locate destination for message");
}
}
/**
* Return the message given the specified message handle.
* This will delegate to the appropriate {@link DestinationCache} or
* {@link ConsumerManager}
*
* @param handle - the handle
* @return MessageImpl - the associated message or null
*/
MessageImpl getMessage(MessageHandle handle) {
// precondition; ensure that the handle is not null
if (handle == null) {
return null;
}
MessageImpl message = null;
if (handle.getDestination() instanceof JmsTopic) {
// is for a topic so check the consumer endpoint
// cache
TopicConsumerEndpoint endpoint = (TopicConsumerEndpoint)
ConsumerManager.instance().getConsumerEndpoint(
handle.getConsumerName());
if (endpoint != null) {
message = endpoint.getMessage(handle);
}
} else {
// must be for a queue so check the destination cache
DestinationCache cache =
DestinationManager.instance().getDestinationCache(
handle.getDestination());
if (cache != null) {
message = cache.getMessage(handle);
}
}
return message;
}
/**
* This method prepares the message without actually passing it through the
* system. It is used by the {@link ResourceManager} to process incoming
* messages.
* <p>
* If there are any issues with the message the method will throw an
* exception
*
* @param message - the message
* @throws JMSException - if the message is invalid or cannot be prep'ed
*/
void checkAndPrepareMessage(MessageImpl message)
throws JMSException {
if (message != null) {
// mark the message as accepted and attach a sequence number
message.setAcceptedTime((new Date()).getTime());
message.setSequenceNumber(++_sequenceNumberGenerator);
message.setReadOnly(true);
if (message.getJMSDestination() == null) {
throw new JMSException("Null destination specified in message");
}
} else {
throw new JMSException("checkAndPrepareMessage failed for null message");
}
}
/**
* Returns true if there are any messages for the specified consumer
*
* @param consumer - the consumer to check
* @return boolean - true if messages are queued
* @throws JMSException - if the consumer can't be checked
*/
public boolean hasMessages(ConsumerEndpoint consumer) throws JMSException {
if (consumer == null) {
throw new JMSException(
"Can't call hasMessages with null consumer");
}
return (consumer.getMessageCount() > 0);
}
/**
* Returns a list of active destinations
*
* @return List a list of JmsDestination objects
*/
public Iterator getDestinations() {
return DestinationManager.instance().destinations();
}
/**
* Returns an iterator of active consumers registered to a given
* destination
*
* @return Iterator - iterator of {@link ConsumerEndpoint} objects.
* @throws JMSException
*/
public Iterator getConsumers(JmsDestination destination)
throws JMSException {
//check to see that the destination is not null
if (destination == null) {
throw new JMSException("destination is null in getConsumer");
}
DestinationCache dest =
DestinationManager.instance().getDestinationCache(destination);
return (dest == null) ? null : dest.getConsumers();
}
/**
* Resolves a destination given its name
*
* @param name the name of the destination
* @return JmsDestination if an active destination exists for
* the given name, else it returns
* <tt>null</tt>
*/
public JmsDestination resolve(String name) {
return DestinationManager.instance().destinationFromString(name);
}
/**
* Resolves a consumer given its destination and an identity. Should look
* removing t from here.
*
* @param destination the destination
* @param name the name of the consumer
* @return ConsumerIfc if an active consumer exists for
* the given name, else it returns
* <tt>null</tt>
*/
public ConsumerEndpoint resolveConsumer(JmsDestination destination,
String id) {
return ConsumerManager.instance().getConsumerEndpoint(id);
}
/**
* Stop/start a consumer. When stopped, the consumer will not receive
* messages until the consumer is re-started.
* This is invoked when the underlying connection is stopped or started
*
* @param consumer the consumer to stop/start
* @param stop when <tt>true</tt> stop the consumer
* else start it.
*/
public void setStopped(ConsumerEndpoint consumer, boolean stop)
throws JMSException {
// need to implement this for the consumer
}
/**
* Add a message listener for a specific destination to be informed
* when messages, for the destination are added or removed from the
* queue. More than one listener can be registered per desitnation
* and the same listener can be registered for multiple destinations.
* <p>
* If a listener is already registered for a particuler destination
* then it fails silently.
*
* @param destination - what messgaes to listen for
* @param listener - a JmsMessageListener instance
*/
public void addEventListener(JmsDestination destination,
MessageManagerEventListener listener) {
if ((destination != null) &&
(listener != null)) {
synchronized (_listeners) {
if (!_listeners.containsKey(destination)) {
_listeners.put(destination, listener);
}
}
}
}
/**
* Remove the listener for the specified destination. If one is not
* registered then ignore it.
*
* @param destination - destination that it listens for
* @param listener - listener for that destination.
*/
public void removeEventListener(JmsDestination destination,
MessageManagerEventListener listener) {
if ((destination != null) &&
(listener != null)) {
synchronized (_listeners) {
if (_listeners.containsKey(destination)) {
_listeners.remove(destination);
}
}
}
}
/**
* Notify the listeners, registered for the destination that a message has
* been added to the message manager.
* <p>
* All errors are propagated as JMSException exceptions
*
* @param destination - destination for which message exits
* @param message - message that was added
* @return boolean - true if the message was processed
* @throws JMSException - for any processing error
*/
boolean notifyOnAddMessage(JmsDestination destination,
MessageImpl message) throws JMSException {
boolean result = false;
MessageManagerEventListener listener =
(MessageManagerEventListener) _listeners.get(destination);
if (listener != null) {
// if there is a registered destination cache then let the cache
// process it.
result = listener.messageAdded(destination, message);
} else {
// let the {@link DestinationManager handle the message
result = DestinationManager.instance().messageAdded(destination,
message);
}
return result;
}
/**
* Notify the listeners, registered for the destination that a message has
* been removed from the message manager. There maybe several reason why
* this has happened (i.e the message has expired, message has been
* purged, message has been consumed etc).
*
* @param destination - destination for which message exits
* @param message - message that was removed
* @throws JMSException for any processing error
*/
void notifyOnRemoveMessage(JmsDestination destination,
MessageImpl message) throws JMSException {
MessageManagerEventListener listener =
(MessageManagerEventListener) _listeners.get(destination);
if (listener != null) {
// send the notification to the active listener
listener.messageRemoved(destination, message);
} else {
// there is not active listener, send it to the Destination
// Manager
DestinationManager.instance().messageRemoved(destination, message);
}
}
/**
* Notify the listeners, registered for the destination that a persistent
* message has been added to the message manager.
*
* @param connection - the database connection to use.
* @param destination - destination for which message exits
* @param message - message that was added
* @return boolean - true if the message was processed
* @throws JMSException - is a processing error occured
* @throws PersistenceException - if a persistence error occured
*/
boolean notifyOnAddPersistentMessage(Connection connection,
JmsDestination destination,
MessageImpl message)
throws JMSException, PersistenceException {
boolean result = false;
MessageManagerEventListener listener =
(MessageManagerEventListener) _listeners.get(destination);
if (listener != null) {
// if there is a registered destination cache then let the cache
// process it.
result = listener.persistentMessageAdded(connection,
destination, message);
} else {
// let the {@link DestinationManager} handle the message
result = DestinationManager.instance().persistentMessageAdded(
connection, destination, message);
}
return result;
}
/**
* Notify the listeners, registered for the destination that a persistent
* message has been removed from the message manager. There maybe several
* reason why this has happened (i.e the message has expired, message has
* been purged, message has been consumed etc).
*
* @param connection - the database connection to use
* @param destination - destination for which message exits
* @param message - message that was removed
* @throws JMSException - for any processing problem
* @throws PersistenceException - for any persistence related problem
*/
void notifyOnRemovePersistentMessage(Connection connection,
JmsDestination destination,
MessageImpl message)
throws JMSException, PersistenceException {
MessageManagerEventListener listener =
(MessageManagerEventListener) _listeners.get(destination);
if (listener != null) {
// send the notification to the active listener
listener.persistentMessageRemoved(connection, destination,
message);
} else {
// there is not active listener, send it to the Destination
// Manager
DestinationManager.instance().persistentMessageRemoved(connection,
destination, message);
}
}
/**
* Return the maximum size of the cache
*
* @return int - maximum size of cache
*/
public int getMaximumSize() {
return _maximumSize;
}
/**
* Notify the destruction of a handle.
* <p>
* If the handle has been destroyed then we need to do the following
* 1. if the handle is for a queue then we can remove the message
* from the cache
* 2. if the handle is for a topic then we need to see whether we can
* garbage collect it
*
* @param handle a TransientMessageHandle
*/
public void handleDestroyed(MessageHandle handle) {
// precondition: handle != null
if (handle == null) {
return;
}
if (handle.getDestination() instanceof JmsTopic) {
TopicConsumerEndpoint endpoint = (TopicConsumerEndpoint)
ConsumerManager.instance().getConsumerEndpoint(
handle.getConsumerName());
if (endpoint != null) {
endpoint.deleteMessage(handle);
}
} else {
DestinationCache cache =
DestinationManager.instance().getDestinationCache(
handle.getDestination());
if (cache != null) {
cache.deleteMessage(handle);
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -