⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cbdispatchconnection.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
         for (int i=0; i<msgArr_.length; i++) {            MsgQueueUpdateEntry entry = (MsgQueueUpdateEntry)msgArr_[i];            MsgUnitWrapper msgUnitWrapper = entry.getMsgUnitWrapper();            if (msgUnitWrapper == null) {               if (log.isLoggable(Level.FINE)) log.fine(ME+": doSend("+entry.getLogId()+") ignoring callback message as no meat is available (assume expired)");               entry.setReturnObj(new UpdateReturnQosServer(this.glob, Constants.RET_EXPIRED)); //"<qos><state id='EXPIRED'/></qos>";               continue;            }            if (msgUnitWrapper.getMsgQosData().isPtp() && session!=null && !session.getConnectQos().isPtpAllowed()) {               if (log.isLoggable(Level.FINE)) log.fine(ME+": doSend("+entry.getLogId()+") ignoring callback message as PtP is not wanted");               entry.setReturnObj(new UpdateReturnQosServer(this.glob, Constants.RET_ERASED));               continue;            }            MsgUnit mu = msgUnitWrapper.getMsgUnit();            //MsgUnit mu = entry.getMsgUnit(); throws unwanted exception if meat==null (forceDestroy)            MsgQosData msgQosData = (MsgQosData)mu.getQosData().clone();            msgQosData.setTopicProperty(null);            msgQosData.setState(entry.getState());            msgQosData.setSubscriptionId(entry.getSubscriptionId());            msgQosData.setQueueIndex(i);            msgQosData.setQueueSize(connectionsHandler.getDispatchManager().getQueue().getNumOfEntries());            if (msgQosData.getNumRouteNodes() == 1) {               msgQosData.clearRoutes();            }            // Convert oid to original again for erased events fired by TopicHandler.java notifySubscribersAboutErase()            if (mu.getKeyOid().equals(Constants.EVENT_OID_ERASEDTOPIC)) {               mu = new MsgUnit(mu, (MsgKeyData)mu.getKeyData().clone(), null, msgQosData);               String oid = mu.getQosData().getClientProperty("__oid", (String)null);               if (oid != null) {                  mu.getKeyData().setOid(oid);                  try {                     ((org.xmlBlaster.util.qos.MsgQosData)mu.getQosData()).setSubscriptionId(mu.getQosData().getClientProperty("__subscriptionId", (String)null));                  }                  catch (Throwable e) {                     log.severe(ME+": Failed to set subscriptionId: " + e.toString());                  }                  String domain = mu.getQosData().getClientProperty("__domain", (String)null);                  if (domain != null) {                     mu.getKeyData().setDomain(domain);                     mu.getQosData().getClientProperties().remove("__domain");                  }                  mu.getQosData().getClientProperties().remove("__oid");                  mu.getQosData().getClientProperties().remove("__subscriptionId");               }            }            else {               mu = new MsgUnit(mu, null, null, msgQosData);            }            MsgUnitRaw raw = new MsgUnitRaw(mu, mu.getKeyData().toXml(), mu.getContent(), mu.getQosData().toXml());            if (address.oneway() || entry.updateOneway()) {               if (oneways == null) oneways = new ArrayList();               oneways.add(new Holder(entry, raw, entry.getSubscriptionId()));            }            else {               if (responders == null) responders = new ArrayList();               responders.add(new Holder(entry, raw, entry.getSubscriptionId()));            }         }      }      exportCrypt(responders, MethodName.UPDATE);      exportCrypt(oneways, MethodName.UPDATE_ONEWAY);      if (oneways != null) {         MsgUnitRaw[] raws = new MsgUnitRaw[oneways.size()];         for (int i=0; i<oneways.size(); i++) {            raws[i] = ((Holder)oneways.get(i)).msgUnitRaw;         }         cbDriver.sendUpdateOneway(raws);         connectionsHandler.getDispatchStatistic().incrNumUpdate(oneways.size());         if (log.isLoggable(Level.FINE)) log.fine(ME+": Success, sent " + oneways.size() + " oneway messages.");         I_Checkpoint cp = glob.getCheckpointPlugin();         if (cp != null) {            for (int i=0; i<oneways.size(); i++) {               Holder h = (Holder)oneways.get(i);               cp.passingBy(I_Checkpoint.CP_UPDATE_ACK, (MsgUnit)h.msgUnitRaw.getMsgUnit(),                     sessionName, null);            }         }      }      if (responders != null) {         if (log.isLoggable(Level.FINE)) log.fine(ME+": Before update " + responders.size() + " acknowledged messages ...");         MsgUnitRaw[] raws = new MsgUnitRaw[responders.size()];         for (int i=0; i<responders.size(); i++) {            raws[i] = ((Holder)responders.get(i)).msgUnitRaw;         }         String[] rawReturnVal = cbDriver.sendUpdate(raws);         connectionsHandler.getDispatchStatistic().incrNumUpdate(raws.length);         if (log.isLoggable(Level.FINE)) log.fine(ME+": Success, sent " + raws.length + " acknowledged messages, return value #1 is '" + rawReturnVal[0] + "'");         I_Checkpoint cp = glob.getCheckpointPlugin();         if (cp != null) {            for (int i=0; i<raws.length; i++) {               cp.passingBy(I_Checkpoint.CP_UPDATE_ACK, (MsgUnit)raws[i].getMsgUnit(),                     sessionName, null);            }         }         // this is done since the client could send one single bulk acknowledge         if (rawReturnVal != null && rawReturnVal.length == 1 && raws.length > 1) {            String bulkReturnValue = rawReturnVal[0];            log.fine("Reconstructing return values of a bulk acknowledge '" + bulkReturnValue + "'");            rawReturnVal = new String[raws.length];            for (int i=0; i < rawReturnVal.length; i++)               rawReturnVal[i] = bulkReturnValue;         }         if (rawReturnVal != null && rawReturnVal.length == raws.length) {            I_MsgSecurityInterceptor securityInterceptor = connectionsHandler.getDispatchManager().getMsgSecurityInterceptor();            for (int i=0; i<rawReturnVal.length; i++) {               MsgQueueUpdateEntry entry = ((Holder)responders.get(i)).msgQueueUpdateEntry;               if (!entry.wantReturnObj())                  continue;               if (securityInterceptor != null) {                  // decrypt ...                  CryptDataHolder dataHolder = new CryptDataHolder(MethodName.UPDATE,                        new MsgUnitRaw(null, (byte[])null, rawReturnVal[i]));                  dataHolder.setReturnValue(true);                  rawReturnVal[i] = securityInterceptor.importMessage(dataHolder).getQos();               }               // create object               try {                  entry.setReturnObj(new UpdateReturnQosServer(glob, rawReturnVal[i]));               }               catch (Throwable e) {                  log.warning(ME+": Can't parse returned value '" + rawReturnVal[i] + "', setting to default: " + e.toString());                  //e.printStackTrace();                  UpdateReturnQosServer updateRetQos = new UpdateReturnQosServer(glob, "<qos/>");                  updateRetQos.setException(e);                  entry.setReturnObj(updateRetQos);               }            }            if (log.isLoggable(Level.FINE)) log.fine(ME+": Imported/decrypted " + rawReturnVal.length + " message return values.");         }         else            log.severe(ME+": Unexpected UpdateReturnQos '" + (rawReturnVal==null?"null":""+rawReturnVal.length)+ "', expected " + raws.length);      }   }   /**    * @see org.xmlBlaster.util.dispatch.DispatchConnection#doPing(String)    */   public final String doPing(String data) throws XmlBlasterException {      String ret = this.cbDriver.ping(data);      return (ret==null) ? "" : ret;   }   /**    * Nothing to do here    */   public final void resetConnection() {   }   /**    * On reconnect polling try to establish the connection.    */   protected final void reconnect() throws XmlBlasterException {      // this.connectionsHandler.createDispatchConnection(address);      this.cbDriver.init(glob, (CallbackAddress)address);   }   /**    * Stop all callback drivers of this client.    */   public final void shutdown() throws XmlBlasterException {      super.shutdown();      glob.removeNativeCallbackDriver(cbKey);      if (this.cbDriver != null) {         this.cbDriver.shutdown();      }   }   /**    * Dump state of this object into a XML ASCII string.    * <br>    * @param extraOffset indenting of tags for nice output    * @return internal state as an XML ASCII string    */   public final String toXml(String extraOffset) {      StringBuffer sb = new StringBuffer(256);      String offset = "\n   ";      if (extraOffset == null) extraOffset = "";      offset += extraOffset;      sb.append(offset + "<CbDispatchConnection>");      address.toXml("   " + offset);      if (this.cbDriver == null)         sb.append(offset).append("   <noCallbackDriver />");      else         sb.append(offset).append("   <callback type='" + getDriverName() + "' state='" + getState() + "'/>");      sb.append(offset).append("</CbDispatchConnection>");      return sb.toString();   }   protected boolean forcePingFailure() {      return false;   }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -