⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 requestbroker.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
    * Make the topicHandler persistent for crash recovery and shutdown/startup cycle.    * @return Number of new entries added: 0 if entry existed, 1 if new entry added    */   public final int addPersistentTopicHandler(TopicEntry topicEntry) throws XmlBlasterException {      if (this.topicStore != null) {         if (log.isLoggable(Level.FINE)) log.fine("Persisting topicEntry");         return this.topicStore.put(topicEntry);      }      return 0;   }   /**    * Remove the persistent TopicHandler entry.    * @return the number of elements erased.    */   public final int removePersistentTopicHandler(TopicEntry topicEntry) throws XmlBlasterException {      if (this.topicStore != null) {         if (log.isLoggable(Level.FINE)) log.fine("Removing persisting topicEntry");         return this.topicStore.remove(topicEntry);      }      return 0;   }   /**    * Remove the persistent TopicHandler entry.    * @return the number of elements erased.    */   public final int changePersistentTopicHandler(TopicEntry topicEntry) throws XmlBlasterException {      if (this.topicStore != null) {         if (log.isLoggable(Level.FINE)) log.fine("Changing persisting topicEntry");         this.topicStore.change(topicEntry, null);      }      return 0;   }   /**    * Low level subscribe, is called when the <key oid='...' queryType='EXACT'> to subscribe is exactly known.    * <p>    * If the message is yet unknown, an empty is created to hold the subscription.    * @param subs    * @param calleeIsXPathMatchCheck true The calling thread is internally to check if a Query matches a new published topic    *        false The callee is a subscribe() thread from a client    */   private void subscribeToOid(SubscriptionInfo subs, boolean calleeIsXPathMatchCheck) throws XmlBlasterException {      if (log.isLoggable(Level.FINER)) log.finer("Entering subscribeToOid(subId="+subs.getSubscriptionId()+", oid="+subs.getKeyData().getOid()+", queryType="+subs.getKeyData().getQueryType()+") ...");      String uniqueKey = subs.getKeyData().getOid();      SessionInfo publisherSessionInfo = null; // subs.getSessionInfo() is the wrong one      TopicHandler topicHandler = this.glob.getTopicAccessor().findOrCreate(publisherSessionInfo, uniqueKey);      try {         subs.incrSubscribeCounter();         fireSubscribeEvent(subs);  // inform all listeners about this new subscription         // Now the MsgUnit exists and all subcription handling is done, subscribe to it -> fires update to client         topicHandler.addSubscriber(subs, calleeIsXPathMatchCheck);      }      finally {         this.glob.getTopicAccessor().release(topicHandler);      }   }   /**    * This method returns the unprotected Authenticate object.    * @param secretSessionId the secret Session Id of the invoker.    * TODO in future an authorization operation shall be performed here    * @return    */   public Authenticate getAuthenticate(String secretSessionId) {      return this.authenticate;   }   /**    * Incoming unsubscribe request from a client.    * <p />    * If you have subscribed before, you can cancel your    * subscription with this method again    *    * @param sessionInfo    * @param xmlKey    Key with the oid to unSubscribe<br>    *                  See XmlKey.dtd for a description<br>    *                  If you subscribed with XPath, you need to pass the id you got from your subscription    * @param qos       Quality of Service, flags to control unsubscription<br>    *                  See XmlQoS.dtd for a description    *         Example (note that the qos are not yet fully implemented):<p />    * <pre>    *    &lt;qos>    *       &lt;notify>false</notify>     &lt;!-- The subscribers shall not be notified when this message is destroyed -->    *    &lt;/qos>    * </pre>    * @return An array of canceled subscriptions e.g.    * <pre>    *   &lt;qos>    *      &lt;subscribe id='__subId:2'/>    *      &lt;isUnSubscribe/>    *   &lt;/qos>    * </pre>    */   public String[] unSubscribe(SessionInfo sessionInfo, QueryKeyData xmlKey, UnSubscribeQosServer unSubscribeQos) throws XmlBlasterException   {      try {         if (log.isLoggable(Level.FINER)) log.finer("Entering unSubscribe(oid='" + xmlKey.getOid() + "', queryType='" + xmlKey.getQueryType() + "', query='" + xmlKey.getQueryString() + "', domain='" + xmlKey.getDomain() + "') ...");         if (this.glob.isClusterManagerReady()) { // cluster support - forward message to master            try {               UnSubscribeReturnQos[] ret = glob.getClusterManager().forwardUnSubscribe(sessionInfo, xmlKey, unSubscribeQos);               if (ret != null) {                  log.info("unSubscribe of '" + xmlKey.getNiceString() + "' matched " + ret.length + " entries in remote cluster");                  // Currently we only return the local matched subscriptions,                  // we need to discuss how they can differ from the remote cluster                  // unSubscribes ...               }            }            catch (XmlBlasterException e) {               if (e.getErrorCode() == ErrorCode.RESOURCE_CONFIGURATION_PLUGINFAILED) {                  log.warning("unSubscribe of '" + xmlKey.getNiceString() + "' entries in remote cluster: " + e.getMessage());                  this.glob.setUseCluster(false);               }               else {                  log.warning("unSubscribe of '" + xmlKey.getNiceString() + "' in remote cluster: " + e.getMessage());                  e.printStackTrace();                  throw e;               }            }         }         else {            if (this.glob.useCluster())               log.warning("unSubscribe not forwarded to cluster as ClusterManager is not ready");         }         Set subscriptionIdSet = new HashSet();         String id = xmlKey.getOid();         if (SubscriptionInfo.isSubscribeId(id)) {            SubscriptionInfo subs = clientSubscriptions.getSubscription(sessionInfo, xmlKey.getOid());            if (subs != null) {               SubscriptionInfo[] childs = subs.getChildrenSubscriptions();               if (childs != null) {                  if (log.isLoggable(Level.FINE)) log.fine("unSubscribe() Traversing " + childs.length + " childs");                  for (int ii=0; ii<childs.length; ii++) {                     SubscriptionInfo so = childs[ii];                     fireUnSubscribeEvent(so);                     subscriptionIdSet.add(so.getSubscriptionId());                     so = null;                  }               }               fireUnSubscribeEvent(subs);               subscriptionIdSet.add(subs.getSubscriptionId());               subs = null;            }            else {               log.warning("UnSubscribe of " + xmlKey.getOid() + " by session " + sessionInfo.getId() + " failed");               if (log.isLoggable(Level.FINEST)) log.finest(toXml());            }         }         else { // Try to unSubscribe with topic oid instead of subscribe id:            String suppliedXmlKey = xmlKey.getOid(); // remember supplied oid, another oid may be generated later            String[] oids = queryMatchingTopics(sessionInfo, xmlKey, unSubscribeQos.getData());            //Set oidSet = new HashSet(topicHandlerArr.length);  // for return values (TODO: change to TreeSet to maintain order)            for (int ii=0; ii<oids.length; ii++) {               TopicHandler topicHandler = this.glob.getTopicAccessor().access(oids[ii]);               if (topicHandler == null) { // unSubscribe on a unknown message ...                  log.warning("UnSubscribe on unknown topic "+oids[ii]+" from passed [" + xmlKey.getOid() + "] is ignored");                  continue;               }               SubscriptionInfo[] subs;               try {                  subs = topicHandler.findSubscriber(sessionInfo);               }               finally { // extend lock to cover fireUnSubscribeEvent?                  this.glob.getTopicAccessor().release(topicHandler);               }               for (int jj=0; jj<subs.length; jj++) {                  SubscriptionInfo sub = subs[jj];                  if (sub != null) {                     fireUnSubscribeEvent(sub);                     subscriptionIdSet.add(sub.getSubscriptionId());                  }                  else                     log.warning("UnSubscribe of " + oids[ii] + " by session " + sessionInfo.getId() + " failed");               }            }            if (oids.length < 1) {               log.warning("Can't access subscription, unSubscribe failed, your supplied key oid '" + suppliedXmlKey + "' is invalid");               throw new XmlBlasterException(glob, ErrorCode.USER_OID_UNKNOWN, ME, "Can't access subscription, unSubscribe failed, your supplied key oid '" + suppliedXmlKey + "' is invalid");            }         }         // Build the return values ...         String[] oidArr = new String[subscriptionIdSet.size()];         StatusQosData qos = new StatusQosData(glob, MethodName.UNSUBSCRIBE);         qos.setState(Constants.STATE_OK);         Iterator it = subscriptionIdSet.iterator();         int ii = 0;         while (it.hasNext()) {            qos.setSubscriptionId((String)it.next());            oidArr[ii++] = qos.toXml();         }         return oidArr;      }      catch (XmlBlasterException e) {         throw e;      }      catch (Throwable e) {         e.printStackTrace();         throw XmlBlasterException.convert(glob, ME, ErrorCode.INTERNAL_UNSUBSCRIBE.toString(), e);      }   }   /**    * Used for cluster internal updates.    */   public final String update(SessionInfo sessionInfo, UpdateKey updateKey, byte[] content, MsgQosData msgQosData) throws XmlBlasterException   {      if (msgQosData.isErased()) {         String eraseKey = msgQosData.getClientProperty("__eraseKey", updateKey.toXml());         QueryKeyData key = glob.getQueryKeyFactory().readObject(eraseKey);         String eraseQos = msgQosData.getClientProperty("__eraseQos", "<qos/>");         EraseQosServer qos = new EraseQosServer(glob, eraseQos);         String[] ret = erase(sessionInfo, key, qos, true);         if (ret != null && ret.length > 0)            return ret[0];         else            return "<qos/>";      }      else {         PublishQosServer qos = new PublishQosServer(glob, msgQosData);         // Since xmlBlaster 1.6: We need to serialize and replace the original Global with ServerScope         MsgUnit msgUnit = new MsgUnit(glob, updateKey.getData().toXml(), content, qos.getData().toXml());         //MsgUnit msgUnit = new MsgUnit(updateKey.getData(), content, qos.getData());         return publish(sessionInfo, msgUnit, true);      }   }   /**    * Internal publishing helper.    */   public final String publish(SessionInfo sessionInfo, MsgUnit msgUnit) throws XmlBlasterException {      return publish(sessionInfo, msgUnit, false);   }   private final String publish(SessionInfo sessionInfo, MsgUnit msgUnit, boolean isClusterUpdate) throws XmlBlasterException {      if (!msgUnit.getGlobal().isServerSide()) {         // Since xmlBlaster 1.6.1: We need to serialize and replace the original Global with ServerScope         if (log.isLoggable(Level.FINE)) log.fine("publish call with client side Global, converting now to ServerScope: " + Global.getStackTraceAsString(null));         msgUnit = new MsgUnit(glob, msgUnit.getMsgUnitRaw(), msgUnit.getMethodName());      }      PublishQosServer publishQosServer = new PublishQosServer(glob, msgUnit.getQosData());      publishQosServer.setClusterUpdate(isClusterUpdate);      return publish(sessionInfo, msgUnit, publishQosServer);   }   /**    * Write-Access method to publish a new message from a data source.    * <p />    * There are two MoM styles supported:    * <p />    * <ul>    * <li>PubSub style:<br />    * If MsgUnit is created from subscribe or the MsgUnit is new, we need to add the    * DOM here once; XmlKey takes care of that</li>    * <li>PTP style:<br />    * Send message directly to all destinations, ignore if same message is known from Pub/Sub style</li>    * </ul>    * <p />    * This triggers the method update() if observed by somebody    * <p />    * If the given key oid doesn't exist, it will be automatically added, <br>    * so this covers the SQL'ish INSERT and UPDATE.    * <p />    * If MsgUnit is created from subscribe or MsgUnit is new, the key meta    * data are

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -