⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 msgqueueentry.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
字号:
/*------------------------------------------------------------------------------Name:      MsgQueueEntry.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.util.queuemsg;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.PriorityEnum;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.def.MethodName;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queue.I_QueueEntry;/** * Base class to enter xmlBlaster method invocations (messages) into an ordered queue. * @author xmlBlaster@marcelruff.info * @author michele@laghi.eu */public abstract class MsgQueueEntry implements I_QueueEntry, Cloneable{   private final static String ME = "MsgQueueEntry";   protected transient Global glob;   private static Logger log = Logger.getLogger(MsgQueueEntry.class.getName());   // Three helpers to transport the return value back to the caller   protected transient boolean wantReturnObj = true;   protected transient Object returnObj;   protected transient MsgQueueEntry refToCloneOrigin;   private String logId;   /** The queue to which this entry belongs (set in the constructors) */   protected final StorageId storageId;   /** How often the entry was tried to send but failed */   protected int redeliverCounter = 0;   /** The unique creation timestamp (unique in a Global of a virtual machine) */   protected final Timestamp uniqueIdTimestamp;   /** The message priority, see Constants.java */   protected final PriorityEnum priority;   /** the flag telling if a message is persistent (opposite to transient) */   protected boolean persistent;   /** Which method we invoke, e.g. "update" or "publish" */   protected final String entryType;   private transient boolean stored = false;   //private transient boolean swapped = false;   /** The queue to which this entry belongs (set in the constructors) */   protected final String uniqueIdString;   /**    * @param methodName use methodName as entryType    */   public MsgQueueEntry(Global glob, MethodName methodName, PriorityEnum priority, StorageId storageId, boolean persistent) {      this(glob, methodName.toString(), priority, (Timestamp)null, storageId, persistent);   }   /**    * Creates a new queue entry object.     * @param priority The message priority    * @param storageId The queue i belong to    * @see org.xmlBlaster.util.Timestamp    */   public MsgQueueEntry(Global glob, String entryType, PriorityEnum priority, StorageId storageId, boolean persistent) {      this(glob, entryType, priority, (Timestamp)null, storageId, persistent);   }   /**    * This constructor is for internal creation from persistence only.     *    * @param timestamp The unique nano timestamp as from org.xmlBlaster.util.Timestamp or null to create one now    */   public MsgQueueEntry(Global glob, String entryType, PriorityEnum priority, Timestamp timestamp, StorageId storageId, boolean persistent) {      this.uniqueIdTimestamp = (timestamp == null) ? new Timestamp() : timestamp;      this.uniqueIdString = "" + this.uniqueIdTimestamp.getTimestamp();      if (entryType == null || priority == null || glob == null || storageId ==null) {         log.severe("Invalid constructor parameter");         Thread.dumpStack();         throw new IllegalArgumentException(ME + ": Invalid constructor parameter");      }      this.glob = glob;      this.entryType = entryType;      this.priority = priority;      this.storageId = storageId;      this.persistent = persistent;   }   public final void setGlobal(Global global) {      this.glob = global;   }   public void finalize() {      if (log.isLoggable(Level.FINE)) log.fine("finalize - garbage collect");   }   /**    * Enforced by I_QueueEntry    * @return The priority of this message as int (see PriorityEnum.java)    */   public int getPriority() {      return this.priority.getInt();   }   /**    * @return The priority of this message    */   public final PriorityEnum getPriorityEnum() {      return this.priority;   }   /**    * @return The persistent flag of this message    */   public boolean isPersistent() {      return this.persistent;   }   public void setPersistent(boolean persistent) {      this.persistent = persistent;   }   /**    * The unique creation timestamp (unique in a Global of a virtual machine)    * Enforced by I_QueueEntry    * @param nano seconds    */   public long getUniqueId() {      return this.uniqueIdTimestamp.getTimestamp();   }   /**    * The unique creation timestamp (unique in a Global of a virtual machine)    * @param nano seconds    */   public Long getUniqueIdLong() {      return this.uniqueIdTimestamp.getTimestampLong();   }   /**    * Flag which marks the entry as outdated    */   public abstract boolean isExpired();   /**    * Flag which marks the entry as destroyed, you should take it from queue and ignore/discard it    */   public abstract boolean isDestroyed();   /**    * @return If it is an internal message (oid starting with "_").     */   public abstract boolean isInternal();   /**    * @return The name of the sender (data source) or null    */   public abstract SessionName getSender();   /**    * @return The name of the receiver (data sink) or null    */   public abstract SessionName getReceiver();   /**    * @return The message key oid or null    */   public abstract String getKeyOid();   /**    * To which queue do i belong    */   public final StorageId getStorageId() {      return this.storageId;   }   /**    * Increment the counter if message delivery fails (exception during sending)    * We don't know if other side has processed it completely or not    */   public final void incrRedeliverCounter() {      this.redeliverCounter++;   }   /**    * How often we tried to redeliver the message.    * <p>    * Note: Depending on the derived class implementation this information    * is lost on server crash (the redeliver counter    * is not persistent). Only MsgQueueUpdateEntry persists it.    * </p>    * @return Number of failed tries    */   public final int getRedeliverCounter() {      return this.redeliverCounter;   }   /**    * Try to find out the approximate memory consumption of this message in RAM.    * <p />    * NOTE: The derived classes need to add their data amount to this value.    * @return the approximate size in bytes of this object which contributes to a QueueEntry memory consumption    */   public long getSizeInBytes() {      return 100; // a totally intuitive number for the object creation itself   }   /**    * Needed for sorting the queue.    * Comparing the longs directly is 20% faster than having a    * String compound key    * <br /><br />    * Sorts the messages    * <ol>    *   <li>Priority</li>    *   <li>Timestamp</li>    * </ol>    * <p>    * The sorting order is priority,timestamp:    * </p>    * <pre>    *   ->    5,100 - 5,98 - 5,50 - 9,3000 - 9,2500   ->    * </pre>    * <p>    * As 9 is highest priority it is the first to be taken out.<br />    * As we need to maintain the timely sequence and    * id is a timestamp in (more or less) nano seconds elapsed since 1970)    * the id 2500 (it is older) has precedence to the id 3000    * </p>    */   public final int compare(I_QueueEntry m2) {      //log.info(ME, "Entering compare A=" + getUniqueId() + " B=" + m2.getUniqueId());      int diff = m2.getPriority() - getPriority();      if (diff != 0) // The higher prio wins         return diff;      long dif = m2.getUniqueId() - getUniqueId();      if (dif < 0L)         return 1;  // The older message wins      else if (dif > 0L)         return -1;      return 0;      //System.out.println("Comparing " + getSortKey() + " : " + d2.getSortKey());      // return getSortKey().compareTo(d2.getSortKey());   }   /**    * Needed for sorting in queue    */   public final boolean equals(I_QueueEntry m2) {      return (getPriority() == m2.getPriority() && getUniqueId() == m2.getUniqueId());   }   /**    * Dump state of this object into XML.    * <br>    * @return XML state of MsgQueueEntry    */   public String toXml() {      return toXml((String)null);   }   /**    * Dump state of this object into XML.    * <br>    * @param extraOffset indenting of tags    * @return XML state of MsgQueueEntry    */   public String toXml(String extraOffset) {      StringBuffer sb = new StringBuffer(200);      String offset = "\n   ";      if (extraOffset != null) offset += extraOffset;      sb.append(offset).append("<MsgQueueEntry type='").append(entryType).append("'>");      sb.append(offset).append("   <priority>").append(getPriority()).append("</priority>");      sb.append(offset).append("   <persistent>").append(isPersistent()).append("</persistent>");      sb.append(offset).append("   <uniqueId>").append(getUniqueId()).append("</uniqueId>");      sb.append(offset).append("   <sender>").append(getSender()).append("</sender>");      sb.append(offset).append("   <receiver>").append(getReceiver()).append("</receiver>");      sb.append(offset).append("   <expired>").append(isExpired()).append("</expired>");      sb.append(offset).append("   <isInternal>").append(isInternal()).append("</isInternal>");      sb.append(offset).append("</MsgQueueEntry>\n");      return sb.toString();   }   /**    * Nice for logging    * @see #getLogId()    */   public String toString() {      return getLogId();      /*      StringBuffer sb = new StringBuffer(200);      sb.append("MsgQueueEntry sender=").append(getSender());      sb.append(" receiver=").append(getReceiver());      sb.append(" priority=").append(getPriority());      sb.append(" uniqueId=").append(getUniqueId());      sb.append(" expired=").append(isExpired());      return sb.toString();      */   }   /**    * @see I_QueueEntry.getEmbeddedObject()    */   //public Object getEmbeddedObject() {   //   return this.getMsgUnit();   //}   /**    * @return MethodName.xxx    */   public String getEmbeddedType() {      return entryType;   }   /**    * Notification if this entry is added to queue    * @see org.xmlBlaster.util.queue.I_Entry#added(StorageId)    */   public void added(StorageId storageId) {      log.info(getLogId() + " is added to queue: REFERENCE COUNTER IMPL MISSING");   }   /**    * Notification if this entry is removed from queue    * @see org.xmlBlaster.util.queue.I_Entry#removed(StorageId)    */   public void removed(StorageId storageId) {      log.info(getLogId() + " is removed from queue: REFERENCE COUNTER IMPL MISSING");   }   /**    * @return e.g. MethodName.PUBLISH    * @see #getEmbeddedType()    */   public MethodName getMethodName() {      return MethodName.toMethodName(this.entryType);   }   /**    * @return true if the dispatcher framework shall provide a return object    */   public boolean wantReturnObj() {      return this.wantReturnObj;   }   /**    * sets the 'wantReturnObj' flag to what you specify (overwrites the default for    * the implementing class).    * @param wantReturnObj    */   public void setWantReturnObject(boolean wantReturnObj) {      this.wantReturnObj = wantReturnObj;   }   /**    * If this instance is a clone we can remember who cloned us.     */   void setRefToCloneOrigin(MsgQueueEntry origEntry) {      this.refToCloneOrigin = origEntry;   }   /**    * @return returnObj The carried object used as return QoS in sync or async I_Queue.put() mode, can be null.    */   public Object getReturnObj() {      return this.returnObj;   }   /**    * Set the object to be carried as return value.     * NOTE: This can be used only once as the first call to this method    * destroys the reference to the clone original instance.    */   public void setReturnObj(Object returnObj) {      if (this.refToCloneOrigin != null) {         this.refToCloneOrigin.setReturnObj(returnObj);         this.refToCloneOrigin = null; // free reference so that GC can remove it if swapped      }      this.returnObj = returnObj;   }   /**    * Returns a shallow clone.     * Is done by DispatchManager.prepareMsgsFromQueue() so that it can later encrypt    * the message without touching the original    */   public Object clone() {      MsgQueueEntry entry = null;      try {         entry = (MsgQueueEntry)super.clone();         if (entry.wantReturnObj) {            entry.setRefToCloneOrigin(this);         }      }      catch(CloneNotSupportedException e) {         log.severe("Internal clone problem: " + e.toString());      }      return entry;   }   /**    * Return a human readable identifier for logging output.    * @return e.g. "callback:/node/heron/client/joe/2/17/HIGH/23455969/TheMessageOid"    */   public final String getLogId() {      if (this.logId == null) {         StringBuffer sb = new StringBuffer(80);         sb.append(getStorageId());         sb.append("/").append(priority);         //sb.append("/").append(priority.toString());         sb.append("/").append(getUniqueId());         sb.append("/").append(getKeyOid());         this.logId = sb.toString();      }      return this.logId;   }   /**    * @see org.xmlBlaster.util.queue.I_Entry#setStored(boolean)    */   final public void setStored(boolean stored) {      this.stored = stored;   }   /**    * @see org.xmlBlaster.util.queue.I_Entry#isStored()    */   final public boolean isStored() {      return this.stored;   }   /**    *     * @return null (always)    */   public MsgUnit getMsgUnit() throws XmlBlasterException {      return null;   }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -