📄 queuemessagesender.java
字号:
/***************************************
* @作者 罗金子
* @创建时间 2007-11-8
* @功能描述:
* @修改记录:
* 修改时间:YYYY-MM-DD 修改人:
* 修改原因及内容:
***************************************/
package com.regaltec.rsas.listener;
import java.io.Serializable;
import java.util.Hashtable;
import java.util.Map;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import com.regaltec.rsas.common.config.ListenerConf;
import com.regaltec.rsas.common.log.RLogger;
import com.regaltec.rsas.listener.exception.UnableInitialException;
/***************************************
* @描述 实现MessageDispatcher接口,采用JMS规范中
* Queue(队列)模式的消息发送者。Queue模式即一
* 条消息只有一个消费者。一个QueueMessageSender
* 实例发送一个请求体Requisition。
* @公司 广州瑞达通信技术有限公司
* @作者 罗金子
* @创建时间 2007-11-8
***************************************/
public class QueueMessageSender implements MessageDispatcher
{
private static ListenerConf config = null;//监听器参数
private static Map commandMap = null;//command和对应的服务名称的映射集合
private QueueConnection connection = null;//与jms管理器的连接
private QueueSession session = null;//连接产生的会话
private QueueSender sender = null;//消息发送器
private Message message = null;//准备发送的消息
private Serializable replyContent = null;//回复消息的内容
/***************************************
* 构造方法
* @param command 请求体Requisition的属性command
* @throws UnableInitialException
***************************************/
public QueueMessageSender(String command) throws UnableInitialException
{
/**
* 检查配置参数
*/
if(QueueMessageSender.config == null || QueueMessageSender.commandMap == null)
{
RLogger.logError("运行参数未配置完毕!com.regaltec.rsas.listener.QueueMessageSender.QueueMessageSender/");
throw new UnableInitialException();
}
/**
* 生成初始化上下文
*/
Context ctx;
try
{
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, QueueMessageSender.config.getContextFactory());
env.put(Context.PROVIDER_URL, QueueMessageSender.config.getProviderURL());
// env.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
ctx = new InitialContext(env);
}
catch (NamingException e)
{
RLogger.logError("InitialContext实例化失败!com.regaltec.rsas.listener.QueueMessageSender.QueueMessageSender/"
+ e.toString());
throw new UnableInitialException();
}
/**
* 生成jms通信对象
*/
try
{
QueueConnectionFactory connectionFactory = (QueueConnectionFactory) ctx.lookup(QueueMessageSender.config.getConnectionFactory());
this.connection = connectionFactory.createQueueConnection();
this.session = this.connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
this.sender = this.session.createSender((Queue) ctx.lookup("queue/" + this.getQueueName(command)));
this.message = this.session.createObjectMessage();
}
catch (NamingException e)
{
RLogger.logError("找不到JMS连接工厂或者消息队列!请确认名称是否准确。com.regaltec.rsas.listener.QueueMessageSender.QueueMessageSender/"
+ e.toString());
try
{
this.close();
}
catch (JMSException je)
{
RLogger.logError("关闭连接失败!com.regaltec.rsas.listener.QueueMessageSender.QueueMessageSender/" + je.toString());
}
throw new UnableInitialException();
}
catch (JMSException e)
{
RLogger.logError("不能创建JMS基础对象,请检查连接工厂、消息队列名称。com.regaltec.rsas.listener.QueueMessageSender.QueueMessageSender/"
+ e.toString());
try
{
this.close();
}
catch (JMSException je)
{
RLogger.logError("关闭连接失败!com.regaltec.rsas.listener.QueueMessageSender.QueueMessageSender/" + je.toString());
}
throw new UnableInitialException();
}
}
/***************************************
* 配置QueueMessageSender
* @param listenerConf 监听器参数
* @param commandMap command和对应的服务名称的映射集合
***************************************/
public static void configure(ListenerConf listenerConf, Map commandMap)
{
QueueMessageSender.config = listenerConf;
QueueMessageSender.commandMap = commandMap;
}
/***************************************
* 获取目标队列名称
* @param command 用户请求体Requisition的command属性
* @return 目标队列的名称
***************************************/
private String getQueueName(String command)
{
if(QueueMessageSender.commandMap == null)
{
return "";
}
if (QueueMessageSender.commandMap.containsKey(command))
{
return (String) QueueMessageSender.commandMap.get(command);
}
else
{
return "";
}
}
/* *************************************
*
* @see com.regaltec.rsas.listener.MessageDispatcher#setMessageHeader()
***************************************/
public void setMessageHeader() throws JMSException
{
if (this.sender != null && QueueMessageSender.config != null)
{
this.sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
this.sender.setDisableMessageID(true);
this.sender.setDisableMessageTimestamp(true);
this.sender.setTimeToLive(QueueMessageSender.config.getMessageLiveTime());
}
}
/* *************************************
*
* @see com.regaltec.rsas.listener.MessageDispatcher#setMessageBody(java.io.Serializable)
* @param messageContent
***************************************/
public void setMessageBody(Serializable messageContent) throws JMSException
{
if (this.message != null)
{
((ObjectMessage) this.message).setObject(messageContent);
}
}
/* *************************************
*
* @see com.regaltec.rsas.listener.MessageDispatcher#sendMessage(boolean)
* @param isResponseNeed
***************************************/
public void sendMessage(boolean isResponseNeed) throws JMSException
{
if (isResponseNeed)
{
if (this.connection != null && this.session != null && this.sender != null && this.message != null)
{
Queue replyQueue = this.session.createTemporaryQueue();//回复队列
this.message.setJMSReplyTo(replyQueue);
this.sender.send(this.message);
/**
* 监听回复消息
*/
QueueReceiver receiver = this.session.createReceiver(replyQueue);
receiver.setMessageListener(this);
this.connection.start();
}
}
else
{
if (this.sender != null && this.message != null)
{
this.sender.send(this.message);
}
}
}
/* *************************************
*
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
* @param replyMessage
***************************************/
public void onMessage(Message replyMessage)
{
try
{
if (replyMessage instanceof ObjectMessage)
{
this.replyContent = ((ObjectMessage) replyMessage).getObject();
}
}
catch (JMSException e)
{
RLogger.logError("不能获得回复消息。com.regaltec.rsas.listener.QueueMessageSender.onMessage/" + e.toString());
}
}
/* *************************************
*
* @see com.regaltec.rsas.listener.MessageDispatcher#getReplyMessageContent()
* @return
***************************************/
public Serializable getReplyMessageContent()
{
return this.replyContent;
}
/* *************************************
*
* @see com.regaltec.rsas.listener.MessageDispatcher#close()
***************************************/
public void close() throws JMSException
{
if (this.sender != null)
{
this.sender.close();
}
if (this.session != null)
{
this.session.close();
}
if (this.connection != null)
{
this.connection.close();
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -