📄 persistencecacheplugin.java
字号:
} /** * We set the cache props to the real props for RAM queue running under a cacheQueue */ private QueuePropertyBase createRamCopy(QueuePropertyBase queuePropertyBase) { QueuePropertyBase ramCopy = (QueuePropertyBase)queuePropertyBase.clone(); ramCopy.setMaxEntries(queuePropertyBase.getMaxEntriesCache()); ramCopy.setMaxBytes(queuePropertyBase.getMaxBytesCache()); return ramCopy; } /** * @see I_Map#setProperties(Object) */ public synchronized void setProperties(Object userData) throws XmlBlasterException { if (userData == null) return; if (log.isLoggable(Level.FINER)) log.finer("Entering setProperties()"); QueuePropertyBase newProp; try { newProp = (QueuePropertyBase)userData; } catch(Throwable e) { throw new XmlBlasterException(glob, ErrorCode.RESOURCE_CONFIGURATION, ME, "Can't configure queue, your properties are invalid", e); } /* Do we need to protect against shrinking? if (this.property != null && this.property.getMaxEntries() > newProp.getMaxEntries()) { log.warn(ME, "Reconfigure of a RamQueuePlugin - getMaxNumOfEntries from " + this.property.getMaxEntries() + " to " + newProp.getMaxEntries() + " is not supported, we ignore the new setting."); return; } */ this.property = newProp; this.transientStore.setProperties(createRamCopy((QueuePropertyBase)userData)); if (this.persistentStore != null) this.persistentStore.setProperties(userData); } /** * Access the current queue configuration */ public Object getProperties() { return this.property; } // JMX public String getPropertyStr() { return (this.property == null) ? "" : this.property.toXml(); } /** * All entries are stored into the transient queue. All persistent entries are * stored also in the persistent queue. The exceeding size in the transient * queue is calculated. If it is positive it means we need to swap. The * overflowing entries are taken from the ram queue. The volatile between * them are stored in the persistent storage (since the persistent ones have * been previously stored). * @see I_Map#put(I_MapEntry) */ public int put(I_MapEntry mapEntry) throws XmlBlasterException { if (mapEntry == null) throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "put(I_MapEntry="+mapEntry+")"); if (log.isLoggable(Level.FINER)) log.finer("put(" + mapEntry.getLogId() + ")"); int numPersistentPut = 0; int numTransientPut = 0; synchronized(this) { XmlBlasterException exceptionReturned = spaceLeftException(mapEntry, this); if (exceptionReturned != null) { if (log.isLoggable(Level.FINE)) log.fine(exceptionReturned.getMessage()); exceptionReturned.setLocation(ME+"-put("+mapEntry.getLogId()+")"); throw exceptionReturned; } // separate persistent from transient entries and store the persistents in persistence if (this.persistentStore != null && this.isConnected) { if (mapEntry.isPersistent()) { XmlBlasterException exceptionReturned2 = spaceLeftException(mapEntry, this.persistentStore); if (exceptionReturned2 != null) { if (log.isLoggable(Level.FINE)) log.fine(exceptionReturned2.getMessage()); exceptionReturned2.setLocation(ME+"-put("+mapEntry.getLogId()+")"); throw exceptionReturned2; } try { numPersistentPut = this.persistentStore.put(mapEntry); } catch (XmlBlasterException ex) { log.severe("put: an error occurred when writing to the persistent queue, the persistent entry " + mapEntry.getLogId() + " will temporarily be handled as transient. Is the DB up and running ? " + ex.getMessage() + "state " + this.toXml("")); } } } assureTransientSpace(mapEntry); numTransientPut = this.transientStore.put(mapEntry); } // sync(this) this.storageSizeListenerHelper.invokeStorageSizeListener(); if (numPersistentPut>0 || numTransientPut>0) { return 1; } // NOTE: It is possible that a persistent entry is not put to persistent storage // e.g. because of 'duplicate key' (entry existed already) and same with RAM queue // In this case the caller does get a 0 return 0; } /** * Swap an entry away to hard disk. * Call this method from synchronized code only. * @param mapEntry The new entry which needs space for itself. */ private void assureTransientSpace(I_MapEntry mapEntry) throws XmlBlasterException { while (!spaceLeft(mapEntry, this.transientStore)) { /* Protect against infinite looping */ if (this.transientStore == null || this.property == null || this.transientStore.getNumOfEntries() < 1) break; I_MapEntry oldest = this.transientStore.removeOldest(); if (oldest == null) { if (log.isLoggable(Level.FINE)) log.fine("The RAM queue is full, new entry '" + mapEntry.getUniqueId() + "' seems to be the first and only one, so we accept it"); break; } if (log.isLoggable(Level.FINER)) log.finer("Swapping '" + oldest.getLogId() + "' to HD ..."); try { if (!oldest.isPersistent()) { // if entry is marked as persistent it is already in persistentStore (see code above) // swap away the oldest cache entry to harddisk ... if (log.isLoggable(Level.FINE)) log.fine("Swapping '" + oldest.getLogId() + " size=" + oldest.getSizeInBytes() + "'. Exceeding size state after removing from transient before entering persistent: " + toXml("")); if (this.persistentStore == null) throw new XmlBlasterException(glob, ErrorCode.RESOURCE_DB_UNAVAILABLE, ME, "assureTransientSpace: no persistent queue configured, needed for swapping, entry " + mapEntry.getLogId() + " is not handled"); if (!this.isConnected) throw new XmlBlasterException(glob, ErrorCode.RESOURCE_DB_UNAVAILABLE, ME, "assureTransientSpace: The DB is currently disconnected, entry " + mapEntry.getLogId() + " is not handled"); if (spaceLeft(oldest, this.persistentStore)) { try { this.persistentStore.put(oldest); } catch (XmlBlasterException ex) { log.severe("assureTransientSpace: an error occured when writing to the persistent queue, transient entry " + oldest.getLogId() + " is not swapped, new entry '" + mapEntry.getLogId() + "' is rejected. Is the DB up and running ? " + ex.getMessage() + " state: " + toXml("")); throw ex; } } else throw new XmlBlasterException(glob, ErrorCode.RESOURCE_OVERFLOW_QUEUE_BYTES, ME, "assureTransientSpace: maximum size in bytes for the persistent queue exceeded when swapping, entry " + mapEntry.getLogId() + " not handled . State: " + toXml("")); } oldest.isSwapped(true); } catch(XmlBlasterException ex2) { this.transientStore.put(oldest); // undo on error throw ex2; // swapping failed, we won't accept the new entry } } } /** * Check is storage is big enough for entry * @param mapEntry may not be null * @return null There is space (otherwise the error text is returned) */ private XmlBlasterException spaceLeftException(I_MapEntry mapEntry, I_Map map) { if (map == null || this.property == null) { return new XmlBlasterException(glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "Storage framework is down, current settings are" + toXml("")); } if ((1 + map.getNumOfEntries()) > map.getMaxNumOfEntries()) return new XmlBlasterException(glob, ErrorCode.RESOURCE_OVERFLOW_QUEUE_ENTRIES, ME, "Queue overflow (number of entries), " + getNumOfEntries() + " entries are in queue, try increasing property '" + this.property.getPropName("maxEntries") + "' and '" + this.property.getPropName("maxEntriesCache") + "', current settings are" + toXml("")); if ((mapEntry.getSizeInBytes() + map.getNumOfBytes()) > map.getMaxNumOfBytes()) return new XmlBlasterException(glob, ErrorCode.RESOURCE_OVERFLOW_QUEUE_BYTES, ME, "Queue overflow, " + getMaxNumOfBytes() + " bytes are in queue, try increasing property '" + this.property.getPropName("maxBytes") + "' and '" + this.property.getPropName("maxBytesCache") + "', current settings are" + toXml("")); return null; } /** * Check is storage is big enough for entry * @param mapEntry may not be null * @return true Space enough */ private boolean spaceLeft(I_MapEntry mapEntry, I_Map map) { if (map == null || this.property == null) return false; if ((1 + map.getNumOfEntries()) > map.getMaxNumOfEntries()) return false; if ((mapEntry.getSizeInBytes() + map.getNumOfBytes()) > map.getMaxNumOfBytes()) return false; return true; } /** * Returns the unique ID of this queue */ public StorageId getStorageId() { return this.storageId; } /** * @see I_Map#get(long) */ public I_MapEntry get(final long uniqueId) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("Entering get(" + uniqueId + ")"); I_MapEntry mapEntry = null; synchronized(this) { mapEntry = this.transientStore.get(uniqueId); if (mapEntry != null) { mapEntry.isSwapped(false); return mapEntry; } if (this.persistentStore == null) return null; mapEntry = this.persistentStore.get(uniqueId); if (mapEntry == null) { return null; } // Ok, we need to swap transient entry back from persistence store if (!mapEntry.isPersistent()) { this.persistentStore.remove(mapEntry); } assureTransientSpace(mapEntry); this.transientStore.put(mapEntry); mapEntry.isSwapped(false); } // synchronized(this) return mapEntry; } /** * Access all entries. * <p /> * TODO !!!: This method should be changed to an iterator approach * as if we have swapped messages they won't fit to memory. * @see I_Map#getAll() */ public I_MapEntry[] getAll(final I_EntryFilter entryFilter) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("Entering getAll()"); synchronized (this) { //log.error(ME, "getAll() DEBUG ONLY: numSwapped=" + numSwapped() + " transient=" + this.transientStore.getNumOfEntries() + " persistentStore=" + this.persistentStore.getNumOfEntries()); java.util.Map map = new java.util.TreeMap(); // To suppress same entry twice and to be sorted (sorted is not yet specified to be necessary) I_MapEntry[] ramEntries = null; if (entryFilter != null) { ramEntries = this.transientStore.getAll(new I_EntryFilter() { public I_Entry intercept(I_Entry entry, I_Storage storage) { if (entry.isPersistent()) return null; // take the one from persistent store return entryFilter.intercept(entry, storage); } }); } else { ramEntries = this.transientStore.getAll(null); } for(int i=0; i<ramEntries.length; i++) { map.put(new Long(ramEntries[i].getUniqueId()), ramEntries[i]); } //log.error(ME, "getAll() DEBUG ONLY: map.size=" + map.size() + " numSwapped=" + numSwapped() + " transient=" + this.transientStore.getNumOfEntries()); if (this.persistentStore != null) { I_MapEntry[] persistEntries = this.persistentStore.getAll(entryFilter); if (persistEntries != null) { for(int i=0; i<persistEntries.length; i++) { if (persistEntries[i] == null) continue; map.put(new Long(persistEntries[i].getUniqueId()), persistEntries[i]); } } //log.error(ME, "getAll() DEBUG ONLY: map.size=" + map.size() + " numSwapped=" + numSwapped() + " persistentStore=" + this.persistentStore.getNumOfEntries()); } return (I_MapEntry[])map.values().toArray(new I_MapEntry[map.size()]);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -