⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 msgunitwrapper.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*------------------------------------------------------------------------------Name:      MsgUnitWrapper.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.engine;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.qos.MsgQosData;import org.xmlBlaster.util.key.MsgKeyData;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.def.PriorityEnum;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.Timeout;import org.xmlBlaster.util.I_Timeout;import org.xmlBlaster.engine.queuemsg.ServerEntryFactory;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.engine.msgstore.I_Map;import org.xmlBlaster.engine.msgstore.I_MapEntry;import org.xmlBlaster.engine.msgstore.I_ChangeCallback;import org.xmlBlaster.engine.queuemsg.ReferenceEntry;/** * Wraps a publish() message into an entry for a persistence cache.  * <p> * There are two options to make this object persistent (measure on a 2GHz Intel Linux laptop with Postgres): * </p> * <p> * 1. QoS and Key are stored as XML ASCII strings, the content as byte[]<br /> * This variant takes about 50 microsec to serialize (toXml()) and 380 microsec to create the object again (SAX parse). * The size for an empty content is approx. 80 bytes for a medium sized key and QoS. * </p> * <p> * 2. The whole object is java.io.Serialized<br /> * This variant takes about 160 microsec to serialize and 750 microsec to deserialize. * </p> * <p> * So we have chosen the XML variant as it is faster, has no versioning problems and has smaller size * </p> * @author xmlBlaster@marcelruff.info * @see org.xmlBlaster.engine.queuemsg.ServerEntryFactory#main(String[]) */public final class MsgUnitWrapper implements I_MapEntry, I_Timeout, I_ChangeCallback{   private static final long serialVersionUID = -3883804885824516337L;   private transient final ServerScope glob;   private static Logger log = Logger.getLogger(MsgUnitWrapper.class.getName());   private transient int historyReferenceCounter; // if is in historyQueue, is swapped to persistence as well   private transient int referenceCounter;        // total number of references, is swapped to persistence as well   private transient final long uniqueId;   private transient final String uniqueIdStr;    // cache uniqueId as String   private transient I_Map ownerCache;   private transient final String embeddedType;   /** used to tell to the MsgQueueEntry if a return value is desidered */   private transient boolean wantReturnObj;   private transient Object returnObj;   /**    * This topic is destroyed after given timeout    * The timer is activated on state change to UNREFERENCED    * and removed on change to ALIVE    */   private transient Timeout destroyTimer;   private transient Timestamp timerKey = null;   private final static int ALIVE = 0;   private final static int PRE_EXPIRED = 4;   private final static int EXPIRED = 1;   private final static int DESTROYED = 2;   private transient int state = ALIVE;   private MsgUnit msgUnit;   private final long immutableSizeInBytes;   private boolean stored = false;   private transient boolean swapped = false;   private transient Timestamp sortTimestamp;   /**    * Testsuite    */   public MsgUnitWrapper(ServerScope glob, MsgUnit msgUnit, StorageId storageId) throws XmlBlasterException {      this(glob, msgUnit, (I_Map)null, storageId, 0, 0, (String)null, -1);   }   /**    * Used when message is created from TopicHandler.publish    */   public MsgUnitWrapper(ServerScope glob, MsgUnit msgUnit, I_Map ownerCache, int referenceCounter,                        int historyReferenceCounter, long sizeInBytes) throws XmlBlasterException {      this(glob, msgUnit, ownerCache, ownerCache.getStorageId(), referenceCounter, historyReferenceCounter, (String)null, sizeInBytes);   }   /**    * Used when message comes from persistence, the owning I_Map is unknown    */   public MsgUnitWrapper(ServerScope glob, MsgUnit msgUnit, StorageId storageId, int referenceCounter,                        int historyReferenceCounter, long sizeInBytes) throws XmlBlasterException {      this(glob, msgUnit, (I_Map)null, storageId, referenceCounter, historyReferenceCounter, (String)null, sizeInBytes);   }   /**    * Used when message comes from persistence, the owning I_Map is unknown    * @param embeddedType Allows you to control how to make this object persistent:<br />    *         ServerEntryFactory.ENTRY_TYPE_MSG_XML Dump strings as XML ASCII (which is smaller, faster, portable -> and therefor default)<br />    *         ServerEntryFactory.ENTRY_TYPE_MSG_SERIAL Dump object with java.io.Serializable    * @param sizeInBytes The estimated size of this entry in RAM, if -1 we estimate it for you    */   public MsgUnitWrapper(ServerScope glob, MsgUnit msgUnit, I_Map ownerCache, StorageId storageId, int referenceCounter,                         int historyReferenceCounter, String embeddedType, long sizeInBytes) throws XmlBlasterException {      this.glob = glob;      if (msgUnit == null) {         throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, "MsgUnitWrapper", "Invalid constructor parameter msgUnit==null");      }      this.msgUnit = msgUnit;      this.ownerCache = ownerCache;      this.referenceCounter = referenceCounter;      this.historyReferenceCounter = historyReferenceCounter;      this.embeddedType = (embeddedType == null) ? ServerEntryFactory.ENTRY_TYPE_MSG_XML : embeddedType;      //this.uniqueId = getKeyOid()+getMsgQosData().getRcvTimestamp();      if (getMsgQosData().getRcvTimestamp() == null) {         throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, "MsgUnitWrapper", "Missing timestamp, try to create publish QoS with PublishQosServer.java");      }      this.uniqueId = getMsgQosData().getRcvTimestamp().getTimestamp();      this.uniqueIdStr = ""+this.uniqueId;      // this.ME = "MsgUnitWrapper-" + getLogId();      this.destroyTimer = this.glob.getMessageTimer();  // holds weak references only         /*            Estimation in database (here postgres(oracle):            1. the columns               JdbcDriver.mapping[Oracle]=string=VARCHAR(128),longint=NUMBER(19),int=NUMBER(10),blob=BLOB,boolean=CHAR(1)                             Postg      Oracle               -----------------------------               dataid    int   8          19               nodeid    text  variable  128               queuename text  variable  128               prio      int   4          10               flag      text  1           1               durable   text  1           1               bytesize  int   8          19               -----------------------------               SUM                       306               +               blob      blob  variable   variable             2) blob = MsgUnit + Integer + Integer                     =  38 + this.qosData.size() + this.keyData.size() + this.content.length + 38 + 38                Postgres example:                  1077011447218000001     xmlBlaster_192_168_1_4_3412     msgUnitStore_xmlBlaster_192_168_1_4_3412myMessage       5       MSG_XML T       3833    ��\\000\\005ur\\000\\023[Ljava.lang.Object;\\220�X\\237\\020s)l\\002\\000\\000xp\\000\\000\\000\\005t\\002\\026\\012 <qos>\\012  <subscribable>false</subscribable>\\012  <destination forceQueuing='true'>/node/xmlBlaster_192_168_1_4_3412/client/Subscriber</destination>\\012  <sender>/node/xmlBlaster_192_168_1_4_3412/client/Publisher/1</sender>\\012  <priority>MAX</priority>\\012  <expiration lifeTime='360000' remainingLife='271805' forceDestroy='false'/>\\012  <rcvTimestamp nanos='1077011447218000001'/>\\012  <persistent/>\\012  <route>\\012   <node id='xmlBlaster_192_168_1_4_3412' stratum='0' timestamp='1077011447218000001' dirtyRead='false'/>\\012  </route>\\012  <isPublish/>\\012 </qos>t\\000.\\012 <key oid='myMessage' contentMime='txt/xml'/>ur\\000\\002[B��\\027�\\006\\010T�\\002\\000\\000xp\\000\\000\\0005I'm message B-376 of type myMessage sent in a PtP waysr\\000\\021java.lang.Integer\\022⠤�\\201\\2078\\002\\000\\001I\\000\\005valuexr\\000\\020java.lang.Number\\206�\\225\\035\\013\\224�\\213\\002\\000\\000xp\\000\\000\\000\\001sq\\000~\\000\\006\\000\\000\\000\\000                          => 382 + msgUnit.size()         In RAM:         // Estimated calculation of used memory by one MsgUnitWrapper instance         // = Object memory + payload         // Where following objects need to be created:         // 5 PropBoolean         // 1 PropLong         // 1 RcvTimestamp         // 1 MsgQosData         // 1 MsgKeyData         // 1 MsgUnit         // 1 MsgUnitWrapper      */      this.immutableSizeInBytes = (sizeInBytes >= 0) ? sizeInBytes : (3200 + this.msgUnit.size());      if (log.isLoggable(Level.FINE)) log.fine("Created new MsgUnitWrapper instance '" + this + "' " + ((this.ownerCache==null) ? " from persistence store" : ""));      //this.glob.getLog("core").info(ME, "Created message" + toXml());      if (this.historyReferenceCounter > this.referenceCounter) { // assert         log.severe("PANIC: historyReferenceCounter=" + this.historyReferenceCounter + " is bigger than referenceCounter=" + this.referenceCounter + toXml());      }   }      public final ServerScope getServerScope() {      return this.glob;   }   /**    * Cleanup timer, it is a weak reference on us therefor it is a 'nice to have'.     */   public void finalize() {      if (this.destroyTimer != null && this.timerKey != null) {         if (log.isLoggable(Level.FINE)) log.fine("finalize timerKey=" + this.timerKey);         this.destroyTimer.removeTimeoutListener(this.timerKey);      }   }   /**    * The cache sets it to true when the entry is swapped    * away.     * You should not write on a swapped away entry as those    * changes are lost.    * Enforced by I_Map    * @see I_Map#isSwapped()    */   public boolean isSwapped() {      return this.swapped;   }   /**    * Used by the cache implementation to mark entries which will    * be swapped to the persistent store.     * Enforced by I_Map    */   public void isSwapped(boolean swapped) {      this.swapped = swapped;   }   /**    * Invoked by ReferenceEntry.java and TopicHandler.java to support reference counting    * @param count The number of ref-counts to add/subtract    * @param storageId    * @return false if the entry is not pre destroyed, true if it is    *         pre destroyed. NOTE1: the caller must ensure to invoke toDestroyed() in cases    *         'true' is returned. NOTE2: The invocation toDestroyed() must be done    *         outside from any sync on the cache.    */   public void incrementReferenceCounter(int count, StorageId storageId) throws XmlBlasterException {      if (isSwapped()) {         if (log.isLoggable(Level.FINE))             log.fine("incrementReferenceCounter: unexpected swapped message");         return;      }      boolean isHistoryReference = (storageId != null && storageId.getPrefix().equals("history"));      synchronized (uniqueIdStr) { // use an arbitrary local attribute as monitor         if (isHistoryReference) {            this.historyReferenceCounter += count;         }         this.referenceCounter += count;      }      if (log.isLoggable(Level.FINE) && !isInternal()) {         log.fine("Reference count changed from " +             (this.referenceCounter-count) + " to " + this.referenceCounter +              ", new historyEntries=" + this.historyReferenceCounter + " this='" + this + "' storageId='" + storageId + "'");      }      if (this.referenceCounter > 0L) {         if (ReferenceEntry.STRICT_REFERENCE_COUNTING) {            if (count != 0) this.glob.getTopicAccessor().changeDirtyRead(this);         }      }      else {         if (!isDestroyed()) {            toDestroyed();         }      }   }   /**    * Internal use for TopicHandler    */   void setReferenceCounter(int count) {      synchronized (uniqueIdStr) { // use an arbitrary local attribute as monitor         this.referenceCounter += count;      }      if (log.isLoggable(Level.FINE) && !isInternal()) {         log.fine("Reference count changed from " +             (this.referenceCounter-count) + " to " + this.referenceCounter + ", this='" + this + "'");      }      if (this.referenceCounter <= 0L)         toDestroyed();   }   /**    * Callback invoked by I_Map.change inside the synchronization point.     * Enforced by I_ChangeCallback    * @param entry the entry to modify.    * @return I_MapEntry the modified entry.    * @throws XmlBlasterException if something has gone wrong and the change must be rolled back.    */                                public I_MapEntry changeEntry(I_MapEntry entry) throws XmlBlasterException {      if (log.isLoggable(Level.FINE)) log.fine("Entring changeEntry(), referecenceCounter=" + this.referenceCounter +                                ", historyReferenceCounter=" + this.historyReferenceCounter );      return this;   }   /**    * @return The number or references on myself (history, callback queue and plugin queues)    */   public int getReferenceCounter() {      return this.referenceCounter;   }   /**    * @return 1: Is referenced one time from history queue, else 0    */   public int getHistoryReferenceCounter() {      return this.historyReferenceCounter;   }   /** Returns a dummy only as sorting is not important in this context. */   public int getPriority() {      return PriorityEnum.NORM_PRIORITY.getInt();   }   public MsgQosData getMsgQosData() {      return (MsgQosData)this.msgUnit.getQosData();   }   public boolean isPersistent() {      return getMsgQosData().isPersistent();   }   public MsgKeyData getMsgKeyData() {      return (MsgKeyData)this.msgUnit.getKeyData();   }   public MsgUnit getMsgUnit() {      return this.msgUnit;   }   public final String getKeyOid() {      return getMsgKeyData().getOid();   }   public String getContentMime() {      return getMsgKeyData().getContentMime();   }   public String getContentMimeExtended() {      return getMsgKeyData().getContentMimeExtended();   }   public String getDomain() {      return getMsgKeyData().getDomain();   }   public void setMsgUnit(MsgUnit msg) {      this.msgUnit = msg;   }   public long getSizeInBytes() {      return this.immutableSizeInBytes;   }   /**    * The unique ID for this entry = getMsgQosData().getRcvTimestamp().getTimestamp()    */   public long getUniqueId() {      return this.uniqueId;   }   public String getUniqueIdStr() {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -