⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 referenceentry.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
字号:
/*------------------------------------------------------------------------------Name:      ReferenceEntry.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.engine.queuemsg;import org.xmlBlaster.engine.ServerScope;import org.xmlBlaster.engine.MsgUnitWrapper;import org.xmlBlaster.util.key.MsgKeyData;import org.xmlBlaster.util.qos.MsgQosData;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.def.PriorityEnum;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queuemsg.MsgQueueEntry;import java.lang.ref.WeakReference;import java.util.logging.Level;import java.util.logging.Logger;/** * Wraps an publish() message into an entry for a sorted queue. * @author michele@laghi.eu * @author xmlBlaster@marcelruff.info */public class ReferenceEntry extends MsgQueueEntry{   private static Logger log = Logger.getLogger(ReferenceEntry.class.getName());   private static final long serialVersionUID = 1L;   private final String ME; // for logging   protected final transient ServerScope glob; // engine.Global   /** Weak reference on the MsgUnit with key/content/qos (raw struct) */   private transient WeakReference weakMsgUnitWrapper;   // The keyOid and the rcvTimestamp build the unique id in the msgUnitStore   protected String keyOid;   protected long msgUnitWrapperUniqueId;   protected SessionName receiver;   /*      If true we don't store the meat in every UpdateEntry but just the reference      and take care that the msgUnitStore referenceCounter on HD is always current   */   public static final boolean STRICT_REFERENCE_COUNTING = true;   /*      If true we are able to read the meat from database entries of older      xmlBlaster versions      Nice during transition to new behavior   */   public static final boolean STRICT_REFERENCE_COUNTING_COMPATIBLE = false;   /**    * A new message object is fed by method update().     * @param msgUnit The raw data, we keep a weak reference only on this data so it can be garbage collected    */   public ReferenceEntry(String ME, ServerScope glob_, String entryType, MsgUnitWrapper msgUnitWrapper,                         Timestamp timestamp, StorageId storageId, SessionName receiver) throws XmlBlasterException {      super(glob_, entryType,             (msgUnitWrapper==null) ? PriorityEnum.NORM_PRIORITY : msgUnitWrapper.getMsgQosData().getPriority(),            timestamp,            storageId,            (msgUnitWrapper==null) ? true : msgUnitWrapper.getMsgQosData().isPersistent()); // We may not use msgUnitWrapper.isPersistent() as is forced to transient in TopicHandler during initialization      this.glob = glob_;      this.ME = ME;      setMsgUnitWrapper(msgUnitWrapper);      //setSender(msgUnitWrapper.getMsgQosData().getSender());      setReceiver(receiver);      if (msgUnitWrapper != null)          super.wantReturnObj = msgUnitWrapper.getWantReturnObj();      else super.wantReturnObj = false;    }   /**    * A new message object is fed by method publish().     * @param msgUnit The raw data, we keep a weak reference only on this data so it can be garbage collected    */   public ReferenceEntry(String ME, ServerScope glob, String entryType, MsgUnitWrapper msgUnitWrapper, StorageId storageId) throws XmlBlasterException {      this(ME, glob, entryType, msgUnitWrapper, (Timestamp)null, storageId, (SessionName)null);   }   /**    * For persistence recovery    * @param msgUnitWrapperUniqueId The unique timestamp of the MsgUnitWrapper instance (need to lookup MsgUnitWrapper)    */   public ReferenceEntry(String ME, ServerScope glob, String entryType, PriorityEnum priority, StorageId storageId, Timestamp entryTimestamp,                        String keyOid, long msgUnitWrapperUniqueId, boolean persistent, SessionName receiver,                        String qos, String key, byte[] content) throws XmlBlasterException {      super(glob, entryType, priority, entryTimestamp, storageId, persistent);      this.glob = glob;      this.ME = ME;      this.keyOid = keyOid;      this.msgUnitWrapperUniqueId = msgUnitWrapperUniqueId;      setReceiver(receiver);      super.wantReturnObj = false;      /*      MsgUnitWrapper msgUnitWrapper = getMsgUnitWrapper();      if ( STRICT_REFERENCE_COUNTING_COMPATIBLE ) {         // We need to check it the original msgUnitWrapper still         // exists, if not we create one (used by callback queue, not by history queue).         if (msgUnitWrapper == null) {            if (qos != null || key != null) {               log.warn(ME, "Lookup of MsgUnitWrapper '" + msgUnitWrapperUniqueId + "' failed, we create a new instance");               if (this.glob.getRequestBroker() != null) {                  TopicHandler topicHandler = this.glob.getRequestBroker().getMessageHandlerFromOid(this.keyOid);                  if (topicHandler != null) {                     PublishQosServer publishQosServer = new PublishQosServer(glob, qos, true); // true marks from persistent store (prevents new timestamp)                     MsgKeyData msgKeyData = glob.getMsgKeyFactory().readObject(key);                     MsgUnit msgUnit = new MsgUnit(msgKeyData, content, publishQosServer.getData());                     msgUnitWrapper = new MsgUnitWrapper(glob, msgUnit,                                                topicHandler.getMsgUnitCache(),                                                1, 0, -1);                     // The topicHandler holds the real reference:                     msgUnitWrapper = topicHandler.addMsgUnitWrapper(msgUnitWrapper, storageId);                     // NOTE: The returned msgUnitWrapper is not always identical to the passed one                     // if two threads do this simultaneously, the topic handler sync this situation                     super.wantReturnObj = msgUnitWrapper.getWantReturnObj();                     this.weakMsgUnitWrapper = new WeakReference(msgUnitWrapper);                  }               }            }            else {               log.error(ME, "Can't recreate MsgUnitWrapper, got no information from persistency: " + toXml());            }         }         else {            // added() is not triggered when coming from persistency            //msgUnitWrapper.incrementReferenceCounter(1, storageId);         }      }      else {         if (msgUnitWrapper == null) {            log.warn(ME+"-"+getLogId(), "DEBUG ONLY: No 'meat' found in msgStore, we ignore this entry," +                     " this is possible after a server crash for messages which were not acknowldeged to the publisher during the crash." +                     " Usually the publisher will send it again");                        //log.error(ME, "No 'meat' found for MsgQueueEntry '" + getLogId() +            //              "' in msgStore: " + Global.getStackTraceAsString());            //            //throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME,             //          "No 'meat' found for MsgQueueEntry '" + getLogId() + "' in msgStore");         }      }      */   }   protected boolean isForceDestroy() {      return false;   }   public final ServerScope getGlobal() {      return this.glob;   }   /**    * The caller needs to synchronize over msgCache    *  @return the MsgUnitWrapper or null if not found    */   public MsgUnitWrapper getMsgUnitWrapper() {      Object referent = null;      if (this.weakMsgUnitWrapper != null) {         referent = this.weakMsgUnitWrapper.get();      }       if (referent == null || ((MsgUnitWrapper)referent).isSwapped()) {  // message was swapped away         this.weakMsgUnitWrapper = null;         referent = lookup();         if (referent == null) return null;         this.weakMsgUnitWrapper = new WeakReference(referent);      }      MsgUnitWrapper msgUnitWrapper = (MsgUnitWrapper)referent;      if (msgUnitWrapper == null) {          if (!isForceDestroy()) {            log.severe("No meat found but forceDestroy=false");         }      }      return msgUnitWrapper;   }   private void incrementReferenceCounter(int incr, StorageId storageId) {      try {         MsgUnitWrapper msgUnitWrapper = getMsgUnitWrapper();         if (msgUnitWrapper != null) {            msgUnitWrapper.incrementReferenceCounter(incr, storageId);         }         else { // Log situation:            boolean isHistory = (storageId == null) ? false : Constants.RELATING_HISTORY.equals(storageId.getPrefix());            String id = (storageId == null) ? "" : storageId.getId();            if (isForceDestroy()) {               if (log.isLoggable(Level.FINE)) log.fine("No meat found, incr=" + incr + " storageId=" + id + " isExpired=" + isExpired());            }            else if (isHistory) { // Is OK when expired history entries are cleared!               if (log.isLoggable(Level.FINE)) log.fine("No meat found, incr=" + incr + " storageId=" + id + " isExpired=" + isExpired());            }            else if (isExpired()) {               if (log.isLoggable(Level.FINE)) log.fine("No meat found, incr=" + incr + " storageId=" + id + " isExpired=" + isExpired());            }            else { // Analyse it               log.severe("No meat found, incr=" + incr + " storageId=" + id + " isExpired=" + isExpired());               Thread.dumpStack();            }         }      }      catch (Throwable ex) {         log.severe("incr="+incr+" to '" + storageId + "' raised an exception: " + ex.toString());      }   }   /**    * Notification if this entry is added to queue.     * It can be added to several queues simultaneously, the reference counter will be incremented each time    * @see org.xmlBlaster.util.queue.I_Entry#added(StorageId)    */   public void added(StorageId storageId) {      incrementReferenceCounter(1, storageId);   }   /**    * Notification if this entry is removed from queue    * @see org.xmlBlaster.util.queue.I_Entry#removed(StorageId)    */   public void removed(StorageId storageId) {      incrementReferenceCounter(-1, storageId);   }   /**    * @return true for EXPIRED messages    */   public boolean isExpired() {      MsgUnitWrapper msgUnitWrapper = getMsgUnitWrapper();      if (msgUnitWrapper == null)         return true;      return msgUnitWrapper.isExpired(); //getMsgQosData().isExpired();   }   /**    * @return true if no MsgUnitWrapper is found (the 'meat' is not available anymore)<br />    *         or for msgUnitWrapper in state=DESTROYED    */   public boolean isDestroyed() {      MsgUnitWrapper msgUnitWrapper = getMsgUnitWrapper();      if (msgUnitWrapper == null)         return true;      return msgUnitWrapper.isDestroyed();   }   public MsgQosData getMsgQosData() throws XmlBlasterException {      return (MsgQosData)getMsgUnit().getQosData();   }   /**    * Gets the message unit but is for read only (dirty read)     * @return null if expired->destroyed    * @throws XmlBlasterException    */   public MsgUnit getMsgUnitOrNull() {      MsgUnitWrapper msgUnitWrapper = getMsgUnitWrapper();      if (msgUnitWrapper == null) {         return null;      }      return msgUnitWrapper.getMsgUnit();   }   /**    * Gets the message unit but is for read only (dirty read)     * @return    * @throws XmlBlasterException if not found (e.g. forceDestroy and expired)    */   public MsgUnit getMsgUnit() throws XmlBlasterException {      MsgUnitWrapper msgUnitWrapper = getMsgUnitWrapper();      if (msgUnitWrapper == null) {         throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME, "Message " + getUniqueId() + " not found");      }      return msgUnitWrapper.getMsgUnit();   }   public void setMsgUnitWrapper(MsgUnitWrapper msgUnitWrapper) throws XmlBlasterException {      if (msgUnitWrapper == null)         throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "Given msgUnitWrapper is null");      this.weakMsgUnitWrapper = new WeakReference(msgUnitWrapper);      this.keyOid = msgUnitWrapper.getMsgUnit().getKeyData().getOid();      this.msgUnitWrapperUniqueId = msgUnitWrapper.getUniqueId();   }   public long getMsgUnitWrapperUniqueId() {      return this.msgUnitWrapperUniqueId;   }   public String getKeyOid() {      return this.keyOid;   }   public long getRcvTimestamp() {      return 0L;   }   public final MsgKeyData getMsgKeyData() throws XmlBlasterException {      return (MsgKeyData)getMsgUnit().getKeyData();   }   /**    * @return If it is an internal message (oid starting with "_").     */   public boolean isInternal() {      try {         return (getMsgKeyData().isInternal() || getMsgKeyData().isPluginInternal());      }      catch (XmlBlasterException e) {         return false;      }   }   /**    * TODO? make sender persistent?    * @return can be null    */   public final SessionName getSender() {      MsgUnit msgUnit = getMsgUnitOrNull();      if (msgUnit == null) return null;      return msgUnit.getQosData().getSender();   }   /**    * @return null    */   public final SessionName getReceiver() {      return this.receiver;   }   public final void setReceiver(SessionName receiver) {      this.receiver = receiver;   }   /** @return the MsgUnitWrapper or null if not found */   private MsgUnitWrapper lookup() {      try {         return this.glob.getTopicAccessor().lookupDirtyRead(this.keyOid, this.msgUnitWrapperUniqueId);      }      catch (XmlBlasterException e) {         log.warning("lookup failed: " + e.getMessage());         return null;      }   }   /**    * The embedded object for this implementing class is an Object[2]    */   public Object getEmbeddedObject() {      Object[] obj = { this.keyOid, new Long(this.msgUnitWrapperUniqueId) };      return obj;   }   public final void embeddedObjectToXml(java.io.OutputStream out, java.util.Properties props) throws java.io.IOException {      MsgUnit msgUnit = getMsgUnitOrNull();      if (msgUnit != null) // TODO: Messages with no meat can't be loaded again by ServerEntryFactory          msgUnit.toXml(out, props);   }      /**    * Returns a shallow clone    */   public Object clone() {      ReferenceEntry entry = null;      entry = (ReferenceEntry)super.clone();      return entry;   }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -