📄 jmsserversession.java
字号:
/**
* Browse up to count messages
*
* @param consumerId the consumer identifier
* @param count the maximum number of messages to receive
* @return a list of {@link MessageImpl} instances
* @throws JMSException for any JMS error
*/
public List browse(long consumerId, int count) throws JMSException {
ConsumerEndpoint consumer = getConsumerEndpoint(consumerId);
if (consumer == null) {
throw new JMSException("Can't browse messages: no browser registered with "
+ "identifier "
+ consumerId
+ " on session");
}
if (!(consumer instanceof QueueBrowserEndpoint)) {
throw new JMSException("Can't browse messages: invalid consumer");
}
Vector handles = ((QueueBrowserEndpoint) consumer).receiveMessages(
count);
List messages = new ArrayList(count);
Iterator iterator = handles.iterator();
while (iterator.hasNext()) {
MessageHandle handle = (MessageHandle) iterator.next();
MessageImpl orig = handle.getMessage();
if (orig != null) {
// clone the message to set client specific properties
try {
MessageImpl message = (MessageImpl) orig.clone();
message.setJMSRedelivered(handle.getDelivered());
message.setConsumerId(handle.getConsumerId());
messages.add(message);
} catch (Exception exception) {
_log.error(exception);
}
if (messages.size() == count) {
break;
}
}
}
return messages;
}
/**
* Create a new message consumer
*
* @param destination the destination to consume messages from
* @param selector the message selector. May be <code>null</code>
* @param noLocal if true, and the destination is a topic, inhibits the
* delivery of messages published by its own connection.
* The behavior for <code>noLocal</code> is not specified
* if the destination is a queue.
* @return the identifty of the message consumer
* @throws JMSException for any JMS error
*/
public long createConsumer(JmsDestination destination, String selector,
boolean noLocal) throws JMSException {
if (_log.isDebugEnabled()) {
_log.debug("createConsumer(destination=" + destination
+ ", selector=" + selector + ", noLocal=" + noLocal
+ ") [session=" + this + "]");
}
if (destination == null) {
throw new InvalidDestinationException(
"Cannot create MessageConsumer for null destination");
}
// Retrieve the destination from the destination manager and use
// it to create the consumer
ConsumerEndpoint consumer =
ConsumerManager.instance().createConsumerEndpoint(this,
destination,
selector, noLocal);
final long id = consumer.getId();
consumer.setStopped(_stopped);
_consumers.put(new Long(id), consumer);
return id;
}
/**
* Create a new durable consumer. Durable consumers may only consume from
* non-temporary <code>Topic</code> destinations.
*
* @param topic the non-temporary <code>Topic</code> to subscribe to
* @param name the name used to identify this subscription
* @param selector only messages with properties matching the message
* selector expression are delivered. A value of null or an
* empty string indicates that there is no message selector
* for the message consumer.
* @param noLocal if set, inhibits the delivery of messages published by
* its own connection
* @return the identity of the durable consumer
* @throws JMSException for any JMS error
*/
public long createDurableConsumer(JmsTopic topic, String name,
String selector, boolean noLocal)
throws JMSException {
if (_log.isDebugEnabled()) {
_log.debug("createDurableConsumer(topic=" + topic + ", name="
+ name
+ ", selector=" + selector + ", noLocal=" + noLocal
+ ") [session=" + this + "]");
}
if (topic == null || topic.isTemporaryDestination()) {
throw new InvalidDestinationException("Invalid topic: " + topic);
}
if (name == null) {
throw new InvalidDestinationException("Invalid subscription name");
}
ConsumerManager manager = ConsumerManager.instance();
if (manager.durableConsumerExists(name)) {
// if the durable consumer exists then validate that
// it was the specified topic that it was registered
// under. If it is not registered for the topic then
// we must delete the existing entry and recreate it
// against the new topic
if (!manager.validSubscription(topic.getName(), name)) {
unsubscribe(name);
manager.createDurableConsumer(topic, name);
}
} else {
// the durable consumer does not exist. so create
// it
manager.createDurableConsumer(topic, name);
}
// if a durable subscriber with the specified name is
// already active then this method will throw an exception.
// attempt to create a durable consuinmer
ConsumerEndpoint consumer = manager.createDurableConsumerEndpoint(this,
topic,
name,
noLocal,
selector);
final long id = consumer.getId();
consumer.setStopped(_stopped);
_consumers.put(new Long(id), consumer);
return id;
}
/**
* Create a queue browser for this session. This allows clients to browse a
* queue without removing any messages.
*
* @param queue the queue to browse
* @param selector the message selector. May be <code>null</code>
* @return the identity of the queue browser
* @throws JMSException for any JMS error
*/
public long createBrowser(JmsQueue queue, String selector)
throws JMSException {
if (_log.isDebugEnabled()) {
_log.debug("createBrowser(queue=" + queue + ", selector="
+ selector
+ ") [session=" + this + "]");
}
if (queue == null) {
throw new JMSException("Cannot create QueueBrowser for null queue");
}
ConsumerEndpoint consumer =
ConsumerManager.instance().createQueueBrowserEndpoint(this,
queue,
selector);
final long id = consumer.getId();
consumer.setStopped(_stopped);
_consumers.put(new Long(id), consumer);
return id;
}
/**
* Delete the receiver with the specified identity and clean up all
* associated resources.
*
* @param consumerId the consumer identifier
* @throws JMSException if the consumer cannot be deleted
*/
public void removeConsumer(long consumerId) throws JMSException {
if (_log.isDebugEnabled()) {
_log.debug("removeConsumer(consumerId=" + consumerId
+ ") [session="
+ this + "]");
}
ConsumerEndpoint consumer =
(ConsumerEndpoint) _consumers.remove(new Long(consumerId));
if (consumer == null) {
throw new JMSException("No consuemr with id=" + consumerId);
}
// destroy the consumer endpoint
ConsumerManager.instance().deleteConsumerEndpoint(consumer);
}
/**
* Unsubscribe a durable subscription
*
* @param name the name used to identify the subscription
* @throws JMSException for any JMS error
*/
public void unsubscribe(String name) throws JMSException {
if (_log.isDebugEnabled()) {
_log.debug("unsubscribe(name=" + name + ") [session=" + this + "]");
}
ConsumerManager manager = ConsumerManager.instance();
// check that the durable consumer actually exists. If it doesn't then
// throw an exception
if (!manager.durableConsumerExists(name)) {
throw new InvalidDestinationException(
name + " is not a durable subscriber name");
}
// check that the durable consumer is not active before removing it. If
// it is then throw an exception
if (!manager.isDurableConsumerActive(name)) {
manager.removeDurableConsumer(name);
} else {
throw new JMSException("Failed to unsubscribe subscriber "
+ name + " since is still active");
}
}
/**
* Start the message delivery for the session.
*/
public void start() {
if (_log.isDebugEnabled()) {
_log.debug("start() [session=" + this + "]");
}
if (_stopped) {
pause(false);
_stopped = false;
}
}
/**
* Stop message delivery for the session
*/
public void stop() {
if (_log.isDebugEnabled()) {
_log.debug("stop() [session=" + this + "]");
}
if (!_stopped) {
pause(true);
_stopped = true;
}
}
/**
* Set a message listener for the session. This is the channel used to
* asynchronously deliver messages to consumers created on this session.
*
* @param listener the message listener
*/
public void setMessageListener(JmsMessageListener listener) {
_listener = listener;
}
/**
* Enable or disable asynchronous message delivery for a particular
* consumer
*
* @param consumerId the consumer identifier
* @param enable true to enable; false to disable
* @throws JMSException for any JMS error
*/
public void enableAsynchronousDelivery(long consumerId, boolean enable)
throws JMSException {
ConsumerEndpoint consumer = getConsumerEndpoint(consumerId);
if (consumer == null) {
throw new JMSException(consumerId + " is not registered");
}
if (enable) {
consumer.setMessageListener(this);
} else {
consumer.setMessageListener(null);
}
}
/**
* Close and release any resource allocated to this session.
*
* @throws JMSException if the session cannot be closed
*/
public void close() throws JMSException {
boolean closed = false;
synchronized (this) {
closed = _closed;
if (!closed) {
_closed = true;
}
}
if (!closed) {
if (_log.isDebugEnabled()) {
_log.debug("close() [session=" + this + "]");
}
// reset the listener
setMessageListener(null);
// iterate over the list of consumers and deregister the
// associated endpoints and then remove all the entries
Iterator consumers = _consumers.values().iterator();
while (consumers.hasNext()) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -