📄 jmsserversession.java
字号:
public void acknowledgeMessage(long clientId, String id)
throws JMSException {
_sentMessageCache.acknowledgeMessage(new MessageId(id), clientId);
}
/**
* Send the specified message to the server
*
* @param message the message to send
* @throws JMSException if the message can't be sent
*/
public void sendMessage(Message message) throws JMSException {
if (message == null) {
throw new JMSException("Message is null");
}
try {
// check the delivery mode of the message
checkDeliveryMode((MessageImpl) message);
// set the connection identity and then let the message meanager
// process it
((MessageImpl) message).setConnectionId(_connection.hashCode());
// if there is a global transaction currently in process then
// we must send the message to the resource manager, otherwise
// send it directly to the message manager
if (_xid != null) {
ResourceManager.instance().logPublishedMessage(_xid,
(MessageImpl) message);
} else {
MessageMgr.instance().add((MessageImpl) message);
_publishCount++;
}
} catch (JMSException exception) {
_log.error("Failed to process message", exception);
throw exception;
} catch (OutOfMemoryError exception) {
String msg =
"Failed to process message due to out-of-memory error";
_log.error(msg, exception);
throw new JMSException(msg);
} catch (Exception exception) {
String msg = "Failed to process message";
_log.error(msg, exception);
throw new JMSException(msg);
}
}
/**
* Send the specified messages to the server.
*
* @param messages the messages to send
* @throws JMSException if the messages can't be sent
*/
public void sendMessages(Vector messages) throws JMSException {
if (messages == null) {
throw new JMSException("No messages to send");
}
MessageImpl message = null;
while ((messages.size() > 0) &&
((message = (MessageImpl) messages.remove(0)) != null)) {
try {
// check the delivery mode of the message
checkDeliveryMode((MessageImpl) message);
// set the connection identity and then let the message manager
// process it
message.setConnectionId(_connection.hashCode());
// if there is a global transaction in progress then send the
// message to the resource manager, otherwise send it to the
// message manager
if (_xid != null) {
ResourceManager.instance().logPublishedMessage(_xid,
message);
} else {
MessageMgr.instance().add(message);
_publishCount++;
}
} catch (JMSException exception) {
_log.error("Failed to process message", exception);
throw exception;
} catch (OutOfMemoryError exception) {
String msg =
"Failed to process message due to out-of-memory error";
_log.error(msg, exception);
throw new JMSException(msg);
} catch (Exception exception) {
String msg = "Failed to process messages";
_log.error(msg, exception);
throw new JMSException(msg);
}
}
}
/**
* Return the next message for the specified client. The <code>wait</code>
* parameter indicates how long many milliseconds to wait for a message
* before returning. If <code>wait</code> is 0 then do not wait at all. If
* <code>wait</code> is -1 then wait indefinitely for the next message
*
* @param clientId the client identity
* @param wait number of ms to wait
* @return the next message or <code>null</code> if there is no message
* @throws JMSException if the message can't be received
*/
public Message receiveMessage(long clientId, long wait)
throws JMSException {
MessageImpl message = null;
ConsumerEndpoint consumer = getConsumerEndpoint(clientId);
if (consumer == null) {
throw new JMSException(
"Can't receive message: no consumer registered with "
+ "identifier " + clientId + " on session " + _sessionId);
}
// we have a valid consumer, now we need retrieve a handle.
MessageHandle handle = consumer.receiveMessage(wait);
if (handle != null) {
// if we get a non-null handle the retrieve the message,
// clone
MessageImpl orig = handle.getMessage();
if (orig != null) {
try {
message = (MessageImpl) orig.clone();
message.setJMSRedelivered(handle.getDelivered());
message.setClientId(handle.getClientId());
_consumeCount++;
} catch (Exception exception) {
_log.error(exception);
message = null;
}
}
}
// if we have a non-null message then add it to the sent message
// cache. Additionally, if we are part of a global transaction then
// we must also sent it to the ResourceManager for recovery.
if (handle != null) {
_sentMessageCache.process(handle);
if (_xid != null) {
try {
ResourceManager.instance().logReceivedMessage(
_xid, consumer.getId(), handle);
} catch (Exception exception) {
_log.error(exception);
JMSException jms_exception = new JMSException(
"Error in receiveMessage");
jms_exception.setLinkedException(exception);
throw jms_exception;
}
}
}
return message;
}
/**
* Return up to count messages from the endpoint with the specified
* client identity. The client must be a QueueBrowser.
*
* @param clientId the client identity
* @param count the maximum number of messages retrieve
* @return Message the next message or null
* @throws JMSException if the endpoint does not exist, or is not a
* {@link QueueBrowserEndpoint}
*/
public Vector receiveMessages(long clientId, int count)
throws JMSException {
ConsumerEndpoint consumer = getConsumerEndpoint(clientId);
if (consumer == null) {
throw new JMSException(
"Can't receive messages: no consumer registered with "
+ "identifier " + clientId + " on session " + _sessionId);
}
if (!(consumer instanceof QueueBrowserEndpoint)) {
throw new JMSException(
"Can't receive messages: consumer with identifier "
+ "identifier " + clientId + " is not a QueueBrowser");
}
// we have a valid consumer, now we need retrieve upto count
// handles
Vector handles = ((QueueBrowserEndpoint) consumer).receiveMessages(
count);
Vector messages = new Vector();
if (handles.size() > 0) {
// process the handles
int max = handles.size();
for (int index = 0; index < max; index++) {
MessageHandle handle = (MessageHandle) handles.elementAt(index);
MessageImpl orig = handle.getMessage();
MessageImpl message = null;
if (orig != null) {
try {
message = (MessageImpl) orig.clone();
message.setJMSRedelivered(handle.getDelivered());
message.setClientId(handle.getClientId());
messages.addElement(message);
} catch (Exception exception) {
_log.error(exception);
message = null;
}
}
}
}
return messages;
}
/**
* Create an amdinistered queue, through the message manager admin
* interface.
*
* @param queue administered queue to create
* @throws JMSException if the queue can't be created
*/
public void createQueue(JmsQueue queue) throws JMSException {
if (!DestinationManager.instance().createAdministeredDestination(
queue)) {
throw new JMSException("Failed to create queue: " +
queue.getName());
}
}
/**
* Create an administered topic, through the message manager admin
* interface.
*
* @param topic administered topic to create
* @throws JMSException if the topic can't be created
*/
public void createTopic(JmsTopic topic) throws JMSException {
if (!DestinationManager.instance().createAdministeredDestination(
topic)) {
throw new JMSException("Failed to create topic: " +
topic.getName());
}
}
/**
* Create a receiver endpoint for this session. A receiver is a message
* consumer specific to the queue message model. The receiver is
* associated with a queue.
* <p>
* You cannot create more than one receiver for the same destination
*
* @param queue the receiver destination
* @param consumerId the client session allocated identifier of the
* consumer
* @param selector the selector to filter messages. May be
* <code>null</code>
* @throws JMSException if the receiver can't be created
*/
public void createReceiver(JmsQueue queue, long clientId, String selector)
throws JMSException {
if (_log.isDebugEnabled()) {
_log.debug("createReceiver(queue=" + queue + ", clientId="
+ clientId + ", selector=" + selector
+ ") [sessionId=" + _sessionId + "]");
}
if (queue == null) {
throw new JMSException("Cannot create receiver for null queue");
}
// Retrieve the destination from the destination manager and use
// it to create the consumer
ConsumerEndpoint consumer =
ConsumerManager.instance().createConsumerEndpoint(this,
clientId, queue, selector);
consumer.setAckMode(_ackMode);
consumer.setConnectionId(_connection.hashCode());
consumer.setTransacted(_transacted);
// if the session is stopped then we should also stop the
// consumer, so that it doesn't deliver messages and then
// cache it for future reference.
consumer.setStopped(_stopped);
_consumers.put(Long.toString(clientId), consumer);
}
/**
* This is a no-op
*/
public void createSender(JmsQueue queue) throws JMSException {
}
/**
* Create a queue browser for this session. This allows clients to browse
* a queue without removing any messages.
* <p>
*
* You cannot create more than one queue browser for the same queue
* in a single session.
*
* @param queue queue to browse
* @param clientId the client identity
* @param selector message selector. This may be null
* @throws JMSException.
*/
public void createBrowser(JmsQueue queue, long clientId, String selector)
throws JMSException {
if (_log.isDebugEnabled()) {
_log.debug("createBrowser(queue=" + queue + ", clientId="
+ clientId + ", selector=" + selector
+ ") [sessionId=" + _sessionId + "]");
}
// check to see that we have a valid queue
if (queue == null) {
throw new JMSException("Cannot create browser for null queue");
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -