⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 clientsubscriptions.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
         while (iterator.hasNext()) {            Map subMap = (Map)iterator.next();            synchronized(subMap) {               Iterator iterator2 = subMap.values().iterator();               while (iterator2.hasNext()) {                  SubscriptionInfo sub = (SubscriptionInfo)iterator2.next();                  if (exactOnly && sub.getKeyData().isQuery()) {                     if (log.isLoggable(Level.FINE)) log.fine("Ignoring subscription " + sub.getSubscriptionId() + " for message '" + keyOid + "'");                     continue;                  }                  if (keyOid.equals(sub.getKeyOid())) {                     if (log.isLoggable(Level.FINE)) log.fine("Found subscription " + sub.getSubscriptionId() + " for message '" + keyOid + "'");                     if (vec == null) vec = new Vector();                     vec.addElement(sub);                  }               }            }         }      }      return vec;   }   /**    * Invoked on successful client login (interface I_ClientListener)    */   public void sessionAdded(ClientEvent e) throws XmlBlasterException   {      if (log.isLoggable(Level.FINER)) log.finer("Login event for client " + e.getSessionInfo().toString() + ", nothing to do");   }   public void sessionPreRemoved(ClientEvent e) throws XmlBlasterException {   }   /**    * Invoked when client does a logout (interface I_ClientListener)    */   public void sessionRemoved(ClientEvent e) throws XmlBlasterException   {      if (log.isLoggable(Level.FINER)) log.finer("START-logout()");      //if (log.isLoggable(Level.FINEST)) log.dump(ME, requestBroker.toXml());      SessionInfo sessionInfo = e.getSessionInfo();      if (log.isLoggable(Level.FINE)) log.fine("Logout event for client " + sessionInfo.toString() + ", removing entries");      try {         removeFromClientSubscriptionMap(sessionInfo, null);      } catch (XmlBlasterException e1) {      }      try {         removeFromQuerySubscribeRequestsSet(sessionInfo, null);      } catch (XmlBlasterException e2) {      }      //if (log.isLoggable(Level.FINEST)) log.dump(ME, "END-logout()\n" + requestBroker.toXml());   }   /**    * Event invoked on new created SubjectInfo.     */   public void subjectAdded(ClientEvent e) throws XmlBlasterException   {      log.warning("Ignoring SubjectInfo added event for client " + e.getSubjectInfo().toString());   }   /**    * Event invoked on deleted SubjectInfo.     */   public void subjectRemoved(ClientEvent e) throws XmlBlasterException   {      log.warning("Ignoring SubjectInfo removed event for client " + e.getSubjectInfo().toString());   }   /**    * Event invoked on message erase() invocation.     */   public void topicRemove(TopicHandler topicHandler) throws XmlBlasterException   {      String uniqueKey = topicHandler.getUniqueKey();      if (topicHandler.hasExactSubscribers()) {         if (log.isLoggable(Level.FINE)) log.fine("Erase event for oid=" + uniqueKey + "', we do nothing here as subscription reservation remains even on deleted messages");         return;      }      Vector vec = getSubscriptionByOid(uniqueKey, false);      if (vec == null) {         if (log.isLoggable(Level.FINE)) log.fine("Erase event for oid=" + uniqueKey + "', we do nothing as no subscribes are found");         return;      }      for (int ii=0; ii<vec.size(); ii++) {         SubscriptionInfo sub = (SubscriptionInfo)vec.elementAt(ii);         if (log.isLoggable(Level.FINE)) log.fine("Erase event for oid=" + uniqueKey + "', removing subId=" + sub.getSubscriptionId() + " ...");         subscriptionRemove(new SubscriptionEvent(sub));      }   }   /**    * Event invoked on new subscription (interface SubscriptionListener).    */   public void subscriptionAdd(SubscriptionEvent e) throws XmlBlasterException {            SubscriptionInfo subscriptionInfo = e.getSubscriptionInfo();      if (subscriptionInfo.getSubscribeCounter() > 1) {         if (log.isLoggable(Level.FINE)) log.fine("Ignoring multisubscribe instance " + subscriptionInfo.getSubscribeCounter());         return;      }      SessionInfo sessionInfo = subscriptionInfo.getSessionInfo();      if (log.isLoggable(Level.FINE)) log.fine("Subscription add event " + e);      KeyData keyData = subscriptionInfo.getKeyData();      String uniqueKey = sessionInfo.getSessionName().getRelativeName();      // Insert into first map:      Object obj;      Map subMap;      synchronized(clientSubscriptionMap) {         obj = clientSubscriptionMap.get(uniqueKey);         if (obj == null) {            subMap = Collections.synchronizedMap(new HashMap());            clientSubscriptionMap.put(uniqueKey, subMap);         }         else {            subMap = (Map)obj;         }         subMap.put(subscriptionInfo.getSubscriptionId(), subscriptionInfo);         if (log.isLoggable(Level.FINE)) log.fine("Adding subscriptionId=" + subscriptionInfo.getSubscriptionId() + " to subMap of client " + sessionInfo.getId());      }      // Insert into second map:      if (keyData.isQuery()) {         obj=null;         synchronized(querySubscribeRequestsSet) {            querySubscribeRequestsSet.add(subscriptionInfo);         }      }   }   /**    * Invoked when a subscription is canceled (interface SubscriptionListener).    * <p />    * Note that the subscriptionInfo object carried in SubscriptionEvent    * is not the real known subscription, but rather misused as a container to    * carry the sessionInfo and subscriptionInfoUniqueKey    */   public void subscriptionRemove(SubscriptionEvent e) throws XmlBlasterException {      SubscriptionInfo subscriptionInfo = e.getSubscriptionInfo();      if (subscriptionInfo.getSubscribeCounter() > 1) {         if (log.isLoggable(Level.FINE)) log.fine("Ignoring multisubscribe instance " + subscriptionInfo.getSubscribeCounter());         return;      }      String subscriptionInfoUniqueKey = subscriptionInfo.getSubscriptionId();      SessionInfo sessionInfo = subscriptionInfo.getSessionInfo();      if (log.isLoggable(Level.FINE)) log.fine("Subscription remove event " + e.toString());      try {         removeFromClientSubscriptionMap(sessionInfo, subscriptionInfoUniqueKey);      } catch (XmlBlasterException e1) {         log.severe("removeFromClientSubscriptionMap: " + e1.toString());      }      try {         removeFromQuerySubscribeRequestsSet(sessionInfo, subscriptionInfoUniqueKey);      } catch (XmlBlasterException e2) {         log.severe("removeFromQuerySubscribeRequestsSet: " + e2.toString());      }      subscriptionInfo.shutdown();   }   /**    * @param subscriptionInfoUniqueKey ==null: Remove client with all its subscriptions<br>    *                                  !=null: Remove only the given subscription    */   private void removeFromClientSubscriptionMap(SessionInfo sessionInfo, String subscriptionInfoUniqueKey) throws XmlBlasterException   {      String uniqueKey = sessionInfo.getSessionName().getRelativeName();      Object obj;      synchronized(clientSubscriptionMap) {         if (subscriptionInfoUniqueKey == null) {            obj = clientSubscriptionMap.remove(uniqueKey); // client logout            if (log.isLoggable(Level.FINE)) log.fine("Removing client " + sessionInfo.toString() + " from clientSubscriptionMap ...");         }         else {            obj = clientSubscriptionMap.get(uniqueKey);    // client unsubscribes            if (log.isLoggable(Level.FINE)) log.fine("Removing subscription " + subscriptionInfoUniqueKey + " from client " + sessionInfo.toString() + " from clientSubscriptionMap, subscriptionInfoUniqueKey=" + subscriptionInfoUniqueKey + " ...");         }      }      if (obj == null) {         if (log.isLoggable(Level.FINE)) log.fine("Sorry, can't remove client subscription for " + sessionInfo.toString() + ", client never subscribed something");         return;      }      // Now we have a map of all subscriptions of this client      Map subMap = (Map)obj;      if (log.isLoggable(Level.FINE)) log.fine("Subscription=" + subscriptionInfoUniqueKey + " client=" + sessionInfo.toString() + " subMap.size=" + subMap.size());      if (subscriptionInfoUniqueKey == null) {  // client does logout(), remove everything:         synchronized (subMap) {            Iterator iterator = subMap.values().iterator();            while (iterator.hasNext()) {               SubscriptionInfo sub = (SubscriptionInfo)iterator.next();               if (sub.getKeyData().isQuery()) {                  if (log.isLoggable(Level.FINE)) log.fine("Ignoring subscription " + sub.getSubscriptionId() + " from TopicHandler '" + sub.getKeyOid() + "'");                  continue;               }               if (log.isLoggable(Level.FINE)) log.fine("Removing subscription " + sub.getSubscriptionId() + " from TopicHandler '" + sub.getKeyOid() + "'");               sub.removeSubscribe(); // removes me from TopicHandler::subscriberMap            }         }         subMap.clear();         subMap = null;      }      else {                                    // client does a single unSubscribe():         SubscriptionInfo sub = null;         synchronized (subMap) {            sub = (SubscriptionInfo)subMap.remove(subscriptionInfoUniqueKey);         }         if (sub == null) {            log.severe("Sorry, can't remove client subscriptionId=" + subscriptionInfoUniqueKey + " for " + sessionInfo.toString() + ", not found, subMap size=" + subMap.size());            log.warning(toXml());            Thread.dumpStack();            return;         }         sub.removeSubscribe(); // removes me from TopicHandler::subscriberMap      }   }   /**    * @param subscriptionInfoUniqueKey ==null: Remove client with all its subscriptions<br>    *                                  !=null: Remove only the given subscription    */   private void removeFromQuerySubscribeRequestsSet(SessionInfo sessionInfo, String subscriptionInfoUniqueKey) throws XmlBlasterException   {      if (log.isLoggable(Level.FINE)) log.fine("removing client " + sessionInfo.toString() + " subscriptionInfoUniqueKey=" + subscriptionInfoUniqueKey + " from querySubscribeRequestsSet with size=" + querySubscribeRequestsSet.size() + " ...");      String uniqueKey = sessionInfo.getSessionName().getRelativeName();      Vector vec = new Vector(querySubscribeRequestsSet.size());      // Slow linear search!!!!      synchronized(querySubscribeRequestsSet) {         Iterator iterator = querySubscribeRequestsSet.iterator();         while (iterator.hasNext()) {            SubscriptionInfo sub = (SubscriptionInfo)iterator.next();            if (sub.getSessionInfo().getSessionName().getRelativeName().equals(uniqueKey) && subscriptionInfoUniqueKey == null ||                subscriptionInfoUniqueKey == sub.getSubscriptionId()) {               vec.addElement(sub);               sub.shutdown();            }         }         for (int ii=0; ii<vec.size(); ii++) {            if (log.isLoggable(Level.FINE)) log.fine("Removing subscription " + ((SubscriptionInfo)vec.elementAt(ii)).getSubscriptionId() + " from querySubscribeRequestsSet");            querySubscribeRequestsSet.remove(vec.elementAt(ii));         }      }      vec = null;   }   /**    * Dump state of this object into XML.    * <br>    * @return XML state of ClientSubscriptions    */   public final String toXml() throws XmlBlasterException   {      return toXml((String)null);   }   /**    * Dump state of this object into XML.    * <br>    * @param extraOffset indenting of tags    * @return XML state of ClientSubscriptions    */   public final String toXml(String extraOffset) throws XmlBlasterException   {      StringBuffer sb = new StringBuffer(10000);      if (extraOffset == null) extraOffset = "";      String offset = Constants.OFFSET + extraOffset;      sb.append(offset).append("<ClientSubscriptions>");      sb.append(offset).append(" <ExactSubscriptions>");      synchronized(this.clientSubscriptionMap) {         Iterator iterator = clientSubscriptionMap.values().iterator();         while (iterator.hasNext()) {            Map subMap = (Map)iterator.next();            synchronized(subMap) {               Iterator iterator2 = subMap.values().iterator();               while (iterator2.hasNext()) {                  SubscriptionInfo sub = (SubscriptionInfo)iterator2.next();                  if (sub.getKeyData().isExact()) {                     sb.append(sub.toXmlSmall(extraOffset + Constants.INDENT + Constants.INDENT));                  }               }            }         }      }      sb.append(offset).append(" </ExactSubscriptions>");      sb.append(offset).append(" <XPathSubscriptions>");      synchronized(querySubscribeRequestsSet) {         Iterator iterator = querySubscribeRequestsSet.iterator();         while (iterator.hasNext()) {            SubscriptionInfo sub = (SubscriptionInfo)iterator.next();            sb.append(offset).append(sub.toXml(extraOffset + Constants.INDENT + Constants.INDENT));         }      }      sb.append(offset + " </XPathSubscriptions>");      sb.append(offset + "</ClientSubscriptions>");      return sb.toString();   }   /**    * @see org.xmlBlaster.engine.I_SubscriptionListener#getPriority()    */   public Integer getPriority() {      return PRIO_01;   }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -