📄 priorizeddispatchplugin.java
字号:
throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME, text); } if (managerEntry.getCurrConnectionState() == ConnectionStateEnum.ALIVE && managerEntry.getCurrConnectionStateConfiguration() == null && this.hasDefaultActionOnly) { if (log.isLoggable(Level.FINE)) log.fine("We have default action only, returning all " + entryList.size() + " messages"); return entryList; } if (log.isLoggable(Level.FINE)) log.fine("Working with " + entryList.size() + " messages ..."); // ... do plugin specific work ... ArrayList resultList = new ArrayList(); for (int i=0; i<entryList.size(); i++) { MsgQueueEntry entry = (MsgQueueEntry)entryList.get(i); DispatchAction action = getDispatchAction(managerEntry, entry); if (log.isLoggable(Level.FINE)) log.fine("Working on '" + entry.getLogId() + "', action=" + action.getAction() + " from sender " + entry.getSender()); if (managerEntry.getCurrConnectionState() == ConnectionStateEnum.ALIVE) { if (entry.isInternal()) { log.info("Sending out of bound internal message '" + entry.getLogId() + "'"); resultList.add(entry); continue; // Send internal message out of bound } if (this.xmlBlasterClient.getLoginName().equals(entry.getSender().getLoginName())) { log.info("Sending out of bound PtP message '" + entry.getLogId() + "'"); resultList.add(entry); continue; // Send PtP notifications out of bound to avoid looping } } if (managerEntry.getCurrConnectionState() != ConnectionStateEnum.ALIVE && action.doSend()) { log.severe("We are in state " + managerEntry.getCurrConnectionState() + " and the configuration tells us to send nevertheless, we queue instead: " + entry.getLogId()); action = this.QUEUE_ACTION; } if (action.doSend()) { resultList.add(entry); } else if (action.doQueue()) { // ignore in async - put to queue in sync mode !! if (log.isLoggable(Level.FINE)) log.fine("Queueing holdback message " + entry.getLogId()); try { putToHoldbackQueue(managerEntry, entry); } catch (XmlBlasterException e) { dispatchManager.getMsgErrorHandler().handleError(new MsgErrorInfo(glob, entry, dispatchManager, e)); } if (log.isLoggable(Level.FINE)) log.fine("Removing from callback queue " + entry.getLogId() + " (is now a holdback message)"); try { dispatchManager.getQueue().removeRandom(entry); if (log.isLoggable(Level.FINE)) log.fine("Callback queue size is now " + dispatchManager.getQueue().getNumOfEntries()); } catch (XmlBlasterException e) { log.severe("PANIC: Can't remove " + entry.toXml("") + " from queue '" + dispatchManager.getQueue().getStorageId() + "': " + e.getMessage()); e.printStackTrace(); } } else if (action.doDestroy()) { try { dispatchManager.getQueue().removeRandom(entry); } catch (XmlBlasterException e) { log.severe("PANIC: Can't remove " + entry.toXml("") + " from queue '" + dispatchManager.getQueue().getStorageId() + "': " + e.getMessage()); e.printStackTrace(); } } if (action.doNotifySender()) { this.xmlBlasterClient.sendPtPMessage(entry, this.specificConfigPropertyKey, action.getAction(), this.currMsgStatus); } } entryList.clear(); return resultList; } private DispatchManagerEntry getDispatchManagerEntry(DispatchManager dispatchManager) { synchronized (this) { return (DispatchManagerEntry)this.dispatchManagerEntryMap.get(dispatchManager); } } private void putToHoldbackQueue(DispatchManagerEntry managerEntry, MsgQueueEntry entry) throws XmlBlasterException { I_Queue queue = managerEntry.getHoldbackQueue(); if (queue == null) { synchronized (this) { if (queue == null) { // Create a queue for this plugin, inherit the settings from the original queue of DispatchManager QueuePropertyBase queueProperties = (QueuePropertyBase)managerEntry.getDispatchManager().getQueue().getProperties(); String type = queueProperties.getType(); String version = queueProperties.getVersion(); String typeVersion = glob.getProperty().get("PriorizedDispatchPlugin.queue.plugin", type+","+version); StorageId storageId = new StorageId("PriorizedDispatchPlugin", managerEntry.getDispatchManager().getQueue().getStorageId().getPostfix()); queue = glob.getQueuePluginManager().getPlugin(typeVersion, storageId, queueProperties); queue.setNotifiedAboutAddOrRemove(true); // Entries are notified to support reference counting (otherwise we have memory leaks) managerEntry.setHoldbackQueue(queue); log.info("Created holdback queue '" + queue.getStorageId() + "' with " + queue.getNumOfEntries() + " entries"); } } } queue.put(entry, true); if (log.isLoggable(Level.FINE)) log.fine("Filled to holdback queue '" + queue.getStorageId() + "' one entry '" + entry.getLogId() + "', it has now " + queue.getNumOfEntries() + " entries"); } /** * All entries from our holdback queue are flushed to the official queues * of the DispatchManager */ private void flushHoldbackQueue(DispatchManagerEntry managerEntry) { synchronized (this) { DispatchManager dispatchManager = managerEntry.getDispatchManager(); I_Queue holdbackQueue = managerEntry.getHoldbackQueue(); if (holdbackQueue != null && holdbackQueue.getNumOfEntries() > 0) { log.info("Flushing " + holdbackQueue.getNumOfEntries() + " entries from holdback queue " + holdbackQueue.getStorageId()); ArrayList list = null; int lastSize = -99; while (holdbackQueue.getNumOfEntries() > 0) { try { list = holdbackQueue.peek(-1, -1); if (holdbackQueue.getNumOfEntries() == lastSize) { log.severe("PANIC: " + holdbackQueue.getNumOfEntries() + " entries from holdback queue " + holdbackQueue.getStorageId() + " can't be flushed, giving up!"); break; } lastSize = (int)holdbackQueue.getNumOfEntries(); } catch (XmlBlasterException e) { log.severe("PANIC: Can't flush holdbackQueue '" + holdbackQueue.getStorageId() + "' with " + holdbackQueue.getNumOfEntries() + " entries: " + e.getMessage()); e.printStackTrace(); continue; } MsgQueueEntry[] queueEntries = (MsgQueueEntry[])list.toArray(new MsgQueueEntry[list.size()]); // On error we send them as dead letters, as we don't know what to do with them in our holdback queue try { dispatchManager.getQueue().put(queueEntries, false); } catch (XmlBlasterException e) { log.warning("flushHoldbackQueue() failed: " + e.getMessage()); // errorCode == "ONOVERFLOW" dispatchManager.getMsgErrorHandler().handleError(new MsgErrorInfo(glob, queueEntries, dispatchManager, e)); } try { long num = holdbackQueue.remove(list.size(), -1); if (num != list.size()) { log.severe("PANIC: Expected to remove from holdbackQueue '" + holdbackQueue.getStorageId() + "' with " + holdbackQueue.getNumOfEntries() + " entries " + list.size() + " entries, but only " + num + " where removed"); } } catch (XmlBlasterException e) { log.severe("PANIC: Expected to remove from holdbackQueue '" + holdbackQueue.getStorageId() + "' with " + holdbackQueue.getNumOfEntries() + " entries " + list.size() + " entries: " + e.getMessage()); } } holdbackQueue.clear(); dispatchManager.notifyAboutNewEntry(); } else { if (log.isLoggable(Level.FINE)) log.fine("No holdback queue for " + dispatchManager.getId() + ", nothing to flush"); } } } /** * Call by DispatchConnectionsHandler on state transition. * <p /> * Enforced by interface I_ConnectionStatusListener */ public final void toAlive(DispatchManager dispatchManager, ConnectionStateEnum oldState) { changeManagerState(dispatchManager, ConnectionStateEnum.ALIVE, true); } /** * Call by DispatchConnectionsHandler on state transition * <p /> * Enforced by interface I_ConnectionStatusListener */ public final void toPolling(DispatchManager dispatchManager, ConnectionStateEnum oldState) { changeManagerState(dispatchManager, ConnectionStateEnum.POLLING, true); } /** * Call by DispatchConnectionsHandler on state transition * <p /> * Enforced by interface I_ConnectionStatusListener */ public final void toDead(DispatchManager dispatchManager, ConnectionStateEnum oldState, String errorText) { changeManagerState(dispatchManager, ConnectionStateEnum.DEAD, true); } private DispatchManagerEntry changeManagerState(DispatchManager dispatchManager, ConnectionStateEnum newState, boolean flush) { DispatchManagerEntry managerEntry = getDispatchManagerEntry(dispatchManager); if (managerEntry == null) { throw new IllegalArgumentException("Internal error in " + newState + ": dispatchManager=" + dispatchManager.toXml("") + " is unknown, dispatchManagerEntryMap.size()=" + dispatchManagerEntryMap.size()); } managerEntry.setCurrConnectionState(newState); StatusConfiguration tmp = parser.getStatusConfiguration(newState); managerEntry.setCurrConnectionStateConfiguration(tmp); if (tmp != null) log.info("Changing to " + newState + ", found configuration is '" + tmp.toXml(null) + "'"); else log.info("Changing to connection state " + newState); if (flush) flushHoldbackQueue(managerEntry); return managerEntry; } /** * @return A current snapshot (thread save etc) */ private DispatchManagerEntry[] getDispatchManagerEntryArr() { synchronized (this) { return (DispatchManagerEntry[])this.dispatchManagerEntryMap.values().toArray(new DispatchManagerEntry[this.dispatchManagerEntryMap.size()]); } } /** * Deregister a dispatch manager. * @see I_MsgDispatchInterceptor#shutdown(DispatchManager) */ public void shutdown(DispatchManager dispatchManager) throws XmlBlasterException { DispatchManagerEntry de = null; synchronized (this) { de = (DispatchManagerEntry)this.dispatchManagerEntryMap.remove(dispatchManager); } if (de != null) { if (de.getHoldbackQueue() != null) { // org.xmlBlaster.test.dispatch.TestPriorizedDispatchWithLostCallback throws an exception if // we activate the following line -> we need to investigate this issue //try { de.getHoldbackQueue().destroy(); } catch (XmlBlasterException e) { log.error(ME, "Problems on shutdown of holdback queue: " + e.getMessage()); } de.getHoldbackQueue().shutdown(); } } synchronized (this) { if (this.dispatchManagerEntryMap.size() == 0) shutdown(); // Remove the whole plugin on last DispatchManager } } /** */ public void shutdown() throws XmlBlasterException { if (log.isLoggable(Level.FINE)) log.fine("shutdown()"); synchronized (this) { if (isShutdown) return; glob.getProperty().removePropertyChangeListener(CONFIG_PROPERTY_KEY, this); DispatchManagerEntry[] arr = getDispatchManagerEntryArr(); for(int i=0; i<arr.length; i++) { shutdown(arr[i].getDispatchManager()); } if (this.dispatchManagerEntryMap.size() > 0) { log.severe("Internal cleanup error in dispatchManagerEntryMap"); } this.dispatchManagerEntryMap.clear(); this.xmlBlasterClient.shutdown(this); isShutdown = true; } } /** * @return true if shutdown */ public boolean isShutdown() { return isShutdown; } /** * @return a human readable usage help string */ public String usage() { return ""; } /** * @see I_MsgDispatchInterceptor#toXml(String) */ public String toXml(String extraOffset) { return ""; } /** * Not doing anything in this method since no cleanup needed. */ public void postHandleNextMessages(DispatchManager dispatchManager, MsgUnit[] processedEntries) throws XmlBlasterException { } /** * Not doing anything in this method since no Exception handling is done. */ public void onDispatchWorkerException(DispatchManager dispatchManager, Throwable ex) { } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -