⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 topichandler.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
            finally {               destinationClient.getLock().release();            }         }         // Handle PtP to session in a thread safe manner         SessionInfo receiverSessionInfo = null;         try {            receiverSessionInfo = authenticate.getSessionInfo(destination.getDestination());            if (receiverSessionInfo != null) {               receiverSessionInfo.getLock().lock();               //receiverSessionInfo.waitUntilAlive();               if (receiverSessionInfo.isAlive()) {                  if (!receiverSessionInfo.getConnectQos().isPtpAllowed() &&                      !Constants.EVENT_OID_ERASEDTOPIC.equals(cacheEntry.getKeyOid())) { // no spam, case 2                     if (log.isLoggable(Level.FINE)) log.fine(ME+": Rejecting PtP message '" + cacheEntry.getLogId() + "' for destination [" + destination.getDestination() + "], isPtpAllowed=false");                     throw new XmlBlasterException(serverScope, ErrorCode.USER_PTP_DENIED, ME,                           receiverSessionInfo.getId() + " does not accept PtP messages '" + cacheEntry.getLogId() +                           "' is rejected");                  }               }               else {                  receiverSessionInfo.getLock().release();                  receiverSessionInfo = null;               }            }            if (receiverSessionInfo == null && !forceQueing) {               String tmp = ME+": Sending PtP message '" + cacheEntry.getLogId() + "' to '" + destination.getDestination() + "' failed, the destination is unkown, the message rejected.";               log.warning(tmp);               throw new XmlBlasterException(serverScope, ErrorCode.USER_PTP_UNKNOWNDESTINATION, ME, tmp +                     " Client is not logged in and <destination forceQueuing='true'> is not set");            }            // Row 1 in table            if (receiverSessionInfo == null) { // We create a faked session without password check               if (log.isLoggable(Level.FINE)) log.fine(ME+": Working on PtP message '" + cacheEntry.getLogId() + "' for destination [" + destination.getDestination() + "] which does not exist, forceQueuing=true, we create a dummy session");               ConnectQos connectQos = new ConnectQos(serverScope);               connectQos.setSessionName(destinationSessionName);               connectQos.setUserId(destinationSessionName.getLoginName());               ConnectQosServer connectQosServer = new ConnectQosServer(serverScope, connectQos.getData());               connectQosServer.bypassCredentialCheck(true);               long sessionTimeout = serverScope.getProperty().get("session.ptp.defaultTimeout", -1L);               connectQosServer.getSessionQos().setSessionTimeout(sessionTimeout);  // Or use message timeout?               for (int i=0; ; i++) {                  if (i>=20) {                     String tmp = "Sending PtP message '" + cacheEntry.getLogId() + "' to '" + destination.getDestination() + "' failed, the message is rejected.";                     String status = "destinationIsSession='" + destinationIsSession + "'" +                                     " forceQueing='" + forceQueing + "' wantsPtP='" + wantsPtP +"'";                     throw new XmlBlasterException(serverScope, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, tmp +                        "the combination '" + status + "' is not handled");                  }                  if (i>0) { try { Thread.sleep(1L); } catch( InterruptedException ie) {}}                  /*ConnectReturnQosServer q = */authenticate.connect(connectQosServer);                  receiverSessionInfo = authenticate.getSessionInfo(destination.getDestination());                  if (receiverSessionInfo == null) continue;                  receiverSessionInfo.getLock().lock();                  if (!receiverSessionInfo.isAlive()) {                     receiverSessionInfo.getLock().release();                     receiverSessionInfo = null;                     continue;                  }                  break;               }            }            if (log.isLoggable(Level.FINE)) log.fine(ME+": Queuing PtP message '" + cacheEntry.getLogId() + "' for destination [" + destination.getDestination() + "]");            MsgQueueUpdateEntry msgEntry = new MsgQueueUpdateEntry(serverScope,                     cacheEntry,                     receiverSessionInfo.getSessionQueue().getStorageId(),                     destination.getDestination(),                     Constants.SUBSCRIPTIONID_PtP, false);            receiverSessionInfo.queueMessage(msgEntry);            continue;         }         finally {            if (receiverSessionInfo != null)               receiverSessionInfo.getLock().release();         }      } // for destinationArr.length   }   /**    * Stores the message 'meat'.    * <p />    * If accessed from outside take care about deadlock.    * @return The storage containing the 'meat' of a message    */   I_Map getMsgUnitCache() {      return this.msgUnitCache;   }   //I_MapEntry changeMsgUnitEntry(I_MapEntry entry, I_ChangeCallback callback) throws XmlBlasterException {   //   return this.msgUnitCache.change(entry, callback);   //}   void change(MsgUnitWrapper msgUnitWrapper) throws XmlBlasterException {      if (isInMsgStore(msgUnitWrapper)) {         I_Map msgUnitCache = this.msgUnitCache;         if (msgUnitCache == null) { // on startup            return;         }         msgUnitCache.change(msgUnitWrapper, null);      }   }   public MsgUnitWrapper getMsgUnitWrapper(long uniqueId) throws XmlBlasterException {      synchronized(this.msgUnitWrapperUnderConstructionMutex) {         if (this.msgUnitWrapperUnderConstruction != null && this.msgUnitWrapperUnderConstruction.getUniqueId() == uniqueId)            return this.msgUnitWrapperUnderConstruction;      }      I_Map msgUnitCache = this.msgUnitCache;      if (msgUnitCache == null) { // on startup         return null;      }      return (MsgUnitWrapper)msgUnitCache.get(uniqueId);   }   /**    * Event triggered by MsgUnitWrapper itself when it reaches destroy state.    * This is an important entry point, and may not be called from a concurrent thread    */   public void entryDestroyed(MsgUnitWrapper msgUnitWrapper) {      if (log.isLoggable(Level.FINER)) log.finer(ME+": Entering entryDestroyed(" + msgUnitWrapper.getLogId() + ")");      boolean underConstruction = !isInMsgStore(msgUnitWrapper);      /*      if (this.historyQueue != null) {         try {            !! where to get msgUnitWrapperHistoryEntry from? avoid removeRandom() as it complicates persistent queue implementation            this.historyQueue.removeRandom(msgUnitWrapperHistoryEntry); // Note: This reduces the msgUnitStore referencecounter         }         catch (XmlBlasterException e) {            log.error(ME, "Internal problem in entryDestroyed removeRandom of history queue (this can lead to a memory leak of '" + msgUnitWrapper.getLogId() + "'): " +                          e.getMessage() + ": " + toXml());         }      }      */      if (!underConstruction) {         try {            if (getMsgUnitCache() == null) {               Thread.dumpStack();               log.severe(ME+": MsgUnitCache is unexpected null, topic: " + toXml() + "\n msgUnitWrapper is: " + msgUnitWrapper.toXml());            }            else {               getMsgUnitCache().remove(msgUnitWrapper);            }         }         catch (XmlBlasterException e) {            log.warning(ME+": Internal problem in entryDestroyed removeRandom of msg store (this can lead to a memory leak of '" + msgUnitWrapper.getLogId() + "'): " +                       e.getMessage()); // + ": " + toXml());         }      }      // if it was a volatile message we need to check unreferenced state      ArrayList notifyList = null;      if (!hasCacheEntries() && !hasExactSubscribers() && !this.isHistoryHandling) {         try {            if (isSoftErased()) {               notifyList = toDead(this.creatorSessionName, null, null);            }            else {               notifyList = toUnreferenced(false, false);            }         }         catch (XmlBlasterException e) {            log.severe(ME+": Internal problem with entryDestroyed: " + e.getMessage() + ": " + toXml());         }      }      msgUnitWrapper = null;      if (notifyList != null) notifySubscribersAboutErase(notifyList); // must be outside the synchronize   }   /*    * The root node of the xmlBlaster DOM tree    */   public final org.w3c.dom.Node getRootNode() throws XmlBlasterException {      return getXmlKey().getRootNode(); // don't cache it, as it may change after merge   }   /**    * A client subscribed to this message, multiple subscriptions from    * the same client are OK.    * @param calleeIsXPathMatchCheck true The calling thread is internally to check if a Query matches a new published topic    *        false The callee is a subscribe() thread from a client    */   public void addSubscriber(SubscriptionInfo sub, boolean calleeIsXPathMatchCheck) throws XmlBlasterException {      if (sub.getSubscribeCounter() > 1)         return;      //Object oldOne;      synchronized(this.subscriberMap) {         /*oldOne = */this.subscriberMap.put(sub.getSubscriptionId(), sub);      }      sub.addTopicHandler(this);      if (this.subscriptionListener != null)         this.subscriptionListener.subscriptionAdd(new SubscriptionEvent(sub));      if (log.isLoggable(Level.FINE)) log.fine(ME+": Client '" + sub.getSessionInfo().getId() + "' has successfully subscribed");      if (isUnconfigured()) {         return;      }      if (isUnreferenced()) {         toAlive();      }      // will be triggered by ConnectionStatusListener.toAlive() ..      if (this.subscriptionListener != null) return;      SubscribeQosServer subscribeQosServer = sub.getSubscribeQosServer();      if (subscribeQosServer == null) {         return;      }      if (log.isLoggable(Level.FINE)) log.fine(ME+": addSubscriber("+sub.getId()+")");      if (subscribeQosServer.getWantInitialUpdate() == true || calleeIsXPathMatchCheck) { // wantInitial==false is only checked if this is a subcribe() thread of a client         MsgUnitWrapper[] wrappers = null;         if (hasHistoryEntries())            wrappers = getMsgUnitWrapperArr(subscribeQosServer.getData().getHistoryQos().getNumEntries(),                                            subscribeQosServer.getData().getHistoryQos().getNewestFirst());         if (wrappers != null && wrappers.length > 0) {            int count = 0, currentCount = 0;            for (int i=0; i < wrappers.length; i++) {               if (this.distributor == null || wrappers[i].isInternal()) {                  currentCount = invokeCallback(null, sub, wrappers[i], true);               }               if (currentCount == -1) break;               count += currentCount;            }            count++;            if (count < 1) {               Set removeSet = new HashSet();               removeSet.add(sub);               handleCallbackFailed(removeSet);            }         }      }      return;   }   /**    * If a callback fails, we remove it from the subscription.    * <p />    * Generating dead letter and auto-logout to release all resources is done by DispatchWorker.    */   public void handleCallbackFailed(Set removeSet) throws XmlBlasterException {      // DON'T do a synchronized(this)! (the possibly triggered notifySubscribersAboutErase() could dead lock)      if (removeSet != null) {         Iterator iterator = removeSet.iterator();         while (iterator.hasNext()) {            SubscriptionInfo sub = (SubscriptionInfo)iterator.next();            if (log.isLoggable(Level.FINE)) log.fine(ME+": Removed subscriber '" + sub.getSessionInfo().getId() + "' as callback failed.");            sub.removeSubscribe();         }         removeSet.clear();         removeSet = null;      }   }   /**    * A client wants to unSubscribe from this topic.    * @return the removed SubscriptionInfo object or null if not found    */   SubscriptionInfo removeSubscriber(String subscriptionInfoUniqueKey) {      // DON'T call from inside a synchronized(this)! (the notifySubscribersAboutErase() could dead lock)      if (log.isLoggable(Level.FINE)) log.fine(ME+": Before size of subscriberMap = " + getNumSubscribers());

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -