📄 messagemgr.java
字号:
*/
public void add(Connection connection, MessageImpl message)
throws JMSException {
JmsDestination destination =
(JmsDestination) message.getJMSDestination();
// if the message's delivery mode is PERSISTENT, and the destination
// is also persistent, then then process it accordingly, otherwise use
// the non-persistent quality of service
if (message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT
&& DestinationManager.instance().isPersistent(destination)) {
addPersistentMessage(connection, message);
} else {
addNonPersistentMessage(message);
}
}
/**
* Processes a non-persistent message
*
* @param message - the message to add
* @throws JMSException if the message cannot be processed
*/
private void addNonPersistentMessage(MessageImpl message)
throws JMSException {
// notify all registered listeners that a new message has arrived
// for the specified destination.
JmsDestination destination = (JmsDestination) message.getJMSDestination();
MessageManagerEventListener listener =
(MessageManagerEventListener) _listeners.get(destination);
if (listener != null) {
// if there is a registered destination cache then let the cache
// process it.
listener.messageAdded(destination, message);
} else {
// let the DestinationManager handle the message
DestinationManager.instance().messageAdded(destination, message);
}
}
/**
* Add a persistent message
*
* @param message the message to add
* @throws JMSException if the message cannot be processed
*/
private void addPersistentMessage(MessageImpl message) throws JMSException {
JmsDestination destination =
(JmsDestination) message.getJMSDestination();
Connection connection = null;
// do all persistent work in this block
try {
connection = DatabaseService.getConnection();
// add the message to the database
DatabaseService.getAdapter().addMessage(connection, message);
// notify all listeners that a persistent message has arrived
notifyOnAddPersistentMessage(connection, destination, message);
// commit the work
connection.commit();
} catch (Exception exception) {
SQLHelper.rollback(connection);
_log.error("Failed to make message persistent", exception);
throw new JMSException("Failed to make message persistent: " +
exception.toString());
} finally {
SQLHelper.close(connection);
}
}
/**
* This method is used to process persistent messages published through the
* resource manager.
*
* @param connection - the database connection to use.
* @param message - the message to process
* @throws JMSException - if the message cannot be processed
*/
private void addPersistentMessage(Connection connection,
MessageImpl message) throws JMSException {
JmsDestination destination = (JmsDestination) message.getJMSDestination();
try {
// notify all listeners that a persistent message has arrived
notifyOnAddPersistentMessage(connection, destination, message);
} catch (PersistenceException exception) {
throw new JMSException("Failed in addPersistentMessage : "
+ exception.toString());
} catch (Exception exception) {
throw new JMSException("Failed in addPersistentMessage : "
+ exception.toString());
}
}
/**
* Prepares a message prior to it being passed through the system.
* This
*
* @param message the message
* @throws JMSException - if the message is invalid or cannot be prep'ed
*/
public void prepare(MessageImpl message)
throws JMSException {
if (message == null) {
throw new JMSException("Null message");
}
Destination destination = message.getJMSDestination();
if (destination == null) {
throw new InvalidDestinationException("Message has no destination");
}
if (!(destination instanceof JmsDestination)) {
throw new InvalidDestinationException(
"Destination not a JmsDestination");
}
// mark the message as accepted and attach a sequence number
message.setAcceptedTime((new Date()).getTime());
message.setSequenceNumber(++_sequenceNumberGenerator);
message.setReadOnly(true);
}
/**
* Resolves a destination given its name
*
* @param name the name of the destination
* @return JmsDestination if an active destination exists for the given
* name, else it returns <tt>null</tt>
*/
public JmsDestination resolve(String name) {
return DestinationManager.instance().getDestination(name);
}
/**
* Resolves a consumer given its destination and an identity. Should look
* removing t from here.
*
* @param destination the destination
* @param consumerId the consumer identifier
* @return ConsumerIfc if an active consumer exists for the given
* name, else it returns <tt>null</tt>
*/
public ConsumerEndpoint resolveConsumer(JmsDestination destination,
String consumerId) {
return ConsumerManager.instance().getConsumerEndpoint(consumerId);
}
/**
* Stop/start a consumer. When stopped, the consumer will not receive
* messages until the consumer is re-started. This is invoked when the
* underlying connection is stopped or started
*
* @param consumer the consumer to stop/start
* @param stop when <tt>true</tt> stop the consumer else start it.
*/
public void setStopped(ConsumerEndpoint consumer, boolean stop)
throws JMSException {
// need to implement this for the consumer
}
/**
* Add a message listener for a specific destination to be informed when
* messages, for the destination are added or removed from the queue. More
* than one listener can be registered per desitnation and the same listener
* can be registered for multiple destinations.
* <p/>
* If a listener is already registered for a particuler destination then it
* fails silently.
*
* @param destination - what messgaes to listen for
* @param listener - a JmsMessageListener instance
*/
public void addEventListener(JmsDestination destination,
MessageManagerEventListener listener) {
if ((destination != null) &&
(listener != null)) {
synchronized (_listeners) {
if (!_listeners.containsKey(destination)) {
_listeners.put(destination, listener);
}
}
}
}
/**
* Remove the listener for the specified destination. If one is not
* registered then ignore it.
*
* @param destination - destination that it listens for
* @param listener - listener for that destination.
*/
public void removeEventListener(JmsDestination destination,
MessageManagerEventListener listener) {
if ((destination != null) &&
(listener != null)) {
synchronized (_listeners) {
if (_listeners.containsKey(destination)) {
_listeners.remove(destination);
}
}
}
}
/**
* Notify the listeners registered for the destination that a persistent
* message has been added to the message manager.
*
* @param connection the database connection to use.
* @param destination the message destination
* @param message the message that was added
* @throws JMSException is a processing error occured
* @throws PersistenceException if a persistence error occured
*/
private void notifyOnAddPersistentMessage(Connection connection,
JmsDestination destination,
MessageImpl message)
throws JMSException, PersistenceException {
MessageManagerEventListener listener =
(MessageManagerEventListener) _listeners.get(destination);
if (listener != null) {
// if there is a registered destination cache then let the cache
// process it.
listener.persistentMessageAdded(connection, destination, message);
} else {
// let the DestinationManager handle the message
DestinationManager.instance().persistentMessageAdded(connection,
destination,
message);
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -