📄 jmstopicsession.java
字号:
/**
* Redistribution and use of this software and associated documentation
* ("Software"), with or without modification, are permitted provided
* that the following conditions are met:
*
* 1. Redistributions of source code must retain copyright
* statements and notices. Redistributions must also contain a
* copy of this document.
*
* 2. Redistributions in binary form must reproduce the
* above copyright notice, this list of conditions and the
* following disclaimer in the documentation and/or other
* materials provided with the distribution.
*
* 3. The name "Exolab" must not be used to endorse or promote
* products derived from this Software without prior written
* permission of Exoffice Technologies. For written permission,
* please contact info@exolab.org.
*
* 4. Products derived from this Software may not be called "Exolab"
* nor may "Exolab" appear in their names without prior written
* permission of Exoffice Technologies. Exolab is a registered
* trademark of Exoffice Technologies.
*
* 5. Due credit should be given to the Exolab Project
* (http://www.exolab.org/).
*
* THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
* ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
* NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
* FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
* OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Copyright 2000-2004 (C) Exoffice Technologies Inc. All Rights Reserved.
*
* $Id: JmsTopicSession.java,v 1.25 2004/01/20 14:14:21 tanderson Exp $
*/
package org.exolab.jms.client;
import java.util.HashSet;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.TopicSession;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.jms.TopicPublisher;
import javax.jms.TemporaryTopic;
/**
* Client implementation of the <code>javax.jms.TopicSession</code> interface
*
* @version $Revision: 1.25 $ $Date: 2004/01/20 14:14:21 $
* @author <a href="mailto:jima@exoffice.com">Jim Alateras</a>
*/
class JmsTopicSession
extends JmsSession
implements TopicSession {
/**
* This set maintains the list of active durable subscriber names for the
* session. Durable subscriber names must be unique within the session
*/
private HashSet _durableNames = new HashSet();
/**
* Construct a new <code>JmsTopicSession</code>
*
* @param connection the owner of the session
* @param transacted if <code>true</code>, the session is transacted.
* @param ackMode indicates whether the consumer or the client will
* acknowledge any messages it receives. This parameter will be ignored if
* the session is transacted. Legal values are
* <code>Session.AUTO_ACKNOWLEDGE</code>,
* <code>Session.CLIENT_ACKNOWLEDGE</code> and
* <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
* @throws JMSException if the session cannot be created
*/
public JmsTopicSession(JmsTopicConnection connection, boolean transacted,
int ackMode) throws JMSException {
super(connection, transacted, ackMode);
}
/**
* Create a topic identity given its name
*
* @param topicName the topic name
* @return a topic with the given name.
* @throws JMSException if the topic can't be created
*/
public synchronized Topic createTopic(String topicName)
throws JMSException {
ensureOpen();
JmsTopic topic = null;
if (topicName != null && topicName.length() > 0) {
topic = new JmsTopic(topicName);
} else {
throw new JMSException("Invalid or null topic name specified");
}
return topic;
}
/**
* Create a non-durable subscriber for the specified topic
*
* @param topic the topic to subscriber to
* @return the new subscriber
* @throws JMSException if the subscriber cannot be created
*/
public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
return createSubscriber(topic, null, false);
}
/**
* Create a non-durable subscriber for the specified topic
*
* @param topic the topic to subscriber to
* @param selector the message selector to filter messages.
* May be <code>null</code>
* @param noLocal if <code>true</code>, inhibits the delivery of messages
* published by its own connection
* @return the new subscriber
* @throws JMSException if the subscriber cannot be created
*/
public synchronized TopicSubscriber createSubscriber(
Topic topic, String selector, boolean noLocal) throws JMSException {
JmsTopicSubscriber subscriber = null;
ensureOpen();
if (topic == null) {
throw new InvalidDestinationException(
"Cannot create subscriber: argument 'topic' is null");
}
// check to see if the topic is temporary. A temporary topic
// can only be used within the context of the owning connection
if (!checkForValidTemporaryDestination((JmsDestination) topic)) {
throw new InvalidDestinationException(
"Trying to create a subscriber for a temp topic "
+ "that is not bound to this connection");
}
subscriber = new JmsTopicSubscriber(
this, getNextConsumerId(), (JmsTopic) topic, selector, noLocal,
null);
addSubscriber(subscriber, null);
return subscriber;
}
/**
* Create a durable subscriber for the specified topic
*
* @param topic the topic to subscribe to
* @param name the durable subscription name
* @return a new durable subscriber
* @throws JMSException if the subscriber can't be created
*/
public TopicSubscriber createDurableSubscriber(Topic topic, String name)
throws JMSException {
return createDurableSubscriber(topic, name, null, false);
}
/**
* Create a durable subscriber for the specified topic
*
* @param topic the topic to subscribe to
* @param name the durable subscription name
* @param selector the message selector to filter messages.
* May be <code>null</code>
* @param noLocal if <code>true</code>, inhibits the delivery of messages
* published by its own connection
* @return a new durable subscriber
* @throws JMSException if the subscriber can't be created
*/
public synchronized TopicSubscriber createDurableSubscriber(
Topic topic, String name, String selector, boolean noLocal)
throws JMSException {
JmsTopicSubscriber subscriber = null;
ensureOpen();
if (topic == null) {
throw new InvalidDestinationException(
"Cannot create durable subscriber: argument 'topic' is null");
}
if (name == null || name.trim().length() == 0) {
throw new JMSException("Invalid subscription name specified");
}
// ensure that no durable subscriber has already been created
// with the same name
if (_durableNames.contains(name)) {
throw new JMSException("A durable subscriber already exists "
+ "with name=" + name);
}
// check to see if the topic is a temporary topic. You cannot
// create a durable subscriber for a temporary topic
if (((JmsTopic) topic).isTemporaryDestination()) {
throw new InvalidDestinationException(
"Cannot create a durable subscriber for a temporary topic");
}
// since it is a durable subscriber, the persistent flag
// must be set within the topic.
JmsTopic durable = null;
try {
durable = (JmsTopic) ((JmsTopic) topic).clone();
} catch (CloneNotSupportedException error) {
throw new JMSException("Failed to clone JmsTopic");
}
durable.setPersistent(true);
subscriber = new JmsTopicSubscriber(
this, getNextConsumerId(), durable, selector, noLocal, name);
addSubscriber(subscriber, name);
_durableNames.add(name);
return subscriber;
}
/**
* Create a publisher for the specified topic.
*
* @param topic the topic to publish to, or <code>null</code> if this is an
* unidentified producer
* @return the new publisher
* @throws JMSException if the publisher can't be created
*/
public synchronized TopicPublisher createPublisher(Topic topic)
throws JMSException {
ensureOpen();
if (topic != null && ((JmsTopic) topic).isWildCard()) {
throw new JMSException(
"Cannot create a publisher using a wildcard topic");
}
JmsTopicPublisher publisher =
new JmsTopicPublisher(this, (JmsTopic) topic);
addPublisher(publisher);
return publisher;
}
/**
* Create a temporary topic. It's lifetime is that of the TopicConnection,
* unless deleted earlier.
*
* @return a new temporary topic
* @throws JMSException if the topic cannot be created
*/
public synchronized TemporaryTopic createTemporaryTopic()
throws JMSException {
ensureOpen();
JmsTemporaryTopic topic = new JmsTemporaryTopic();
topic.setOwningConnection(getConnection());
return topic;
}
/**
* Unsubscribe a durable subscription.
*
* @param name the durable subscription name
* @throws JMSException if the durable subscription can't be removed
*/
public synchronized void unsubscribe(String name) throws JMSException {
ensureOpen();
if (name != null && name.length() > 0) {
if (_durableNames.contains(name)) {
throw new JMSException("Cannot unsubscribe while an active "
+ "TopicSubscriber exists");
}
getJmsSessionStub().unsubscribe(name);
_durableNames.remove(name);
} else {
throw new JMSException("Cannot unsubscribe with a null name");
}
}
/**
* Register a subscriber
*
* @param subscriber the subscriber to register
* @param name durable subscription name, for durable subscribers,
* <code>null</code> otherwise.
* @throws JMSException if the subscriber cannot be registered with the
* server
*/
protected synchronized void addSubscriber(JmsTopicSubscriber subscriber,
String name)
throws JMSException {
// create it on the server
getJmsSessionStub().createSubscriber(
(JmsTopic) subscriber.getTopic(), name,
subscriber.getClientId(),
subscriber.getMessageSelector(),
subscriber.getNoLocal());
// register locally
addConsumer(subscriber);
}
/**
* Register a publisher
*
* @param publisher the publisher to register
* @throws JMSException if the publisher cannot be registered with the
* server
*/
protected synchronized void addPublisher(JmsTopicPublisher publisher)
throws JMSException {
getJmsSessionStub().createPublisher((JmsTopic) publisher.getTopic());
addProducer(publisher);
}
/**
* Deregister azsubscriber
*
* @param subscriber the subscriber to deregister
* @throws JMSException if the subscriber cannot be deregistered from the
* server
*/
protected synchronized void removeSubscriber(JmsTopicSubscriber subscriber)
throws JMSException {
if (!isClosed()) {
// remove the listener before deleting the subscriber. This is
// the correct order for clean up.
removeMessageListener(subscriber);
getJmsSessionStub().deleteSubscriber(subscriber.getClientId());
}
// local clean up
removeConsumer(subscriber);
if (subscriber.isDurableSubscriber()) {
_durableNames.remove(subscriber.getName());
}
}
/**
* Deregister a publisher
*
* @param publisher the publisher to deregister
*/
protected synchronized void removePublisher(JmsTopicPublisher publisher) {
removeProducer(publisher);
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -