📄 mapplugin.java
字号:
/*------------------------------------------------------------------------------Name: MapPlugin.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.engine.msgstore.ram;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;// import org.xmlBlaster.util.plugin.I_Plugin;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queue.I_StoragePlugin;import org.xmlBlaster.util.queue.I_EntryFilter;import org.xmlBlaster.util.queue.I_StorageSizeListener;import org.xmlBlaster.util.queue.StorageSizeListenerHelper;import org.xmlBlaster.util.plugin.PluginInfo;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.qos.storage.QueuePropertyBase;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.engine.ServerScope;import org.xmlBlaster.engine.msgstore.I_Map;import org.xmlBlaster.engine.msgstore.I_MapEntry;import org.xmlBlaster.engine.msgstore.I_ChangeCallback;import org.xmlBlaster.util.queue.I_StorageProblemListener;import org.xmlBlaster.util.Timestamp;import java.io.OutputStream;import java.util.Properties;import java.util.TreeSet;import java.util.Set;import java.util.TreeMap;import java.util.Map;import java.util.Iterator; import java.util.Comparator;import java.util.ArrayList;/** * Mapping messages in RAM only. * Please refer to I_Map for Javadoc comments. * @see org.xmlBlaster.test.classtest.msgstore.I_MapTest * @author xmlBlaster@marcelruff.info */public final class MapPlugin implements I_Map, I_StoragePlugin{ private String ME = "MapPlugin"; private StorageId mapId; private Map storage; private Set lruSet; // We could use a LinkedList for LRU but random access is slow private QueuePropertyBase property; // org.xmlBlaster.util.qos.storage.MsgUnitStoreProperty; private Global glob; private static Logger log = Logger.getLogger(MapPlugin.class.getName()); private boolean isShutdown = false; private long sizeInBytes; private long persistentSizeInBytes; private long numOfPersistentEntries; private PluginInfo pluginInfo; private StorageSizeListenerHelper storageSizeListenerHelper; public MapPlugin() { this.storageSizeListenerHelper = new StorageSizeListenerHelper(this); } /** * Is called after the instance is created. * @see org.xmlBlaster.engine.msgstore.I_Map#initialize(StorageId, Object) */ public void initialize(StorageId uniqueMapId, Object userData) throws XmlBlasterException { setProperties(userData); // sets this.property this.mapId = uniqueMapId; if (mapId == null || glob == null) { Thread.dumpStack(); throw new IllegalArgumentException("Illegal arguments in MapPlugin constructor: mapId=" + mapId); } this.ME = "MapPlugin-" + mapId; if (this.property != null && this.glob.isServerSide() != this.property.getGlobal().isServerSide()) { log.severe("Incompatible globals this.property.getGlobal().isServerSide()=" + this.property.getGlobal().isServerSide() + ": " + Global.getStackTraceAsString(null)); } this.glob = this.property.getGlobal(); if (property.getMaxEntries() > Integer.MAX_VALUE) throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_CONFIGURATION_PLUGINFAILED, ME + ".initialize: The maximum number of messages is too big"); this.storage = new TreeMap(); this.lruSet = new TreeSet(new LruComparator()); this.isShutdown = false; } public void setProperties(Object userData) throws XmlBlasterException { if (userData == null) return; QueuePropertyBase newProp; try { newProp = (QueuePropertyBase)userData; } catch(Throwable e) { // this.log is still null throw XmlBlasterException.convert(this.glob, ME, "Can't configure RAM map, your properties are invalid", e); // glob is allowed to be null } this.property = newProp; } public Object getProperties() { return this.property; } public final StorageId getStorageId() { return mapId; } public boolean isTransient() { return true; } public void finalize() { if (log.isLoggable(Level.FINE)) log.fine("finalize - garbage collected"); } /** * @see I_Map#get(long) */ public I_MapEntry get(final long uniqueId) throws XmlBlasterException { final String key = ""+uniqueId; if (log.isLoggable(Level.FINER)) log.finer("get(" + key + ")"); synchronized (this.storage) { I_MapEntry entry = (I_MapEntry)this.storage.get(key); touch(entry); return entry; } } private void touch(I_MapEntry entry) { if (entry == null) return; if (entry.getSortTimestamp() != null) // assert: All entries in the set must have a sortTimestamp else the Comparator fails this.lruSet.remove(entry); entry.setSortTimestamp(new Timestamp()); this.lruSet.add(entry); } /** * @see I_Map#getAll() */ public I_MapEntry[] getAll(I_EntryFilter entryFilter) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("getAll()"); I_MapEntry[] entries = null; synchronized (this.storage) { // sortTimestamp remains as all entries are touched entries = (I_MapEntry[])this.storage.values().toArray(new I_MapEntry[this.storage.size()]); } if (entryFilter == null) return entries; ArrayList list = new ArrayList(); for (int i=0; i<entries.length; i++) { I_MapEntry entry = (I_MapEntry)entryFilter.intercept(entries[i], this); if (entry != null) list.add(entry); } return (I_MapEntry[])list.toArray(new I_MapEntry[list.size()]); } /** * @see I_Map#put(I_MapEntry) */ public int put(I_MapEntry entry) throws XmlBlasterException { if (entry == null) { throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "put(I_MapEntry="+entry+")"); } if (log.isLoggable(Level.FINER)) log.finer("put(" + entry.getLogId() + ")"); if (getNumOfEntries() > property.getMaxEntries()) { // Allow superload one time only String reason = "Message store overflow, number of entries=" + property.getMaxEntries() + ", try increasing '" + this.property.getPropName("maxEntries") + "' on client login."; if (log.isLoggable(Level.FINE)) log.fine(reason); throw new XmlBlasterException(glob, ErrorCode.RESOURCE_OVERFLOW_QUEUE_ENTRIES, ME, reason); } if (this.getNumOfBytes() > property.getMaxBytes()) { // Allow superload one time only String reason = "Message store overflow with " + this.getNumOfBytes() + " bytes, try increasing '" + this.property.getPropName("maxBytes") + "' on client login."; if (log.isLoggable(Level.FINE)) log.fine(reason); throw new XmlBlasterException(glob, ErrorCode.RESOURCE_OVERFLOW_QUEUE_ENTRIES, ME, reason); } String key = entry.getUniqueIdStr(); int ret = 0; synchronized (this.storage) { Object old = this.storage.put(key, entry); if (old != null) { // I_Map#put(I_MapEntry) spec says that the old entry is not updated! this.storage.put(key, old); touch((I_MapEntry)old); ret = 0; /* this.sizeInBytes -= old.getSizeInBytes(); if (old.isPersistent()) { this.numOfPersistentEntries--; this.persistentSizeInBytes -= old.getSizeInBytes(); } */ } else { entry.setSortTimestamp(new Timestamp()); this.lruSet.add(entry); entry.setStored(true); this.sizeInBytes += entry.getSizeInBytes(); if (entry.isPersistent()) { this.numOfPersistentEntries++; this.persistentSizeInBytes += entry.getSizeInBytes(); } ret = (old != null) ? 0 : 1; } } this.storageSizeListenerHelper.invokeStorageSizeListener(); return ret; } /** * @see I_Map#remove(I_MapEntry) */ public int remove(final I_MapEntry mapEntry) throws XmlBlasterException { if (mapEntry == null) return 0; if (log.isLoggable(Level.FINER)) log.finer("remove(" + mapEntry.getLogId() + ")"); try { synchronized (this.storage) { if (mapEntry.getSortTimestamp() != null) this.lruSet.remove(mapEntry); I_MapEntry entry = (I_MapEntry)this.storage.remove(mapEntry.getUniqueIdStr()); if (entry == null) return 0; if (entry.isPersistent()) { this.numOfPersistentEntries--; this.persistentSizeInBytes -= entry.getSizeInBytes(); } entry.setStored(false); this.sizeInBytes -= entry.getSizeInBytes(); return 1; } } finally { // since it should be outside the sync this.storageSizeListenerHelper.invokeStorageSizeListener(); } } /** * @see I_Map#remove(long) */ public int remove(final long uniqueId) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("remove(" + uniqueId + ")"); int ret = 0; synchronized (this.storage) { I_MapEntry mapEntry = get(uniqueId); if (mapEntry != null) ret = remove(mapEntry); } this.storageSizeListenerHelper.invokeStorageSizeListener(); return ret; } /** * @see I_Map#removeTransient() */ public int removeTransient() throws XmlBlasterException { throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "removeTransient() is not implemented"); } /** * @see I_Map#removeOldest() */ public I_MapEntry removeOldest() throws XmlBlasterException { try { synchronized (this.storage) { I_MapEntry oldest = null; Iterator it = this.lruSet.iterator(); if (it.hasNext()) { oldest = (I_MapEntry)it.next(); } if (oldest != null) { remove(oldest); return oldest; } if (this.storage.size() > 0) { log.severe("LRU set has no entries, we remove an arbitrary entry from RAM map");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -