📄 dispatchmanager.java
字号:
return; startWorkerThread(false); } /** * Counts how often a new entry was added since the current worker thread was started. */ public int getNotifyCounter() { return this.notifyCounter; } /** * Give the callback worker thread a kick to deliver the messages. * Throws no exception. */ private void activateDispatchWorker() { if (checkSending(false) == false) return; if (useBurstModeTimer() == true) return; startWorkerThread(false); } /** * @return true if a burst mode timer was activated */ private boolean useBurstModeTimer() { if (collectTime <= 0L) return false; // Messages are sent delayed on timeout (burst mode) if (log.isLoggable(Level.FINE)) log.fine(ME+": Executing useBurstModeTimer() collectTime=" + collectTime + " dispatchWorkerIsActive=" + dispatchWorkerIsActive); synchronized (this) { if (this.isShutdown) return false; if (this.timerKey == null) { if (log.isLoggable(Level.FINE)) log.fine(ME+": Starting burstMode timer with " + collectTime + " msec"); this.timerKey = this.glob.getBurstModeTimer().addTimeoutListener(this, collectTime, null); } } return true; } /** * Remove the burst mode timer */ private void removeBurstModeTimer() { synchronized (this) { if (this.timerKey != null) { this.glob.getBurstModeTimer().removeTimeoutListener(timerKey); this.timerKey = null; } } } /** * @param fromTimeout for logging only */ private void startWorkerThread(boolean fromTimeout) { if (this.dispatchWorkerIsActive == false) { synchronized (this) { if (this.isShutdown) { if (log.isLoggable(Level.FINE)) log.fine(ME+": startWorkerThread() failed, we are shutdown: " + toXml("")); return; } if (this.dispatchWorkerIsActive == false) { // send message directly this.dispatchWorkerIsActive = true; this.notifyCounter = 0; try { this.glob.getDispatchWorkerPool().execute(new DispatchWorker(glob, this)); } catch (Throwable e) { this.dispatchWorkerIsActive = false; log.severe(ME+": Unexpected error occurred: " + e.toString()); e.printStackTrace(); } } } return; } if (fromTimeout) { if (log.isLoggable(Level.FINE)) log.fine(ME+": Burst mode timeout occurred, last callback worker thread is not finished - we do nothing (the worker thread will give us a kick)"); } else { if (log.isLoggable(Level.FINE)) log.fine(ME+": Last callback worker thread is not finished - we do nothing (the worker thread will give us a kick)"); } } public boolean isDead() { return this.dispatchConnectionsHandler.isDead(); } public boolean isPolling() { return this.dispatchConnectionsHandler.isPolling(); } /** * Can be called when client connection is lost (NOT the callback connection). * Currently only detected by the SOCKET protocol plugin. * Others can only detect lost clients with their callback protocol pings */ public void lostClientConnection() { log.warning(ME+": Lost client connection"); // If SOCKET: the cb connection is lost as well and we can go to polling mode pingCallbackServer(false); } public void pingCallbackServer(boolean sync) { DispatchConnection dispatchConnection = this.dispatchConnectionsHandler.getCurrentDispatchConnection(); if (dispatchConnection != null) { if (sync) { dispatchConnection.timeout(null); // force a ping } else { // force a ping via another thread this.glob.getPingTimer().addTimeoutListener(dispatchConnection, 0L, null); } } } /** * @param isPublisherThread We take care that the publisher thread, coming through putPost() * does never too much work to return fast enough and avoid possible dead locks. * @return true is status is OK and we can try to send a message */ private boolean checkSending(boolean isPublisherThread) { if (this.isShutdown) { if (log.isLoggable(Level.FINE)) log.fine(ME+": The dispatcher is shutdown, can't activate callback worker thread" + toXml("")); return false; // assert } if (this.isSyncMode) { return false; } if (!this.dispatcherActive) { return false; } if (msgQueue.isShutdown() && !isPublisherThread) { // assert if (log.isLoggable(Level.FINE)) log.fine(ME+": The queue is shutdown, can't activate callback worker thread."); // e.g. client has disconnected on the mean time. //Thread.currentThread().dumpStack(); shutdown(); return false; } if (this.dispatchConnectionsHandler.isUndef()) { if (log.isLoggable(Level.FINE)) log.fine(ME+": Not connected yet, state is UNDEF"); return false; } if (this.dispatchConnectionsHandler.isDead() && !isPublisherThread) { String text = "No recoverable remote connection available, giving up queue " + msgQueue.getStorageId() + "."; if (log.isLoggable(Level.FINE)) log.fine(ME+": "+text); givingUpDelivery(new XmlBlasterException(glob,ErrorCode.COMMUNICATION_NOCONNECTION_DEAD, ME, text)); return false; } if (msgQueue.getNumOfEntries() == 0L) { return false; } if (this.msgInterceptor != null) { if (this.msgInterceptor.doActivate(this) == false) { if (log.isLoggable(Level.FINE)) log.fine(ME+": this.msgInterceptor.doActivate==false"); return false; // A plugin told us to suppress sending the message } return true; } /* * The msgInterceptor plugin needs to have a chance to take care of this even in polling mode */ if (this.dispatchConnectionsHandler.isPolling()) { if (log.isLoggable(Level.FINE)) log.fine(ME+": Can't send message as connection is lost and we are polling"); return false; } return true; } /** * We are notified about the burst mode timeout through this method. * @param userData You get bounced back your userData which you passed * with Timeout.addTimeoutListener() */ public void timeout(Object userData) { this.timerKey = null; if (log.isLoggable(Level.FINE)) log.fine(ME+": Burst mode timeout occurred, queue entries=" + msgQueue.getNumOfEntries() + ", starting callback worker thread ..."); startWorkerThread(true); } /** * @return The interceptor plugin if available, otherwise null */ public I_MsgDispatchInterceptor getMsgDispatchInterceptor() { return this.msgInterceptor; } /** * Set new callback addresses, typically after a session login/logout */ public void setAddresses(AddressBase[] addr) throws XmlBlasterException { this.dispatchConnectionsHandler.initialize(addr); } /** * Switch on/off the sending of messages. */ private void initDispatcherActive(AddressBase[] addrArr) { if (addrArr != null) { for (int ii=0; ii<addrArr.length; ii++) { // TODO: How to handle setting of multiple addresses?? this.dispatcherActive = addrArr[ii].isDispatcherActive(); } } } /** * The worker notifies us that it is finished, if messages are available * it is triggered again. */ void setDispatchWorkerIsActive(boolean val) { this.dispatchWorkerIsActive = val; if (val == false) { if (this.isShutdown) { if (log.isLoggable(Level.FINE)) log.fine(ME+": setDispatchWorkerIsActive(" + val + ") failed, we are shutdown: " + toXml("")); return; } if (msgQueue.getNumOfEntries() > 0) { if (log.isLoggable(Level.FINE)) log.fine(ME+": Finished callback job. Giving a kick to send the remaining " + msgQueue.getNumOfEntries() + " messages."); try { activateDispatchWorker(); } catch(Throwable e) { log.severe(ME+": "+e.toString()); e.printStackTrace(); // Assure the queue is flushed with another worker } } else { if (this.trySyncMode && !this.isSyncMode) { switchToSyncMode(); } } } } /** * Called locally and from TopicHandler when internal error (Throwable) occurred to avoid infinite looping */ public void internalError(Throwable throwable) { givingUpDelivery((throwable instanceof XmlBlasterException) ? (XmlBlasterException)throwable : new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION_DEAD, ME, "", throwable)); log.severe(ME+": PANIC: Internal error, doing shutdown: " + throwable.getMessage()); shutdown(); } /** * @return A container holding some statistical delivery information */ public DispatchStatistic getDispatchStatistic() { return this.dispatchConnectionsHandler.getDispatchStatistic(); } public boolean isShutdown() { return this.isShutdown; } /** * Stop all callback drivers of this client. * Possibly invoked twice (givingUpDelivery() calls it indirectly as well) * We don't shutdown the corresponding queue. */ public void shutdown() { if (log.isLoggable(Level.FINER)) log.finer(ME+": Entering shutdown ..."); if (this.isShutdown) return; synchronized (this) { if (this.isShutdown) return; this.isShutdown = true; this.msgQueue.removePutListener(this); // remove all ConnectionStatusListeners this.connectionStatusListeners.clear(); removeBurstModeTimer(); // NOTE: We would need to remove the 'final' qualifier to be able to set to null if (this.msgInterceptor != null) { try { this.msgInterceptor.shutdown(this); } catch (XmlBlasterException e) { log.warning(ME+": Ignoring problems during shutdown of plugin: " + e.getMessage()); } //this.msgInterceptor = null; } if (this.dispatchConnectionsHandler != null) { this.dispatchConnectionsHandler.shutdown(); //this.dispatchConnectionsHandler = null; } removeBurstModeTimer(); //this.msgQueue = null; //this.failureListener = null; //this.securityInterceptor = null; //if (this.dispatchWorkerPool != null) { // this.dispatchWorkerPool.shutdown(); NO: not here, is the scope and duty of Global // this.dispatchWorkerPool = null; //} if (this.syncDispatchWorker != null) this.syncDispatchWorker.shutdown(); } } /** * For logging */ public String getId() { return this.msgQueue.getStorageId().getId(); } /** * Dump state of this object into a XML ASCII string. * <br> * @param extraOffset indenting of tags for nice output * @return internal state as a XML ASCII string */ public String toXml(String extraOffset) { StringBuffer sb = new StringBuffer(2000); if (extraOffset == null) extraOffset = ""; String offset = Constants.OFFSET + extraOffset; sb.append(offset).append("<DispatchManager id='").append(getId()); if (this.msgQueue != null) sb.append("' numEntries='").append(this.msgQueue.getNumOfEntries()); sb.append("' isShutdown='").append(this.isShutdown).append("'>"); sb.append(this.dispatchConnectionsHandler.toXml(extraOffset+Constants.INDENT)); sb.append(offset).append(" <dispatchWorkerIsActive>").append(dispatchWorkerIsActive).append("</dispatchWorkerIsActive>"); sb.append(offset).append("</DispatchManager>"); return sb.toString(); } /** * Inhibits/activates the delivery of asynchronous dispatches of messages. * @param dispatcherActive */ public void setDispatcherActive(boolean dispatcherActive) { if (log.isLoggable(Level.FINE)) log.fine(ME+": Changed dispatcherActive from " + this.dispatcherActive + " to " + dispatcherActive); this.dispatcherActive = dispatcherActive; if (this.dispatcherActive) notifyAboutNewEntry(); } /** * * @return true if the dispacher is currently activated, i.e. if it is * able to deliver asynchronousy messages from the callback queue. */ public boolean isDispatcherActive() { return this.dispatcherActive; } public ArrayList filterDistributorEntries(ArrayList entries, Throwable ex) { return this.dispatchConnectionsHandler.filterDistributorEntries(entries, ex); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -