⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 serverentryfactory.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*------------------------------------------------------------------------------Name:      ServerEntryFactory.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE fileComment:   Implementation for the I_EntryFactory------------------------------------------------------------------------------*/package org.xmlBlaster.engine.queuemsg;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.engine.ServerScope;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.def.PriorityEnum;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queue.I_EntryFactory;import org.xmlBlaster.util.queue.I_Entry;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.queuemsg.DummyEntry;import org.xmlBlaster.util.key.MsgKeyData;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.engine.MsgUnitWrapper;import org.xmlBlaster.engine.qos.PublishQosServer; // for main onlyimport org.xmlBlaster.client.key.PublishKey;       // for main onlyimport java.io.ByteArrayInputStream;import java.io.InputStream;import java.io.ObjectInputStream;import java.io.ByteArrayOutputStream;import java.io.ObjectOutputStream;import java.io.IOException;/** * The implementation of the interface which can be used to convert an object * which implements the interface I_Entry to an Object and back. This is * useful for example if you want to store such entries in persistent storage * like a database or a file system. It might however be used even for other * purposes. * @author michele@laghi.eu * @author xmlBlaster@marcelruff.info */public class ServerEntryFactory implements I_EntryFactory{   private final static String ME = "ServerEntryFactory";   private ServerScope glob = null;   private static Logger log = Logger.getLogger(ServerEntryFactory.class.getName());   public static final String ENTRY_TYPE_MSG_SERIAL = "MSG_SER"; // msgUnit was serialized with java.io.Serializable   public static final String ENTRY_TYPE_MSG_XML = "MSG_XML"; // msgUnit is dumped as XML ASCII string   public static final String ENTRY_TYPE_MSG_RAW = "MSG_RAW"; // msgUnit is dumped as specified in the protocol.socket requirement (see C persistent queue)   public static final String ENTRY_TYPE_UPDATE_REF = "UPDATE_REF";   public static final String ENTRY_TYPE_HISTORY_REF = "HISTORY_REF";   public static final String ENTRY_TYPE_TOPIC_SERIAL = "TOPIC_SER";   public static final String ENTRY_TYPE_TOPIC_XML = "TOPIC_XML";   public static final String ENTRY_TYPE_SESSION = "SESSION";   public static final String ENTRY_TYPE_SUBSCRIBE = "SUBSCRIBE";   public static final String ENTRY_TYPE_DUMMY = DummyEntry.ENTRY_TYPE;   /**    * Parses the specified entry to a byte array (serializing).    */   public byte[] toBlob(I_Entry entry) throws XmlBlasterException {      // this way we don't need to make instanceof checks, so every      //implementation of I_Entry is responsible of returning an object      // it wants to store in the db//      return entry.getEmbeddedObject();      try {         Object obj = entry.getEmbeddedObject();         ByteArrayOutputStream baos = new ByteArrayOutputStream();         ObjectOutputStream objStream = new ObjectOutputStream(baos);         objStream.writeObject(obj);         return baos.toByteArray();      }      catch (IOException ex) {         log.severe("toBlob: " + ex.getMessage());         throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME, "toBlob()", ex);      }   }   /**    * Parses back the raw data to a I_Entry (deserializing)    * @param type see ENTRY_TYPE_MSG etc.    */   public I_Entry createEntry(int priority, long timestamp, String type,                              boolean persistent, long sizeInBytes, InputStream is, StorageId storageId)      throws XmlBlasterException {      if (is == null) {         String txt = "Entry with data prio='" + priority + "' timestamp='" + timestamp + "' type='" + type + "' persitent='" + persistent + "' size='" + sizeInBytes + "' storageId='" + storageId + "' has a null stream";         throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME, "createEntry-MsgQueueUpdateEntry: " + txt);      }      if (ENTRY_TYPE_UPDATE_REF.equalsIgnoreCase(type)) { // still used         try {            ObjectInputStream objStream = new ObjectInputStream(is);            Object[] obj = (Object[])objStream.readObject();            if (obj.length < 6) {               throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME,                  "Expected 6 entries in serialized object '" + type + "' but got " + obj.length + " for priority=" + priority + " timestamp=" + timestamp + ". Could be a version incompatibility.");            }            String keyOid = (String)obj[0];            Long msgUnitWrapperUniqueId = (Long)obj[1];            String receiverStr = (String)obj[2];            String subscriptionId = (String)obj[3];            String flag = (String)obj[4]; // was state in older release            Integer redeliverCount = (Integer)obj[5];            // We read the message content as well but don't parse it yet:            String qos = null;            String key = null;            byte[] content = null;            if ( ReferenceEntry.STRICT_REFERENCE_COUNTING_COMPATIBLE ) {               if (obj.length >= 9) {                  // deprecated, remove this code in future                  // see ENTRY_TYPE_MSG_XML !                  qos = (String)obj[6];                  key = (String)obj[7];                  content = (byte[])obj[8];                  //Integer referenceCounter = (Integer)obj[9];                  //Integer historyReferenceCounter = (Integer)obj[10];               }            }            if (log.isLoggable(Level.FINE)) log.fine("storageId=" + storageId + ": Read timestamp=" + timestamp + " topic keyOid=" + keyOid +                         " msgUnitWrapperUniqueId=" + msgUnitWrapperUniqueId + " receiverStr=" + receiverStr +                         " subscriptionId=" + subscriptionId + " flag=" + flag + " redeliverCount=" + redeliverCount);            SessionName receiver = new SessionName(glob, receiverStr);            Timestamp updateEntryTimestamp = new Timestamp(timestamp);            return new MsgQueueUpdateEntry(this.glob,                                           PriorityEnum.toPriorityEnum(priority), storageId, updateEntryTimestamp,                                           keyOid, msgUnitWrapperUniqueId.longValue(), persistent, sizeInBytes,                                           receiver, subscriptionId, flag, redeliverCount.intValue(),                                           qos, key, content);         }         catch (Exception ex) {            throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME, "createEntry-MsgQueueUpdateEntry", ex);         }      }      else if (ENTRY_TYPE_HISTORY_REF.equalsIgnoreCase(type)) { // still used         try {            ObjectInputStream objStream = new ObjectInputStream(is);            Object[] obj = (Object[])objStream.readObject();            if (obj.length < 2) {               throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME,                  "Expected 2 entries in serialized object '" + type + "' but got " + obj.length + " for priority=" + priority + " timestamp=" + timestamp + ". Could be a version incompatibility.");            }            String keyOid = (String)obj[0];            Long msgUnitWrapperUniqueId = (Long)obj[1];            Timestamp updateEntryTimestamp = new Timestamp(timestamp);            return new MsgQueueHistoryEntry(this.glob,                                           PriorityEnum.toPriorityEnum(priority), storageId, updateEntryTimestamp,                                           keyOid, msgUnitWrapperUniqueId.longValue(), persistent, sizeInBytes);         }         catch (Exception ex) {            throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME, "createEntry-MsgQueueHistoryEntry", ex);         }      }      else if (ENTRY_TYPE_MSG_XML.equalsIgnoreCase(type)) { // still used         try {            ObjectInputStream objStream = new ObjectInputStream(is);            Object[] obj = (Object[])objStream.readObject();            if (obj.length < 5) {               throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME,                         "Expected 5 entries in serialized object stream but got " + obj.length + " for priority=" + priority + " timestamp=" + timestamp);            }            String qos = (String)obj[0];            String key = (String)obj[1];            byte[] content = (byte[])obj[2];            Integer referenceCounter = (Integer)obj[3];            Integer historyReferenceCounter = (Integer)obj[4];            PublishQosServer publishQosServer = new PublishQosServer(glob, qos, true); // true marks from persistent store (prevents new timestamp)            MsgKeyData msgKeyData = glob.getMsgKeyFactory().readObject(key);            MsgUnit msgUnit = new MsgUnit(msgKeyData, content, publishQosServer.getData());            MsgUnitWrapper msgUnitWrapper = new MsgUnitWrapper(glob, msgUnit, storageId,                                      referenceCounter.intValue(), historyReferenceCounter.intValue(), sizeInBytes);            msgUnitWrapper.startExpiryTimer();            return msgUnitWrapper;         }         catch (Exception ex) {            throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME, "createEntry-MsgUnitWrapper", ex);         }      }      else if (ENTRY_TYPE_MSG_SERIAL.equalsIgnoreCase(type)) {  // probably unused (not found in my tests)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -