📄 topichandler.java
字号:
SubscriptionInfo subs = null; synchronized(this.subscriberMap) { subs = (SubscriptionInfo)this.subscriberMap.remove(subscriptionInfoUniqueKey); } if (subs == null && !isDead() && !isSoftErased()) { //Thread.currentThread().dumpStack(); log.warning(ME+": can't unsubscribe, you where not subscribed to subscription ID=" + subscriptionInfoUniqueKey); } if (log.isLoggable(Level.FINE)) log.fine(ME+": After size of subscriberMap = " + getNumSubscribers()); if (isDead()) { if (this.subscriptionListener != null && subs != null) { try { this.subscriptionListener.subscriptionRemove(new SubscriptionEvent(subs)); } catch (XmlBlasterException ex) { log.severe(ME+": removeSubscriber: an exception occured: " + ex.getMessage()); } } return subs; // during cleanup process } ArrayList notifyList = null; if (!hasCacheEntries() && !hasExactSubscribers()) { if (isUnconfigured()) notifyList = toDead(this.creatorSessionName, null, null); else { try { notifyList = toUnreferenced(false, false); } catch (XmlBlasterException e) { log.severe(ME+": Internal problem with removeSubscriber: " + e.getMessage() + ": " + toXml()); } } } if (this.subscriptionListener != null && subs != null) { try { this.subscriptionListener.subscriptionRemove(new SubscriptionEvent(subs)); } catch (XmlBlasterException ex) { log.severe(ME+": removeSubscriber: an exception occured: " + ex.getMessage()); } } if (notifyList != null) notifySubscribersAboutErase(notifyList); // must be outside the synchronize return subs; } /** * This is the unique key of the topic and MsgUnit * <p /> * @return the <key oid='...'> */ public String getUniqueKey() { return uniqueKey; } /** * @return The key data of this topic (not DOM parsed) or null of not yet known */ public MsgKeyData getMsgKeyData() { return this.msgKeyData; } /** * What is the MIME type of this message content? * <p /> * @return the MIME type of the MsgUnit.content or null if not known */ public String getContentMime() { return (this.msgKeyData != null) ? this.msgKeyData.getContentMime() : null; } public String getContentMimeExtended() { return (this.msgKeyData != null) ? this.msgKeyData.getContentMimeExtended() : null; } /** * Access the raw CORBA msgUnit * @return MsgUnit object public MsgUnit getMsgUnit() throws XmlBlasterException { return getMsgUnitWrapper().getMsgUnit(); } */ /** * Send updates to all subscribed clients. * <p /> * @param publisherSessionInfo The sessionInfo of the publisher or null if not known or not online */ private final void invokeCallbackAndHandleFailure(SessionInfo publisherSessionInfo, MsgUnitWrapper msgUnitWrapper) throws XmlBlasterException { if (msgUnitWrapper == null) { Thread.dumpStack(); throw new XmlBlasterException(serverScope, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "MsgUnitWrapper is null"); } if (log.isLoggable(Level.FINE)) log.fine(ME+": Going to update dependent clients for " + msgUnitWrapper.getKeyOid() + ", subscriberMap.size() = " + getNumSubscribers()); if (this.distributor != null && !msgUnitWrapper.isInternal()) { // if there is a plugin this.distributor.distribute(msgUnitWrapper); return; } // Take a copy of the map entries (a current snapshot) // If we would iterate over the map directly we can risk a java.util.ConcurrentModificationException // when one of the callback fails and the entry is removed by the callback worker thread SubscriptionInfo[] subInfoArr = getSubscriptionInfoArr(); Set removeSet = null; for (int ii=0; ii<subInfoArr.length; ii++) { SubscriptionInfo sub = subInfoArr[ii]; if (!subscriberMayReceiveIt(sub, msgUnitWrapper)) continue; if (invokeCallback(publisherSessionInfo, sub, msgUnitWrapper, true) < 1) { if (removeSet == null) removeSet = new HashSet(); removeSet.add(sub); // We can't delete directly since we are in the iterator } } if (removeSet != null) handleCallbackFailed(removeSet); } /** * Checks if it is allowed to send the entry to the callback queue. * @param publisherSessionInfo * @param sub * @return true if it is configured, there is a callback, and the topic is referenced */ private boolean checkIfAllowedToSend(SessionInfo publisherSessionInfo, SubscriptionInfo sub) { if (!sub.getSessionInfo().hasCallback()) { log.warning(ME+": A client which subscribes " + sub.toXml() + " should have a callback server: " + sub.getSessionInfo().toXml("", (Properties)null)); Thread.dumpStack(); return false; } if (isUnconfigured()) { log.warning(ME+": invokeCallback() not supported, this MsgUnit was created by a subscribe() and not a publish()"); return false; } if (isUnreferenced()) { log.severe(ME+": PANIC: invoke callback is strange in state 'UNREFERENCED'"); Thread.dumpStack(); return false; } return true; } /** * Checks if the filters allow this message to be sent to the specified session * * @param publisherSessionInfo * @param sub * @param msgUnitWrapper * @return true if the message is approved to be sent, false otherwise * @throws XmlBlasterException in case an exception happened when checking the filters. * This method handles internally the publishing of dead letters in case of a throwable * and after that it throws this XmlBlasterException to notify the invoked about the * abnormal flow. */ public final boolean checkFilter(SessionInfo publisherSessionInfo, SubscriptionInfo sub, MsgUnitWrapper msgUnitWrapper, boolean handleException) throws XmlBlasterException { AccessFilterQos[] filterQos = sub.getAccessFilterArr(); if (filterQos != null) { //SubjectInfo publisher = (publisherSessionInfo == null) ? null : publisherSessionInfo.getSubjectInfo(); //SubjectInfo destination = (sub.getSessionInfo() == null) ? null : sub.getSessionInfo().getSubjectInfo(); for (int ii=0; ii<filterQos.length; ii++) { try { I_AccessFilter filter = requestBroker.getAccessPluginManager().getAccessFilter( filterQos[ii].getType(), filterQos[ii].getVersion(), getContentMime(), getContentMimeExtended()); if (filter != null && filter.match(sub.getSessionInfo(), msgUnitWrapper.getMsgUnit(), filterQos[ii].getQuery()) == false) { return false; } } catch (Throwable e) { // sender = publisherSessionInfo.getLoginName() // receiver = sub.getSessionInfo().getLoginName() // 1. We just log the situation: SessionName publisherName = (publisherSessionInfo != null) ? publisherSessionInfo.getSessionName() : msgUnitWrapper.getMsgQosData().getSender(); String reason = "Mime access filter '" + filterQos[ii].getType() + "' for message '" + msgUnitWrapper.getLogId() + "' from sender '" + publisherName + "' to subscriber '" + sub.getSessionInfo().getSessionName() + "' threw an exception, we don't deliver " + "the message to the subscriber: " + e.toString(); if (log.isLoggable(Level.FINE)) log.fine(ME+": "+reason); if (handleException) { MsgQueueEntry[] entries = { new MsgQueueUpdateEntry(serverScope, msgUnitWrapper, sub.getMsgQueue().getStorageId(), sub.getSessionInfo().getSessionName(), sub.getSubSourceSubscriptionId(), sub.getSubscribeQosServer().getWantUpdateOneway()) }; requestBroker.deadMessage(entries, null, reason); } // 2. This error handling is wrong as the plugin should not invalidate the subscribe: //sub.getSessionInfo().getDispatchManager().internalError(e); // calls MsgErrorHandler // 3. This error handling is wrong as we handle a subscribe and not a publish: /* MsgQueueEntry entry = new MsgQueueUpdateEntry(serverScope, msgUnitWrapper, sub.getMsgQueue().getStorageId(), sub.getSessionInfo().getSessionName(), sub.getSubSourceSubscriptionId(), sub.getSubscribeQosServer().getWantUpdateOneway()); publisherSessionInfo.getMsgErrorHandler().handleError(new MsgErrorInfo(serverScope, entry, null, e)); */ //retCount++; throw new XmlBlasterException(this.serverScope, ErrorCode.INTERNAL_UNKNOWN, ME , "checkFilter: " + reason); } } } // if filterQos return true; } /** * Checks if the subscriber is a cluster and the message has the 'dirtyRead' flag set. * @param sub * @param msgQosData * @return true if dirtyRead is set, false otherwise. */ public static boolean isDirtyRead(SubscriptionInfo sub, MsgUnitWrapper msgUnitWrapper) throws XmlBlasterException { MsgQosData msgQosData = msgUnitWrapper.getMsgQosData(); if (sub.getSessionInfo().getSubjectInfo().isCluster()) { if (log.isLoggable(Level.FINEST)) log.finest("TopicHandler: Slave node '" + sub.getSessionInfo() + "' has dirty read message '" + msgUnitWrapper.toXml()); if (msgQosData.dirtyRead(sub.getSessionInfo().getSubjectInfo().getNodeId())) { if (log.isLoggable(Level.FINE)) log.fine("TopicHandler: Slave node '" + sub.getSessionInfo() + "' has dirty read message '" + sub.getSubscriptionId() + "', '" + sub.getKeyData().getOid() + "' we don't need to send it back"); return true; } } return false; } public static final MsgQueueUpdateEntry createEntryFromWrapper(MsgUnitWrapper msgUnitWrapper, SubscriptionInfo sub) throws XmlBlasterException { return new MsgQueueUpdateEntry(msgUnitWrapper.getServerScope(), msgUnitWrapper, sub.getMsgQueue().getStorageId(), sub.getSessionInfo().getSessionName(), sub.getSubSourceSubscriptionId(), sub.getSubscribeQosServer().getWantUpdateOneway()); } /** * Put the message into the callback queue of the subscribed client (Pub/Sub mode only). * @param publisherSessionInfo The sessionInfo of the publisher or null if not known or not online * @param sub The subscription handle of the client * @return -1 in case it was not able to complete the invocation due to an incorrect status (for example * if it is unconfigured, unreferenced or if the session has no callback). Returns 0 if it was not able * to complete the request even if the status was OK, 1 if successful. * Never throws an exception. * Returning -1 tells the invoker not to continue with these invocations (performance) */ private final int invokeCallback(SessionInfo publisherSessionInfo, SubscriptionInfo sub, MsgUnitWrapper msgUnitWrapper, boolean doErrorHandling) { if (!checkIfAllowedToSend(publisherSessionInfo, sub)) return -1; if (msgUnitWrapper == null) { log.severe(ME+": invokeCallback() MsgUnitWrapper is null: " + ((publisherSessionInfo != null) ? publisherSessionInfo.toXml() + "\n" : "") + ((sub != null) ? sub.toXml() + "\n" : "") + ((this.historyQueue != null) ? this.historyQueue.toXml("") : "")); Thread.dumpStack(); return 0; //throw new XmlBlasterException(serverScope, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "MsgUnitWrapper is null"); } try { if (isDirtyRead(sub, msgUnitWrapper)) return 1; try { if (!checkFilter(publisherSessionInfo, sub, msgUnitWrapper, true)) return 1; } catch (XmlBlasterException ex) { if (log.isLoggable(Level.FINEST)) log.finest(ex.getMessage()); return 0; } if (log.isLoggable(Level.FINER)) log.finer(ME+": pushing update() mess
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -