⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 priorizeddispatchplugin.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
         throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME, text);      }      if (managerEntry.getCurrConnectionState() == ConnectionStateEnum.ALIVE &&          managerEntry.getCurrConnectionStateConfiguration() == null &&          this.hasDefaultActionOnly) {         if (log.isLoggable(Level.FINE)) log.fine("We have default action only, returning all " + entryList.size() + " messages");         return entryList;      }      if (log.isLoggable(Level.FINE)) log.fine("Working with " + entryList.size() + " messages ...");      // ... do plugin specific work ...      ArrayList resultList = new ArrayList();      for (int i=0; i<entryList.size(); i++) {         MsgQueueEntry entry = (MsgQueueEntry)entryList.get(i);         DispatchAction action = getDispatchAction(managerEntry, entry);         if (log.isLoggable(Level.FINE)) log.fine("Working on '" + entry.getLogId() + "', action=" + action.getAction() + " from sender " + entry.getSender());         if (managerEntry.getCurrConnectionState() == ConnectionStateEnum.ALIVE) {            if (entry.isInternal()) {               log.info("Sending out of bound internal message '" + entry.getLogId() + "'");               resultList.add(entry);               continue; // Send internal message out of bound            }            if (this.xmlBlasterClient.getLoginName().equals(entry.getSender().getLoginName())) {               log.info("Sending out of bound PtP message '" + entry.getLogId() + "'");               resultList.add(entry);               continue; // Send PtP notifications out of bound to avoid looping            }         }         if (managerEntry.getCurrConnectionState() != ConnectionStateEnum.ALIVE && action.doSend()) {            log.severe("We are in state " + managerEntry.getCurrConnectionState() + " and the configuration tells us to send nevertheless, we queue instead: " + entry.getLogId());            action = this.QUEUE_ACTION;         }         if (action.doSend()) {            resultList.add(entry);         }         else if (action.doQueue()) {            // ignore in async - put to queue in sync mode !!            if (log.isLoggable(Level.FINE)) log.fine("Queueing holdback message " + entry.getLogId());            try {               putToHoldbackQueue(managerEntry, entry);            }            catch (XmlBlasterException e) {               dispatchManager.getMsgErrorHandler().handleError(new MsgErrorInfo(glob, entry, dispatchManager, e));            }            if (log.isLoggable(Level.FINE)) log.fine("Removing from callback queue " + entry.getLogId() + " (is now a holdback message)");            try {               dispatchManager.getQueue().removeRandom(entry);               if (log.isLoggable(Level.FINE)) log.fine("Callback queue size is now " + dispatchManager.getQueue().getNumOfEntries());            }            catch (XmlBlasterException e) {               log.severe("PANIC: Can't remove " + entry.toXml("") + " from queue '" + dispatchManager.getQueue().getStorageId() + "': " + e.getMessage());               e.printStackTrace();            }         }         else if (action.doDestroy()) {            try {               dispatchManager.getQueue().removeRandom(entry);            }            catch (XmlBlasterException e) {               log.severe("PANIC: Can't remove " + entry.toXml("") + " from queue '" + dispatchManager.getQueue().getStorageId() + "': " + e.getMessage());               e.printStackTrace();            }         }         if (action.doNotifySender()) {            this.xmlBlasterClient.sendPtPMessage(entry, this.specificConfigPropertyKey, action.getAction(), this.currMsgStatus);         }      }      entryList.clear();      return resultList;   }   private DispatchManagerEntry getDispatchManagerEntry(DispatchManager dispatchManager) {      synchronized (this) {         return (DispatchManagerEntry)this.dispatchManagerEntryMap.get(dispatchManager);      }   }   private void putToHoldbackQueue(DispatchManagerEntry managerEntry, MsgQueueEntry entry) throws XmlBlasterException {      I_Queue queue = managerEntry.getHoldbackQueue();      if (queue == null) {         synchronized (this) {            if (queue == null) {               // Create a queue for this plugin, inherit the settings from the original queue of DispatchManager               QueuePropertyBase queueProperties = (QueuePropertyBase)managerEntry.getDispatchManager().getQueue().getProperties();               String type = queueProperties.getType();               String version = queueProperties.getVersion();               String typeVersion = glob.getProperty().get("PriorizedDispatchPlugin.queue.plugin", type+","+version);               StorageId storageId = new StorageId("PriorizedDispatchPlugin", managerEntry.getDispatchManager().getQueue().getStorageId().getPostfix());               queue = glob.getQueuePluginManager().getPlugin(typeVersion, storageId, queueProperties);               queue.setNotifiedAboutAddOrRemove(true); // Entries are notified to support reference counting (otherwise we have memory leaks)               managerEntry.setHoldbackQueue(queue);               log.info("Created holdback queue '" + queue.getStorageId() + "' with " + queue.getNumOfEntries() + " entries");            }         }      }      queue.put(entry, true);      if (log.isLoggable(Level.FINE)) log.fine("Filled to holdback queue '" + queue.getStorageId() + "' one entry '" + entry.getLogId() +                               "', it has now " + queue.getNumOfEntries() + " entries");   }   /**    * All entries from our holdback queue are flushed to the official queues    * of the DispatchManager    */   private void flushHoldbackQueue(DispatchManagerEntry managerEntry) {      synchronized (this)  {         DispatchManager dispatchManager = managerEntry.getDispatchManager();         I_Queue holdbackQueue = managerEntry.getHoldbackQueue();         if (holdbackQueue != null && holdbackQueue.getNumOfEntries() > 0) {            log.info("Flushing " + holdbackQueue.getNumOfEntries() + " entries from holdback queue " + holdbackQueue.getStorageId());            ArrayList list = null;            int lastSize = -99;            while (holdbackQueue.getNumOfEntries() > 0) {               try {                  list = holdbackQueue.peek(-1, -1);                  if (holdbackQueue.getNumOfEntries() == lastSize) {                     log.severe("PANIC: " + holdbackQueue.getNumOfEntries() + " entries from holdback queue " + holdbackQueue.getStorageId() + " can't be flushed, giving up!");                     break;                  }                  lastSize = (int)holdbackQueue.getNumOfEntries();               }               catch (XmlBlasterException e) {                  log.severe("PANIC: Can't flush holdbackQueue '" + holdbackQueue.getStorageId() + "' with " + holdbackQueue.getNumOfEntries() + " entries: " + e.getMessage());                  e.printStackTrace();                  continue;               }               MsgQueueEntry[] queueEntries = (MsgQueueEntry[])list.toArray(new MsgQueueEntry[list.size()]);               // On error we send them as dead letters, as we don't know what to do with them in our holdback queue               try {                  dispatchManager.getQueue().put(queueEntries, false);               }               catch (XmlBlasterException e) {                  log.warning("flushHoldbackQueue() failed: " + e.getMessage());                  // errorCode == "ONOVERFLOW"                  dispatchManager.getMsgErrorHandler().handleError(new MsgErrorInfo(glob, queueEntries, dispatchManager, e));               }               try {                  long num = holdbackQueue.remove(list.size(), -1);                  if (num != list.size()) {                     log.severe("PANIC: Expected to remove from holdbackQueue '" + holdbackQueue.getStorageId() + "' with " + holdbackQueue.getNumOfEntries() + " entries " + list.size() + " entries, but only " + num + " where removed");                  }               }               catch (XmlBlasterException e) {                  log.severe("PANIC: Expected to remove from holdbackQueue '" + holdbackQueue.getStorageId() + "' with " + holdbackQueue.getNumOfEntries() + " entries " + list.size() + " entries: " + e.getMessage());               }            }            holdbackQueue.clear();            dispatchManager.notifyAboutNewEntry();         }         else {            if (log.isLoggable(Level.FINE)) log.fine("No holdback queue for " + dispatchManager.getId() + ", nothing to flush");         }      }   }   /**    * Call by DispatchConnectionsHandler on state transition.     * <p />    * Enforced by interface I_ConnectionStatusListener    */   public final void toAlive(DispatchManager dispatchManager, ConnectionStateEnum oldState) {      changeManagerState(dispatchManager, ConnectionStateEnum.ALIVE, true);   }   /**    * Call by DispatchConnectionsHandler on state transition    * <p />    * Enforced by interface I_ConnectionStatusListener    */   public final void toPolling(DispatchManager dispatchManager, ConnectionStateEnum oldState) {      changeManagerState(dispatchManager, ConnectionStateEnum.POLLING, true);   }   /**    * Call by DispatchConnectionsHandler on state transition    * <p />    * Enforced by interface I_ConnectionStatusListener    */   public final void toDead(DispatchManager dispatchManager, ConnectionStateEnum oldState, String errorText) {      changeManagerState(dispatchManager, ConnectionStateEnum.DEAD, true);   }   private DispatchManagerEntry changeManagerState(DispatchManager dispatchManager, ConnectionStateEnum newState, boolean flush) {      DispatchManagerEntry managerEntry = getDispatchManagerEntry(dispatchManager);      if (managerEntry == null) {         throw new IllegalArgumentException("Internal error in " + newState + ": dispatchManager=" + dispatchManager.toXml("") + " is unknown, dispatchManagerEntryMap.size()=" + dispatchManagerEntryMap.size());      }      managerEntry.setCurrConnectionState(newState);      StatusConfiguration tmp = parser.getStatusConfiguration(newState);      managerEntry.setCurrConnectionStateConfiguration(tmp);      if (tmp != null)         log.info("Changing to " + newState + ", found configuration is '" + tmp.toXml(null) + "'");      else          log.info("Changing to connection state " + newState);      if (flush)         flushHoldbackQueue(managerEntry);      return managerEntry;    }   /**    * @return A current snapshot (thread save etc)    */   private DispatchManagerEntry[] getDispatchManagerEntryArr() {     synchronized (this) {        return (DispatchManagerEntry[])this.dispatchManagerEntryMap.values().toArray(new DispatchManagerEntry[this.dispatchManagerEntryMap.size()]);      }   }   /**    * Deregister a dispatch manager.     * @see I_MsgDispatchInterceptor#shutdown(DispatchManager)    */    public void shutdown(DispatchManager dispatchManager) throws XmlBlasterException {      DispatchManagerEntry de = null;      synchronized (this)  {         de = (DispatchManagerEntry)this.dispatchManagerEntryMap.remove(dispatchManager);      }      if (de != null) {         if (de.getHoldbackQueue() != null) {            // org.xmlBlaster.test.dispatch.TestPriorizedDispatchWithLostCallback throws an exception if            // we activate the following line -> we need to investigate this issue            //try { de.getHoldbackQueue().destroy(); } catch (XmlBlasterException e) { log.error(ME, "Problems on shutdown of holdback queue: " + e.getMessage()); }            de.getHoldbackQueue().shutdown();         }      }      synchronized (this)  {         if (this.dispatchManagerEntryMap.size() == 0)            shutdown(); // Remove the whole plugin on last DispatchManager      }   }   /**   */    public void shutdown() throws XmlBlasterException {      if (log.isLoggable(Level.FINE)) log.fine("shutdown()");      synchronized (this) {         if (isShutdown) return;         glob.getProperty().removePropertyChangeListener(CONFIG_PROPERTY_KEY, this);         DispatchManagerEntry[] arr = getDispatchManagerEntryArr();         for(int i=0; i<arr.length; i++) {            shutdown(arr[i].getDispatchManager());         }         if (this.dispatchManagerEntryMap.size() > 0) {            log.severe("Internal cleanup error in dispatchManagerEntryMap");         }         this.dispatchManagerEntryMap.clear();                  this.xmlBlasterClient.shutdown(this);         isShutdown = true;      }   }   /**    * @return true if shutdown    */   public boolean isShutdown() {      return isShutdown;   }   /**    * @return a human readable usage help string    */   public String usage() {      return "";   }   /**    * @see I_MsgDispatchInterceptor#toXml(String)    */   public String toXml(String extraOffset) {      return "";   }   /**    * Not doing anything in this method since no cleanup needed.    */   public void postHandleNextMessages(DispatchManager dispatchManager, MsgUnit[] processedEntries) throws XmlBlasterException {   }   /**    * Not doing anything in this method since no Exception handling is done.    */   public void onDispatchWorkerException(DispatchManager dispatchManager, Throwable ex) {   }      }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -