📄 requestbroker.java
字号:
* Make the topicHandler persistent for crash recovery and shutdown/startup cycle. * @return Number of new entries added: 0 if entry existed, 1 if new entry added */ public final int addPersistentTopicHandler(TopicEntry topicEntry) throws XmlBlasterException { if (this.topicStore != null) { if (log.isLoggable(Level.FINE)) log.fine("Persisting topicEntry"); return this.topicStore.put(topicEntry); } return 0; } /** * Remove the persistent TopicHandler entry. * @return the number of elements erased. */ public final int removePersistentTopicHandler(TopicEntry topicEntry) throws XmlBlasterException { if (this.topicStore != null) { if (log.isLoggable(Level.FINE)) log.fine("Removing persisting topicEntry"); return this.topicStore.remove(topicEntry); } return 0; } /** * Remove the persistent TopicHandler entry. * @return the number of elements erased. */ public final int changePersistentTopicHandler(TopicEntry topicEntry) throws XmlBlasterException { if (this.topicStore != null) { if (log.isLoggable(Level.FINE)) log.fine("Changing persisting topicEntry"); this.topicStore.change(topicEntry, null); } return 0; } /** * Low level subscribe, is called when the <key oid='...' queryType='EXACT'> to subscribe is exactly known. * <p> * If the message is yet unknown, an empty is created to hold the subscription. * @param subs * @param calleeIsXPathMatchCheck true The calling thread is internally to check if a Query matches a new published topic * false The callee is a subscribe() thread from a client */ private void subscribeToOid(SubscriptionInfo subs, boolean calleeIsXPathMatchCheck) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("Entering subscribeToOid(subId="+subs.getSubscriptionId()+", oid="+subs.getKeyData().getOid()+", queryType="+subs.getKeyData().getQueryType()+") ..."); String uniqueKey = subs.getKeyData().getOid(); SessionInfo publisherSessionInfo = null; // subs.getSessionInfo() is the wrong one TopicHandler topicHandler = this.glob.getTopicAccessor().findOrCreate(publisherSessionInfo, uniqueKey); try { subs.incrSubscribeCounter(); fireSubscribeEvent(subs); // inform all listeners about this new subscription // Now the MsgUnit exists and all subcription handling is done, subscribe to it -> fires update to client topicHandler.addSubscriber(subs, calleeIsXPathMatchCheck); } finally { this.glob.getTopicAccessor().release(topicHandler); } } /** * This method returns the unprotected Authenticate object. * @param secretSessionId the secret Session Id of the invoker. * TODO in future an authorization operation shall be performed here * @return */ public Authenticate getAuthenticate(String secretSessionId) { return this.authenticate; } /** * Incoming unsubscribe request from a client. * <p /> * If you have subscribed before, you can cancel your * subscription with this method again * * @param sessionInfo * @param xmlKey Key with the oid to unSubscribe<br> * See XmlKey.dtd for a description<br> * If you subscribed with XPath, you need to pass the id you got from your subscription * @param qos Quality of Service, flags to control unsubscription<br> * See XmlQoS.dtd for a description * Example (note that the qos are not yet fully implemented):<p /> * <pre> * <qos> * <notify>false</notify> <!-- The subscribers shall not be notified when this message is destroyed --> * </qos> * </pre> * @return An array of canceled subscriptions e.g. * <pre> * <qos> * <subscribe id='__subId:2'/> * <isUnSubscribe/> * </qos> * </pre> */ public String[] unSubscribe(SessionInfo sessionInfo, QueryKeyData xmlKey, UnSubscribeQosServer unSubscribeQos) throws XmlBlasterException { try { if (log.isLoggable(Level.FINER)) log.finer("Entering unSubscribe(oid='" + xmlKey.getOid() + "', queryType='" + xmlKey.getQueryType() + "', query='" + xmlKey.getQueryString() + "', domain='" + xmlKey.getDomain() + "') ..."); if (this.glob.isClusterManagerReady()) { // cluster support - forward message to master try { UnSubscribeReturnQos[] ret = glob.getClusterManager().forwardUnSubscribe(sessionInfo, xmlKey, unSubscribeQos); if (ret != null) { log.info("unSubscribe of '" + xmlKey.getNiceString() + "' matched " + ret.length + " entries in remote cluster"); // Currently we only return the local matched subscriptions, // we need to discuss how they can differ from the remote cluster // unSubscribes ... } } catch (XmlBlasterException e) { if (e.getErrorCode() == ErrorCode.RESOURCE_CONFIGURATION_PLUGINFAILED) { log.warning("unSubscribe of '" + xmlKey.getNiceString() + "' entries in remote cluster: " + e.getMessage()); this.glob.setUseCluster(false); } else { log.warning("unSubscribe of '" + xmlKey.getNiceString() + "' in remote cluster: " + e.getMessage()); e.printStackTrace(); throw e; } } } else { if (this.glob.useCluster()) log.warning("unSubscribe not forwarded to cluster as ClusterManager is not ready"); } Set subscriptionIdSet = new HashSet(); String id = xmlKey.getOid(); if (SubscriptionInfo.isSubscribeId(id)) { SubscriptionInfo subs = clientSubscriptions.getSubscription(sessionInfo, xmlKey.getOid()); if (subs != null) { SubscriptionInfo[] childs = subs.getChildrenSubscriptions(); if (childs != null) { if (log.isLoggable(Level.FINE)) log.fine("unSubscribe() Traversing " + childs.length + " childs"); for (int ii=0; ii<childs.length; ii++) { SubscriptionInfo so = childs[ii]; fireUnSubscribeEvent(so); subscriptionIdSet.add(so.getSubscriptionId()); so = null; } } fireUnSubscribeEvent(subs); subscriptionIdSet.add(subs.getSubscriptionId()); subs = null; } else { log.warning("UnSubscribe of " + xmlKey.getOid() + " by session " + sessionInfo.getId() + " failed"); if (log.isLoggable(Level.FINEST)) log.finest(toXml()); } } else { // Try to unSubscribe with topic oid instead of subscribe id: String suppliedXmlKey = xmlKey.getOid(); // remember supplied oid, another oid may be generated later String[] oids = queryMatchingTopics(sessionInfo, xmlKey, unSubscribeQos.getData()); //Set oidSet = new HashSet(topicHandlerArr.length); // for return values (TODO: change to TreeSet to maintain order) for (int ii=0; ii<oids.length; ii++) { TopicHandler topicHandler = this.glob.getTopicAccessor().access(oids[ii]); if (topicHandler == null) { // unSubscribe on a unknown message ... log.warning("UnSubscribe on unknown topic "+oids[ii]+" from passed [" + xmlKey.getOid() + "] is ignored"); continue; } SubscriptionInfo[] subs; try { subs = topicHandler.findSubscriber(sessionInfo); } finally { // extend lock to cover fireUnSubscribeEvent? this.glob.getTopicAccessor().release(topicHandler); } for (int jj=0; jj<subs.length; jj++) { SubscriptionInfo sub = subs[jj]; if (sub != null) { fireUnSubscribeEvent(sub); subscriptionIdSet.add(sub.getSubscriptionId()); } else log.warning("UnSubscribe of " + oids[ii] + " by session " + sessionInfo.getId() + " failed"); } } if (oids.length < 1) { log.warning("Can't access subscription, unSubscribe failed, your supplied key oid '" + suppliedXmlKey + "' is invalid"); throw new XmlBlasterException(glob, ErrorCode.USER_OID_UNKNOWN, ME, "Can't access subscription, unSubscribe failed, your supplied key oid '" + suppliedXmlKey + "' is invalid"); } } // Build the return values ... String[] oidArr = new String[subscriptionIdSet.size()]; StatusQosData qos = new StatusQosData(glob, MethodName.UNSUBSCRIBE); qos.setState(Constants.STATE_OK); Iterator it = subscriptionIdSet.iterator(); int ii = 0; while (it.hasNext()) { qos.setSubscriptionId((String)it.next()); oidArr[ii++] = qos.toXml(); } return oidArr; } catch (XmlBlasterException e) { throw e; } catch (Throwable e) { e.printStackTrace(); throw XmlBlasterException.convert(glob, ME, ErrorCode.INTERNAL_UNSUBSCRIBE.toString(), e); } } /** * Used for cluster internal updates. */ public final String update(SessionInfo sessionInfo, UpdateKey updateKey, byte[] content, MsgQosData msgQosData) throws XmlBlasterException { if (msgQosData.isErased()) { String eraseKey = msgQosData.getClientProperty("__eraseKey", updateKey.toXml()); QueryKeyData key = glob.getQueryKeyFactory().readObject(eraseKey); String eraseQos = msgQosData.getClientProperty("__eraseQos", "<qos/>"); EraseQosServer qos = new EraseQosServer(glob, eraseQos); String[] ret = erase(sessionInfo, key, qos, true); if (ret != null && ret.length > 0) return ret[0]; else return "<qos/>"; } else { PublishQosServer qos = new PublishQosServer(glob, msgQosData); // Since xmlBlaster 1.6: We need to serialize and replace the original Global with ServerScope MsgUnit msgUnit = new MsgUnit(glob, updateKey.getData().toXml(), content, qos.getData().toXml()); //MsgUnit msgUnit = new MsgUnit(updateKey.getData(), content, qos.getData()); return publish(sessionInfo, msgUnit, true); } } /** * Internal publishing helper. */ public final String publish(SessionInfo sessionInfo, MsgUnit msgUnit) throws XmlBlasterException { return publish(sessionInfo, msgUnit, false); } private final String publish(SessionInfo sessionInfo, MsgUnit msgUnit, boolean isClusterUpdate) throws XmlBlasterException { if (!msgUnit.getGlobal().isServerSide()) { // Since xmlBlaster 1.6.1: We need to serialize and replace the original Global with ServerScope if (log.isLoggable(Level.FINE)) log.fine("publish call with client side Global, converting now to ServerScope: " + Global.getStackTraceAsString(null)); msgUnit = new MsgUnit(glob, msgUnit.getMsgUnitRaw(), msgUnit.getMethodName()); } PublishQosServer publishQosServer = new PublishQosServer(glob, msgUnit.getQosData()); publishQosServer.setClusterUpdate(isClusterUpdate); return publish(sessionInfo, msgUnit, publishQosServer); } /** * Write-Access method to publish a new message from a data source. * <p /> * There are two MoM styles supported: * <p /> * <ul> * <li>PubSub style:<br /> * If MsgUnit is created from subscribe or the MsgUnit is new, we need to add the * DOM here once; XmlKey takes care of that</li> * <li>PTP style:<br /> * Send message directly to all destinations, ignore if same message is known from Pub/Sub style</li> * </ul> * <p /> * This triggers the method update() if observed by somebody * <p /> * If the given key oid doesn't exist, it will be automatically added, <br> * so this covers the SQL'ish INSERT and UPDATE. * <p /> * If MsgUnit is created from subscribe or MsgUnit is new, the key meta * data are
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -