⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 queuemessagesender.java

📁 JMS发送器的源码 JMS发送器的源码
💻 JAVA
字号:
/***************************************
 * @作者 罗金子
 * @创建时间 2007-11-8
 * @功能描述: 
 * @修改记录:
 * 		修改时间:YYYY-MM-DD    修改人:
 *		修改原因及内容:
 ***************************************/
package com.regaltec.rsas.listener;

import java.io.Serializable;
import java.util.Hashtable;
import java.util.Map;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import com.regaltec.rsas.common.config.ListenerConf;
import com.regaltec.rsas.common.log.RLogger;
import com.regaltec.rsas.listener.exception.UnableInitialException;

/***************************************
 * @描述 实现MessageDispatcher接口,采用JMS规范中
 * Queue(队列)模式的消息发送者。Queue模式即一
 * 条消息只有一个消费者。一个QueueMessageSender
 * 实例发送一个请求体Requisition。
 * @公司 广州瑞达通信技术有限公司
 * @作者 罗金子
 * @创建时间 2007-11-8
 ***************************************/
public class QueueMessageSender implements MessageDispatcher
{
   private static ListenerConf config = null;//监听器参数
   private static Map commandMap = null;//command和对应的服务名称的映射集合 
   
   private QueueConnection connection = null;//与jms管理器的连接
   private QueueSession session = null;//连接产生的会话
   private QueueSender sender = null;//消息发送器
   private Message message = null;//准备发送的消息
   private Serializable replyContent = null;//回复消息的内容

   /***************************************
    * 构造方法
    * @param command 请求体Requisition的属性command
    * @throws UnableInitialException
    ***************************************/
   public QueueMessageSender(String command) throws UnableInitialException
   {
      /**
       * 检查配置参数
       */
      if(QueueMessageSender.config == null || QueueMessageSender.commandMap == null)
      {
         RLogger.logError("运行参数未配置完毕!com.regaltec.rsas.listener.QueueMessageSender.QueueMessageSender/");
         throw new UnableInitialException();
      }
      
      /**
       * 生成初始化上下文
       */
      Context ctx;
      try
      {
         Hashtable env = new Hashtable();
         env.put(Context.INITIAL_CONTEXT_FACTORY, QueueMessageSender.config.getContextFactory());
         env.put(Context.PROVIDER_URL, QueueMessageSender.config.getProviderURL());
//         env.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
         ctx = new InitialContext(env);
      }
      catch (NamingException e)
      {
         RLogger.logError("InitialContext实例化失败!com.regaltec.rsas.listener.QueueMessageSender.QueueMessageSender/"
               + e.toString());
         throw new UnableInitialException();
      }

      /**
       * 生成jms通信对象
       */
      try
      {
         QueueConnectionFactory connectionFactory = (QueueConnectionFactory) ctx.lookup(QueueMessageSender.config.getConnectionFactory());
         this.connection = connectionFactory.createQueueConnection();
         this.session = this.connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
         this.sender = this.session.createSender((Queue) ctx.lookup("queue/" + this.getQueueName(command)));
         this.message = this.session.createObjectMessage();
      }
      catch (NamingException e)
      {
         RLogger.logError("找不到JMS连接工厂或者消息队列!请确认名称是否准确。com.regaltec.rsas.listener.QueueMessageSender.QueueMessageSender/"
               + e.toString());
         try
         {
            this.close();
         }
         catch (JMSException je)
         {
            RLogger.logError("关闭连接失败!com.regaltec.rsas.listener.QueueMessageSender.QueueMessageSender/" + je.toString());
         }
         throw new UnableInitialException();
      }
      catch (JMSException e)
      {
         RLogger.logError("不能创建JMS基础对象,请检查连接工厂、消息队列名称。com.regaltec.rsas.listener.QueueMessageSender.QueueMessageSender/"
               + e.toString());
         try
         {
            this.close();
         }
         catch (JMSException je)
         {
            RLogger.logError("关闭连接失败!com.regaltec.rsas.listener.QueueMessageSender.QueueMessageSender/" + je.toString());
         }
         throw new UnableInitialException();
      }
   }

   /***************************************
    * 配置QueueMessageSender
    * @param listenerConf 监听器参数
    * @param commandMap command和对应的服务名称的映射集合 
    ***************************************/
   public static void configure(ListenerConf listenerConf, Map commandMap)
   {
      QueueMessageSender.config = listenerConf;
      QueueMessageSender.commandMap = commandMap;
   }
   
   /***************************************
    * 获取目标队列名称
    * @param command 用户请求体Requisition的command属性
    * @return 目标队列的名称
    ***************************************/
   private String getQueueName(String command)
   {
      if(QueueMessageSender.commandMap == null)
      {
         return "";
      }

      if (QueueMessageSender.commandMap.containsKey(command))
      {
         return (String) QueueMessageSender.commandMap.get(command);
      }
      else
      {
         return "";
      }
   }

   /* *************************************
    * 
    * @see com.regaltec.rsas.listener.MessageDispatcher#setMessageHeader()
    ***************************************/
   public void setMessageHeader() throws JMSException
   {
      if (this.sender != null && QueueMessageSender.config != null)
      {
         this.sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
         this.sender.setDisableMessageID(true);
         this.sender.setDisableMessageTimestamp(true);
         this.sender.setTimeToLive(QueueMessageSender.config.getMessageLiveTime());
      }
   }

   /* *************************************
    * 
    * @see com.regaltec.rsas.listener.MessageDispatcher#setMessageBody(java.io.Serializable)
    * @param messageContent
    ***************************************/
   public void setMessageBody(Serializable messageContent) throws JMSException
   {
      if (this.message != null)
      {
         ((ObjectMessage) this.message).setObject(messageContent);
      }
   }

   /* *************************************
    *  
    * @see com.regaltec.rsas.listener.MessageDispatcher#sendMessage(boolean)
    * @param isResponseNeed
    ***************************************/
   public void sendMessage(boolean isResponseNeed) throws JMSException
   {
      if (isResponseNeed)
      {
         if (this.connection != null && this.session != null && this.sender != null && this.message != null)
         {
            Queue replyQueue = this.session.createTemporaryQueue();//回复队列
            this.message.setJMSReplyTo(replyQueue);
            this.sender.send(this.message);
            /**
             * 监听回复消息
             */
            QueueReceiver receiver = this.session.createReceiver(replyQueue);
            receiver.setMessageListener(this);
            this.connection.start();
         }
      }
      else
      {
         if (this.sender != null && this.message != null)
         {
            this.sender.send(this.message);
         }         
      }
   }

   /* *************************************
    *  
    * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
    * @param replyMessage
    ***************************************/
   public void onMessage(Message replyMessage)
   {
      try
      {
         if (replyMessage instanceof ObjectMessage)
         {
            this.replyContent = ((ObjectMessage) replyMessage).getObject();
         }
      }
      catch (JMSException e)
      {
         RLogger.logError("不能获得回复消息。com.regaltec.rsas.listener.QueueMessageSender.onMessage/" + e.toString());
      }
   }

   /* *************************************
    *  
    * @see com.regaltec.rsas.listener.MessageDispatcher#getReplyMessageContent()
    * @return
    ***************************************/
   public Serializable getReplyMessageContent()
   {
      return this.replyContent;
   }

   /* *************************************
    * 
    * @see com.regaltec.rsas.listener.MessageDispatcher#close()
    ***************************************/
   public void close() throws JMSException
   {
      if (this.sender != null)
      {
         this.sender.close();
      }
      if (this.session != null)
      {
         this.session.close();
      }
      if (this.connection != null)
      {
         this.connection.close();
      }
   }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -