📄 jmssession.java
字号:
* otherwise wait the specified time.
* @return the received message, or <code>null</code>, if no message is
* available
* @throws JMSException if an error occurs retrieving the message
*/
public Message retrieveMessage(long clientId, long wait)
throws JMSException {
ensureOpen();
boolean breakOnNextRead = false;
long start = System.currentTimeMillis();
long end = start + wait;
MessageImpl message = null;
while (true) {
synchronized (_receiveLock) {
if (_closing || _closed) {
// session is in the process of closing, or has been
// closed. Need to return null.
break;
} else if (_stopped) {
// connection has been stopped. No message can be returned,
// but receives continue to time out
} else {
// connection is started. Messages may be returned.
message = (MessageImpl) getJmsSessionStub().receiveMessage(
clientId, wait);
}
if (message != null) {
message.setSession(this);
break;
} else {
// if we have instructed to break, then exit the loop.
if (breakOnNextRead) {
break;
}
// no message was received. Block for the specified time
// until one of the following occurs:
// . a message is received
// . the receive times out
// . the session is closed
if (wait >= 0) {
try {
if (wait > 0) {
// wait for a specific period of time
_receiveLock.wait(wait);
long current = System.currentTimeMillis();
if (current >= end) {
breakOnNextRead = true;
} else {
// update the time to wait. If the value
// is zero then break on the next read
wait = end - current;
if (wait == 0) {
breakOnNextRead = true;
}
}
} else {
// wait indefinitely
_receiveLock.wait();
}
} catch (InterruptedException ignore) {
// no-op
}
} else {
// exit the loop since the client is performing a non
// blocking read
break;
}
}
}
}
return message;
}
/**
* Fetch up to count messages from the endpoint. This should only
* be called via a {@link JmsQueueBrowser}.
*
* @param clientId scoped to the session
* @param count the max messages to retrieve.
* @return the set of retrieve messages
* @throws JMSException if messages can't be retrieved
*/
public synchronized Vector retrieveMessages(long clientId, int count)
throws JMSException {
ensureOpen();
return getJmsSessionStub().receiveMessages(clientId, count);
}
/**
* Release local resources used by this session object
*
* @throws JMSException - if there is a problem completing this request
*/
public void destroy() throws JMSException {
if (!_closed) {
_closing = true;
// wake up any blocking consumers
notifyConsumers();
// go through all the producer and call close on them
// respectively
Enumeration producers = getProducers();
while (producers.hasMoreElements()) {
JmsMessageProducer producer =
(JmsMessageProducer) producers.nextElement();
producer.destroy();
}
// go through all the consumer and call close on them
// respectively
Enumeration consumers = getConsumers();
while (consumers.hasMoreElements()) {
JmsMessageConsumer consumer =
(JmsMessageConsumer) consumers.nextElement();
consumer.destroy();
}
// deregister this with the connection
_connection.removeSession(this);
_connection = null;
// clear any cached messages or acks
_messagesToSend.clear();
// simply release the reference to the sever session
_stub = null;
// update the session state
_closed = true;
_closing = false;
}
}
/**
* Send the specified message to the server.
*
* @param message the message to send
* @throws JMSException if the message can't be sent
*/
protected synchronized void sendMessage(Message message)
throws JMSException {
if (_transacted) {
// if the session is transacted then cache the message locally.
// and wait for a commit or a rollback
if (message instanceof MessageImpl) {
try {
message = (Message) ((MessageImpl) message).clone();
} catch (CloneNotSupportedException error) {
throw new JMSException(error.getMessage());
}
} else {
message = convert(message);
}
_messagesToSend.addElement(message);
} else {
if (!(message instanceof MessageImpl)) {
message = convert(message);
}
getJmsSessionStub().sendMessage(message);
_publishCount++;
}
}
/**
* Return an instance of the remote stub. This is set during object
* creation time
*
* @return the remote stub
*/
protected JmsSessionStubIfc getJmsSessionStub() {
return _stub;
}
/**
* Return a reference to the connection that created this session
*
* @return the owning connection
*/
protected JmsConnection getConnection() {
return _connection;
}
/**
* This method checks the destination. If the destination is not temporary
* then return true. If it is a temporary destination and it is owned by
* this session's connection then it returns true. If it is a tmeporary
* destination and it is owned by another connection then it returns false
*
* @param destination the destination to check
* @return <code>true</code> if the destination is valid
*/
protected boolean checkForValidTemporaryDestination(
JmsDestination destination) {
boolean result = false;
if (destination.isTemporaryDestination()) {
JmsTemporaryDestination temp =
(JmsTemporaryDestination) destination;
// check that this temp destination is owned by the session's
// connection.
if (temp.validForConnection(getConnection())) {
result = true;
}
} else {
result = true;
}
return result;
}
/**
* Returns a list of registered producers for the session
*
* @return an enumeration of the producers managed by the session
*/
protected Enumeration getProducers() {
return _producers.elements();
}
/**
* Returns a list of registered consumers for the session
*
* @return an enumeration of the consumers managed by the session
*/
protected Enumeration getConsumers() {
return _consumers.elements();
}
/**
* Returns the next seed value to be allocated to a new consumer
*
* @return a unique identifier for a consumer for this session
*/
protected long getNextConsumerId() {
return ++_consumerIdSeed;
}
/**
* Add a consumer to the list of consumers managed by this session
*
* @param consumer the consumer to add
*/
protected void addConsumer(JmsMessageConsumer consumer) {
_consumers.put(new Long(consumer.getClientId()), consumer);
}
/**
* Remove the consumer with the specified id from the list of managed
* consumers
*
* @param consumer the consumer to remove
*/
protected void removeConsumer(JmsMessageConsumer consumer) {
_consumers.remove(new Long(consumer.getClientId()));
}
/**
* Add a producer to the list of producers managed by this session
*
* @param producer the producer to add
*/
protected void addProducer(JmsMessageProducer producer) {
_producers.addElement(producer);
}
/**
* Remove the producer from the list of managed producers
*
* @param producer the producer to remove
*/
protected void removeProducer(JmsMessageProducer producer) {
_producers.remove(producer);
}
/**
* Check if the session is closed
*
* @return <code>true</code> if the session is closed
*/
protected final boolean isClosed() {
return _closed;
}
/**
* Add a message to the message cache. This message will be processed
* when the run() method is called.
*
* @param message the message to add.
*/
protected void addMessage(Message message) {
_messageCache.addElement(message);
}
/**
* Verifies that the session isn't closed
*
* @throws IllegalStateException if the session is closed
*/
protected void ensureOpen() throws IllegalStateException {
if (_closed) {
throw new IllegalStateException(
"Cannot perform operation - session has been closed");
}
}
/**
* Verifies that the session is transactional
*
* @throws IllegalStateException if the session isn't transactional
*/
private void ensureTransactional() throws IllegalStateException {
if (!_transacted) {
throw new IllegalStateException(
"Cannot perform operatiorn - session is not transactional");
}
}
/**
* Notifies any blocking synchronous consumers
*/
private void notifyConsumers() {
synchronized (_receiveLock) {
_receiveLock.notifyAll();
}
}
/**
* Convert a message to its corresponding OpenJMS implementation
*
* @param message the message to convert
* @return the OpenJMS implementation of the message
* @throws JMSException for any error
*/
private Message convert(Message message) throws JMSException {
MessageConverter converter =
MessageConverterFactory.create(message);
return converter.convert(message);
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -