📄 jmssession.java
字号:
// have registered listeners...bit confusing but this is what
// I believe it should do
if (_listener != null) {
_listener.onMessage(message);
} else {
// send it to the appropriate consumer
consumer.onMessage(message);
}
} else {
// consumer no longer active...so drop the message
_log.error("Received a message for an inactive consumer");
}
}
/**
* Fetch the next message for this client. If the session's ackMode is
* client acknowledge then set the session for the message, othwerwise ack
* the message before returning it.
*
* @param consumerId the consumer identififer
* @param wait the maximum time to wait for a message, in
* milliseconds. If <code>-1</code>, don't wait, if
* <code>0</code> wait indefinitely, otherwise wait the
* specified time.
* @return the received message, or <code>null</code>, if no message is
* available
* @throws JMSException if an error occurs retrieving the message
*/
public Message retrieveMessage(long consumerId, long wait)
throws JMSException {
ensureOpen();
boolean breakOnNextRead = false;
long start = System.currentTimeMillis();
long end = start + wait;
MessageImpl message = null;
while (true) {
synchronized (_receiveLock) {
if (_closing || _closed) {
// session is in the process of closing, or has been
// closed. Need to return null.
break;
} else if (_stopped) {
// connection has been stopped. No message can be returned,
// but receives continue to time out
} else {
// connection is started. Messages may be returned.
message = (MessageImpl) getServerSession().receive(
consumerId, wait);
}
if (message != null) {
message.setSession(this);
break;
} else {
// if we have instructed to break, then exit the loop.
if (breakOnNextRead) {
break;
}
// no message was received. Block for the specified time
// until one of the following occurs:
// . a message is received
// . the receive times out
// . the session is closed
if (wait >= 0) {
try {
if (wait > 0) {
// wait for a specific period of time
_receiveLock.wait(wait);
long current = System.currentTimeMillis();
if (current >= end) {
breakOnNextRead = true;
} else {
// update the time to wait. If the value
// is zero then break on the next read
wait = end - current;
if (wait == 0) {
breakOnNextRead = true;
}
}
} else {
// wait indefinitely
_receiveLock.wait();
}
} catch (InterruptedException ignore) {
// no-op
}
} else {
// exit the loop since the client is performing a non
// blocking read
break;
}
}
}
}
return message;
}
/**
* Browse up to count messages.
*
* @param consumerId the consumer identifier
* @param count the maximum number of messages to receive
* @return a list of {@link MessageImpl} instances
* @throws JMSException for any JMS error
*/
public synchronized List browse(long consumerId, int count)
throws JMSException {
ensureOpen();
return getServerSession().browse(consumerId, count);
}
/**
* Send the specified message to the server.
*
* @param message the message to send
* @throws JMSException if the message can't be sent
*/
protected synchronized void sendMessage(Message message)
throws JMSException {
if (_transacted) {
// if the session is transacted then cache the message locally.
// and wait for a commit or a rollback
if (message instanceof MessageImpl) {
try {
message = (Message) ((MessageImpl) message).clone();
} catch (CloneNotSupportedException error) {
throw new JMSException(error.getMessage());
}
} else {
message = convert(message);
}
_messagesToSend.add(message);
} else {
if (!(message instanceof MessageImpl)) {
message = convert(message);
}
getServerSession().send((MessageImpl) message);
}
}
/**
* Returns the server session.
*
* @return the server session
*/
protected ServerSession getServerSession() {
return _session;
}
/**
* Return a reference to the connection that created this session.
*
* @return the owning connection
*/
protected JmsConnection getConnection() {
return _connection;
}
/**
* Creates a new message consumer, returning its identity.
*
* @param destination the destination to access
* @param selector the message selector. May be <code>null</code>
* @param noLocal if true, and the destination is a topic, inhibits the
* delivery of messages published by its own connection.
* The behavior for <code>noLocal</code> is not specified
* if the destination is a queue.
* @throws JMSException if the session fails to create a
* MessageConsumer due to some internal
* error.
* @throws InvalidDestinationException if an invalid destination is
* specified.
* @throws InvalidSelectorException if the message selector is invalid.
*/
protected long allocateConsumer(Destination destination,
String selector, boolean noLocal)
throws JMSException {
ensureOpen();
if (!(destination instanceof JmsDestination)) {
throw new InvalidDestinationException(
"Cannot create MessageConsumer for destination="
+ destination);
}
JmsDestination dest = (JmsDestination) destination;
// check to see if the destination is temporary. A temporary destination
// can only be used within the context of the owning connection
if (!checkForValidTemporaryDestination(dest)) {
throw new InvalidDestinationException(
"Trying to create a MessageConsumer for a temporary "
+ "destination that is not bound to this connection");
}
long consumerId = _session.createConsumer(dest, selector, noLocal);
return consumerId;
}
/**
* This method checks the destination. If the destination is not temporary
* then return true. If it is a temporary destination and it is owned by
* this session's connection then it returns true. If it is a tmeporary
* destination and it is owned by another connection then it returns false
*
* @param destination the destination to check
* @return <code>true</code> if the destination is valid
*/
protected boolean checkForValidTemporaryDestination(
JmsDestination destination) {
boolean result = false;
if (destination.isTemporaryDestination()) {
JmsTemporaryDestination temp =
(JmsTemporaryDestination) destination;
// check that this temp destination is owned by the session's
// connection.
if (temp.validForConnection(getConnection())) {
result = true;
}
} else {
result = true;
}
return result;
}
/**
* Add a consumer to the list of consumers managed by this session.
*
* @param consumer the consumer to add
*/
protected void addConsumer(JmsMessageConsumer consumer) {
_consumers.put(new Long(consumer.getConsumerId()), consumer);
}
/**
* Remove a consumer, deregistering it on the server.
*
* @param consumer the consumer to remove
* @throws JMSException if removal fails
*/
protected void removeConsumer(JmsMessageConsumer consumer)
throws JMSException {
long consumerId = consumer.getConsumerId();
try {
if (!(consumer instanceof JmsQueueBrowser)) {
removeMessageListener(consumer);
}
_session.removeConsumer(consumerId);
} finally {
_consumers.remove(new Long(consumerId));
}
}
/**
* Add a producer to the list of producers managed by this session.
*
* @param producer the producer to add
*/
protected void addProducer(JmsMessageProducer producer) {
_producers.add(producer);
}
/**
* Remove the producer from the list of managed producers.
*
* @param producer the producer to remove
*/
protected void removeProducer(JmsMessageProducer producer) {
_producers.remove(producer);
}
/**
* Check if the session is closed.
*
* @return <code>true</code> if the session is closed
*/
protected final boolean isClosed() {
return _closed;
}
/**
* Add a message to the message cache. This message will be processed when
* the run() method is called.
*
* @param message the message to add.
*/
protected void addMessage(Message message) {
_messageCache.add(message);
}
/**
* Verifies that the session isn't closed.
*
* @throws IllegalStateException if the session is closed
*/
protected void ensureOpen() throws IllegalStateException {
if (_closed) {
throw new IllegalStateException(
"Cannot perform operation - session has been closed");
}
}
/**
* Verifies that the session is transactional.
*
* @throws IllegalStateException if the session isn't transactional
*/
private void ensureTransactional() throws IllegalStateException {
if (!_transacted) {
throw new IllegalStateException(
"Cannot perform operatiorn - session is not transactional");
}
}
/**
* Notifies any blocking synchronous consumers.
*/
private void notifyConsumers() {
synchronized (_receiveLock) {
_receiveLock.notifyAll();
}
}
/**
* Convert a message to its corresponding OpenJMS implementation.
*
* @param message the message to convert
* @return the OpenJMS implementation of the message
* @throws JMSException for any error
*/
private Message convert(Message message) throws JMSException {
MessageConverter converter =
MessageConverterFactory.create(message);
return converter.convert(message);
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -