⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 messagehelper.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
字号:
/*------------------------------------------------------------------------------Name:      MessageHelper.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.jms;import java.io.ByteArrayOutputStream;import java.io.IOException;import java.io.ObjectOutputStream;import java.util.Enumeration;import java.util.logging.Logger;import javax.jms.BytesMessage;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MapMessage;import javax.jms.Message;import javax.jms.ObjectMessage;import javax.jms.StreamMessage;import javax.jms.TextMessage;import org.xmlBlaster.client.key.PublishKey;import org.xmlBlaster.client.qos.PublishQos;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.util.StringPairTokenizer;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.PriorityEnum;import org.xmlBlaster.util.key.MsgKeyData;import org.xmlBlaster.util.key.MsgKeySaxFactory;import org.xmlBlaster.util.qos.MsgQosData;/** * MessageHelper. *  * Implementation details about how  * * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a> *  */public class MessageHelper {    private static Logger log = Logger.getLogger(MessageHelper.class.getName());      public MessageHelper() {   }      /**    * Currently only used in streaming messages to build the chunks.    * @param sourceMsg    * @param destMsg    * @throws JMSException    */   static void copyProperties(Message sourceMsg, Message destMsg) throws JMSException {      if (sourceMsg == null)         throw new XBException("internal", "Convert: The passed 'sourceMsg' attribute is null.");      if (destMsg == null)         throw new XBException("internal", "Convert: The passed 'destMsg' attribute is null.");      /*      if (sourceMsg instanceof XBMessage)         ((XBMessage)sourceMsg).giveFullAccess();      if (destMsg instanceof XBMessage)         ((XBMessage)destMsg).giveFullAccess();      */         try {         // first strip all qos properties which are specific         Destination dest = sourceMsg.getJMSDestination();         if (dest != null)            destMsg.setJMSDestination(dest);         String corrId = sourceMsg.getJMSCorrelationID();         if (corrId != null)            destMsg.setJMSCorrelationID(corrId);         int deliveryMode = sourceMsg.getJMSDeliveryMode();         destMsg.setJMSDeliveryMode(deliveryMode);         long expiration = sourceMsg.getJMSExpiration();         if (expiration > -1)            destMsg.setJMSExpiration(expiration);         int prio = sourceMsg.getJMSPriority();         if (prio > -1)            destMsg.setJMSPriority(prio);         String mimeType = sourceMsg.getJMSType(); // is this correct ?         if (mimeType != null)            destMsg.setJMSType(mimeType);         Enumeration eNum = sourceMsg.getPropertyNames();         while (eNum.hasMoreElements()) {            String propKey = (String)eNum.nextElement();            Object obj = sourceMsg.getObjectProperty(propKey);            destMsg.setObjectProperty(propKey, obj);         }      }      finally {         /*         if (sourceMsg instanceof XBMessage)            ((XBMessage)sourceMsg).resetAccess();         if (destMsg instanceof XBMessage)            ((XBMessage)destMsg).resetAccess();         */      }   }      /**    * This method converts a message (which could be from a foreign provider but also an own    * XBMessage) to a MsgUnit (internal XmlBlaster messages). This method is called internally    * by us as a provider. More specifically this is only used on the Message Producer send     * method. This method is made public only for testing purposes.     *     * @param global    * @param msg    * @return the created message unit.    * @throws JMSException    * @throws XmlBlasterException    * @throws IOException    */   public static MsgUnit convertToMessageUnit(Global global, Message msg) throws JMSException, XmlBlasterException, IOException {      if (global == null)         throw new XBException("internal", "Convert: The passed 'global' attribute is null.");      if (msg == null)         throw new XBException("internal", "Convert: The passed 'msg' attribute is null.");      // first strip all qos properties which are specific      Destination dest = msg.getJMSDestination();      if (!(dest instanceof XBDestination))         throw new XBException("client.configuration", "destination is not an xmlblaster destination. Do not know how to handle it");      PublishQos qos = null;      PublishKey key = null;      if (dest != null) {         XBDestination xbDest = (XBDestination)dest;         String ptpCompleteName = xbDest.getQueueName();         if (ptpCompleteName != null) {            String[] ptpNames = StringPairTokenizer.parseLine(ptpCompleteName, ',');            org.xmlBlaster.util.qos.address.Destination ptpDest = new org.xmlBlaster.util.qos.address.Destination(global, new SessionName(global, ptpNames[0]));            if (xbDest.getForceQueuing())               ptpDest.forceQueuing(true);            qos = new PublishQos(global, ptpDest);            for (int i=1; i < ptpNames.length; i++) {               org.xmlBlaster.util.qos.address.Destination additionalDest = new org.xmlBlaster.util.qos.address.Destination(global, new SessionName(global, ptpNames[i]));               qos.addDestination(additionalDest);            }         }         else {            qos = new PublishQos(global);         }         String tmp = xbDest.getTopicName();         if (tmp == null && ptpCompleteName == null)            throw new XBException("client.configuration", "A Topic must be specified in the message to be sent");         // determine if it is a complete key          if (tmp != null && tmp.indexOf('<') > -1) { // complete key            MsgKeySaxFactory keyFactory = new MsgKeySaxFactory(global);            key = new PublishKey(global, keyFactory.readObject(tmp));         }         else { // then it is a simple oid            if (tmp != null)               key = new PublishKey(global, tmp);            else               key = new PublishKey(global);         }      }      else         throw new XBException("client.configuration", "A destination must be specified in the message to be sent");      String corrId = msg.getJMSCorrelationID();      if (corrId != null)         qos.addClientProperty(XBMessage.addToKeyAndCheck(XBPropertyNames.JMS_CORRELATION_ID), corrId);      int deliveryMode = msg.getJMSDeliveryMode();      if (deliveryMode == DeliveryMode.PERSISTENT)         qos.setPersistent(true);      long expiration = msg.getJMSExpiration();      if (expiration > -1)         qos.setLifeTime(expiration);      int prio = msg.getJMSPriority();      if (prio > -1)         qos.setPriority(PriorityEnum.toPriorityEnum(prio));         String mimeType = msg.getJMSType(); // is this correct ?      if (mimeType != null)         key.setContentMime(mimeType);      Enumeration eNum = msg.getPropertyNames();      while (eNum.hasMoreElements()) {         String propKey = (String)eNum.nextElement();         Object obj = msg.getObjectProperty(propKey);         qos.addClientProperty(XBMessage.addToKeyAndCheck(propKey), obj);      }      byte[] content = null;      if (msg instanceof TextMessage) {         qos.addClientProperty(XBMessage.addToKeyAndCheck(XBPropertyNames.JMS_MESSAGE_TYPE), XBMessage.TEXT);         content = ((TextMessage)msg).getText().getBytes();      }      else if (msg instanceof StreamMessage) {         qos.addClientProperty(XBMessage.addToKeyAndCheck(XBPropertyNames.JMS_MESSAGE_TYPE), XBMessage.STREAM);         StreamMessage streamMsg = (StreamMessage)msg;         if (streamMsg instanceof XBStreamMessage) {            long length = ((XBStreamMessage)streamMsg).getBodyLength();            if (length >= Integer.MAX_VALUE)               throw new XBException("feature.missing", "Handling of big message not implemented");            content = new byte[(int)length];            streamMsg.readBytes(content);         }         else            throw new XBException("feature.missing", "Handling of non XBStreamMessage types not implemented");      }      else if (msg instanceof BytesMessage) {         qos.addClientProperty(XBMessage.addToKeyAndCheck(XBPropertyNames.JMS_MESSAGE_TYPE), XBMessage.BYTES);         BytesMessage bytesMsg = (BytesMessage)msg;         long length = bytesMsg.getBodyLength();         if (length >= Integer.MAX_VALUE)            throw new XBException("feature.missing", "Handling of big message not implemented");         content = new byte[(int)length];         bytesMsg.readBytes(content);      }      else if (msg instanceof ObjectMessage) {         qos.addClientProperty(XBMessage.addToKeyAndCheck(XBPropertyNames.JMS_MESSAGE_TYPE), XBMessage.OBJECT);         ObjectMessage objMsg = (ObjectMessage)msg;         ByteArrayOutputStream baos = new ByteArrayOutputStream();         ObjectOutputStream oos = new ObjectOutputStream(baos);         Object object = objMsg.getObject();         oos.writeObject(object);         content = baos.toByteArray();         oos.close();      }      else if(msg instanceof MapMessage) { // TODO implement this         throw new XBException("feature.missing", "MapMessage is not implemented");      }      else {         throw new XBException("feature.missing", "unknown message type '" + msg.getClass().getName() + "'");      }      return new MsgUnit(key, content, qos);   }   /**    * Puts the extra header properties into the qos    */   /*   private void exportExtraHeader() {      if (this.qos != null) {          Enumeration enumer = this.extraHeader.keys();         while (enumer.hasMoreElements()) {            String key = (String)enumer.nextElement();            Object value = this.extraHeader.get(key);            this.qos.addClientProperty(XBPropertyNames.JMS_HEADER_PREFIX + key, ClientProperty.getPropertyType(value), (String)value);         }      }   }   */   /**    * Used in the message consumer receive method.    */   public static XBMessage convertFromMsgUnit(XBSession session, String sender, MsgUnit msgUnit) throws JMSException, XmlBlasterException, IOException {      MsgQosData qosData = (MsgQosData)msgUnit.getQosData();      MsgKeyData keyData = (MsgKeyData)msgUnit.getKeyData();      byte[] content = msgUnit.getContent();      return convertFromMsgUnit(session, sender, keyData, content, qosData);   }       /**    * Used in the message consumer update method.     * @param session    * @param sender    * @param keyData    * @param content    * @param qosData    * @return    * @throws JMSException    * @throws XmlBlasterException    * @throws IOException    */   public static XBMessage convertFromMsgUnit(XBSession session, String sender, MsgKeyData keyData, byte[] content, MsgQosData qosData) throws JMSException, XmlBlasterException, IOException {      XBMessage msg = null;      int type = qosData.getClientProperty(XBMessage.addToKeyAndCheck(XBPropertyNames.JMS_MESSAGE_TYPE), XBMessage.DEFAULT_TYPE);      switch (type) {         case XBMessage.TEXT : msg = new XBTextMessage(session, content); break;         case XBMessage.BYTES : msg = new XBBytesMessage(session, content); break;         case XBMessage.OBJECT : msg = new XBObjectMessage(session, content); break;         case XBMessage.MAP : msg = new XBMapMessage(session, content); break;         case XBMessage.STREAM : msg = new XBStreamMessage(session, content); break;         default : throw new XBException("feature.missing", "message type '" + type + "' is unknown to the XmlBlaster JMS Implementation");      }      String corrId = qosData.getClientProperty(XBMessage.addToKeyAndCheck(XBPropertyNames.JMS_CORRELATION_ID), (String)null);      if (corrId != null)         msg.setJMSCorrelationID(corrId);      if (qosData.isPersistent())         msg.setJMSDeliveryMode(DeliveryMode.PERSISTENT);      else         msg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);      org.xmlBlaster.util.qos.address.Destination[] destArr = qosData.getDestinationArr();      Destination dest = null;      if (destArr != null && destArr.length > 0) {         if (destArr.length > 1)            log.warning("there are more than one destinations defined. The current JMS Implementation only supports single PtP Destinations");         dest = new XBDestination(keyData.toXml(), destArr[0].getDestination().getAbsoluteName(), destArr[0].forceQueuing());      }      else         dest = new XBDestination(keyData.toXml(), null, false);      msg.setJMSDestination(dest);      long life = qosData.getLifeTime();      msg.setJMSExpiration(life);      String msgId = "ID:" + qosData.getRcvTimestamp().getTimestamp();      msg.setJMSMessageID(msgId);      msg.setJMSPriority(qosData.getPriority().getInt());      boolean redelivered = qosData.getClientProperty(XBMessage.addToKeyAndCheck(XBPropertyNames.JMS_REDELIVERED), false);      if (redelivered)         msg.setJMSRedelivered(true);      if (sender != null) {         // no force queuing (since I don't know better)         Destination senderDest = new XBDestination(null, sender, false);         msg.setJMSReplyTo(senderDest);      }      long timestamp = qosData.getClientProperty(XBMessage.addToKeyAndCheck(XBPropertyNames.JMS_TIMESTAMP), 0L);      if (timestamp != 0L)         msg.setJMSTimestamp(timestamp);      msg.setReadOnly(true);      msg.setPropertyReadOnly(true);      return msg;         }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -