📄 topichandler.java
字号:
finally { destinationClient.getLock().release(); } } // Handle PtP to session in a thread safe manner SessionInfo receiverSessionInfo = null; try { receiverSessionInfo = authenticate.getSessionInfo(destination.getDestination()); if (receiverSessionInfo != null) { receiverSessionInfo.getLock().lock(); //receiverSessionInfo.waitUntilAlive(); if (receiverSessionInfo.isAlive()) { if (!receiverSessionInfo.getConnectQos().isPtpAllowed() && !Constants.EVENT_OID_ERASEDTOPIC.equals(cacheEntry.getKeyOid())) { // no spam, case 2 if (log.isLoggable(Level.FINE)) log.fine(ME+": Rejecting PtP message '" + cacheEntry.getLogId() + "' for destination [" + destination.getDestination() + "], isPtpAllowed=false"); throw new XmlBlasterException(serverScope, ErrorCode.USER_PTP_DENIED, ME, receiverSessionInfo.getId() + " does not accept PtP messages '" + cacheEntry.getLogId() + "' is rejected"); } } else { receiverSessionInfo.getLock().release(); receiverSessionInfo = null; } } if (receiverSessionInfo == null && !forceQueing) { String tmp = ME+": Sending PtP message '" + cacheEntry.getLogId() + "' to '" + destination.getDestination() + "' failed, the destination is unkown, the message rejected."; log.warning(tmp); throw new XmlBlasterException(serverScope, ErrorCode.USER_PTP_UNKNOWNDESTINATION, ME, tmp + " Client is not logged in and <destination forceQueuing='true'> is not set"); } // Row 1 in table if (receiverSessionInfo == null) { // We create a faked session without password check if (log.isLoggable(Level.FINE)) log.fine(ME+": Working on PtP message '" + cacheEntry.getLogId() + "' for destination [" + destination.getDestination() + "] which does not exist, forceQueuing=true, we create a dummy session"); ConnectQos connectQos = new ConnectQos(serverScope); connectQos.setSessionName(destinationSessionName); connectQos.setUserId(destinationSessionName.getLoginName()); ConnectQosServer connectQosServer = new ConnectQosServer(serverScope, connectQos.getData()); connectQosServer.bypassCredentialCheck(true); long sessionTimeout = serverScope.getProperty().get("session.ptp.defaultTimeout", -1L); connectQosServer.getSessionQos().setSessionTimeout(sessionTimeout); // Or use message timeout? for (int i=0; ; i++) { if (i>=20) { String tmp = "Sending PtP message '" + cacheEntry.getLogId() + "' to '" + destination.getDestination() + "' failed, the message is rejected."; String status = "destinationIsSession='" + destinationIsSession + "'" + " forceQueing='" + forceQueing + "' wantsPtP='" + wantsPtP +"'"; throw new XmlBlasterException(serverScope, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, tmp + "the combination '" + status + "' is not handled"); } if (i>0) { try { Thread.sleep(1L); } catch( InterruptedException ie) {}} /*ConnectReturnQosServer q = */authenticate.connect(connectQosServer); receiverSessionInfo = authenticate.getSessionInfo(destination.getDestination()); if (receiverSessionInfo == null) continue; receiverSessionInfo.getLock().lock(); if (!receiverSessionInfo.isAlive()) { receiverSessionInfo.getLock().release(); receiverSessionInfo = null; continue; } break; } } if (log.isLoggable(Level.FINE)) log.fine(ME+": Queuing PtP message '" + cacheEntry.getLogId() + "' for destination [" + destination.getDestination() + "]"); MsgQueueUpdateEntry msgEntry = new MsgQueueUpdateEntry(serverScope, cacheEntry, receiverSessionInfo.getSessionQueue().getStorageId(), destination.getDestination(), Constants.SUBSCRIPTIONID_PtP, false); receiverSessionInfo.queueMessage(msgEntry); continue; } finally { if (receiverSessionInfo != null) receiverSessionInfo.getLock().release(); } } // for destinationArr.length } /** * Stores the message 'meat'. * <p /> * If accessed from outside take care about deadlock. * @return The storage containing the 'meat' of a message */ I_Map getMsgUnitCache() { return this.msgUnitCache; } //I_MapEntry changeMsgUnitEntry(I_MapEntry entry, I_ChangeCallback callback) throws XmlBlasterException { // return this.msgUnitCache.change(entry, callback); //} void change(MsgUnitWrapper msgUnitWrapper) throws XmlBlasterException { if (isInMsgStore(msgUnitWrapper)) { I_Map msgUnitCache = this.msgUnitCache; if (msgUnitCache == null) { // on startup return; } msgUnitCache.change(msgUnitWrapper, null); } } public MsgUnitWrapper getMsgUnitWrapper(long uniqueId) throws XmlBlasterException { synchronized(this.msgUnitWrapperUnderConstructionMutex) { if (this.msgUnitWrapperUnderConstruction != null && this.msgUnitWrapperUnderConstruction.getUniqueId() == uniqueId) return this.msgUnitWrapperUnderConstruction; } I_Map msgUnitCache = this.msgUnitCache; if (msgUnitCache == null) { // on startup return null; } return (MsgUnitWrapper)msgUnitCache.get(uniqueId); } /** * Event triggered by MsgUnitWrapper itself when it reaches destroy state. * This is an important entry point, and may not be called from a concurrent thread */ public void entryDestroyed(MsgUnitWrapper msgUnitWrapper) { if (log.isLoggable(Level.FINER)) log.finer(ME+": Entering entryDestroyed(" + msgUnitWrapper.getLogId() + ")"); boolean underConstruction = !isInMsgStore(msgUnitWrapper); /* if (this.historyQueue != null) { try { !! where to get msgUnitWrapperHistoryEntry from? avoid removeRandom() as it complicates persistent queue implementation this.historyQueue.removeRandom(msgUnitWrapperHistoryEntry); // Note: This reduces the msgUnitStore referencecounter } catch (XmlBlasterException e) { log.error(ME, "Internal problem in entryDestroyed removeRandom of history queue (this can lead to a memory leak of '" + msgUnitWrapper.getLogId() + "'): " + e.getMessage() + ": " + toXml()); } } */ if (!underConstruction) { try { if (getMsgUnitCache() == null) { Thread.dumpStack(); log.severe(ME+": MsgUnitCache is unexpected null, topic: " + toXml() + "\n msgUnitWrapper is: " + msgUnitWrapper.toXml()); } else { getMsgUnitCache().remove(msgUnitWrapper); } } catch (XmlBlasterException e) { log.warning(ME+": Internal problem in entryDestroyed removeRandom of msg store (this can lead to a memory leak of '" + msgUnitWrapper.getLogId() + "'): " + e.getMessage()); // + ": " + toXml()); } } // if it was a volatile message we need to check unreferenced state ArrayList notifyList = null; if (!hasCacheEntries() && !hasExactSubscribers() && !this.isHistoryHandling) { try { if (isSoftErased()) { notifyList = toDead(this.creatorSessionName, null, null); } else { notifyList = toUnreferenced(false, false); } } catch (XmlBlasterException e) { log.severe(ME+": Internal problem with entryDestroyed: " + e.getMessage() + ": " + toXml()); } } msgUnitWrapper = null; if (notifyList != null) notifySubscribersAboutErase(notifyList); // must be outside the synchronize } /* * The root node of the xmlBlaster DOM tree */ public final org.w3c.dom.Node getRootNode() throws XmlBlasterException { return getXmlKey().getRootNode(); // don't cache it, as it may change after merge } /** * A client subscribed to this message, multiple subscriptions from * the same client are OK. * @param calleeIsXPathMatchCheck true The calling thread is internally to check if a Query matches a new published topic * false The callee is a subscribe() thread from a client */ public void addSubscriber(SubscriptionInfo sub, boolean calleeIsXPathMatchCheck) throws XmlBlasterException { if (sub.getSubscribeCounter() > 1) return; //Object oldOne; synchronized(this.subscriberMap) { /*oldOne = */this.subscriberMap.put(sub.getSubscriptionId(), sub); } sub.addTopicHandler(this); if (this.subscriptionListener != null) this.subscriptionListener.subscriptionAdd(new SubscriptionEvent(sub)); if (log.isLoggable(Level.FINE)) log.fine(ME+": Client '" + sub.getSessionInfo().getId() + "' has successfully subscribed"); if (isUnconfigured()) { return; } if (isUnreferenced()) { toAlive(); } // will be triggered by ConnectionStatusListener.toAlive() .. if (this.subscriptionListener != null) return; SubscribeQosServer subscribeQosServer = sub.getSubscribeQosServer(); if (subscribeQosServer == null) { return; } if (log.isLoggable(Level.FINE)) log.fine(ME+": addSubscriber("+sub.getId()+")"); if (subscribeQosServer.getWantInitialUpdate() == true || calleeIsXPathMatchCheck) { // wantInitial==false is only checked if this is a subcribe() thread of a client MsgUnitWrapper[] wrappers = null; if (hasHistoryEntries()) wrappers = getMsgUnitWrapperArr(subscribeQosServer.getData().getHistoryQos().getNumEntries(), subscribeQosServer.getData().getHistoryQos().getNewestFirst()); if (wrappers != null && wrappers.length > 0) { int count = 0, currentCount = 0; for (int i=0; i < wrappers.length; i++) { if (this.distributor == null || wrappers[i].isInternal()) { currentCount = invokeCallback(null, sub, wrappers[i], true); } if (currentCount == -1) break; count += currentCount; } count++; if (count < 1) { Set removeSet = new HashSet(); removeSet.add(sub); handleCallbackFailed(removeSet); } } } return; } /** * If a callback fails, we remove it from the subscription. * <p /> * Generating dead letter and auto-logout to release all resources is done by DispatchWorker. */ public void handleCallbackFailed(Set removeSet) throws XmlBlasterException { // DON'T do a synchronized(this)! (the possibly triggered notifySubscribersAboutErase() could dead lock) if (removeSet != null) { Iterator iterator = removeSet.iterator(); while (iterator.hasNext()) { SubscriptionInfo sub = (SubscriptionInfo)iterator.next(); if (log.isLoggable(Level.FINE)) log.fine(ME+": Removed subscriber '" + sub.getSessionInfo().getId() + "' as callback failed."); sub.removeSubscribe(); } removeSet.clear(); removeSet = null; } } /** * A client wants to unSubscribe from this topic. * @return the removed SubscriptionInfo object or null if not found */ SubscriptionInfo removeSubscriber(String subscriptionInfoUniqueKey) { // DON'T call from inside a synchronized(this)! (the notifySubscribersAboutErase() could dead lock) if (log.isLoggable(Level.FINE)) log.fine(ME+": Before size of subscriberMap = " + getNumSubscribers());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -