📄 destinationmanager.java
字号:
}
/**
* Delete the specfied destination.
*
* @param dest the destination to destroy
*/
protected synchronized void destroyDestinationCache(JmsDestination dest) {
DestinationCache cache = (DestinationCache) _caches.remove(dest);
if (cache != null) {
cache.destroy();
// notify the listeners that a destination has been removed from
// the destination manager
notifyDestinationRemoved(dest, cache);
}
}
/**
* Returns a destination given its name.
*
* @param name the name of the destination
* @return the destination corresponding to <code>name</code> or
* <code>null</code> if none exists
*/
public synchronized JmsDestination getDestination(String name) {
return (JmsDestination) _destinationCache.get(name);
}
/**
* Register the specified DestinationEventListener. If the listener is
* already registered then do not re-register it again.
*
* @param listener the listener to add
*/
void addDestinationEventListener(DestinationEventListener listener) {
synchronized (_listeners) {
if (!_listeners.contains(listener)) {
_listeners.add(listener);
}
}
}
/**
* Remove the specified DestinationEventListener from the list.
*
* @param listener the listener to remove
*/
void removeDestinationEventListener(DestinationEventListener listener) {
synchronized (_listeners) {
_listeners.remove(listener);
}
}
/**
* Create a non-administered destination and cache it. It does not check to
* see whether or not it is an administered destination this must be done
* by the caller
*
* @param destination - the destination to create
*/
public synchronized void createDestination(JmsDestination destination) {
addToDestinationCache(destination);
}
/**
* Create an administered destination using the specified destination. It
* will create the destination in the database and register it with the jndi
* context.
*
* @param dest - the destination
* @return boolean - true if successful
*/
public synchronized boolean createAdministeredDestination(
JmsDestination dest) throws JMSException {
if (_log.isDebugEnabled()) {
_log.debug("createAdministeredDestination(dest=" + dest + ")");
}
boolean success = true;
boolean queue = (dest instanceof JmsQueue) ? true : false;
PersistenceAdapter adapter = DatabaseService.getAdapter();
// check that the destination does not exist. If it exists then return
// false. If it doesn't exists the create it and bind it to the jndi
// context
Connection connection = null;
try {
connection = DatabaseService.getConnection();
if (!adapter.checkDestination(connection, dest.getName())) {
adapter.addDestination(connection, dest.getName(), queue);
dest.setPersistent(true);
// destination was created in persistent store, now create it
// in transient memory and also bind it in the jndi context
addToDestinationCache(dest);
try {
ContextHelper.rebind(getContext(), dest.getName(), dest);
} catch (NamingException exception) {
String msg = "Failed to add destination " + dest.getName()
+ " to JNDI context";
_log.error(msg, exception);
throw new JMSException(msg + ": " +
exception.getMessage());
}
} else {
success = false;
}
connection.commit();
} catch (JMSException exception) {
SQLHelper.rollback(connection);
throw exception;
} catch (Exception exception) { // PersistenceException, SQLException
SQLHelper.rollback(connection);
String msg = "Failed to create administered destination"
+ dest.getName();
_log.error(msg, exception);
throw new JMSException(msg + ": " + exception.getMessage());
} finally {
SQLHelper.close(connection);
}
return success;
}
/**
* Remove the corresponding administered destination from persistent store,
* from transient memory and from the jndi context. This will also remove
* all durable consumers for this topic.
*
* @param dest - the destination to remove
*/
public synchronized void deleteAdministeredDestination(JmsDestination dest)
throws JMSException {
if (_log.isDebugEnabled()) {
_log.debug("deleteAdministeredDestination(dest=" + dest + ")");
}
boolean queue = (dest instanceof JmsQueue) ? true : false;
ConsumerManager consumerMgr = ConsumerManager.instance();
// If we are dealing with a topic then first check that there are
// no active durable consumers for the destination
if (!queue) {
if (consumerMgr.hasActiveDurableConsumers(dest)) {
throw new JMSException(
"Cannot delete the administered destination "
+ dest
+ " since there are active durable consumers.");
}
// no active consumers. Remove all durable consumers to this
// destination
consumerMgr.removeDurableConsumers(dest);
}
// make sure there are not active endpoints
int active = consumerMgr.getEndpointsForDest(dest).size();
if (active > 0) {
throw new JMSException(
"Cannot delete the administered destination"
+ dest
+ " since there are "
+ active
+ " active endpoints.");
}
// unbind it from the jndi context so that now it is unavailable
// to other consumers
try {
getContext().unbind(dest.getName());
} catch (NamingException error) {
_log.error("Failed to remove destination " + dest.getName()
+ " from JNDI", error);
}
// now that we have removed all the durable consumers we can remove
// the administered topic. First delete it from memory and then
// from the persistent store
Connection connection = null;
try {
connection = DatabaseService.getConnection();
DatabaseService.getAdapter().removeDestination(connection,
dest.getName());
destroyDestinationCache(dest);
removeFromDestinationCache(dest);
connection.commit();
} catch (PersistenceException exception) {
SQLHelper.rollback(connection);
String msg = "Failed to remove destination " + dest.getName();
_log.error(msg, exception);
throw new JMSException(msg + ":" + exception.getMessage());
} catch (SQLException exception) {
SQLHelper.rollback(connection);
String msg = "Failed to remove destination " + dest.getName();
_log.error(msg, exception);
throw new JMSException(msg + ":" + exception.getMessage());
} finally {
SQLHelper.close(connection);
}
}
/**
* This method will create the administered destinations specified in the
* configuration. A topic may also have zero or more preconfigured durable
* sunbscribers. An equivalent entity for queues does not exist.
*/
public void registerConfiguredAdministeredDestinations() {
AdministeredDestinations destinations =
ConfigurationManager.getConfig().getAdministeredDestinations();
if (destinations != null) {
// first process the topics
int count = destinations.getAdministeredTopicCount();
for (int index = 0; index < count; index++) {
AdministeredTopic topic = destinations.getAdministeredTopic(
index);
// define a persistent topic destination and then use the
// message manager administrator to add it
JmsTopic destination = new JmsTopic(topic.getName());
destination.setPersistent(true);
try {
createAdministeredDestination(destination);
// register the subscribers for each topic.
int scount = topic.getSubscriberCount();
ConsumerManager mgr = ConsumerManager.instance();
for (int sindex = 0; sindex < scount; sindex++) {
Subscriber subscriber = topic.getSubscriber(sindex);
mgr.createDurableConsumer(destination,
subscriber.getName());
}
} catch (JMSException exception) {
_log.error("Failed to register persistent topic "
+ topic.getName(), exception);
}
}
// next process the queue destinations. QueueDestinations do not
// have any associated durable subscribers
count = destinations.getAdministeredQueueCount();
for (int index = 0; index < count; index++) {
AdministeredQueue queue = destinations.getAdministeredQueue(
index);
// define a persistent topic destination and then use the
// message manager administrator to add it
JmsQueue destination = new JmsQueue(queue.getName());
destination.setPersistent(true);
try {
createAdministeredDestination(destination);
} catch (JMSException exception) {
_log.error("Failed to register persistent queue "
+ queue.getName(), exception);
}
}
}
}
/**
* Invoked when the {@link MessageMgr} receives a non-persistent message
*
* @param destination the message's destination
* @param message the message
* @throws JMSException if the message can't be processed
*/
public synchronized void messageAdded(JmsDestination destination,
MessageImpl message)
throws JMSException {
if (destination instanceof JmsTopic) {
// check to see whether there are active consumers for the
// specified destination. If there are then we need to
// create a destination cache and pass the message to it.
if (ConsumerManager.instance().hasActiveConsumers(destination)) {
if (!destinationExists(destination)) {
createDestination(destination);
}
DestinationCache cache = getDestinationCache(destination);
cache.messageAdded(destination, message);
}
} else {
// destination is a queue. Since the message is non-persistent,
// create the cache and pass the message to it.
if (!destinationExists(destination)) {
createDestination(destination);
}
DestinationCache cache = getDestinationCache(destination);
cache.messageAdded(destination, message);
}
}
/**
* Invoked when the {@link MessageMgr} receives a persistent message
*
* @param connection the database connection
* @param destination the message's destination
* @param message the message
* @throws JMSException if the message can't be processed
* @throws PersistenceException if there is a persistence related problem
*/
public synchronized void persistentMessageAdded(Connection connection,
JmsDestination destination,
MessageImpl message)
throws JMSException, PersistenceException {
DestinationCache cache = getDestinationCache(destination, connection);
cache.persistentMessageAdded(connection, destination, message);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -