📄 jmssession.java
字号:
// send all the cached messages to the server
getJmsSessionStub().sendMessages(_messagesToSend);
_publishCount += _messagesToSend.size();
_messagesToSend.clear();
// commit the session
getJmsSessionStub().commit();
}
/**
* Rollback any messages done in this transaction
*
* @throws JMSException if the transaction cannot be rolled back
*/
public synchronized void rollback() throws JMSException {
ensureOpen();
ensureTransactional();
// clear all the cached messages
_messagesToSend.clear();
// rollback the session
getJmsSessionStub().rollback();
}
/**
* Close the session. This call will block until a receive or message
* listener in progress has completed. A blocked message consumer receive
* call returns <code>null</code> when this session is closed.
*
* @throws JMSException if the session can't be closed
*/
public synchronized void close() throws JMSException {
if (!_closed) {
_closing = true;
// signal the stub that we are preparing to close the
// connection.
getJmsSessionStub().beforeClose();
// must stop first before we close
stop();
// wake up any blocking consumers
notifyConsumers();
// go through all the producer and call close on them
// respectively
Enumeration producers = getProducers();
while (producers.hasMoreElements()) {
JmsMessageProducer producer =
(JmsMessageProducer) producers.nextElement();
producer.close();
}
// go through all the consumer and call close on them
// respectively
Enumeration consumers = getConsumers();
while (consumers.hasMoreElements()) {
JmsMessageConsumer consumer =
(JmsMessageConsumer) consumers.nextElement();
consumer.close();
}
// deregister this with the connection
_connection.removeSession(this);
_connection = null;
// clear any cached messages or acks
_messagesToSend.clear();
// issue a close to the remote session. This will release any
// allocated remote resources
getJmsSessionStub().close();
_stub = null;
// update the session state
_closed = true;
_closing = false;
}
}
/**
* Stop message delivery in this session, and restart sending messages with
* the oldest unacknowledged message
*
* @throws JMSException if the session can't be recovered
*/
public synchronized void recover() throws JMSException {
ensureOpen();
if (!_transacted) {
// let the server handle the recovery
getJmsSessionStub().recover();
} else {
throw new IllegalStateException(
"Cannot recover from a transacted session");
}
}
/**
* Returns the message listener associated with the session
*
* @return the message listener associated with the session, or
* <code>null</code> if no listener is registered
* @throws JMSException if the session is closed
*/
public MessageListener getMessageListener() throws JMSException {
ensureOpen();
return _listener;
}
/**
* Sets the session's message listener.
*
* @param listener the session's message listener
* @throws JMSException if the session is closed
*/
public void setMessageListener(MessageListener listener)
throws JMSException {
ensureOpen();
_listener = listener;
}
/**
* Iterates through the list of messages added by an
* {@link JmsConnectionConsumer}, sending them to the registered listener
*/
public void run() {
try {
while (!_messageCache.isEmpty()) {
Message message = (Message) _messageCache.remove(0);
_listener.onMessage(message);
}
} catch (Exception exception) {
_log.error("Error in the Session.run()", exception);
} finally {
// Clear message cache
_messageCache.clear();
}
}
/**
* Set the message listener for a particular consumer.
* <p>
* If a listener is already registered for the consumer, it will be
* automatically overwritten
*
* @param listener the message listener
* @throws JMSException if the listener can't be set
*/
public void setMessageListener(JmsMessageConsumer listener)
throws JMSException {
ensureOpen();
enableAsynchronousDelivery(listener.getClientId(),
listener.getLastMessageDelivered(), true);
}
/**
* Remove a message listener
*
* @param listener the message listener to remove
* @throws JMSException if the listener can't be removed
*/
public void removeMessageListener(JmsMessageConsumer listener)
throws JMSException {
ensureOpen();
enableAsynchronousDelivery(listener.getClientId(),
listener.getLastMessageDelivered(), false);
}
/**
* This will start message delivery to this session. If message delivery
* has already started then this is a no-op.
*
* @throws JMSException if message delivery can't be started
*/
public void start() throws JMSException {
ensureOpen();
if (_stopped) {
getJmsSessionStub().startMessageDelivery();
_stopped = false;
// wake up any blocking consumers
notifyConsumers();
}
}
/**
* This will stop message delivery to this session. If message delivery
* has already stoped then this is a no-op.
*
* @throws JMSException if message delivery can't be stopped
*/
public void stop() throws JMSException {
ensureOpen();
if (!_stopped) {
getJmsSessionStub().stopMessageDelivery();
_stopped = true;
// wake up any blocking consumers
notifyConsumers();
}
}
/**
* Acknowledge the specified message. This is only applicable for
* CLIENT_ACKNOWLEDGE sessions. For other session types, the request
* is ignored.
* <p>
* Acking a message automatically acks all those that have come
* before it.
*
* @param message the message to acknowledge
* @throws JMSException if the message can't be acknowledged
*/
public void acknowledgeMessage(Message message) throws JMSException {
ensureOpen();
if (_ackMode == Session.CLIENT_ACKNOWLEDGE) {
MessageImpl impl = (MessageImpl) message;
getJmsSessionStub().acknowledgeMessage(impl.getClientId(),
impl.getAckMessageID());
}
}
/**
* Enable or disable asynchronous message delivery for the specified
* client.
*
* @param clientId - the client identity
* @param id - the last message delivered asynchronously
* @param enable - <code>true</code> to enable; <code>false</code> to
* disable
* @throws JMSException if message delivery cannot be enabled or disabled
*/
public void enableAsynchronousDelivery(long clientId, String id,
boolean enable)
throws JMSException {
ensureOpen();
getJmsSessionStub().enableAsynchronousDelivery(clientId, id, enable);
}
/**
* Asynchronously deliver a message to a <code>MessageConsumer</code>
*
* @param message the message to deliver
*/
public void onMessage(Message message) {
if (message != null) {
MessageImpl impl = (MessageImpl) message;
impl.setJMSXRcvTimestamp(System.currentTimeMillis());
// dispatch the message;
execute(message);
}
}
/**
* Asynchronously deliver a set of message to a
* <code>MessageConsumer</code>
*
* @param messages the messages to deliver
*/
public void onMessages(Vector messages) {
while (messages.size() > 0) {
onMessage((Message) messages.remove(0));
}
}
/**
* Inform the session that there is a message available
* for the message consumer with the specified identity
*
* @param clientId the identity of the client
*/
public void onMessageAvailable(long clientId) {
// wake up any blocking consumers
notifyConsumers();
}
/**
* This is the called to process messages asynchronously delivered by the
* server. The session is then responsible for delivering it to the
* appropriate registered consumer. If it cannot resolve the consumer then
* it must log an exception
* <p>
* If the session has a registered listener then all messages will be
* delivered to the session's listener instead of the individual consumer
* message listeners.
*
* @param object received message
*/
public synchronized void execute(Object object) {
// if the session is closed then drop the object
if (_closed) {
_log.error("Received a message for a closed session");
return;
}
MessageImpl message = (MessageImpl) object;
long clientId = message.getClientId();
JmsMessageConsumer consumer =
(JmsMessageConsumer) _consumers.get(new Long(clientId));
// tag the session that received this message
message.setSession(this);
if (consumer != null) {
// if a listener is defined for the session then send all the
// messages to that listener regardless if any consumers are
// have registered listeners...bit confusing but this is what
// I believe it should do
if (_listener != null) {
_listener.onMessage(message);
} else {
// send it to the appropriate consumer
consumer.onMessage(message);
}
} else {
// consumer no longer active...so drop the message
_log.error("Received a message for an inactive consumer");
}
}
/**
* Returns the session identifier
*
* @return the session identifier
*/
public String getSessionId() {
return _sessionId;
}
/**
* Return the acknowledgement mode for the session
*
* @return the acknowledgement mode for the session
*/
public int getAckMode() {
return _ackMode;
}
/**
* Fetch the next message for this client. If the session's ackMode is
* client acknowledge then set the session for the message, othwerwise
* ack the message before returning it.
*
* @param clientId the consumer identififer.
* @param wait the maximum time to wait for a message, in milliseconds.
* If <code>-1</code>, don't wait, if <code>0</code> wait indefinitely,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -