📄 cachequeueinterceptorplugin.java
字号:
} } } } this.storageSizeListenerHelper.invokeStorageSizeListener(); long ret = 0L; for (int i=0; i < tmp.length; i++) if (tmp[i]) ret++; return ret; } /** * @see I_Queue#removeTransient() */ public int removeTransient() throws XmlBlasterException { throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "removeTransient not implemented"); } /** * It returns the size of the queue. Note that this call will return the size * stored in cache, i.e. it will NOT make a call to the underlying DB. * * @see I_Queue#getNumOfEntries() */ synchronized public long getNumOfEntries() { long ret = 0L; if (isPersistenceAvailable()) { ret = this.persistentQueue.getNumOfEntries(); if (ret < 0L) return this.transientQueue.getNumOfEntries(); ret += this.transientQueue.getNumOfEntries() - this.transientQueue.getNumOfPersistentEntries(); return ret; } return this.transientQueue.getNumOfEntries(); } /** * It returns the size of persistent entries in the queue. Note that this call will return the size * stored in cache, i.e. it will NOT make a call to the underlying DB. * * @see I_Queue#getNumOfPersistentEntries() */ synchronized public long getNumOfPersistentEntries() { long ret = 0L; if (isPersistenceAvailable()) { ret = this.persistentQueue.getNumOfPersistentEntries(); if (ret < 0L) return this.transientQueue.getNumOfEntries(); return this.persistentQueue.getNumOfPersistentEntries(); } return this.transientQueue.getNumOfPersistentEntries(); } /** * @see I_Queue#getMaxNumOfEntries() */ public long getMaxNumOfEntries() { if (isPersistenceAvailable()) return this.persistentQueue.getMaxNumOfEntries(); return this.transientQueue.getMaxNumOfEntries(); } /** * @see I_Queue#getNumOfBytes() */ synchronized public long getNumOfBytes() { long ret = 0L; if (isPersistenceAvailable()) { ret = this.persistentQueue.getNumOfBytes(); if (ret < 0L) return this.transientQueue.getNumOfBytes(); ret += this.transientQueue.getNumOfBytes() - this.transientQueue.getNumOfPersistentBytes(); return ret; } return this.transientQueue.getNumOfBytes(); } /** * @see I_Queue#getNumOfPersistentBytes() */ synchronized public long getNumOfPersistentBytes() { long ret = 0L; if (isPersistenceAvailable()) { ret = this.persistentQueue.getNumOfPersistentBytes(); // if a persistent queue return -1L it means it was not able to get the correct size if (ret < 0L) return this.transientQueue.getNumOfPersistentBytes(); return ret; } return this.transientQueue.getNumOfPersistentBytes(); } /** * @see I_Queue#getMaxNumOfBytes() */ synchronized public long getMaxNumOfBytes() { if (isPersistenceAvailable()) return this.persistentQueue.getMaxNumOfBytes(); return this.transientQueue.getMaxNumOfBytes(); } /** * Updates the given message queue entry with a new value. Note that this * can be used if an entry with the unique id already exists. * ?? Does this really make sense here since we need to store history ?? * ?? Should we define a switch which can deactivate storage of history ?? */ public int update(I_QueueEntry queueEntry) throws XmlBlasterException { throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "update not implemented"); } /** * Clears everything and removes the queue (i.e. frees the associated table) * This method is not synchronized because of the callback in remove which must * be outside the synchronized. */ public long clear() { if (this.notifiedAboutAddOrRemove) { long numOfEntries = getNumOfEntries(); while (true) { long curr = getNumOfEntries(); if (curr <= 0) break; //if (curr > 10000) curr = 10000; is protected by maxEntriesCached try { long count = remove(curr, -1); // with callback to Entry for reference counting if (count == 0) break; } catch (Throwable e) { log.severe(ME+"Ignoring exception in clear(): " + e.toString()); } } return numOfEntries; } else { // Entries don't have reference counting: The quick way: long numOfEntries = getNumOfEntries(); try { if (this.persistentQueue != null) this.persistentQueue.clear(); } catch (Throwable e) { log.severe(ME+"Ignoring exception in clear(): " + e.toString()); } try { this.transientQueue.clear(); } catch (Throwable e) { log.severe(ME+"Ignoring exception in clear(): " + e.toString()); } return numOfEntries; } } /** * @see I_Queue#removeHead(I_QueueEntry) */ public long removeHead(I_QueueEntry toEntry) throws XmlBlasterException { throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "removeHead not implemented"); } /** * Shutdown the implementation, sync with data store */ public void shutdown() { synchronized (this) { if (log.isLoggable(Level.FINER)) log.finer(ME+"shutdown(isDown="+this.isDown+")"); if (this.isDown) { return; } this.isDown = true; this.glob.unregisterMBean(this.mbeanHandle); long numTransients = getNumOfEntries() - getNumOfPersistentEntries(); if (numTransients > 0) { log.warning(ME+"Shutting down cache queue which contains " + numTransients + " transient messages"); } try { this.transientQueue.shutdown(); } catch (Throwable ex) { log.severe(ME+"shutdown: exception when processing transient queue. Reason: " + ex.toString()); ex.printStackTrace(); } try { if (this.persistentQueue != null) this.persistentQueue.shutdown(); } catch (Throwable ex) { log.severe(ME+"shutdown: exception when processing transient queue. Reason: " + ex.toString()); ex.printStackTrace(); } try { // this.glob.getJdbcQueueManager(this.queueId).unregisterListener(this); if (this.persistentQueue != null) this.persistentQueue.unRegisterStorageProblemListener(this); } catch (Exception ex) { log.severe(ME+"could not unregister listener. Cause: " + ex.getMessage()); ex.printStackTrace(); } } this.storageSizeListenerHelper.invokeStorageSizeListener(); removeStorageSizeListener(null); glob.getQueuePluginManager().cleanup(this); } public boolean isShutdown() { return this.isDown; } /** * JMX help * @return a human readable usage help string */ public java.lang.String usage() { return "Manipulating the queue directly will most certainly destroy your data." +Global.getJmxUsageLinkInfo(this.getClass().getName(), null); } /** * @return A link for JMX usage */ public java.lang.String getUsageUrl() { return Global.getJavadocUrl(this.getClass().getName(), null); } /* dummy to have a copy/paste functionality in jconsole */ public void setUsageUrl(java.lang.String url) {} // JMX public final String toXml() { return toXml(""); } /** * @return Internal state as an XML ASCII string */ public final String toXml(String extraOffset) { StringBuffer sb = new StringBuffer(1000); if (extraOffset == null) extraOffset = ""; String offset = Constants.OFFSET + extraOffset; sb.append(offset).append("<CacheQueueInterceptorPlugin id='").append(getStorageId().getId()); sb.append("' type='").append(getType()); sb.append("' version='").append(getVersion()); sb.append("' maxEntriesCache='").append(this.transientQueue.getMaxNumOfEntries()); sb.append("' maxBytesCache='").append(this.transientQueue.getMaxNumOfBytes()); sb.append("' maxEntries='").append(getMaxNumOfEntries()); sb.append("' maxBytes='").append(getMaxNumOfBytes()); sb.append("' numOfEntries='").append(getNumOfEntries()); sb.append("' numOfBytes='").append(getNumOfBytes()); sb.append("'>"); sb.append(this.transientQueue.toXml(extraOffset+Constants.INDENT)); if (this.persistentQueue != null) sb.append(this.persistentQueue.toXml(extraOffset+Constants.INDENT)); sb.append(offset).append("</CacheQueueInterceptorPlugin>"); return sb.toString(); } /** * Enforced by I_Plugin * @see org.xmlBlaster.util.plugin.I_Plugin#init(org.xmlBlaster.util.Global, PluginInfo) */ public void init(org.xmlBlaster.util.Global glob, PluginInfo pluginInfo) { this.glob = glob; this.pluginInfo = pluginInfo;// this.pluginProperties = pluginInfo.getParameters(); } /** * Enforced by I_Plugin * @return "CACHE" */ public String getType() { return "CACHE"; } /** * Enforced by I_Plugin * @return "1.0" */ public String getVersion() { return "1.0"; } /** * Enforced by I_StoragePlugin * @return the pluginInfo object. */ public PluginInfo getInfo() { return this.pluginInfo; } /** * @see org.xmlBlaster.util.queue.I_StorageProblemNotifier#registerStorageProblemListener(I_StorageProblemListener) */ synchronized public boolean registerStorageProblemListener(I_StorageProblemListener listener) { if (this.persistentQueue == null) return false; return this.persistentQueue.registerStorageProblemListener(listener); } /** * @see org.xmlBlaster.util.queue.I_StorageProblemNotifier#unRegisterStorageProblemListener(I_StorageProblemListener) */ synchronized public boolean unRegisterStorageProblemListener(I_StorageProblemListener listener) { if (this.persistentQueue == null) return false; return this.persistentQueue.unRegisterStorageProblemListener(listener); } /** * @see I_Queue#addStorageSizeListener(I_StorageSizeListener) */ public void addStorageSizeListener(I_StorageSizeListener listener) { this.storageSizeListenerHelper.addStorageSizeListener(listener); } /** * @see I_Queue#removeStorageSizeListener(I_StorageSizeListener) */ public void removeStorageSizeListener(I_StorageSizeListener listener) { this.storageSizeListenerHelper.removeStorageSizeListener(listener); } /** * @see I_Queue#hasStorageSizeListener(I_StorageSizeListener) */ public boolean hasStorageSizeListener(I_StorageSizeListener listener) { return this.storageSizeListenerHelper.hasStorageSizeListener(listener); } /** * @see I_Storage#getStorageSizeListeners() */ public I_StorageSizeListener[] getStorageSizeListeners() { return storageSizeListenerHelper.getStorageSizeListeners(); } /** * @see I_Queue#embeddedObjectsToXml(OutputStream, Properties) */ public long embeddedObjectsToXml(OutputStream out, Properties props) throws Exception { I_Queue queue = this.persistentQueue; if (queue != null) { return queue.embeddedObjectsToXml(out, null); } log.warning(ME+"Sorry, dumping transient entries to '" + out + "' is not implemented"); return 0; } /** * @see I_AdminQueue#dumpEmbeddedObjectsToFile(String) */ public String dumpEmbeddedObjectsToFile(String fileName) throws Exception { if (fileName == null || fileName.equalsIgnoreCase("String")) { fileName = this.queueId.getStrippedId() + ".xml"; } File to_file = new File(fileName); if (to_file.getParent() != null) { to_file.getParentFile().mkdirs(); } FileOutputStream out = new FileOutputStream(to_file); out.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?>".getBytes()); out.write(("\n<"+this.queueId.getPrefix()+">").getBytes()); Properties props = new Properties(); props.put(Constants.TOXML_FORCEREADABLE, ""+true); // to be human readable (minimize base64) props.put(Constants.TOXML_ENCLOSINGTAG, "publish"); // to look similar to XmlScript long count = embeddedObjectsToXml(out, props); out.write(("\n</"+this.queueId.getPrefix()+">").getBytes()); out.close(); return "Dumped " + count + " entries to '" + to_file.toString() + "'"; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -