📄 dispatchmanager.java
字号:
getMsgErrorHandler().handleError(new MsgErrorInfo(glob, (MsgQueueEntry)null, this, ex)); shutdown(); } public void postSendNotification(MsgQueueEntry entry) { MsgQueueEntry[] entries = new MsgQueueEntry[] { entry }; postSendNotification(entries); } public void postSendNotification(MsgQueueEntry[] entries) { I_PostSendListener postSendListener = this.dispatchConnectionsHandler.getPostSendListener(); if (postSendListener != null) { try { postSendListener.postSend(entries); } catch (Throwable e) { e.printStackTrace(); log.warning("postSendListener.postSend() exception: " + e.toString()); } } } /** * Notify I_PostSendListener about problem. * <p> * Typically XmlBlasterAccess is notified when message came asynchronously from queue * * @param entryList * @param ex * @return true if processed * @see I_PostSendListener#postSend(MsgQueueEntry) for explanation */ public boolean sendingFailedNotification(MsgQueueEntry[] entries, XmlBlasterException ex) { I_PostSendListener postSendListener = this.dispatchConnectionsHandler.getPostSendListener(); if (postSendListener == null) return false; try { return postSendListener.sendingFailed(entries, ex); } catch (Throwable e) { e.printStackTrace(); log.warning("postSendListener.sendingFailed() exception: " + e.toString()); return false; } } /** * Called by DispatchWorker if an Exception occured in sync mode * Only on client side */ void handleSyncWorkerException(ArrayList entryList, Throwable throwable) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer(ME+": Sync delivery failed connection state is " + this.dispatchConnectionsHandler.getState().toString() + ": " + throwable.toString()); XmlBlasterException xmlBlasterException = XmlBlasterException.convert(glob,ME,null,throwable); if (xmlBlasterException.isUser()) { // Exception from remote client from update(), pass it to error handler and carry on ...? // A PublishPlugin could throw it MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]); getMsgErrorHandler().handleErrorSync(new MsgErrorInfo(glob, entries, this, xmlBlasterException)); return; } else if (xmlBlasterException.isCommunication()) { if (this.msgInterceptor != null && isPolling()) { // If we have a plugin it shall handle it try { entryList = this.msgInterceptor.handleNextMessages(this, entryList); if (entryList != null && entryList.size() > 0) { MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]); getMsgErrorHandler().handleError(new MsgErrorInfo(glob, entries, this, xmlBlasterException)); } } catch (XmlBlasterException xmlBlasterException2) { internalError(xmlBlasterException2); } if (entryList != null && entryList.size() > 0) { MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]); getMsgErrorHandler().handleError(new MsgErrorInfo(glob, entries, this, xmlBlasterException)); } return; } // Exception from connection to remote client (e.g. from Corba layer) // DispatchManager handles this // Error handling in sync mode // 1. throwExceptionBackToPusher // 2. Switch to async mode and collect message (wait on better times) // 3. If we have serious problems (programming exceptions or isDead()) throw exception back // 4. Pass exception to an error handler plugin switchToASyncMode(); // Simulate return values, and manipulate missing informations into entries ... I_QueueEntry[] entries = (I_QueueEntry[])entryList.toArray(new I_QueueEntry[entryList.size()]); getDispatchConnectionsHandler().createFakedReturnObjects(entries, Constants.STATE_OK, Constants.INFO_QUEUED); msgQueue.put(entries, I_Queue.IGNORE_PUT_INTERCEPTOR); if (log.isLoggable(Level.FINE)) log.fine(ME+": Delivery failed, pushed " + entries.length + " entries into tail back queue"); } else { if (log.isLoggable(Level.FINE)) log.fine(ME+": Invocation failed: " + xmlBlasterException.getMessage()); throw xmlBlasterException; } } /** * Messages are successfully sent, remove them now from queue (sort of a commit()): * We remove filtered/destroyed messages as well (which doen't show up in entryListChecked) * @param postSendNotify TODO */ public void removeFromQueue(MsgQueueEntry[] entries, boolean postSendNotify) throws XmlBlasterException { I_MsgDispatchInterceptor msgInterceptor = getMsgDispatchInterceptor(); MsgUnit[] msgUnits = null; if (msgInterceptor != null) { // we need to do this before removal since the msgUnits are weak references and would be deleted by gc msgUnits = new MsgUnit[entries.length]; for (int i=0; i < msgUnits.length; i++) { msgUnits[i] = entries[i].getMsgUnit(); } } this.msgQueue.removeRandom(entries); /*(currently only done in sync invocation) ArrayList defaultEntries = sendAsyncResponseEvent(entryList); if (defaultEntries.size() > 0) { MsgQueueEntry[] entries = (MsgQueueEntry[])defaultEntries.toArray(new MsgQueueEntry[defaultEntries.size()]); this.msgQueue.removeRandom(entries); } */ if (postSendNotify) postSendNotification(entries); if (msgInterceptor != null) { msgInterceptor.postHandleNextMessages(this, msgUnits); } if (log.isLoggable(Level.FINE)) log.fine("Commit of successful sending of " + entries.length + " messages done, current queue size is " + this.msgQueue.getNumOfEntries() + " '" + entries[0].getLogId() + "'"); } /** * Called by DispatchWorker if an Exception occurred in async mode. * @throws XmlBlasterException should never happen but is possible during removing entries from queue */ void handleWorkerException(ArrayList entryList, Throwable throwable) throws XmlBlasterException { // Note: The DispatchManager is notified about connection problems directly by its DispatchConnectionsHandler // we don't need to take care of ErrorCode.COMMUNICATION* if (log.isLoggable(Level.FINER)) log.finer(ME+": Async delivery failed connection state is " + this.dispatchConnectionsHandler.getState().toString() + ": " + throwable.toString()); //Thread.currentThread().dumpStack(); if (entryList == null) { if (!this.isShutdown) log.warning(ME+": Didn't expect null entryList in handleWorkerException() for throwable " + throwable.getMessage() + toXml("")); return; } getDispatchStatistic().setLastDeliveryException(throwable.toString()); getDispatchStatistic().incrNumDeliveryExceptions(1); if (throwable instanceof XmlBlasterException) { XmlBlasterException ex = (XmlBlasterException)throwable; if (log.isLoggable(Level.FINE)) log.fine(ME+": Invocation or callback failed: " + ex.getMessage()); if (ex.isUser()) { // Exception from remote client from update(), pass it to error handler and carry on ... MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]); boolean isHandled = sendingFailedNotification(entries, ex); if (isHandled) removeFromQueue(entries, false); else getMsgErrorHandler().handleError(new MsgErrorInfo(glob, entries, this, ex)); } else if (ex.isCommunication()) { if (this.msgInterceptor != null && isPolling()) { // If we have a plugin it shall handle it try { entryList = this.msgInterceptor.handleNextMessages(this, entryList); if (entryList != null && entryList.size() > 0) { MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]); getMsgErrorHandler().handleError(new MsgErrorInfo(glob, entries, this, ex)); } } catch (XmlBlasterException ex2) { internalError(ex2); } if (entryList != null && entryList.size() > 0) { MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]); getMsgErrorHandler().handleError(new MsgErrorInfo(glob, entries, this, ex)); } } // Exception from connection to remote client (e.g. from Corba layer) // DispatchManager handles this } else { //log.severe(ME+": Callback failed: " + ex.toString()); //ex.printStackTrace(); MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]); boolean isHandled = sendingFailedNotification(entries, ex); if (isHandled) removeFromQueue(entries, false); else internalError(ex); } } else { //log.severe(ME+": Callback failed: " + throwable.toString()); //throwable.printStackTrace(); XmlBlasterException ex = new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION_DEAD, ME, "", throwable); // sendingFailedNotification() not called as the msgs remain in queue until problem is resolved by admin internalError(ex); } } public I_MsgErrorHandler getMsgErrorHandler() { return this.failureListener; } /** * We register a QueuePutListener and all put() into the queue are * intercepted - our put() is called instead. * We then deliver this QueueEntry directly to the remote * connection and return synchronously the returned value or the * Exception if one is thrown. */ public void switchToSyncMode() { if (this.isSyncMode) return; synchronized (this) { if (this.isSyncMode) return; if (this.syncDispatchWorker == null) this.syncDispatchWorker = new DispatchWorker(glob, this); this.isSyncMode = true; log.info(ME+": Switched to synchronous message delivery"); if (this.timerKey != null) log.severe(ME+": Burst mode timer was activated and we switched to synchronous delivery" + " - handling of this situation is not coded yet"); removeBurstModeTimer(); } } /** * Switch back to asynchronous mode. * Our thread pool will take the messages out of the queue * and deliver them in asynchronous mode. */ public void switchToASyncMode() { if (!this.isSyncMode) return; synchronized (this) { if (!this.isSyncMode) return; //this.msgQueue.removePutListener(this); this.isSyncMode = false; activateDispatchWorker(); // just in case there are some messages pending in the queue log.info(ME+": Switched to asynchronous message delivery"); } } /** * @see I_QueuePutListener#putPre(I_QueueEntry) */ public boolean putPre(I_QueueEntry queueEntry) throws XmlBlasterException { //I_QueueEntry[] queueEntries = new I_QueueEntry[1]; //queueEntries[0] = queueEntry; return putPre(new I_QueueEntry[] { queueEntry }); } /** * @see #putPre(I_QueueEntry) * @see I_QueuePutListener#putPre(I_QueueEntry[]) */ public boolean putPre(I_QueueEntry[] queueEntries) throws XmlBlasterException { if (!this.isSyncMode) { if (this.inAliveTransition) { // Do not allow other threads to put messages to queue during transition to alive synchronized (ALIVE_TRANSITION_MONITOR) { // don't allow } } return true; // Add entry to queue } if (log.isLoggable(Level.FINE)) log.fine(ME+": putPre() - Got " + queueEntries.length + " QueueEntries to deliver synchronously ..."); ArrayList entryList = new ArrayList(queueEntries.length); for (int ii=0; ii<queueEntries.length; ii++) { if (this.trySyncMode && !this.isSyncMode && queueEntries[ii] instanceof MsgQueueGetEntry) { // this.trySyncMode === isClientSide throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "You can't call get() in asynchronous mode (gets can't be queued because we don't know its return value)"); } entryList.add(queueEntries[ii]); } this.syncDispatchWorker.run(entryList); return false; } /** * @see I_QueuePutListener#putPost(I_QueueEntry) */ public void putPost(I_QueueEntry queueEntry) throws XmlBlasterException { if (!this.isSyncMode) { if (this.dispatcherActive) notifyAboutNewEntry(); if (((MsgQueueEntry)queueEntry).wantReturnObj()) { // Simulate return values, and manipulate missing informations into entries ... I_QueueEntry[] entries = new I_QueueEntry[] { queueEntry }; getDispatchConnectionsHandler().createFakedReturnObjects(entries, Constants.STATE_OK, Constants.INFO_QUEUED); } } } /** * @see #putPost(I_QueueEntry) * @see I_QueuePutListener#putPost(I_QueueEntry[]) */ public void putPost(I_QueueEntry[] queueEntries) throws XmlBlasterException { if (!this.isSyncMode) { if (this.dispatcherActive) notifyAboutNewEntry(); if (queueEntries.length > 0 && ((MsgQueueEntry)queueEntries[0]).wantReturnObj()) { // Simulate return values, and manipulate missing informations into entries ... getDispatchConnectionsHandler().createFakedReturnObjects(queueEntries, Constants.STATE_OK, Constants.INFO_QUEUED); } } } /** * Here we prepare messages which are coming directly from the queue. * <ol> * <li>We eliminate destroyed messages</li> * <li>We make a shallow copy of the message. * We need to do this, out messages are references directly into the queue. * The delivery framework is later changing the QoS * and plugins may change the content - and this should not modify the queue entries</li> * </ol> */ public ArrayList prepareMsgsFromQueue(ArrayList entryList) { if (entryList == null || entryList.size() < 1) { if (log.isLoggable(Level.FINE)) log.fine(ME+": Got zero messages from queue, expected at least one, can happen if client disconnected in the mean time: " + toXml("")); return null; } return prepareMsgsFromQueue(ME, log, this.msgQueue, entryList); } public static ArrayList prepareMsgsFromQueue(String logId, Logger log, I_Queue queue, ArrayList entryList) { // Remove all expired messages and do a shallow copy int size = entryList.size(); ArrayList result = new ArrayList(size); for (int ii=0; ii<size; ii++) { MsgQueueEntry entry = (MsgQueueEntry)entryList.get(ii); // Take care to remove the filtered away messages from the queue as well if (entry.isDestroyed()) { log.info(logId+": Message " + entry.getLogId() + " is destroyed, ignoring it"); if (log.isLoggable(Level.FINE)) log.fine("Message " + entry.getLogId() + " is destroyed, ignoring it: " + entry.toXml()); try { queue.removeRandom(entry); // Probably change to use [] for better performance } catch (Throwable e) { log.severe(logId+": Internal error when removing expired message " + entry.getLogId() + " from queue, no recovery implemented, we continue: " + e.toString()); } continue; } result.add(entry.clone()); // expired messages are sent as well } return result; } /** * When somebody puts a new entry into the queue, we want to be * notified about this after the entry is fed. * <p> * Called by I_Queue.putPost() */ public void notifyAboutNewEntry() { if (log.isLoggable(Level.FINER)) log.finer(ME+": Entering notifyAboutNewEntry("+this.notifyCounter+")"); this.notifyCounter++; //activateDispatchWorker(); if (checkSending(true) == false) return; if (useBurstModeTimer() == true)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -