📄 jmssession.java
字号:
public TemporaryQueue createTemporaryQueue() throws JMSException {
ensureOpen();
JmsTemporaryQueue queue = new JmsTemporaryQueue();
queue.setOwningConnection(getConnection());
return queue;
}
/**
* Creates a <code>TemporaryTopic</code> object. Its lifetime will be that
* of the <code>Connection</code> unless it is deleted earlier.
*
* @return a temporary topic identity
* @throws JMSException if the session fails to create a temporary topic due
* to some internal error.
*/
public TemporaryTopic createTemporaryTopic() throws JMSException {
ensureOpen();
JmsTemporaryTopic topic = new JmsTemporaryTopic();
topic.setOwningConnection(getConnection());
return topic;
}
/**
* Unsubscribes a durable subscription that has been created by a client.
* <p/>
* <P>This method deletes the state being maintained on behalf of the
* subscriber by its provider.
* <p/>
* <P>It is erroneous for a client to delete a durable subscription while
* there is an active <code>MessageConsumer</code> or
* <code>TopicSubscriber</code> for the subscription, or while a consumed
* message is part of a pending transaction or has not been acknowledged in
* the session.
*
* @param name the name used to identify this subscription
* @throws JMSException if the session fails to unsubscribe
* to the durable subscription due to
* some internal error.
* @throws InvalidDestinationException if an invalid subscription name is
* specified.
*/
public void unsubscribe(String name) throws JMSException {
ensureOpen();
_session.unsubscribe(name);
}
/**
* Commit all messages done in this transaction
*
* @throws JMSException if the transaction cannot be committed
*/
public synchronized void commit() throws JMSException {
ensureOpen();
ensureTransactional();
// send all the cached messages to the server
getServerSession().send(_messagesToSend);
_messagesToSend.clear();
// commit the session
getServerSession().commit();
}
/**
* Rollback any messages done in this transaction
*
* @throws JMSException if the transaction cannot be rolled back
*/
public synchronized void rollback() throws JMSException {
ensureOpen();
ensureTransactional();
// clear all the cached messages
_messagesToSend.clear();
// rollback the session
getServerSession().rollback();
}
/**
* Close the session. This call will block until a receive or message
* listener in progress has completed. A blocked message consumer receive
* call returns <code>null</code> when this session is closed.
*
* @throws JMSException if the session can't be closed
*/
public synchronized void close() throws JMSException {
if (!_closed) {
_closing = true;
// must stop first before we close
stop();
// wake up any blocking consumers
notifyConsumers();
// go through all the producer and call close on them
// respectively
JmsMessageProducer[] producers =
(JmsMessageProducer[]) _producers.toArray(
new JmsMessageProducer[0]);
for (int i = 0; i < producers.length; ++i) {
JmsMessageProducer producer = producers[i];
producer.close();
}
// go through all the consumer and call close on them
// respectively
JmsMessageConsumer[] consumers =
(JmsMessageConsumer[]) _consumers.values().toArray(
new JmsMessageConsumer[0]);
for (int i = 0; i < consumers.length; ++i) {
JmsMessageConsumer consumer = consumers[i];
consumer.close();
}
// deregister this with the connection
_connection.removeSession(this);
_connection = null;
// clear any cached messages or acks
_messagesToSend.clear();
// issue a close to the remote session. This will release any
// allocated remote resources
getServerSession().close();
_session = null;
// update the session state
_closed = true;
_closing = false;
}
}
/**
* Stop message delivery in this session, and restart sending messages with
* the oldest unacknowledged message
*
* @throws JMSException if the session can't be recovered
*/
public synchronized void recover() throws JMSException {
ensureOpen();
if (!_transacted) {
// let the server handle the recovery
getServerSession().recover();
} else {
throw new IllegalStateException(
"Cannot recover from a transacted session");
}
}
/**
* Returns the message listener associated with the session
*
* @return the message listener associated with the session, or
* <code>null</code> if no listener is registered
* @throws JMSException if the session is closed
*/
public MessageListener getMessageListener() throws JMSException {
ensureOpen();
return _listener;
}
/**
* Sets the session's message listener.
*
* @param listener the session's message listener
* @throws JMSException if the session is closed
*/
public void setMessageListener(MessageListener listener)
throws JMSException {
ensureOpen();
_listener = listener;
}
/**
* Iterates through the list of messages added by an {@link
* JmsConnectionConsumer}, sending them to the registered listener
*/
public void run() {
try {
while (!_messageCache.isEmpty()) {
Message message = (Message) _messageCache.remove(0);
_listener.onMessage(message);
}
} catch (Exception exception) {
_log.error("Error in the Session.run()", exception);
} finally {
// Clear message cache
_messageCache.clear();
}
}
/**
* Set the message listener for a particular consumer.
* <p/>
* If a listener is already registered for the consumer, it will be
* automatically overwritten
*
* @param listener the message listener
* @throws JMSException if the listener can't be set
*/
public void setMessageListener(JmsMessageConsumer listener)
throws JMSException {
ensureOpen();
enableAsynchronousDelivery(listener.getConsumerId(), true);
}
/**
* Remove a message listener
*
* @param listener the message listener to remove
* @throws JMSException if the listener can't be removed
*/
public void removeMessageListener(JmsMessageConsumer listener)
throws JMSException {
ensureOpen();
enableAsynchronousDelivery(listener.getConsumerId(), false);
}
/**
* This will start message delivery to this session. If message delivery has
* already started then this is a no-op.
*
* @throws JMSException if message delivery can't be started
*/
public void start() throws JMSException {
ensureOpen();
if (_stopped) {
getServerSession().start();
_stopped = false;
// wake up any blocking consumers
notifyConsumers();
}
}
/**
* This will stop message delivery to this session. If message delivery has
* already stoped then this is a no-op.
*
* @throws JMSException if message delivery can't be stopped
*/
public void stop() throws JMSException {
ensureOpen();
if (!_stopped) {
getServerSession().stop();
_stopped = true;
// wake up any blocking consumers
notifyConsumers();
}
}
/**
* Acknowledge the specified message. This is only applicable for
* CLIENT_ACKNOWLEDGE sessions. For other session types, the request is
* ignored.
* <p/>
* Acking a message automatically acks all those that have come before it.
*
* @param message the message to acknowledge
* @throws JMSException if the message can't be acknowledged
*/
public void acknowledgeMessage(Message message) throws JMSException {
ensureOpen();
if (_ackMode == Session.CLIENT_ACKNOWLEDGE) {
MessageImpl impl = (MessageImpl) message;
getServerSession().acknowledgeMessage(impl.getConsumerId(),
impl.getAckMessageID());
}
}
/**
* Enable or disable asynchronous message delivery for the specified
* client.
*
* @param consumerId the consumer identifier
* @param enable <code>true</code> to enable; <code>false</code> to
* disable
* @throws JMSException if message delivery cannot be enabled or disabled
*/
public void enableAsynchronousDelivery(long consumerId, boolean enable)
throws JMSException {
ensureOpen();
getServerSession().enableAsynchronousDelivery(consumerId, enable);
}
/**
* Asynchronously deliver a message to a <code>MessageConsumer</code>
*
* @param message the message to deliver
*/
public void onMessage(Message message) {
if (message != null) {
MessageImpl impl = (MessageImpl) message;
impl.setJMSXRcvTimestamp(System.currentTimeMillis());
// dispatch the message;
execute(message);
}
}
/**
* Inform the session that there is a message available for a particular
* consumer
*
* @param consumerId the consumer identity
*/
public void onMessageAvailable(long consumerId) {
// wake up any blocking consumers
notifyConsumers();
}
/**
* This is the called to process messages asynchronously delivered by the
* server. The session is then responsible for delivering it to the
* appropriate registered consumer. If it cannot resolve the consumer then
* it must log an exception
* <p/>
* If the session has a registered listener then all messages will be
* delivered to the session's listener instead of the individual consumer
* message listeners.
*
* @param object received message
*/
public synchronized void execute(Object object) {
// if the session is closed then drop the object
if (_closed) {
_log.error("Received a message for a closed session");
return;
}
MessageImpl message = (MessageImpl) object;
long consumerId = message.getConsumerId();
JmsMessageConsumer consumer =
(JmsMessageConsumer) _consumers.get(new Long(consumerId));
// tag the session that received this message
message.setSession(this);
if (consumer != null) {
// if a listener is defined for the session then send all the
// messages to that listener regardless if any consumers are
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -