📄 cachequeueinterceptorplugin.java
字号:
this.property = newProp; this.transientQueue.setProperties(createRamCopy((QueuePropertyBase)userData)); if (this.persistentQueue != null) this.persistentQueue.setProperties(userData); } // JMX public String getPropertyStr() { return (this.property == null) ? "" : this.property.toXml(); } /** * Access the current queue configuration */ public Object getProperties() { return this.property; } public void setNotifiedAboutAddOrRemove(boolean notify) { this.notifiedAboutAddOrRemove = notify; } public boolean isNotifiedAboutAddOrRemove() { return this.notifiedAboutAddOrRemove; } /** * @see I_Queue#addPutListener(I_QueuePutListener) */ synchronized public void addPutListener(I_QueuePutListener l) { if (l == null) throw new IllegalArgumentException(ME + ": addPustListener(null) is not allowed"); if (this.putListener != null) throw new IllegalArgumentException(ME + ": addPustListener() failed, there is a listener registered already"); this.putListener = l; } /** * @see I_Queue#removePutListener(I_QueuePutListener) */ synchronized public void removePutListener(I_QueuePutListener l) { this.putListener = null; } /** * returns the persistent queue (null if no one defined) */ public I_Queue getPersistentQueue() { return this.persistentQueue; } /** * returns the transient queue (null if no one defined) */ public I_Queue getTransientQueue() { return this.transientQueue; } /** * Gets the references of the entries in the queue. Note that the data * which is referenced here may be changed by other threads. */ public long[] getEntryReferences() throws XmlBlasterException { // currently not implemented throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "getEntryReferences not implemented"); } /** * @see I_Queue#getEntries(I_EntryFilter) */ public ArrayList getEntries(I_EntryFilter entryFilter) throws XmlBlasterException { if (this.persistentQueue == null) return new ArrayList(); return this.persistentQueue.getEntries(entryFilter); } /** * @see I_Queue#put(I_QueueEntry, boolean) */ public void put(I_QueueEntry queueEntry, boolean ignorePutInterceptor) throws XmlBlasterException { I_QueueEntry[] entries = new I_QueueEntry[1]; entries[0] = queueEntry; put(entries, ignorePutInterceptor); } /** * All entries are stored into the transient queue. All persistent messages are * stored also in the persistent queue. The exceeding size in the transient * queue is calculated. If it is positive it means we need to swap. The * overflowing messages are taken from the ram queue. The volatile between * them are stored in the persistent storage (since the persistent ones have * been previously stored). * * @see I_Queue#put(I_QueueEntry[], boolean) */ public void put(I_QueueEntry[] queueEntries, boolean ignorePutInterceptor) throws XmlBlasterException { if (queueEntries == null || queueEntries.length < 1) return; if ((this.putListener != null) && (!ignorePutInterceptor)) { // Is an interceptor registered (and not bypassed) ? if (this.putListener.putPre(queueEntries) == false) return; // supress adding entries to queue (bypasses the queue) } synchronized (this) { checkEntriesAvailable(this, 0L, true, "first check in put"); // throws XmlBlasterException if no space left checkSpaceAvailable(this, 0L, true, "first check in put"); // throws XmlBlasterException if no space left long sizeOfEntries = 0L; // separate persistent from transient messages and store the persistents in persistence if (isPersistenceAvailable()) { ArrayList persistentsFromEntries = new ArrayList(); long sizeOfPersistents = 0L; long numOfPersistents = 0L; for (int i=0; i < queueEntries.length; i++) { if (queueEntries[i].isPersistent()) { persistentsFromEntries.add(queueEntries[i]); sizeOfPersistents += queueEntries[i].getSizeInBytes(); numOfPersistents++; } else sizeOfEntries += queueEntries[i].getSizeInBytes(); } sizeOfEntries += sizeOfPersistents; if (persistentsFromEntries.size() > 0) { try { this.persistentQueue.put((I_QueueEntry[])persistentsFromEntries.toArray(new I_QueueEntry[persistentsFromEntries.size()]), ignorePutInterceptor); } catch (XmlBlasterException ex) { log.severe(ME+"put: an error occured when writing to the persistent queue: " + persistentsFromEntries.size() + " persistent entries will temporarly be handled as transient. Is the DB up and running ? " + ex.getMessage() + "state " + this.toXml("")); // should an exception be rethrown here ? No because it should be possible to work even if no persistence available } catch (Throwable ex) { log.severe(ME+"put: an error occured when writing to the persistent queue: " + persistentsFromEntries.size() + " persistent entries will temporarly be handled as transient. Is the DB up and running ? " + ex.toString() + "state " + this.toXml("")); ex.printStackTrace(); } } } synchronized(this.peekSync) { // put all messages on transient queue this.transientQueue.put(queueEntries, ignorePutInterceptor); if (isPersistenceAvailable()) { // if no persistence available let RAM overflow one time // handle swapping (if any) // TODO swap only if bigger than max entries (not same as max entries) long exceedingSize = -checkSpaceAvailable(this.transientQueue, 0L, false, "second check in put"); long exceedingEntries = -checkEntriesAvailable(this.transientQueue, 0L, false, "second check in put"); if ( (exceedingSize >= 0L && this.persistentQueue.getMaxNumOfBytes() > this.transientQueue.getMaxNumOfBytes()) || (exceedingEntries >= 0L && this.persistentQueue.getMaxNumOfEntries() > this.transientQueue.getMaxNumOfEntries())) { if (log.isLoggable(Level.FINE)) log.fine(ME+"Swapping. Exceeding size (in bytes): " + exceedingSize + " exceeding entries: " + exceedingEntries + " state: " + toXml("")); ArrayList transients = null; try { ArrayList swaps = null; boolean needsLoading = false; if (this.transientQueue.getNumOfEntries() == 0) swaps = this.transientQueue.takeLowest((int)exceedingEntries, exceedingSize, null, true); else { swaps = this.transientQueue.takeLowest(queueEntries.length, sizeOfEntries, null, true); needsLoading = true; } if (log.isLoggable(Level.FINE)) { log.fine(ME+"Swapping: moving '" + swaps.size() + "' entries from transient queue to persistent queue: exceedingEntries='" + exceedingEntries + "' and exceedingSize='" + exceedingSize + "'"); } // get the transients transients = new ArrayList(); for (int i=0; i < swaps.size(); i++) { I_QueueEntry entry = (I_QueueEntry)swaps.get(i); if (!entry.isPersistent()) { transients.add(entry); } } if (transients.size() > 0) this.persistentQueue.put((I_QueueEntry[])transients.toArray(new I_QueueEntry[transients.size()]), ignorePutInterceptor); if (needsLoading) loadFromPersistence(); } catch (XmlBlasterException ex) { log.severe(ME+"put: an error occured when swapping: " + transients.size() + ". Is the DB up and running ? " + ex.getMessage() + " state: " + toXml("")); ex.printStackTrace(); } catch (Throwable ex) { log.severe(ME+"put: an error occured when swapping: " + transients.size() + ". Is the DB up and running ? " + ex.toString()); ex.printStackTrace(); } } } } // end of peekSync here ... } // end of synchronized here ... // these must be outside the synchronized ... if (this.notifiedAboutAddOrRemove) { for(int i=0; i<queueEntries.length; i++) try { queueEntries[i].added(this.queueId); } catch (Throwable ex) { log.severe(ME+"put: an error occured when notifying : " + ex.toString()); ex.printStackTrace(); } } this.storageSizeListenerHelper.invokeStorageSizeListener(); if ((this.putListener != null) && (!ignorePutInterceptor)) { this.putListener.putPost(queueEntries); } } // JMX public String getQueueName() { return getStorageId().getStrippedId(); } /** * Returns the unique ID of this queue */ public StorageId getStorageId() { return this.queueId; } /** * @see I_Queue#takeWithPriority(int,long,int,int) */ public ArrayList takeWithPriority(int numOfEntries, long numOfBytes, int minPriority, int maxPriority) throws XmlBlasterException { throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "takeWithPriority not implemented"); // if (this.notifiedAboutAddOrRemove) {} } private final boolean hasTransientsSwapped() { return this.persistentQueue.getNumOfPersistentEntries() != this.persistentQueue.getNumOfEntries(); } private final boolean isPersistenceAvailable() { return this.persistentQueue != null && this.isConnected; } private final boolean hasUncachedEntries() { return hasTransientsSwapped() || this.persistentQueue.getNumOfPersistentEntries() != this.transientQueue.getNumOfPersistentEntries(); } /** * Aware: peekLowest is not implemented!! * @see I_Queue#peekLowest(int, long, I_QueueEntry, boolean) */ public ArrayList peekLowest(int numOfEntries, long numOfBytes, I_QueueEntry limitEntry, boolean leaveOne) throws XmlBlasterException { throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "peekLowest is not implemented"); } /** * Aware: takeLowest for more than one entry is not implemented!! * @see I_Queue#takeLowest(int, long, I_QueueEntry, boolean) */ public ArrayList takeLowest(int numOfEntries, long numOfBytes, I_QueueEntry limitEntry, boolean leaveOne) throws XmlBlasterException { ArrayList list = null; boolean doNotify = false; try { synchronized(this) { boolean handlePersistents = isPersistenceAvailable() && hasUncachedEntries(); if ( handlePersistents ) { // swapping try { list = this.persistentQueue.takeLowest(numOfEntries, numOfBytes, limitEntry, leaveOne); doNotify = true; } catch (Throwable ex) { handlePersistents = false; log.severe(ME+"takeLowest: exception occured when taking the lowest entry from the persistent queue: " + ex.toString()); ex.printStackTrace(); } if (handlePersistents) { if (list.size() > 1) { throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "takeLowest for more than one entry is not implemented"); } long num = 0L; boolean[] hlp = this.transientQueue.removeRandom((I_Entry[])list.toArray(new I_Entry[list.size()])); for (int i=0; i < hlp.length; i++) if (hlp[i]) num++; if (num > 0L) { if (log.isLoggable(Level.FINE)) log.fine(ME+"Didn't expect message " + ((I_Entry)list.get(0)).getLogId() + " in transient store." + " If the database was temporary unavailable this is possible " + this.toXml("")); } } } // 'else' is no good here since it could have changed due to the exception ... if ( !handlePersistents) { list = this.transientQueue.takeLowest(numOfEntries, numOfBytes, limitEntry, leaveOne); doNotify = true; if (isPersistenceAvailable() && list.size() > 0 && this.persistentQueue.getNumOfEntries() > 0) { boolean durableFound = false; for(int i=0; i<list.size(); i++) { if (((I_Entry)list.get(i)).isPersistent()) { durableFound = true; break; } } if (durableFound) { this.persistentQueue.removeRandom((I_Entry[])list.toArray(new I_Entry[list.size()])); } } } } // end of syncrhonized } finally { if (doNotify) { if (this.notifiedAboutAddOrRemove) { for(int i=0; i<list.size(); i++) ((I_Entry)list.get(i)).removed(this.queueId); } } this.storageSizeListenerHelper.invokeStorageSizeListener(); } return list; } // JMX public String peekStr() throws Exception { try { I_QueueEntry entry = peek(); return (entry == null) ? "No entry found" : entry.getLogId() + " - " + entry.getSizeInBytes() + " bytes - " + ((entry.isPersistent()) ? "persistent" : "transient") + " prio=" + entry.getPriority() + " " + entry.getEmbeddedType(); // no toXml() available ?? } catch(XmlBlasterException e) { throw new Exception(e); } } // JMX public String[] peekEntries(int numOfEntries) throws Exception { if (numOfEntries == 0) return new String[] { "Please pass number of messages to peak" }; try { ArrayList list = peek(numOfEntries, -1L); if (list == null || list.size()<1) return new String[] { "No entry found" }; String[] ret = new String[list.size()]; for (int i=0; i<list.size(); i++) { I_QueueEntry entry = (I_QueueEntry)list.get(i); // no toXml() available ?? ret[i] = entry.getLogId() + " - " + entry.getSizeInBytes() + " bytes - " + ((entry.isPersistent()) ? "persistent" : "transient") + " prio=" + entry.getPriority() + " " + entry.getEmbeddedType(); // no toXml() available ?? } return ret; } catch (XmlBlasterException e) { throw new Exception(e); } } /** * @see I_Queue#peek() */ public I_QueueEntry peek() throws XmlBlasterException { synchronized(this.peekSync) { return this.transientQueue.peek(); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -