jmstemplate.java

来自「一个关于Spring框架的示例应用程序,简单使用,可以参考.」· Java 代码 · 共 966 行 · 第 1/3 页

JAVA
966
字号
			return session.createConsumer(destination, messageSelector, true);
		}
		else {
			return session.createConsumer(destination, messageSelector);
		}
	}

	private void commitIfNecessary(Session session) throws JMSException {
		try {
			session.commit();
		}
		catch (TransactionInProgressException ex) {
			// Ignore -> can only happen in case of a JTA transaction.
		}
		catch (javax.jms.IllegalStateException ex) {
			// Ignore -> can only happen in case of a JTA transaction.
		}
	}


	/**
	 * Execute the action specified by the given action object within a
	 * JMS Session. Generalized version of execute(SessionCallback),
	 * allowing to start the JMS Connection on the fly.
	 * <p>Use execute(SessionCallback) for the general case. Starting
	 * the JMS Connection is just necessary for receiving messages,
	 * which is preferably achieve through the <code>receive</code> methods.
	 * @param action callback object that exposes the session
	 * @return the result object from working with the session
	 * @throws JmsException if there is any problem
	 * @see #execute(SessionCallback)
	 * @see #receive
	 */
	public Object execute(SessionCallback action, boolean startConnection) throws JmsException {
		Connection con = null;
		Session session = null;
		try {
			Connection conToUse = null;
			Session sessionToUse = null;
			ConnectionHolder conHolder =
					(ConnectionHolder) TransactionSynchronizationManager.getResource(getConnectionFactory());
			if (conHolder != null) {
				conToUse = conHolder.getConnection();
				if (startConnection) {
					conToUse.start();
				}
				sessionToUse = conHolder.getSession();
			}
			else {
				con = createConnection();
				if (startConnection) {
					con.start();
				}
				session = createSession(con);
				conToUse = con;
				sessionToUse = session;
			}
			if (logger.isDebugEnabled()) {
				logger.debug("Executing callback on JMS session [" + sessionToUse +
						"] from connection [" + conToUse + "]");
			}
			return action.doInJms(sessionToUse);
		}
		catch (JMSException ex) {
			throw convertJmsAccessException(ex);
		}
		finally {
			JmsUtils.closeSession(session);
			JmsUtils.closeConnection(con);
		}
	}

	public Object execute(SessionCallback action) throws JmsException {
		return execute(action, false);
	}

	public Object execute(final ProducerCallback action) throws JmsException {
		return execute(new SessionCallback() {
			public Object doInJms(Session session) throws JMSException {
				MessageProducer producer = createProducer(session, null);
				return action.doInJms(session, producer);
			}
		});
	}


	public void send(MessageCreator messageCreator) throws JmsException {
		checkDefaultDestination();
		send(getDefaultDestination(), messageCreator);
	}

	public void send(final Destination destination, final MessageCreator messageCreator) throws JmsException {
		execute(new SessionCallback() {
			public Object doInJms(Session session) throws JMSException {
				doSend(session, destination, messageCreator);
				return null;
			}
		});
	}

	public void send(final String destinationName, final MessageCreator messageCreator) throws JmsException {
		execute(new SessionCallback() {
			public Object doInJms(Session session) throws JMSException {
				Destination destination = resolveDestinationName(session, destinationName);
				doSend(session, destination, messageCreator);
				return null;
			}
		});
	}

	protected void doSend(Session session, Destination destination, MessageCreator messageCreator)
			throws JMSException {
		MessageProducer producer = createProducer(session, destination);
		Message message = messageCreator.createMessage(session);
		if (logger.isDebugEnabled()) {
			logger.debug("Sending created message [" + message + "]");
		}
		doSend(producer, message);
		// Check commit - avoid commit call within a JTA transaction.
		if (session.getTransacted() && isSessionTransacted() &&
				!TransactionSynchronizationManager.hasResource(getConnectionFactory())) {
			// Transacted session created by this template -> commit.
			commitIfNecessary(session);
		}
	}

	protected void doSend(MessageProducer producer, Message message) throws JMSException {
		if (isExplicitQosEnabled()) {
			producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive());
		}
		else {
			producer.send(message);
		}
	}


	public void convertAndSend(Object message) throws JmsException {
		checkDefaultDestination();
		convertAndSend(getDefaultDestination(), message);
	}

	public void convertAndSend(Destination destination, final Object message) throws JmsException {
		checkMessageConverter();
		send(destination, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				return getMessageConverter().toMessage(message, session);
			}
		});
	}

	public void convertAndSend(String destinationName, final Object message) throws JmsException {
		checkMessageConverter();
		send(destinationName, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				return getMessageConverter().toMessage(message, session);
			}
		});
	}

	public void convertAndSend(Object message, MessagePostProcessor postProcessor) throws JmsException {
		checkDefaultDestination();
		convertAndSend(getDefaultDestination(), message, postProcessor);
	}

	public void convertAndSend(
			Destination destination, final Object message, final MessagePostProcessor postProcessor)
			throws JmsException {
		checkMessageConverter();
		send(destination, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				Message msg = getMessageConverter().toMessage(message, session);
				return postProcessor.postProcessMessage(msg);
			}
		});
	}

	public void convertAndSend(
			String destinationName, final Object message, final MessagePostProcessor postProcessor)
	    throws JmsException {
		checkMessageConverter();
		send(destinationName, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				Message msg = getMessageConverter().toMessage(message, session);
				return postProcessor.postProcessMessage(msg);
			}
		});
	}


	public Message receive() throws JmsException {
		checkDefaultDestination();
		return receive(getDefaultDestination());
	}

	public Message receive(final Destination destination) throws JmsException {
		return (Message) execute(new SessionCallback() {
			public Object doInJms(Session session) throws JMSException {
				return doReceive(session, destination);
			}
		}, true);
	}

	public Message receive(final String destinationName) throws JmsException {
		return (Message) execute(new SessionCallback() {
			public Object doInJms(Session session) throws JMSException {
				Destination destination = resolveDestinationName(session, destinationName);
				return doReceive(session, destination);
			}
		}, true);
	}

	public Message receiveSelected(String messageSelector) throws JmsException {
		checkDefaultDestination();
		return receiveSelected(getDefaultDestination(), messageSelector);
	}

	public Message receiveSelected(final Destination destination, final String messageSelector) throws JmsException {
		return (Message) execute(new SessionCallback() {
			public Object doInJms(Session session) throws JMSException {
				return doReceiveSelected(session, destination, messageSelector);
			}
		}, true);
	}

	public Message receiveSelected(final String destinationName, final String messageSelector) throws JmsException {
		return (Message) execute(new SessionCallback() {
			public Object doInJms(Session session) throws JMSException {
				Destination destination = resolveDestinationName(session, destinationName);
				return doReceiveSelected(session, destination, messageSelector);
			}
		}, true);
	}

	protected Message doReceive(Session session, Destination destination) throws JMSException {
		return doReceive(session, createConsumer(session, destination));
	}

	protected Message doReceiveSelected(Session session, Destination destination, String messageSelector)
			throws JMSException {
		return doReceive(session, createConsumer(session, destination, messageSelector));
	}

	protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException {
		try {
			// Use transaction timeout (if available).
			long timeout = getReceiveTimeout();
			ConnectionHolder conHolder =
					(ConnectionHolder) TransactionSynchronizationManager.getResource(getConnectionFactory());
			if (conHolder != null && conHolder.hasTimeout()) {
				timeout = conHolder.getTimeToLiveInMillis();
			}
			Message message = (timeout >= 0) ?
					consumer.receive(timeout) : consumer.receive();
			if (session.getTransacted()) {
				// Commit necessary - but avoid commit call within a JTA transaction.
				if (isSessionTransacted() && conHolder == null) {
					// Transacted session created by this template -> commit.
					commitIfNecessary(session);
				}
			}
			else if (isClientAcknowledge(session)) {
				// Manually acknowledge message, if any.
				if (message != null) {
					message.acknowledge();
				}
			}
			return message;
		}
		finally {
			JmsUtils.closeMessageConsumer(consumer);
		}
	}

	protected boolean isClientAcknowledge(Session session) throws JMSException {
		return (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE);
	}


	public Object receiveAndConvert() throws JmsException {
		checkMessageConverter();
		return doConvertFromMessage(receive());
	}

	public Object receiveAndConvert(Destination destination) throws JmsException {
		checkMessageConverter();
		return doConvertFromMessage(receive(destination));
	}

	public Object receiveAndConvert(String destinationName) throws JmsException {
		checkMessageConverter();
		return doConvertFromMessage(receive(destinationName));
	}

	public Object receiveSelectedAndConvert(String messageSelector) throws JmsException {
		checkMessageConverter();
		return doConvertFromMessage(receiveSelected(messageSelector));
	}

	public Object receiveSelectedAndConvert(Destination destination, String messageSelector) throws JmsException {
		checkMessageConverter();
		return doConvertFromMessage(receiveSelected(destination, messageSelector));
	}

	public Object receiveSelectedAndConvert(String destinationName, String messageSelector) throws JmsException {
		checkMessageConverter();
		return doConvertFromMessage(receiveSelected(destinationName, messageSelector));
	}

	protected Object doConvertFromMessage(Message message) {
		if (message != null) {
			try {
				return getMessageConverter().fromMessage(message);
			}
			catch (JMSException ex) {
				throw convertJmsAccessException(ex);
			}
		}
		return null;
	}

}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?