📄 msgunitwrapper.java
字号:
/*------------------------------------------------------------------------------Name: MsgUnitWrapper.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.engine;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.qos.MsgQosData;import org.xmlBlaster.util.key.MsgKeyData;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.def.PriorityEnum;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.Timeout;import org.xmlBlaster.util.I_Timeout;import org.xmlBlaster.engine.queuemsg.ServerEntryFactory;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.engine.msgstore.I_Map;import org.xmlBlaster.engine.msgstore.I_MapEntry;import org.xmlBlaster.engine.msgstore.I_ChangeCallback;import org.xmlBlaster.engine.queuemsg.ReferenceEntry;/** * Wraps a publish() message into an entry for a persistence cache. * <p> * There are two options to make this object persistent (measure on a 2GHz Intel Linux laptop with Postgres): * </p> * <p> * 1. QoS and Key are stored as XML ASCII strings, the content as byte[]<br /> * This variant takes about 50 microsec to serialize (toXml()) and 380 microsec to create the object again (SAX parse). * The size for an empty content is approx. 80 bytes for a medium sized key and QoS. * </p> * <p> * 2. The whole object is java.io.Serialized<br /> * This variant takes about 160 microsec to serialize and 750 microsec to deserialize. * </p> * <p> * So we have chosen the XML variant as it is faster, has no versioning problems and has smaller size * </p> * @author xmlBlaster@marcelruff.info * @see org.xmlBlaster.engine.queuemsg.ServerEntryFactory#main(String[]) */public final class MsgUnitWrapper implements I_MapEntry, I_Timeout, I_ChangeCallback{ private static final long serialVersionUID = -3883804885824516337L; private transient final ServerScope glob; private static Logger log = Logger.getLogger(MsgUnitWrapper.class.getName()); private transient int historyReferenceCounter; // if is in historyQueue, is swapped to persistence as well private transient int referenceCounter; // total number of references, is swapped to persistence as well private transient final long uniqueId; private transient final String uniqueIdStr; // cache uniqueId as String private transient I_Map ownerCache; private transient final String embeddedType; /** used to tell to the MsgQueueEntry if a return value is desidered */ private transient boolean wantReturnObj; private transient Object returnObj; /** * This topic is destroyed after given timeout * The timer is activated on state change to UNREFERENCED * and removed on change to ALIVE */ private transient Timeout destroyTimer; private transient Timestamp timerKey = null; private final static int ALIVE = 0; private final static int PRE_EXPIRED = 4; private final static int EXPIRED = 1; private final static int DESTROYED = 2; private transient int state = ALIVE; private MsgUnit msgUnit; private final long immutableSizeInBytes; private boolean stored = false; private transient boolean swapped = false; private transient Timestamp sortTimestamp; /** * Testsuite */ public MsgUnitWrapper(ServerScope glob, MsgUnit msgUnit, StorageId storageId) throws XmlBlasterException { this(glob, msgUnit, (I_Map)null, storageId, 0, 0, (String)null, -1); } /** * Used when message is created from TopicHandler.publish */ public MsgUnitWrapper(ServerScope glob, MsgUnit msgUnit, I_Map ownerCache, int referenceCounter, int historyReferenceCounter, long sizeInBytes) throws XmlBlasterException { this(glob, msgUnit, ownerCache, ownerCache.getStorageId(), referenceCounter, historyReferenceCounter, (String)null, sizeInBytes); } /** * Used when message comes from persistence, the owning I_Map is unknown */ public MsgUnitWrapper(ServerScope glob, MsgUnit msgUnit, StorageId storageId, int referenceCounter, int historyReferenceCounter, long sizeInBytes) throws XmlBlasterException { this(glob, msgUnit, (I_Map)null, storageId, referenceCounter, historyReferenceCounter, (String)null, sizeInBytes); } /** * Used when message comes from persistence, the owning I_Map is unknown * @param embeddedType Allows you to control how to make this object persistent:<br /> * ServerEntryFactory.ENTRY_TYPE_MSG_XML Dump strings as XML ASCII (which is smaller, faster, portable -> and therefor default)<br /> * ServerEntryFactory.ENTRY_TYPE_MSG_SERIAL Dump object with java.io.Serializable * @param sizeInBytes The estimated size of this entry in RAM, if -1 we estimate it for you */ public MsgUnitWrapper(ServerScope glob, MsgUnit msgUnit, I_Map ownerCache, StorageId storageId, int referenceCounter, int historyReferenceCounter, String embeddedType, long sizeInBytes) throws XmlBlasterException { this.glob = glob; if (msgUnit == null) { throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, "MsgUnitWrapper", "Invalid constructor parameter msgUnit==null"); } this.msgUnit = msgUnit; this.ownerCache = ownerCache; this.referenceCounter = referenceCounter; this.historyReferenceCounter = historyReferenceCounter; this.embeddedType = (embeddedType == null) ? ServerEntryFactory.ENTRY_TYPE_MSG_XML : embeddedType; //this.uniqueId = getKeyOid()+getMsgQosData().getRcvTimestamp(); if (getMsgQosData().getRcvTimestamp() == null) { throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, "MsgUnitWrapper", "Missing timestamp, try to create publish QoS with PublishQosServer.java"); } this.uniqueId = getMsgQosData().getRcvTimestamp().getTimestamp(); this.uniqueIdStr = ""+this.uniqueId; // this.ME = "MsgUnitWrapper-" + getLogId(); this.destroyTimer = this.glob.getMessageTimer(); // holds weak references only /* Estimation in database (here postgres(oracle): 1. the columns JdbcDriver.mapping[Oracle]=string=VARCHAR(128),longint=NUMBER(19),int=NUMBER(10),blob=BLOB,boolean=CHAR(1) Postg Oracle ----------------------------- dataid int 8 19 nodeid text variable 128 queuename text variable 128 prio int 4 10 flag text 1 1 durable text 1 1 bytesize int 8 19 ----------------------------- SUM 306 + blob blob variable variable 2) blob = MsgUnit + Integer + Integer = 38 + this.qosData.size() + this.keyData.size() + this.content.length + 38 + 38 Postgres example: 1077011447218000001 xmlBlaster_192_168_1_4_3412 msgUnitStore_xmlBlaster_192_168_1_4_3412myMessage 5 MSG_XML T 3833 ��\\000\\005ur\\000\\023[Ljava.lang.Object;\\220�X\\237\\020s)l\\002\\000\\000xp\\000\\000\\000\\005t\\002\\026\\012 <qos>\\012 <subscribable>false</subscribable>\\012 <destination forceQueuing='true'>/node/xmlBlaster_192_168_1_4_3412/client/Subscriber</destination>\\012 <sender>/node/xmlBlaster_192_168_1_4_3412/client/Publisher/1</sender>\\012 <priority>MAX</priority>\\012 <expiration lifeTime='360000' remainingLife='271805' forceDestroy='false'/>\\012 <rcvTimestamp nanos='1077011447218000001'/>\\012 <persistent/>\\012 <route>\\012 <node id='xmlBlaster_192_168_1_4_3412' stratum='0' timestamp='1077011447218000001' dirtyRead='false'/>\\012 </route>\\012 <isPublish/>\\012 </qos>t\\000.\\012 <key oid='myMessage' contentMime='txt/xml'/>ur\\000\\002[B��\\027�\\006\\010T�\\002\\000\\000xp\\000\\000\\0005I'm message B-376 of type myMessage sent in a PtP waysr\\000\\021java.lang.Integer\\022⠤�\\201\\2078\\002\\000\\001I\\000\\005valuexr\\000\\020java.lang.Number\\206�\\225\\035\\013\\224�\\213\\002\\000\\000xp\\000\\000\\000\\001sq\\000~\\000\\006\\000\\000\\000\\000 => 382 + msgUnit.size() In RAM: // Estimated calculation of used memory by one MsgUnitWrapper instance // = Object memory + payload // Where following objects need to be created: // 5 PropBoolean // 1 PropLong // 1 RcvTimestamp // 1 MsgQosData // 1 MsgKeyData // 1 MsgUnit // 1 MsgUnitWrapper */ this.immutableSizeInBytes = (sizeInBytes >= 0) ? sizeInBytes : (3200 + this.msgUnit.size()); if (log.isLoggable(Level.FINE)) log.fine("Created new MsgUnitWrapper instance '" + this + "' " + ((this.ownerCache==null) ? " from persistence store" : "")); //this.glob.getLog("core").info(ME, "Created message" + toXml()); if (this.historyReferenceCounter > this.referenceCounter) { // assert log.severe("PANIC: historyReferenceCounter=" + this.historyReferenceCounter + " is bigger than referenceCounter=" + this.referenceCounter + toXml()); } } public final ServerScope getServerScope() { return this.glob; } /** * Cleanup timer, it is a weak reference on us therefor it is a 'nice to have'. */ public void finalize() { if (this.destroyTimer != null && this.timerKey != null) { if (log.isLoggable(Level.FINE)) log.fine("finalize timerKey=" + this.timerKey); this.destroyTimer.removeTimeoutListener(this.timerKey); } } /** * The cache sets it to true when the entry is swapped * away. * You should not write on a swapped away entry as those * changes are lost. * Enforced by I_Map * @see I_Map#isSwapped() */ public boolean isSwapped() { return this.swapped; } /** * Used by the cache implementation to mark entries which will * be swapped to the persistent store. * Enforced by I_Map */ public void isSwapped(boolean swapped) { this.swapped = swapped; } /** * Invoked by ReferenceEntry.java and TopicHandler.java to support reference counting * @param count The number of ref-counts to add/subtract * @param storageId * @return false if the entry is not pre destroyed, true if it is * pre destroyed. NOTE1: the caller must ensure to invoke toDestroyed() in cases * 'true' is returned. NOTE2: The invocation toDestroyed() must be done * outside from any sync on the cache. */ public void incrementReferenceCounter(int count, StorageId storageId) throws XmlBlasterException { if (isSwapped()) { if (log.isLoggable(Level.FINE)) log.fine("incrementReferenceCounter: unexpected swapped message"); return; } boolean isHistoryReference = (storageId != null && storageId.getPrefix().equals("history")); synchronized (uniqueIdStr) { // use an arbitrary local attribute as monitor if (isHistoryReference) { this.historyReferenceCounter += count; } this.referenceCounter += count; } if (log.isLoggable(Level.FINE) && !isInternal()) { log.fine("Reference count changed from " + (this.referenceCounter-count) + " to " + this.referenceCounter + ", new historyEntries=" + this.historyReferenceCounter + " this='" + this + "' storageId='" + storageId + "'"); } if (this.referenceCounter > 0L) { if (ReferenceEntry.STRICT_REFERENCE_COUNTING) { if (count != 0) this.glob.getTopicAccessor().changeDirtyRead(this); } } else { if (!isDestroyed()) { toDestroyed(); } } } /** * Internal use for TopicHandler */ void setReferenceCounter(int count) { synchronized (uniqueIdStr) { // use an arbitrary local attribute as monitor this.referenceCounter += count; } if (log.isLoggable(Level.FINE) && !isInternal()) { log.fine("Reference count changed from " + (this.referenceCounter-count) + " to " + this.referenceCounter + ", this='" + this + "'"); } if (this.referenceCounter <= 0L) toDestroyed(); } /** * Callback invoked by I_Map.change inside the synchronization point. * Enforced by I_ChangeCallback * @param entry the entry to modify. * @return I_MapEntry the modified entry. * @throws XmlBlasterException if something has gone wrong and the change must be rolled back. */ public I_MapEntry changeEntry(I_MapEntry entry) throws XmlBlasterException { if (log.isLoggable(Level.FINE)) log.fine("Entring changeEntry(), referecenceCounter=" + this.referenceCounter + ", historyReferenceCounter=" + this.historyReferenceCounter ); return this; } /** * @return The number or references on myself (history, callback queue and plugin queues) */ public int getReferenceCounter() { return this.referenceCounter; } /** * @return 1: Is referenced one time from history queue, else 0 */ public int getHistoryReferenceCounter() { return this.historyReferenceCounter; } /** Returns a dummy only as sorting is not important in this context. */ public int getPriority() { return PriorityEnum.NORM_PRIORITY.getInt(); } public MsgQosData getMsgQosData() { return (MsgQosData)this.msgUnit.getQosData(); } public boolean isPersistent() { return getMsgQosData().isPersistent(); } public MsgKeyData getMsgKeyData() { return (MsgKeyData)this.msgUnit.getKeyData(); } public MsgUnit getMsgUnit() { return this.msgUnit; } public final String getKeyOid() { return getMsgKeyData().getOid(); } public String getContentMime() { return getMsgKeyData().getContentMime(); } public String getContentMimeExtended() { return getMsgKeyData().getContentMimeExtended(); } public String getDomain() { return getMsgKeyData().getDomain(); } public void setMsgUnit(MsgUnit msg) { this.msgUnit = msg; } public long getSizeInBytes() { return this.immutableSizeInBytes; } /** * The unique ID for this entry = getMsgQosData().getRcvTimestamp().getTimestamp() */ public long getUniqueId() { return this.uniqueId; } public String getUniqueIdStr() {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -