consumermanagerimpl.java
来自「OpenJMS是一个开源的Java Message Service API 1.」· Java 代码 · 共 919 行 · 第 1/3 页
JAVA
919 行
} return consumer; } /** * Create a durable consumer. * * @param topic the topic to subscribe to * @param name the subscription name * @param clientID the client identifier. May be <code>null</code>. * @param connectionId the identity of the connection that owns this * consumer * @param noLocal if true, and the destination is a topic, inhibits the * delivery of messages published by its own * connection. * @param selector the message selector. May be <code>null</code> * @return the durable consumer endpoint * @throws InvalidDestinationException if <code>topic</code> is not a * persistent destination * @throws InvalidSelectorException if the selector is not well formed * @throws JMSException if a durable consumer is already * active with the same <code>name</code> */ public synchronized DurableConsumerEndpoint createDurableConsumer( JmsTopic topic, String name, String clientID, long connectionId, boolean noLocal, String selector) throws JMSException { if (_log.isDebugEnabled()) { _log.debug("createDurableConsumer(topic=" + topic + ", name=" + name + ", connectionId=" + connectionId + ", selector=" + selector + ", noLocal=" + noLocal + ")"); } DurableConsumerEndpoint consumer = createInactiveDurableConsumer(topic, name, clientID); consumer.activate(connectionId, selector, noLocal); return consumer; } /** * Create a browser for the specified destination and the selector. A * browser is responsible for passing all messages back to the client that * reside on the queue * * @param queue the queue to browse * @param selector the message selector. May be <code>null</code> * @return the queue browser endpoint * @throws JMSException if the browser can't be created */ public synchronized ConsumerEndpoint createQueueBrowser(JmsQueue queue, String selector) throws JMSException { // ensure that the destination is valid before proceeding getDestination(queue, true); long consumerId = getNextConsumerId(); ConsumerEndpoint consumer = null; try { _database.begin(); QueueDestinationCache cache; cache = (QueueDestinationCache) _destinations.getDestinationCache( queue); consumer = new QueueBrowserEndpoint(consumerId, cache, selector); Object key = ConsumerEntry.getConsumerKey(consumer); _endpoints.put(key, consumer); addConsumerEntry(key, queue, null, false); _database.commit(); } catch (Exception exception) { rethrow("Failed to create browser", exception); } return consumer; } /** * Close a consumer. * * @param consumer the consumer to close */ public synchronized void closeConsumer(ConsumerEndpoint consumer) { if (_log.isDebugEnabled()) { _log.debug("closeConsumerEndpoint(consumer=[Id=" + consumer.getId() + ", destination=" + consumer.getDestination() + ")"); } Object key = ConsumerEntry.getConsumerKey(consumer); ConsumerEndpoint existing = (ConsumerEndpoint) _endpoints.get(key); if (existing != null) { try { _database.begin(); if (consumer.getId() != existing.getId()) { // As a fix for bug 759752, only remove the consumer if it // matches the existing one. // @todo - not sure if this situation can arise any longer _log.error("Existing endpoint doesn't match that to be closed " + "- retaining"); } else if (existing instanceof DurableConsumerEndpoint) { DurableConsumerEndpoint durable = (DurableConsumerEndpoint) existing; if (durable.isActive()) { try { durable.deactivate(); } catch (JMSException exception) { _log.error("Failed to deactivate durable consumer=" + durable, exception); } } } else { _endpoints.remove(key); consumer.close(); removeConsumerEntry(key); } _database.commit(); } catch (PersistenceException exception) { _log.error("Failed to close consumer=" + consumer, exception); rollback(); } } } /** * Return the consumer with the specified identity. * * @param consumerId the identity of the consumer * @return the associated consumer, or <code>null</code> if none exists */ public synchronized ConsumerEndpoint getConsumerEndpoint(long consumerId) { return (ConsumerEndpoint) _endpoints.get(new Long(consumerId)); } /** * Return the consumer with the specified persistent identity. * * @param persistentId the persistent identity of the consumer * @return the associated consumer, or <code>null</code> if none exists */ public synchronized ConsumerEndpoint getConsumerEndpoint( String persistentId) { return (ConsumerEndpoint) _endpoints.get(persistentId); } /** * Determines if there are any active consumers for a destination. * * @param destination the destination * @return <code>true</code> if there is at least one consumer */ public synchronized boolean hasActiveConsumers(JmsDestination destination) { boolean result = false; ConsumerEndpoint[] consumers = getConsumers(); for (int i = 0; i < consumers.length; ++i) { if (consumers[i].canConsume(destination)) { result = true; break; } } return result; } /** * Start the service. * * @throws ServiceException if the service fails to start */ protected void doStart() throws ServiceException { try { _database.begin(); Connection connection = _database.getConnection(); PersistenceAdapter adapter = _database.getAdapter(); // return a list of JmsDestination objects. HashMap map = adapter.getAllDurableConsumers(connection); Iterator iter = map.keySet().iterator(); // Create an endpoint for each durable consumer while (iter.hasNext()) { String consumer = (String) iter.next(); String deststr = (String) map.get(consumer); JmsDestination dest = _destinations.getDestination(deststr); if (dest == null) { // this maybe a wildcard subscription dest = new JmsTopic(deststr); if (!((JmsTopic) dest).isWildCard()) { dest = null; } } if (consumer != null && dest != null && dest instanceof JmsTopic) { // cache the consumer-destination mapping in memory. addDurableConsumer((JmsTopic) dest, consumer, null); } else { // @todo _log.error("Failure in ConsumerManager.init : " + consumer + ":" + dest); } } _database.commit(); } catch (Exception exception) { rollback(); throw new ServiceException("Failed to initialise ConsumerManager", exception); } } /** * Stop the service. */ protected synchronized void doStop() { // clean up all the destinations Object[] endpoints = _endpoints.values().toArray(); for (int index = 0; index < endpoints.length; index++) { closeConsumer((ConsumerEndpoint) endpoints[index]); } _endpoints.clear(); // remove cache data structures _consumers.clear(); _destToConsumerMap.clear(); _wildcardConsumers.clear(); } /** * Create an inactive durable consumer. * <p/> * If the consumer doesn't exist, it will created in the persistent store. * If it does exist, and is inactive, it will be recreated. If it does * exist, but is active, an exception will be raised. * * @param topic the topic to subscribe to * @param name the subscription name * @param clientID the client identifier. May be <code>null</code>. * @return the durable consumer * @throws InvalidDestinationException if <code>topic</code> is not a * persistent destination * @throws JMSException if a durable consumer is already * active with the same <code>name</code>, * or the consumer can't be created */ private DurableConsumerEndpoint createInactiveDurableConsumer( JmsTopic topic, String name, String clientID) throws JMSException { DurableConsumerEndpoint endpoint; if (_log.isDebugEnabled()) { _log.debug("createInactiveDurableConsumer(topic=" + topic + ", name=" + name + ", clientID=" + clientID + ")"); } // check that the destination exists, if the topic is not a wildcard if (!topic.isWildCard()) { topic = (JmsTopic) getDestination(topic, false); } if (name == null || name.length() == 0) { throw new InvalidDestinationException( "Invalid subscription name: " + name); } endpoint = (DurableConsumerEndpoint) _endpoints.get(name); if (endpoint != null) { if (endpoint.isActive()) { throw new JMSException( "Durable subscriber already exists with name: " + name); } if (!endpoint.getDestination().equals(topic)) { // subscribing to a different topic. Need to re-subscribe. unsubscribe(name, clientID); endpoint = null; } } if (endpoint == null) { try { _database.begin(); PersistenceAdapter adapter = _database.getAdapter(); Connection connection = _database.getConnection(); adapter.addDurableConsumer(connection, topic.getName(), name); endpoint = addDurableConsumer(topic, name, clientID); _database.commit(); } catch (Exception exception) { String msg = "Failed to create durable consumer, name=" + name + ", for topic=" + topic.getName(); rethrow(msg, exception); } } return endpoint; } /** * Register an inactive durable consumer. * * @param topic the topic to subscribe to * @param name the subscription name * @param clientID the client identifier. May be <code>null</code>. * @return the durable consumer
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?