📄 requestbroker.java
字号:
} } catch (XmlBlasterException e) { if (e.getErrorCode() == ErrorCode.RESOURCE_CONFIGURATION_PLUGINFAILED) { this.glob.setUseCluster(false); } else { e.printStackTrace(); throw e; } } if (log.isLoggable(Level.FINE)) log.fine("get(): Found " + msgUnitList.size() + " remote matches for " + xmlKey.toXml()); } NEXT_MSG: for (int ii=0; ii<keyDataArr.length; ii++) { KeyData xmlKeyExact = keyDataArr[ii]; if (xmlKeyExact == null && xmlKey.isExact()) // subscription on a yet unknown message ... xmlKeyExact = xmlKey; TopicHandler topicHandler = this.glob.getTopicAccessor().access(xmlKeyExact.getOid()); if( topicHandler == null ) { /* if (this.glob.useCluster()) { // cluster support - forward erase to master try { MsgUnit tmp[] = glob.getClusterManager().forwardGet(sessionInfo, xmlKey, getQos); if (tmp != null && tmp.length > 0) { log.info(ME, "get() access of " + tmp.length + " messages from cluster master"); for (int jj=0; jj<tmp.length; jj++) { msgUnitList.add(tmp[jj]); // We currently don' cache the message here in the slave !!! // We could do it with the xmlBlasterConnection.initCache(int size) } continue NEXT_MSG; } } catch (XmlBlasterException e) { if (e.getErrorCode() == ErrorCode.RESOURCE_CONFIGURATION_PLUGINFAILED) { this.glob.setUseCluster(false); } else { e.printStackTrace(); throw e; } } } */ if (log.isLoggable(Level.FINE)) log.fine("get(): The key '"+xmlKeyExact.getOid()+"' is not available."); continue NEXT_MSG; } // topicHandler==null try { if (topicHandler.isAlive()) { int numEntries = getQos.getHistoryQos().getNumEntries(); MsgUnitWrapper[] msgUnitWrapperArr = topicHandler.getMsgUnitWrapperArr(numEntries, getQos.getHistoryQos().getNewestFirst()); NEXT_HISTORY: for(int kk=0; kk<msgUnitWrapperArr.length; kk++) { MsgUnitWrapper msgUnitWrapper = msgUnitWrapperArr[kk]; if (msgUnitWrapper == null) { continue NEXT_HISTORY; } if (this.glob.useCluster() && !msgUnitWrapper.getMsgQosData().isAtMaster()) { if (log.isLoggable(Level.FINE)) log.fine("get(): Ignore message as we are not the master: " + msgUnitWrapper.toXml()); continue NEXT_HISTORY; } //topicHandler.checkFilter(SessionInfo publisherSessionInfo, SubscriptionInfo sub, MsgUnitWrapper msgUnitWrapper, boolean handleException) AccessFilterQos[] filterQos = getQos.getAccessFilterArr(); if (filterQos != null) { if (log.isLoggable(Level.FINE)) log.fine("Checking " + filterQos.length + " filters"); for (int jj=0; jj<filterQos.length; jj++) { I_AccessFilter filter = getAccessPluginManager().getAccessFilter( filterQos[jj].getType(), filterQos[jj].getVersion(), msgUnitWrapper.getContentMime(), msgUnitWrapper.getContentMimeExtended()); if (log.isLoggable(Level.FINE)) log.fine("get("+xmlKeyExact.getOid()+") filter=" + filter + " qos=" + getQos.toXml()); if (filter != null && filter.match(sessionInfo, msgUnitWrapper.getMsgUnit(), filterQos[jj].getQuery()) == false) continue NEXT_HISTORY; // filtered message is not send to client } } if (msgUnitWrapper.isExpired()) { continue NEXT_HISTORY; } MsgUnit mm = msgUnitWrapper.getMsgUnit(); if (mm == null) { continue NEXT_HISTORY; // WeakReference to cache lost and lookup failed } GetReturnQosServer retQos = new GetReturnQosServer(glob, msgUnitWrapper.getMsgQosData(), Constants.STATE_OK); byte[] cont = (getQos.getWantContent()) ? mm.getContent() : new byte[0]; mm = new MsgUnit(mm, null, cont, retQos.getData()); msgUnitList.add(mm); } // for each history entry } // topicHandler.isAlive() } finally { this.glob.getTopicAccessor().release(topicHandler); } } MsgUnit[] msgUnitArr = (MsgUnit[])msgUnitList.toArray(new MsgUnit[msgUnitList.size()]); this.dispatchStatistic.incrNumGet(msgUnitArr.length); if (log.isLoggable(Level.FINE)) log.fine("Returning for get() " + msgUnitArr.length + " messages"); return msgUnitArr; } catch (XmlBlasterException e) { throw e; } catch (Throwable e) { e.printStackTrace(); throw XmlBlasterException.convert(glob, ME, ErrorCode.INTERNAL_GET.toString(), e); } } public void updateInternalUserList() throws XmlBlasterException { // "__sys__UserList"; if (this.publishUserList && this.state == ALIVE) { // Create QoS with new timestamp PublishQosServer publishQosUserListEvent = new PublishQosServer(glob, this.publishQosForEvents.getData().toXml(), false); //publishQosUserListEvent.clearRoutes(); MsgUnit msgUnit = new MsgUnit(this.xmlKeyUserListEvent, this.authenticate.getSubjectList().getBytes(), //content.getBytes(), publishQosUserListEvent.getData()); publish(this.unsecureSessionInfo, msgUnit); publishQosUserListEvent.getData().setTopicProperty(null); // only the first publish needs to configure the topic if (log.isLoggable(Level.FINE)) log.fine("Refreshed internal state for '" + this.xmlKeyUserListEvent.getOid() + "'"); } } /** * This method does the query (queryType = "XPATH" | "EXACT"). * * @param clientName is only needed for nicer logging output * @return Array of matching XmlKey objects (may contain null elements), the array is never null * * TODO: a query Handler, allowing drivers for REGEX, XPath, SQL, etc. queries * @return The array is never null, but it may contain a null element at index 0 if the oid is yet unknown */ private KeyData[] queryMatchingKeys(SessionInfo sessionInfo, QueryKeyData queryKeyData, QueryQosData qos) throws XmlBlasterException { String clientName = sessionInfo.toString(); if (queryKeyData.isQuery()) { // query: subscription without a given oid ArrayList oidList = bigXmlKeyDOM.parseKeyOid(sessionInfo, queryKeyData.getQueryString(), qos); ArrayList strippedList = new ArrayList(); for(int i=0; i<oidList.size(); i++) { TopicHandler topicHandler = this.glob.getTopicAccessor().access((String)oidList.get(i)); if (topicHandler != null) { try { KeyData keyData = topicHandler.getMsgKeyData(); if (keyData != null) { strippedList.add(keyData); } } finally { this.glob.getTopicAccessor().release(topicHandler); } } } return (KeyData[])strippedList.toArray(new KeyData[strippedList.size()]); } else if (queryKeyData.isExact()) { // subscription with a given oid if (log.isLoggable(Level.FINE)) log.fine("Access Client " + clientName + " with EXACT oid='" + queryKeyData.getOid() + "'"); TopicHandler topicHandler = this.glob.getTopicAccessor().access(queryKeyData.getOid()); if (topicHandler == null) { return new KeyData[] { null }; // add arr[0]=null as a place holder } try { if (topicHandler.getMsgKeyData() == null) { return new KeyData[] { null }; // add arr[0]=null as a place holder } // return new KeyData[] { topicHandler.getMsgKeyData() }; return new KeyData[] { queryKeyData }; } finally { this.glob.getTopicAccessor().release(topicHandler); } } else if (queryKeyData.isDomain()) { // a domain attribute is given String domain = queryKeyData.getDomain(); if (log.isLoggable(Level.FINE)) log.fine("Access Client " + clientName + " with DOMAIN domain='" + domain + "'"); if (domain == null) { log.warning("The DOMAIN query has a domain=null, no topics found"); return new KeyData[0]; } String[] oids = this.glob.getTopicAccessor().getTopics(); ArrayList strippedList = new ArrayList(); for(int i=0; i<oids.length; i++) { TopicHandler topicHandler = this.glob.getTopicAccessor().access(oids[i]); if (topicHandler != null) { try { if (topicHandler.getMsgKeyData() != null && domain.equals(topicHandler.getMsgKeyData().getDomain())) strippedList.add(topicHandler.getMsgKeyData()); } finally { this.glob.getTopicAccessor().release(topicHandler); } } } if (log.isLoggable(Level.FINE)) log.fine("Found " + strippedList.size() + " domain matches for '" + domain + "'"); return (KeyData[])strippedList.toArray(new KeyData[strippedList.size()]); } else { log.warning("Sorry, can't access, query syntax is unknown: " + queryKeyData.getQueryType()); throw new XmlBlasterException(glob, ErrorCode.USER_QUERY_TYPE_INVALID, ME, "Sorry, can't access, query syntax is unknown: " + queryKeyData.getQueryType()); } } /** * This method does the query (queryType = "XPATH" | "EXACT"). * * @param clientName is only needed for nicer logging output * @return Array of matching XmlKey objects (may contain null elements), the array is never null * * TODO: a query Handler, allowing drivers for REGEX, XPath, SQL, etc. queries * @return The array is never null, but it may contain a null element at index 0 if the oid is yet unknown */ private String[] queryMatchingTopics(SessionInfo sessionInfo, QueryKeyData queryKeyData, QueryQosData qos) throws XmlBlasterException { String clientName = sessionInfo.toString(); if (queryKeyData.isQuery()) { // query: subscription without a given oid ArrayList oidList = bigXmlKeyDOM.parseKeyOid(sessionInfo, queryKeyData.getQueryString(), qos); return (String[])oidList.toArray(new String[oidList.size()]); } else if (queryKeyData.isExact()) { // subscription with a given oid if (log.isLoggable(Level.FINE)) log.fine("Access Client " + clientName + " with EXACT oid='" + queryKeyData.getOid() + "'"); return new String[] { queryKeyData.getOid() }; } else if (queryKeyData.isDomain()) { // a domain attribute is given String domain = queryKeyData.getDomain(); if (log.isLoggable(Level.FINE)) log.fine("Access Client " + clientName + " with DOMAIN domain='" + domain + "'"); if (domain == null) { log.warning("The DOMAIN query has a domain=null, no topics found"); return new String[0]; } String[] oids = this.glob.getTopicAccessor().getTopics(); ArrayList strippedList = new ArrayList(); for(int i=0; i<oids.length; i++) { TopicHandler topicHandler = this.glob.getTopicAccessor().access(oids[i]); if (topicHandler != null) { try { if (domain.equals(topicHandler.getMsgKeyData().getDomain())) strippedList.add(topicHandler); } finally { this.glob.getTopicAccessor().release(topicHandler); } } } if (log.isLoggable(Level.FINE)) log.fine("Found " + strippedList.size() + " domain matches for '" + domain + "'"); return (String[])strippedList.toArray(new String[strippedList.size()]); } else { log.warning("Sorry, can't access, query syntax is unknown: " + queryKeyData.getQueryType()); throw new XmlBlasterException(glob, ErrorCode.USER_QUERY_TYPE_INVALID, ME, "Sorry, can't access, query syntax is unknown: " + queryKeyData.getQueryType()); } } /**
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -