📄 clientsubscriptions.java
字号:
while (iterator.hasNext()) { Map subMap = (Map)iterator.next(); synchronized(subMap) { Iterator iterator2 = subMap.values().iterator(); while (iterator2.hasNext()) { SubscriptionInfo sub = (SubscriptionInfo)iterator2.next(); if (exactOnly && sub.getKeyData().isQuery()) { if (log.isLoggable(Level.FINE)) log.fine("Ignoring subscription " + sub.getSubscriptionId() + " for message '" + keyOid + "'"); continue; } if (keyOid.equals(sub.getKeyOid())) { if (log.isLoggable(Level.FINE)) log.fine("Found subscription " + sub.getSubscriptionId() + " for message '" + keyOid + "'"); if (vec == null) vec = new Vector(); vec.addElement(sub); } } } } } return vec; } /** * Invoked on successful client login (interface I_ClientListener) */ public void sessionAdded(ClientEvent e) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("Login event for client " + e.getSessionInfo().toString() + ", nothing to do"); } public void sessionPreRemoved(ClientEvent e) throws XmlBlasterException { } /** * Invoked when client does a logout (interface I_ClientListener) */ public void sessionRemoved(ClientEvent e) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("START-logout()"); //if (log.isLoggable(Level.FINEST)) log.dump(ME, requestBroker.toXml()); SessionInfo sessionInfo = e.getSessionInfo(); if (log.isLoggable(Level.FINE)) log.fine("Logout event for client " + sessionInfo.toString() + ", removing entries"); try { removeFromClientSubscriptionMap(sessionInfo, null); } catch (XmlBlasterException e1) { } try { removeFromQuerySubscribeRequestsSet(sessionInfo, null); } catch (XmlBlasterException e2) { } //if (log.isLoggable(Level.FINEST)) log.dump(ME, "END-logout()\n" + requestBroker.toXml()); } /** * Event invoked on new created SubjectInfo. */ public void subjectAdded(ClientEvent e) throws XmlBlasterException { log.warning("Ignoring SubjectInfo added event for client " + e.getSubjectInfo().toString()); } /** * Event invoked on deleted SubjectInfo. */ public void subjectRemoved(ClientEvent e) throws XmlBlasterException { log.warning("Ignoring SubjectInfo removed event for client " + e.getSubjectInfo().toString()); } /** * Event invoked on message erase() invocation. */ public void topicRemove(TopicHandler topicHandler) throws XmlBlasterException { String uniqueKey = topicHandler.getUniqueKey(); if (topicHandler.hasExactSubscribers()) { if (log.isLoggable(Level.FINE)) log.fine("Erase event for oid=" + uniqueKey + "', we do nothing here as subscription reservation remains even on deleted messages"); return; } Vector vec = getSubscriptionByOid(uniqueKey, false); if (vec == null) { if (log.isLoggable(Level.FINE)) log.fine("Erase event for oid=" + uniqueKey + "', we do nothing as no subscribes are found"); return; } for (int ii=0; ii<vec.size(); ii++) { SubscriptionInfo sub = (SubscriptionInfo)vec.elementAt(ii); if (log.isLoggable(Level.FINE)) log.fine("Erase event for oid=" + uniqueKey + "', removing subId=" + sub.getSubscriptionId() + " ..."); subscriptionRemove(new SubscriptionEvent(sub)); } } /** * Event invoked on new subscription (interface SubscriptionListener). */ public void subscriptionAdd(SubscriptionEvent e) throws XmlBlasterException { SubscriptionInfo subscriptionInfo = e.getSubscriptionInfo(); if (subscriptionInfo.getSubscribeCounter() > 1) { if (log.isLoggable(Level.FINE)) log.fine("Ignoring multisubscribe instance " + subscriptionInfo.getSubscribeCounter()); return; } SessionInfo sessionInfo = subscriptionInfo.getSessionInfo(); if (log.isLoggable(Level.FINE)) log.fine("Subscription add event " + e); KeyData keyData = subscriptionInfo.getKeyData(); String uniqueKey = sessionInfo.getSessionName().getRelativeName(); // Insert into first map: Object obj; Map subMap; synchronized(clientSubscriptionMap) { obj = clientSubscriptionMap.get(uniqueKey); if (obj == null) { subMap = Collections.synchronizedMap(new HashMap()); clientSubscriptionMap.put(uniqueKey, subMap); } else { subMap = (Map)obj; } subMap.put(subscriptionInfo.getSubscriptionId(), subscriptionInfo); if (log.isLoggable(Level.FINE)) log.fine("Adding subscriptionId=" + subscriptionInfo.getSubscriptionId() + " to subMap of client " + sessionInfo.getId()); } // Insert into second map: if (keyData.isQuery()) { obj=null; synchronized(querySubscribeRequestsSet) { querySubscribeRequestsSet.add(subscriptionInfo); } } } /** * Invoked when a subscription is canceled (interface SubscriptionListener). * <p /> * Note that the subscriptionInfo object carried in SubscriptionEvent * is not the real known subscription, but rather misused as a container to * carry the sessionInfo and subscriptionInfoUniqueKey */ public void subscriptionRemove(SubscriptionEvent e) throws XmlBlasterException { SubscriptionInfo subscriptionInfo = e.getSubscriptionInfo(); if (subscriptionInfo.getSubscribeCounter() > 1) { if (log.isLoggable(Level.FINE)) log.fine("Ignoring multisubscribe instance " + subscriptionInfo.getSubscribeCounter()); return; } String subscriptionInfoUniqueKey = subscriptionInfo.getSubscriptionId(); SessionInfo sessionInfo = subscriptionInfo.getSessionInfo(); if (log.isLoggable(Level.FINE)) log.fine("Subscription remove event " + e.toString()); try { removeFromClientSubscriptionMap(sessionInfo, subscriptionInfoUniqueKey); } catch (XmlBlasterException e1) { log.severe("removeFromClientSubscriptionMap: " + e1.toString()); } try { removeFromQuerySubscribeRequestsSet(sessionInfo, subscriptionInfoUniqueKey); } catch (XmlBlasterException e2) { log.severe("removeFromQuerySubscribeRequestsSet: " + e2.toString()); } subscriptionInfo.shutdown(); } /** * @param subscriptionInfoUniqueKey ==null: Remove client with all its subscriptions<br> * !=null: Remove only the given subscription */ private void removeFromClientSubscriptionMap(SessionInfo sessionInfo, String subscriptionInfoUniqueKey) throws XmlBlasterException { String uniqueKey = sessionInfo.getSessionName().getRelativeName(); Object obj; synchronized(clientSubscriptionMap) { if (subscriptionInfoUniqueKey == null) { obj = clientSubscriptionMap.remove(uniqueKey); // client logout if (log.isLoggable(Level.FINE)) log.fine("Removing client " + sessionInfo.toString() + " from clientSubscriptionMap ..."); } else { obj = clientSubscriptionMap.get(uniqueKey); // client unsubscribes if (log.isLoggable(Level.FINE)) log.fine("Removing subscription " + subscriptionInfoUniqueKey + " from client " + sessionInfo.toString() + " from clientSubscriptionMap, subscriptionInfoUniqueKey=" + subscriptionInfoUniqueKey + " ..."); } } if (obj == null) { if (log.isLoggable(Level.FINE)) log.fine("Sorry, can't remove client subscription for " + sessionInfo.toString() + ", client never subscribed something"); return; } // Now we have a map of all subscriptions of this client Map subMap = (Map)obj; if (log.isLoggable(Level.FINE)) log.fine("Subscription=" + subscriptionInfoUniqueKey + " client=" + sessionInfo.toString() + " subMap.size=" + subMap.size()); if (subscriptionInfoUniqueKey == null) { // client does logout(), remove everything: synchronized (subMap) { Iterator iterator = subMap.values().iterator(); while (iterator.hasNext()) { SubscriptionInfo sub = (SubscriptionInfo)iterator.next(); if (sub.getKeyData().isQuery()) { if (log.isLoggable(Level.FINE)) log.fine("Ignoring subscription " + sub.getSubscriptionId() + " from TopicHandler '" + sub.getKeyOid() + "'"); continue; } if (log.isLoggable(Level.FINE)) log.fine("Removing subscription " + sub.getSubscriptionId() + " from TopicHandler '" + sub.getKeyOid() + "'"); sub.removeSubscribe(); // removes me from TopicHandler::subscriberMap } } subMap.clear(); subMap = null; } else { // client does a single unSubscribe(): SubscriptionInfo sub = null; synchronized (subMap) { sub = (SubscriptionInfo)subMap.remove(subscriptionInfoUniqueKey); } if (sub == null) { log.severe("Sorry, can't remove client subscriptionId=" + subscriptionInfoUniqueKey + " for " + sessionInfo.toString() + ", not found, subMap size=" + subMap.size()); log.warning(toXml()); Thread.dumpStack(); return; } sub.removeSubscribe(); // removes me from TopicHandler::subscriberMap } } /** * @param subscriptionInfoUniqueKey ==null: Remove client with all its subscriptions<br> * !=null: Remove only the given subscription */ private void removeFromQuerySubscribeRequestsSet(SessionInfo sessionInfo, String subscriptionInfoUniqueKey) throws XmlBlasterException { if (log.isLoggable(Level.FINE)) log.fine("removing client " + sessionInfo.toString() + " subscriptionInfoUniqueKey=" + subscriptionInfoUniqueKey + " from querySubscribeRequestsSet with size=" + querySubscribeRequestsSet.size() + " ..."); String uniqueKey = sessionInfo.getSessionName().getRelativeName(); Vector vec = new Vector(querySubscribeRequestsSet.size()); // Slow linear search!!!! synchronized(querySubscribeRequestsSet) { Iterator iterator = querySubscribeRequestsSet.iterator(); while (iterator.hasNext()) { SubscriptionInfo sub = (SubscriptionInfo)iterator.next(); if (sub.getSessionInfo().getSessionName().getRelativeName().equals(uniqueKey) && subscriptionInfoUniqueKey == null || subscriptionInfoUniqueKey == sub.getSubscriptionId()) { vec.addElement(sub); sub.shutdown(); } } for (int ii=0; ii<vec.size(); ii++) { if (log.isLoggable(Level.FINE)) log.fine("Removing subscription " + ((SubscriptionInfo)vec.elementAt(ii)).getSubscriptionId() + " from querySubscribeRequestsSet"); querySubscribeRequestsSet.remove(vec.elementAt(ii)); } } vec = null; } /** * Dump state of this object into XML. * <br> * @return XML state of ClientSubscriptions */ public final String toXml() throws XmlBlasterException { return toXml((String)null); } /** * Dump state of this object into XML. * <br> * @param extraOffset indenting of tags * @return XML state of ClientSubscriptions */ public final String toXml(String extraOffset) throws XmlBlasterException { StringBuffer sb = new StringBuffer(10000); if (extraOffset == null) extraOffset = ""; String offset = Constants.OFFSET + extraOffset; sb.append(offset).append("<ClientSubscriptions>"); sb.append(offset).append(" <ExactSubscriptions>"); synchronized(this.clientSubscriptionMap) { Iterator iterator = clientSubscriptionMap.values().iterator(); while (iterator.hasNext()) { Map subMap = (Map)iterator.next(); synchronized(subMap) { Iterator iterator2 = subMap.values().iterator(); while (iterator2.hasNext()) { SubscriptionInfo sub = (SubscriptionInfo)iterator2.next(); if (sub.getKeyData().isExact()) { sb.append(sub.toXmlSmall(extraOffset + Constants.INDENT + Constants.INDENT)); } } } } } sb.append(offset).append(" </ExactSubscriptions>"); sb.append(offset).append(" <XPathSubscriptions>"); synchronized(querySubscribeRequestsSet) { Iterator iterator = querySubscribeRequestsSet.iterator(); while (iterator.hasNext()) { SubscriptionInfo sub = (SubscriptionInfo)iterator.next(); sb.append(offset).append(sub.toXml(extraOffset + Constants.INDENT + Constants.INDENT)); } } sb.append(offset + " </XPathSubscriptions>"); sb.append(offset + "</ClientSubscriptions>"); return sb.toString(); } /** * @see org.xmlBlaster.engine.I_SubscriptionListener#getPriority() */ public Integer getPriority() { return PRIO_01; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -