📄 jmstemplate.java
字号:
Message msg = getMessageConverter().toMessage(message, session);
return postProcessor.postProcessMessage(msg);
}
});
}
//-------------------------------------------------------------------------
// Convenience methods for receiving messages
//-------------------------------------------------------------------------
public Message receive() throws JmsException {
checkDefaultDestination();
if (getDefaultDestination() != null) {
return receive(getDefaultDestination());
}
else {
return receive(getDefaultDestinationName());
}
}
public Message receive(final Destination destination) throws JmsException {
return (Message) execute(new SessionCallback() {
public Object doInJms(Session session) throws JMSException {
return doReceive(session, destination, null);
}
}, true);
}
public Message receive(final String destinationName) throws JmsException {
return (Message) execute(new SessionCallback() {
public Object doInJms(Session session) throws JMSException {
Destination destination = resolveDestinationName(session, destinationName);
return doReceive(session, destination, null);
}
}, true);
}
public Message receiveSelected(String messageSelector) throws JmsException {
checkDefaultDestination();
if (getDefaultDestination() != null) {
return receiveSelected(getDefaultDestination(), messageSelector);
}
else {
return receiveSelected(getDefaultDestinationName(), messageSelector);
}
}
public Message receiveSelected(final Destination destination, final String messageSelector) throws JmsException {
return (Message) execute(new SessionCallback() {
public Object doInJms(Session session) throws JMSException {
return doReceive(session, destination, messageSelector);
}
}, true);
}
public Message receiveSelected(final String destinationName, final String messageSelector) throws JmsException {
return (Message) execute(new SessionCallback() {
public Object doInJms(Session session) throws JMSException {
Destination destination = resolveDestinationName(session, destinationName);
return doReceive(session, destination, messageSelector);
}
}, true);
}
/**
* Receive a JMS message.
* @param session the JMS Session to operate on
* @param destination the JMS Destination to receive from
* @param messageSelector the message selector for this consumer (can be <code>null</code>)
* @throws JMSException if thrown by JMS API methods
*/
protected Message doReceive(Session session, Destination destination, String messageSelector)
throws JMSException {
return doReceive(session, createConsumer(session, destination, messageSelector));
}
/**
* Actually receive a JMS message.
* @param session the JMS Session to operate on
* @param consumer the JMS MessageConsumer to send with
* @return the JMS Message received, or <code>null</code> if none
* @throws JMSException if thrown by JMS API methods
*/
protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException {
try {
// Use transaction timeout (if available).
long timeout = getReceiveTimeout();
JmsResourceHolder resourceHolder =
(JmsResourceHolder) TransactionSynchronizationManager.getResource(getConnectionFactory());
if (resourceHolder != null && resourceHolder.hasTimeout()) {
timeout = resourceHolder.getTimeToLiveInMillis();
}
Message message = (timeout >= 0) ?
consumer.receive(timeout) : consumer.receive();
if (session.getTransacted()) {
// Commit necessary - but avoid commit call within a JTA transaction.
if (isSessionLocallyTransacted(session)) {
// Transacted session created by this template -> commit.
JmsUtils.commitIfNecessary(session);
}
}
else if (isClientAcknowledge(session)) {
// Manually acknowledge message, if any.
if (message != null) {
message.acknowledge();
}
}
return message;
}
finally {
JmsUtils.closeMessageConsumer(consumer);
}
}
//-------------------------------------------------------------------------
// Convenience methods for receiving auto-converted messages
//-------------------------------------------------------------------------
public Object receiveAndConvert() throws JmsException {
checkMessageConverter();
return doConvertFromMessage(receive());
}
public Object receiveAndConvert(Destination destination) throws JmsException {
checkMessageConverter();
return doConvertFromMessage(receive(destination));
}
public Object receiveAndConvert(String destinationName) throws JmsException {
checkMessageConverter();
return doConvertFromMessage(receive(destinationName));
}
public Object receiveSelectedAndConvert(String messageSelector) throws JmsException {
checkMessageConverter();
return doConvertFromMessage(receiveSelected(messageSelector));
}
public Object receiveSelectedAndConvert(Destination destination, String messageSelector) throws JmsException {
checkMessageConverter();
return doConvertFromMessage(receiveSelected(destination, messageSelector));
}
public Object receiveSelectedAndConvert(String destinationName, String messageSelector) throws JmsException {
checkMessageConverter();
return doConvertFromMessage(receiveSelected(destinationName, messageSelector));
}
/**
* Extract the content from the given JMS message.
* @param message the JMS Message to convert (can be <code>null</code>)
* @return the content of the message, or <code>null</code> if none
*/
protected Object doConvertFromMessage(Message message) {
if (message != null) {
try {
return getMessageConverter().fromMessage(message);
}
catch (JMSException ex) {
throw convertJmsAccessException(ex);
}
}
return null;
}
//-------------------------------------------------------------------------
// JMS 1.1 factory methods, potentially overridden for JMS 1.0.2
//-------------------------------------------------------------------------
/**
* Fetch an appropriate Connection from the given JmsResourceHolder.
* <p>This implementation accepts any JMS 1.1 Connection.
* @param holder the JmsResourceHolder
* @return an appropriate Connection fetched from the holder,
* or <code>null</code> if none found
*/
protected Connection getConnection(JmsResourceHolder holder) {
return holder.getConnection();
}
/**
* Fetch an appropriate Session from the given JmsResourceHolder.
* <p>This implementation accepts any JMS 1.1 Session.
* @param holder the JmsResourceHolder
* @return an appropriate Session fetched from the holder,
* or <code>null</code> if none found
*/
protected Session getSession(JmsResourceHolder holder) {
return holder.getSession();
}
/**
* Check whether the given Session is locally transacted, that is, whether
* its transaction is managed by this listener container's Session handling
* and not by an external transaction coordinator.
* <p>Note: The Session's own transacted flag will already have been checked
* before. This method is about finding out whether the Session's transaction
* is local or externally coordinated.
* @param session the Session to check
* @return whether the given Session is locally transacted
* @see #isSessionTransacted()
* @see org.springframework.jms.connection.ConnectionFactoryUtils#isSessionTransactional
*/
protected boolean isSessionLocallyTransacted(Session session) {
return isSessionTransacted() &&
!ConnectionFactoryUtils.isSessionTransactional(session, getConnectionFactory());
}
/**
* Create a JMS MessageProducer for the given Session and Destination,
* configuring it to disable message ids and/or timestamps (if necessary).
* <p>Delegates to <code>doCreateProducer</code> for creation of the raw
* JMS MessageProducer, which needs to be specific to JMS 1.1 or 1.0.2.
* @param session the JMS Session to create a MessageProducer for
* @param destination the JMS Destination to create a MessageProducer for
* @return the new JMS MessageProducer
* @throws JMSException if thrown by JMS API methods
* @see #doCreateProducer
* @see #setMessageIdEnabled
* @see #setMessageTimestampEnabled
*/
protected MessageProducer createProducer(Session session, Destination destination) throws JMSException {
MessageProducer producer = doCreateProducer(session, destination);
if (!isMessageIdEnabled()) {
producer.setDisableMessageID(true);
}
if (!isMessageTimestampEnabled()) {
producer.setDisableMessageTimestamp(true);
}
return producer;
}
/**
* Create a raw JMS MessageProducer for the given Session and Destination.
* <p>This implementation uses JMS 1.1 API.
* @param session the JMS Session to create a MessageProducer for
* @param destination the JMS Destination to create a MessageProducer for
* @return the new JMS MessageProducer
* @throws JMSException if thrown by JMS API methods
*/
protected MessageProducer doCreateProducer(Session session, Destination destination) throws JMSException {
return session.createProducer(destination);
}
/**
* Create a JMS MessageConsumer for the given Session and Destination.
* <p>This implementation uses JMS 1.1 API.
* @param session the JMS Session to create a MessageConsumer for
* @param destination the JMS Destination to create a MessageConsumer for
* @param messageSelector the message selector for this consumer (can be <code>null</code>)
* @return the new JMS MessageConsumer
* @throws JMSException if thrown by JMS API methods
*/
protected MessageConsumer createConsumer(Session session, Destination destination, String messageSelector)
throws JMSException {
// Only pass in the NoLocal flag in case of a Topic:
// Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException
// in case of the NoLocal flag being specified for a Queue.
if (isPubSubDomain()) {
return session.createConsumer(destination, messageSelector, isPubSubNoLocal());
}
else {
return session.createConsumer(destination, messageSelector);
}
}
/**
* ResourceFactory implementation that delegates to this template's protected callback methods.
*/
private class JmsTemplateResourceFactory implements ConnectionFactoryUtils.ResourceFactory {
public Connection getConnection(JmsResourceHolder holder) {
return JmsTemplate.this.getConnection(holder);
}
public Session getSession(JmsResourceHolder holder) {
return JmsTemplate.this.getSession(holder);
}
public Connection createConnection() throws JMSException {
return JmsTemplate.this.createConnection();
}
public Session createSession(Connection con) throws JMSException {
return JmsTemplate.this.createSession(con);
}
public boolean isSynchedLocalTransactionAllowed() {
return JmsTemplate.this.isSessionTransacted();
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -