📄 consumablequeueplugin.java
字号:
SubscriptionInfo[] subInfoArr = this.serverScope.getTopicAccessor().getSubscriptionInfoArrDirtyRead(this.topicId); ArrayList subInfoList = new ArrayList(); for (int i=0; i < subInfoArr.length; i++) subInfoList.add(subInfoArr[i]); for (int i=0; i < lst.size(); i++) { if (!this.isReady) return; MsgQueueHistoryEntry entry = (MsgQueueHistoryEntry)lst.get(i); MsgUnitWrapper msgUnitWrapper = (entry).getMsgUnitWrapper(); if (msgUnitWrapper != null) { if (!this.distributeOneEntry(msgUnitWrapper, entry, subInfoList)) { this.isReady = false; this.isRunning = false; return; } } } } this.isReady = true; } catch (Throwable ex) { this.isReady = false; this.isRunning = false; ex.printStackTrace(); log.severe("processQueue: " + ex.getMessage()); } } /** * Distributes one single entry taken from the history queue. This method * is strict, it does not throw any exceptions. If one exception occurs * inside this method, the distribution is interrupted, a dead letter is * generated and the entry is removed from the history queue. * * @param subInfoList contains the SubscriptionInfo objects to scan. Once the * message is processed by one of the dispatchers, the associated * SessionInfo is put at the end of the list to allow some simple * load balancing mechanism. * * @return true if the entry has been removed from the history queue. This happens * if the entry could be sent successfully, or if distribution was given up due to * an exception. It returns false if none of the subscribers were able to receive * the message (to tell the invoker not to continue with distribution until * the next event. */ private boolean distributeOneEntry(MsgUnitWrapper msgUnitWrapper, MsgQueueHistoryEntry entry, List subInfoList) { try { if (msgUnitWrapper == null) { log.severe("distributeOneEntry() MsgUnitWrapper is null"); Thread.dumpStack(); givingUpDistribution(null, msgUnitWrapper, entry, null); return true; // let the loop continue: other entries could be OK } if (log.isLoggable(Level.FINER)) log.finer("distributeOneEntry '" + msgUnitWrapper.getUniqueId() + "' '" + msgUnitWrapper.getKeyOid() + "'"); // Take a copy of the map entries (a current snapshot) // If we would iterate over the map directly we can risk a java.util.ConcurrentModificationException // when one of the callback fails and the entry is removed by the callback worker thread SubscriptionInfo[] subInfoArr = (SubscriptionInfo[])subInfoList.toArray(new SubscriptionInfo[subInfoList.size()]); for (int ii=0; ii<subInfoArr.length; ii++) { SubscriptionInfo sub = subInfoArr[ii]; if (TopicHandler.isDirtyRead(sub, msgUnitWrapper)) { log.severe("ConsumableQueuePlugin used together with 'dirtyRead' is not supported"); TopicHandler topicHandler = this.serverScope.getTopicAccessor().access(this.topicId); if (topicHandler == null) return true; try { I_Queue srcQueue = topicHandler.getHistoryQueue(); if (srcQueue != null) srcQueue.removeRandom(entry); } finally { this.serverScope.getTopicAccessor().release(topicHandler); } return true; // even if it has not been sent } } for (int ii=0; ii<subInfoArr.length; ii++) { SubscriptionInfo sub = subInfoArr[ii]; if (!TopicHandler.subscriberMayReceiveIt(sub, msgUnitWrapper)) continue; //Has no effect: //if (!this.topicHandler.checkIfAllowedToSend(null, sub)) continue; // this is specific for this plugin if (sub.getSessionInfo().getDispatchManager() == null) continue; if (!sub.getSessionInfo().getDispatchManager().getDispatchConnectionsHandler().isAlive()) continue; try { try { TopicHandler topicHandler = this.serverScope.getTopicAccessor().access(this.topicId); if (topicHandler == null) return true; try { // the 'false' here is to tell the filter not to send a dead letter in case of an ex if (!topicHandler.checkFilter(null, sub, msgUnitWrapper, false)) continue; } finally { this.serverScope.getTopicAccessor().release(topicHandler); } } catch (XmlBlasterException ex) { // continue; givingUpDistribution(sub, msgUnitWrapper, entry, ex); return true; // because the entry has been removed from the history queue } // put the current dispatcher at the end of the list for next invocation (round robin) subInfoList.remove(sub); subInfoList.add(sub); MsgQueueUpdateEntry updateEntry = TopicHandler.createEntryFromWrapper(msgUnitWrapper,sub); UpdateReturnQosServer retQos = doDistribute(sub, updateEntry); if (log.isLoggable(Level.FINE)) { if (retQos == null) log.fine("distributeOneEntry: the return object was null: callback has not sent the message (dirty reads ?)"); } if (retQos == null || retQos.getException() == null) { TopicHandler topicHandler = this.serverScope.getTopicAccessor().access(this.topicId); if (topicHandler == null) return true; try { I_Queue srcQueue = topicHandler.getHistoryQueue(); if (srcQueue != null) srcQueue.removeRandom(entry); // success } finally { this.serverScope.getTopicAccessor().release(topicHandler); } if (log.isLoggable(Level.FINE)) log.fine("distributeOneEntry: successfully removed entry from queue"); return true; } else { log.severe("distributeOneEntry an exception occured: " + retQos.getException().getMessage()); Throwable ex = retQos.getException(); // continue if it is a communication exception stop otherwise if (ex instanceof XmlBlasterException && ((XmlBlasterException)ex).isCommunication()) continue; // we pass null for the exception since we don't want to shut down the dispatcher givingUpDistribution(sub, msgUnitWrapper, entry, null); return true; //since removed } } catch (Throwable e) { e.printStackTrace(); givingUpDistribution(sub, msgUnitWrapper, entry, e); return true; } } } catch (Throwable ex) { ex.printStackTrace(); log.severe("distributeOneEntry " + ex.getMessage()); givingUpDistribution(null, msgUnitWrapper, entry, ex); // TODO or should we return true here to allow to continue ? // I think it is a serious ex: probably does not make sense to cont. } return false; } private void givingUpDistribution(SubscriptionInfo sub, MsgUnitWrapper msgUnitWrapper, MsgQueueEntry entry, Throwable e) { try { String id = ""; if (sub != null) id = sub.getSessionInfo().getId(); String exTxt = ""; if (e != null) exTxt = e.toString(); SessionName publisherName = msgUnitWrapper.getMsgQosData().getSender(); if (log.isLoggable(Level.FINE)) log.fine("Sending of message from " + publisherName + " to " + id + " failed: " + exTxt); if (sub != null && e != null) sub.getSessionInfo().getDispatchManager().internalError(e); // calls MsgErrorHandler else { this.serverScope.getRequestBroker().deadMessage(new MsgQueueEntry[] { entry }, null, ME + ".givingUpDistribution: " + exTxt); } // remove the entry from the history queue now that a dead letter has been sent. TopicHandler topicHandler = this.serverScope.getTopicAccessor().access(this.topicId); try { I_Queue historyQueue = topicHandler.getHistoryQueue(); if (historyQueue != null) historyQueue.removeRandom(entry); } finally { this.serverScope.getTopicAccessor().release(topicHandler); } } catch (XmlBlasterException ex) { log.severe("givingUpDistribution: " + ex.getMessage()); ex.printStackTrace(); } } /** * Enforced by the I_DistributionInterceptor interface. It sends sychronously to * the DispatchWorker this entry. */ private UpdateReturnQosServer doDistribute(SubscriptionInfo sub, MsgQueueUpdateEntry entry) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("doDistribute"); // this is a sync call (all in the same thread) entry.setWantReturnObject(true); DispatchWorker worker = new DispatchWorker(this.global, sub.getSessionInfo().getDispatchManager()); ArrayList list = new ArrayList(); list.add(entry); worker.run(list); return (UpdateReturnQosServer)entry.getReturnObj(); } /** * @see org.xmlBlaster.engine.I_SubscriptionListener#getPriority() */ public Integer getPriority() { return PRIO_05; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -