📄 cbdispatchconnection.java
字号:
for (int i=0; i<msgArr_.length; i++) { MsgQueueUpdateEntry entry = (MsgQueueUpdateEntry)msgArr_[i]; MsgUnitWrapper msgUnitWrapper = entry.getMsgUnitWrapper(); if (msgUnitWrapper == null) { if (log.isLoggable(Level.FINE)) log.fine(ME+": doSend("+entry.getLogId()+") ignoring callback message as no meat is available (assume expired)"); entry.setReturnObj(new UpdateReturnQosServer(this.glob, Constants.RET_EXPIRED)); //"<qos><state id='EXPIRED'/></qos>"; continue; } if (msgUnitWrapper.getMsgQosData().isPtp() && session!=null && !session.getConnectQos().isPtpAllowed()) { if (log.isLoggable(Level.FINE)) log.fine(ME+": doSend("+entry.getLogId()+") ignoring callback message as PtP is not wanted"); entry.setReturnObj(new UpdateReturnQosServer(this.glob, Constants.RET_ERASED)); continue; } MsgUnit mu = msgUnitWrapper.getMsgUnit(); //MsgUnit mu = entry.getMsgUnit(); throws unwanted exception if meat==null (forceDestroy) MsgQosData msgQosData = (MsgQosData)mu.getQosData().clone(); msgQosData.setTopicProperty(null); msgQosData.setState(entry.getState()); msgQosData.setSubscriptionId(entry.getSubscriptionId()); msgQosData.setQueueIndex(i); msgQosData.setQueueSize(connectionsHandler.getDispatchManager().getQueue().getNumOfEntries()); if (msgQosData.getNumRouteNodes() == 1) { msgQosData.clearRoutes(); } // Convert oid to original again for erased events fired by TopicHandler.java notifySubscribersAboutErase() if (mu.getKeyOid().equals(Constants.EVENT_OID_ERASEDTOPIC)) { mu = new MsgUnit(mu, (MsgKeyData)mu.getKeyData().clone(), null, msgQosData); String oid = mu.getQosData().getClientProperty("__oid", (String)null); if (oid != null) { mu.getKeyData().setOid(oid); try { ((org.xmlBlaster.util.qos.MsgQosData)mu.getQosData()).setSubscriptionId(mu.getQosData().getClientProperty("__subscriptionId", (String)null)); } catch (Throwable e) { log.severe(ME+": Failed to set subscriptionId: " + e.toString()); } String domain = mu.getQosData().getClientProperty("__domain", (String)null); if (domain != null) { mu.getKeyData().setDomain(domain); mu.getQosData().getClientProperties().remove("__domain"); } mu.getQosData().getClientProperties().remove("__oid"); mu.getQosData().getClientProperties().remove("__subscriptionId"); } } else { mu = new MsgUnit(mu, null, null, msgQosData); } MsgUnitRaw raw = new MsgUnitRaw(mu, mu.getKeyData().toXml(), mu.getContent(), mu.getQosData().toXml()); if (address.oneway() || entry.updateOneway()) { if (oneways == null) oneways = new ArrayList(); oneways.add(new Holder(entry, raw, entry.getSubscriptionId())); } else { if (responders == null) responders = new ArrayList(); responders.add(new Holder(entry, raw, entry.getSubscriptionId())); } } } exportCrypt(responders, MethodName.UPDATE); exportCrypt(oneways, MethodName.UPDATE_ONEWAY); if (oneways != null) { MsgUnitRaw[] raws = new MsgUnitRaw[oneways.size()]; for (int i=0; i<oneways.size(); i++) { raws[i] = ((Holder)oneways.get(i)).msgUnitRaw; } cbDriver.sendUpdateOneway(raws); connectionsHandler.getDispatchStatistic().incrNumUpdate(oneways.size()); if (log.isLoggable(Level.FINE)) log.fine(ME+": Success, sent " + oneways.size() + " oneway messages."); I_Checkpoint cp = glob.getCheckpointPlugin(); if (cp != null) { for (int i=0; i<oneways.size(); i++) { Holder h = (Holder)oneways.get(i); cp.passingBy(I_Checkpoint.CP_UPDATE_ACK, (MsgUnit)h.msgUnitRaw.getMsgUnit(), sessionName, null); } } } if (responders != null) { if (log.isLoggable(Level.FINE)) log.fine(ME+": Before update " + responders.size() + " acknowledged messages ..."); MsgUnitRaw[] raws = new MsgUnitRaw[responders.size()]; for (int i=0; i<responders.size(); i++) { raws[i] = ((Holder)responders.get(i)).msgUnitRaw; } String[] rawReturnVal = cbDriver.sendUpdate(raws); connectionsHandler.getDispatchStatistic().incrNumUpdate(raws.length); if (log.isLoggable(Level.FINE)) log.fine(ME+": Success, sent " + raws.length + " acknowledged messages, return value #1 is '" + rawReturnVal[0] + "'"); I_Checkpoint cp = glob.getCheckpointPlugin(); if (cp != null) { for (int i=0; i<raws.length; i++) { cp.passingBy(I_Checkpoint.CP_UPDATE_ACK, (MsgUnit)raws[i].getMsgUnit(), sessionName, null); } } // this is done since the client could send one single bulk acknowledge if (rawReturnVal != null && rawReturnVal.length == 1 && raws.length > 1) { String bulkReturnValue = rawReturnVal[0]; log.fine("Reconstructing return values of a bulk acknowledge '" + bulkReturnValue + "'"); rawReturnVal = new String[raws.length]; for (int i=0; i < rawReturnVal.length; i++) rawReturnVal[i] = bulkReturnValue; } if (rawReturnVal != null && rawReturnVal.length == raws.length) { I_MsgSecurityInterceptor securityInterceptor = connectionsHandler.getDispatchManager().getMsgSecurityInterceptor(); for (int i=0; i<rawReturnVal.length; i++) { MsgQueueUpdateEntry entry = ((Holder)responders.get(i)).msgQueueUpdateEntry; if (!entry.wantReturnObj()) continue; if (securityInterceptor != null) { // decrypt ... CryptDataHolder dataHolder = new CryptDataHolder(MethodName.UPDATE, new MsgUnitRaw(null, (byte[])null, rawReturnVal[i])); dataHolder.setReturnValue(true); rawReturnVal[i] = securityInterceptor.importMessage(dataHolder).getQos(); } // create object try { entry.setReturnObj(new UpdateReturnQosServer(glob, rawReturnVal[i])); } catch (Throwable e) { log.warning(ME+": Can't parse returned value '" + rawReturnVal[i] + "', setting to default: " + e.toString()); //e.printStackTrace(); UpdateReturnQosServer updateRetQos = new UpdateReturnQosServer(glob, "<qos/>"); updateRetQos.setException(e); entry.setReturnObj(updateRetQos); } } if (log.isLoggable(Level.FINE)) log.fine(ME+": Imported/decrypted " + rawReturnVal.length + " message return values."); } else log.severe(ME+": Unexpected UpdateReturnQos '" + (rawReturnVal==null?"null":""+rawReturnVal.length)+ "', expected " + raws.length); } } /** * @see org.xmlBlaster.util.dispatch.DispatchConnection#doPing(String) */ public final String doPing(String data) throws XmlBlasterException { String ret = this.cbDriver.ping(data); return (ret==null) ? "" : ret; } /** * Nothing to do here */ public final void resetConnection() { } /** * On reconnect polling try to establish the connection. */ protected final void reconnect() throws XmlBlasterException { // this.connectionsHandler.createDispatchConnection(address); this.cbDriver.init(glob, (CallbackAddress)address); } /** * Stop all callback drivers of this client. */ public final void shutdown() throws XmlBlasterException { super.shutdown(); glob.removeNativeCallbackDriver(cbKey); if (this.cbDriver != null) { this.cbDriver.shutdown(); } } /** * Dump state of this object into a XML ASCII string. * <br> * @param extraOffset indenting of tags for nice output * @return internal state as an XML ASCII string */ public final String toXml(String extraOffset) { StringBuffer sb = new StringBuffer(256); String offset = "\n "; if (extraOffset == null) extraOffset = ""; offset += extraOffset; sb.append(offset + "<CbDispatchConnection>"); address.toXml(" " + offset); if (this.cbDriver == null) sb.append(offset).append(" <noCallbackDriver />"); else sb.append(offset).append(" <callback type='" + getDriverName() + "' state='" + getState() + "'/>"); sb.append(offset).append("</CbDispatchConnection>"); return sb.toString(); } protected boolean forcePingFailure() { return false; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -