📄 destinationmanager.java
字号:
}
}
return cache;
}
/**
* Check if the specified destination exists.
*
* @param dest - destination to check
* @return boolean - true if it exists
*/
public boolean hasDestinationCache(JmsDestination dest) {
return (_caches.containsKey(dest));
}
/**
* Create a non-administered destination and cache it. It does not
* check to see whether or not it is an administered destination
* this must be done by the caller
*
* @param destination - the destination to create
*/
public synchronized void createDestination(JmsDestination destination) {
addToDestinationCache(destination);
}
/**
* Create an administered destination using the specified destination.
* It will create the destination in the database and register it with
* the jndi context.
*
* @param dest - the destination
* @return boolean - true if successful
*/
public synchronized boolean createAdministeredDestination(
JmsDestination dest) throws JMSException {
if (_log.isDebugEnabled()) {
_log.debug("createAdministeredDestination(dest=" + dest + ")");
}
boolean success = true;
boolean queue = (dest instanceof JmsQueue) ? true : false;
PersistenceAdapter adapter = DatabaseService.getAdapter();
// check that the destination does not exist. If it exists then return
// false. If it doesn't exists the create it and bind it to the jndi
// context
Connection connection = null;
try {
connection = DatabaseService.getConnection();
if (!adapter.checkDestination(connection, dest.getName())) {
adapter.addDestination(connection, dest.getName(), queue);
// destination was created in persistent store, now create it
// in transient memory and also bind it in the jndi context
addToDestinationCache(dest);
try {
dest.setPersistent(true);
ContextHelper.rebind(getContext(), dest.getName(), dest);
} catch (NamingException exception) {
String msg = "Failed to add destination " + dest.getName()
+ " to JNDI context";
_log.error(msg, exception);
throw new JMSException(msg + ": " +
exception.getMessage());
}
} else {
success = false;
}
connection.commit();
} catch (JMSException exception) {
SQLHelper.rollback(connection);
throw exception;
} catch (Exception exception) { // PersistenceException, SQLException
SQLHelper.rollback(connection);
String msg = "Failed to create administered destination"
+ dest.getName();
_log.error(msg, exception);
throw new JMSException(msg + ": " + exception.getMessage());
} finally {
SQLHelper.close(connection);
}
return success;
}
/**
* Remove the corresponding administered destination from persistent
* store, from transient memory and from the jndi context. This will
* also remove all durable consumers for this topic.
*
* @param dest - the destination to remove
* @return boolean - true if successful
*/
public synchronized void deleteAdministeredDestination(JmsDestination dest)
throws JMSException {
if (_log.isDebugEnabled()) {
_log.debug("deleteAdministeredDestination(dest=" + dest + ")");
}
boolean success = false;
boolean queue = (dest instanceof JmsQueue) ? true : false;
ConsumerManager consumerMgr = ConsumerManager.instance();
// If we are dealing with a topic then first check that there are
// no active durable consumers for the destination
if (!queue) {
if (consumerMgr.hasActiveDurableConsumers(dest)) {
throw new JMSException(
"Cannot delete the administered destination " + dest
+ " since there are active durable consumers.");
}
// no active consumers. Remove all durable consumers to this
// destination
consumerMgr.removeDurableConsumers(dest);
}
// make sure there are not active endpoints, but first clear
// unreferenced endpoints.
consumerMgr.cleanUnreferencedEndpoints(dest);
int active = consumerMgr.getEndpointsForDest(dest).size();
if (active > 0) {
throw new JMSException(
"Cannot delete the administered destination" + dest
+ " since there are " + active + " active endpoints.");
}
// unbind it from the jndi context so that now it is unavailable
// to other consumers
try {
getContext().unbind(dest.getName());
} catch (NamingException error) {
_log.error("Failed to remove destination " + dest.getName()
+ " from JNDI", error);
}
// now that we have removed all the durable consumers we can remove
// the administered topic. First delete it from memory and then
// from the persistent store
Connection connection = null;
try {
connection = DatabaseService.getConnection();
DatabaseService.getAdapter().removeDestination(connection,
dest.getName());
destroyDestinationCache(dest);
removeFromDestinationCache(dest);
connection.commit();
} catch (PersistenceException exception) {
SQLHelper.rollback(connection);
String msg = "Failed to remove destination " + dest.getName();
_log.error(msg, exception);
throw new JMSException(msg + ":" + exception.getMessage());
} catch (SQLException exception) {
SQLHelper.rollback(connection);
String msg = "Failed to remove destination " + dest.getName();
_log.error(msg, exception);
throw new JMSException(msg + ":" + exception.getMessage());
} finally {
SQLHelper.close(connection);
}
}
/**
* Return a list of destination names currently supported by the destination
* manager. This includes all types of destinations.
*
* @return Iterator - iterator for {@link JmsDestination} objects
*/
public Iterator destinationNames() {
return _destinationCache.values().iterator();
}
/**
* Return a list of {@link DestinationCache} objects that are currently
* active and in memory. This will return a list of all destination
* types (temporary. transient, administered}.
*
* @return Iterator - set of DestinationCache objects
*/
public Iterator destinations() {
return _caches.values().iterator();
}
/**
* This method will create the administered destinations specified in
* the configuration. A topic may also have zero or more preconfigured
* durable sunbscribers. An equivalent entity for queues does not
* exist.
*/
public void registerConfiguredAdministeredDestinations() {
AdministeredDestinations destinations =
ConfigurationManager.getConfig().getAdministeredDestinations();
if (destinations != null) {
// first process the topics
int count = destinations.getAdministeredTopicCount();
for (int index = 0; index < count; index++) {
AdministeredTopic topic = destinations.getAdministeredTopic(index);
// define a persistent topic destination and then use the
// message manager administrator to add it
JmsTopic destination = new JmsTopic(topic.getName());
destination.setPersistent(true);
try {
createAdministeredDestination(destination);
// register the subscribers for each topic.
int scount = topic.getSubscriberCount();
ConsumerManager mgr = ConsumerManager.instance();
for (int sindex = 0; sindex < scount; sindex++) {
Subscriber subscriber = topic.getSubscriber(sindex);
// create the durable consumer only if one does
// not already exist
if (!mgr.exists(subscriber.getName())) {
mgr.createDurableConsumer(destination,
subscriber.getName());
}
}
} catch (JMSException exception) {
_log.error("Failed to register persistent topic "
+ topic.getName(), exception);
}
}
// next process the queue destinations. QueueDestinations do not
// have any associated durable subscribers
count = destinations.getAdministeredQueueCount();
for (int index = 0; index < count; index++) {
AdministeredQueue queue = destinations.getAdministeredQueue(index);
// define a persistent topic destination and then use the
// message manager administrator to add it
JmsQueue destination = new JmsQueue(queue.getName());
destination.setPersistent(true);
try {
createAdministeredDestination(destination);
} catch (JMSException exception) {
_log.error("Failed to register persistent queue "
+ queue.getName(), exception);
}
}
}
}
// implementation of MessageManagerEventListener.messageAdded
public synchronized boolean messageAdded(JmsDestination destination, MessageImpl message) {
boolean result = false;
try {
if (destination instanceof JmsTopic) {
// check to see whether there are active consumers for the
// specified destination. If there are then we need to
// create a destination cache and pass the message to it.
if (ConsumerManager.instance().hasActiveConsumers(destination)) {
if (!destinationExists(destination)) {
createDestination(destination);
}
DestinationCache cache = createDestinationCache(destination);
result = cache.messageAdded(destination, message);
}
} else {
// assume that the destination is a queue. since the message
// is non-persistent then we need to create the cache and pass the
// message to it.
if (!destinationExists(destination)) {
createDestination(destination);
}
DestinationCache cache = createDestinationCache(destination);
result = cache.messageAdded(destination, message);
}
} catch (Exception exception) {
_log.error("Exception in DestinationManager.messageAdded",
exception);
}
return result;
}
// implementation of MessageManagerEventListener.messageRemoved
public void messageRemoved(JmsDestination destination, MessageImpl message) {
// removing a non-persistent messages, when the associated destination
// is not active is a noop
}
// implementation of MessageManagerEventListener.persistentMessageAdded
public synchronized boolean persistentMessageAdded(Connection connection,
JmsDestination destination, MessageImpl message)
throws PersistenceException {
boolean result = false;
try {
if (destination instanceof JmsTopic) {
// the cache for this destination is inactive. Determine, if
// there are any active wildcard consumers for this destination
// If there are then create the destination cache and let it
// handle the message. Otherwise send it to the ConsumerManager
// for processing
ConsumerManager manager = ConsumerManager.instance();
if (manager.hasActiveConsumers(destination)) {
// create the destincation cache and let it process the
// message
DestinationCache cache = createDestinationCache(connection,
destination);
result = cache.persistentMessageAdded(connection,
destination, message);
} else {
// This is now handled by the MessageMgr when the message
// enters the system
// let the consumer manager handle this
// result = ConsumerManager.instance().persistentMessageAdded(
// connection, message);
}
} else {
// This is now handled by the MessageMgr when the message
// enters the system
// assume that the destination is a queue. Since the message is
// persistent then we do not need to activate the cache, simply
// create a persistent handle and be done with it.
// MessageHandleFactory.createPersistentHandle(connection,
// destination, null, message);
}
} catch (Exception exception) {
// rethrow as a PersistenceException
exception.printStackTrace();
throw new PersistenceException(
"Exception in DestinationManager.messageAdded " +
exception.toString());
}
return result;
}
// implementation of MessageManagerEventListener.persistentMessageRemoved
public void persistentMessageRemoved(Connection connection,
JmsDestination destination, MessageImpl message)
throws PersistenceException {
try {
if (destination instanceof JmsTopic) {
// this is a persistent message so we need to retrieve the
// set of durable subscribers for this.
Vector names =
ConsumerManager.instance().getDurableConsumersForDest(
(JmsTopic) destination);
// for each durable consumer we need to destory that handle
while (names.size() > 0) {
String name = (String) names.remove(0);
MessageHandleFactory.destroyPersistentHandle(connection,
destination, name, message);
}
} else {
// assume it is a queue and destroy the handle.
MessageHandleFactory.destroyPersistentHandle(connection,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -