📄 clientdispatchconnection.java
字号:
/*------------------------------------------------------------------------------Name: ClientDispatchConnection.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.client.dispatch;import java.util.logging.Level;import java.util.logging.Logger;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.def.MethodName;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.queuemsg.MsgQueueEntry;import org.xmlBlaster.client.queuemsg.MsgQueueConnectEntry;import org.xmlBlaster.client.queuemsg.MsgQueueDisconnectEntry;import org.xmlBlaster.client.queuemsg.MsgQueuePublishEntry;import org.xmlBlaster.client.queuemsg.MsgQueueSubscribeEntry;import org.xmlBlaster.client.queuemsg.MsgQueueUnSubscribeEntry;import org.xmlBlaster.client.queuemsg.MsgQueueEraseEntry;import org.xmlBlaster.client.queuemsg.MsgQueueGetEntry;import org.xmlBlaster.util.dispatch.DispatchConnection;import org.xmlBlaster.util.dispatch.I_PostSendListener;import org.xmlBlaster.util.qos.ConnectQosData;import org.xmlBlaster.client.qos.ConnectReturnQos;import org.xmlBlaster.client.qos.PublishReturnQos;import org.xmlBlaster.client.qos.SubscribeReturnQos;import org.xmlBlaster.client.qos.UnSubscribeReturnQos;import org.xmlBlaster.client.qos.EraseReturnQos;import org.xmlBlaster.client.protocol.I_XmlBlasterConnection;import org.xmlBlaster.client.protocol.ProtocolPluginManager;import org.xmlBlaster.util.qos.address.Address;import org.xmlBlaster.util.qos.address.AddressBase;import org.xmlBlaster.util.xbformat.I_ProgressListener;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.MsgUnitRaw;import org.xmlBlaster.authentication.plugins.CryptDataHolder;import org.xmlBlaster.authentication.plugins.I_MsgSecurityInterceptor;/** * Holding all necessary infos to establish callback * connections and invoke their update(). * @see DispatchConnection * @author xmlBlaster@marcelruff.info */public final class ClientDispatchConnection extends DispatchConnection{ private static Logger log = Logger.getLogger(ClientDispatchConnection.class.getName()); private final String ME; private I_XmlBlasterConnection driver; private final I_MsgSecurityInterceptor securityInterceptor; private String encryptedConnectQos; private ConnectReturnQos connectReturnQos; private MsgQueueEntry connectEntry; /** * @param connectionsHandler The DevliveryConnectionsHandler witch i belong to * @param aAddress The address i shall connect to */ public ClientDispatchConnection(Global glob, ClientDispatchConnectionsHandler connectionsHandler, AddressBase address) throws XmlBlasterException { super(glob, connectionsHandler, address); this.ME = "ClientDispatchConnection-" + connectionsHandler.getDispatchManager().getQueue().getStorageId(); this.securityInterceptor = connectionsHandler.getDispatchManager().getMsgSecurityInterceptor(); } public final String getDriverName() { return (this.driver != null) ? this.driver.getProtocol() : "unknown"; } /** * @return A nice name for logging */ public final String getName() { return ME; } /** * Load the appropriate protocol driver, e.g the CORBA protocol plugin. * <p> * This method is called by our base class during initialization. * </p> */ public final void loadPlugin() throws XmlBlasterException { ProtocolPluginManager loader = glob.getProtocolPluginManager(); this.driver = loader.getPlugin(super.address.getType(), super.address.getVersion()); // e.g. CorbaConnection(glob); if (this.driver == null) throw new XmlBlasterException(glob, ErrorCode.RESOURCE_CONFIGURATION_PLUGINFAILED, ME, "Sorry, protocol type='" + super.address.getType() + "' is not supported"); } /** * @see DispatchConnection#connectLowlevel() */ public final void connectLowlevel() throws XmlBlasterException { if (this.driver == null) throw new XmlBlasterException(glob, ErrorCode.RESOURCE_CONFIGURATION_PLUGINFAILED, ME, "Sorry, protocol type='" + super.address.getType() + "' is not supported"); this.driver.connectLowlevel((Address)super.address); if (super.address.getPingInterval() > 0) { //spanPingTimer(1, true); // Could deadlock as it uses complete dispatch framework with its synchronized? this.driver.ping("<qos><state info='"+Constants.INFO_INITIAL+"'/></qos>"); // Try a low level ping } if (log.isLoggable(Level.FINE)) log.fine("Connected low level to " + super.address.toString()); } /** * Send the messages to xmlBlaster. * @param msgArr The messages to send. * msgArr[i].getReturnVal() will contain the returned QoS object or null for oneway operations */ public void doSend(MsgQueueEntry[] msgArr_) throws XmlBlasterException { if (msgArr_.length < 1) { return; } boolean onlyPublish = true; boolean onlyPublishOneway = true; for (int ii=0; ii<msgArr_.length; ii++) { if (MethodName.PUBLISH_ONEWAY != msgArr_[ii].getMethodName()) onlyPublishOneway = false; if (MethodName.PUBLISH != msgArr_[ii].getMethodName()) onlyPublish = false; } if (onlyPublishOneway || onlyPublish) { publish(msgArr_); return; } for (int ii=0; ii<msgArr_.length; ii++) { try { if (MethodName.PUBLISH_ONEWAY == msgArr_[ii].getMethodName()) { MsgQueueEntry[] tmp = new MsgQueueEntry[] { msgArr_[ii] }; publish(tmp); } else if (MethodName.PUBLISH == msgArr_[ii].getMethodName()) { MsgQueueEntry[] tmp = new MsgQueueEntry[] { msgArr_[ii] }; publish(tmp); } else if (MethodName.GET == msgArr_[ii].getMethodName()) { get(msgArr_[ii]); } else if (MethodName.SUBSCRIBE == msgArr_[ii].getMethodName()) { subscribe(msgArr_[ii]); } else if (MethodName.UNSUBSCRIBE == msgArr_[ii].getMethodName()) { unSubscribe(msgArr_[ii]); } else if (MethodName.ERASE == msgArr_[ii].getMethodName()) { erase(msgArr_[ii]); } else if (MethodName.CONNECT == msgArr_[ii].getMethodName()) { connect(msgArr_[ii]); this.connectEntry = msgArr_[ii]; // remember it } else if (MethodName.DISCONNECT == msgArr_[ii].getMethodName()) { this.connectEntry = null; disconnect(msgArr_[ii]); } else { throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "Message type '" + msgArr_[ii].getEmbeddedType() + "' is not implemented"); } } catch (XmlBlasterException e) { if (this.connectEntry != null && e.isErrorCode(ErrorCode.USER_SECURITY_AUTHENTICATION_ACCESSDENIED)) { // Happens if the client was killed in the server by an admin task // and has tried to reconnect with the old sessionId log.warning("Server changed sessionId, trying reconnect now: " + e.toString()); //reconnect(); // loops?! connect(this.connectEntry); connectionsHandler.getDispatchManager().postSendNotification(this.connectEntry); if (log.isLoggable(Level.FINE)) log.fine("Server changed sessionId to " + this.connectReturnQos.getServerInstanceId()); ii--; } else { throw e; } } } } private void publish(MsgQueueEntry[] msgArr_) throws XmlBlasterException { // Convert to PublishEntry MsgUnit[] msgArr = new MsgUnit[msgArr_.length]; for (int i=0; i<msgArr.length; i++) { MsgQueuePublishEntry publishEntry = (MsgQueuePublishEntry)msgArr_[i]; msgArr[i] = publishEntry.getMsgUnit(); } MsgUnitRaw[] msgUnitRawArr = new MsgUnitRaw[msgArr.length]; // We export/encrypt the message (call the interceptor) if (securityInterceptor != null) { for (int i=0; i<msgArr.length; i++) { CryptDataHolder dataHolder = new CryptDataHolder(MethodName.PUBLISH, msgArr[i].getMsgUnitRaw()); msgUnitRawArr[i] = securityInterceptor.exportMessage(dataHolder); } if (log.isLoggable(Level.FINE)) log.fine("Exported/encrypted " + msgArr.length + " publish messages."); } else { log.warning("No session security context, sending " + msgArr.length + " publish messages without encryption"); for (int i=0; i<msgArr.length; i++) { msgUnitRawArr[i] = msgArr[i].getMsgUnitRaw(); } } if (MethodName.PUBLISH_ONEWAY == msgArr_[0].getMethodName()) { this.driver.publishOneway(msgUnitRawArr); connectionsHandler.getDispatchStatistic().incrNumPublish(msgUnitRawArr.length); if (log.isLoggable(Level.FINE)) log.fine("Success, sent " + msgArr.length + " oneway publish messages."); return; } if (log.isLoggable(Level.FINE)) log.fine("Before publish " + msgArr.length + " acknowledged messages ..."); String[] rawReturnVal = this.driver.publishArr(msgUnitRawArr); connectionsHandler.getDispatchStatistic().incrNumPublish(rawReturnVal.length); if (log.isLoggable(Level.FINE)) log.fine("Success, sent " + msgArr.length + " acknowledged publish messages, return value #1 is '" + rawReturnVal[0] + "'"); if (rawReturnVal != null) { for (int i=0; i<rawReturnVal.length; i++) { if (!msgArr_[i].wantReturnObj()) continue; if (securityInterceptor != null) { CryptDataHolder dataHolder = new CryptDataHolder(MethodName.PUBLISH, new MsgUnitRaw(null, (byte[])null, rawReturnVal[i])); dataHolder.setReturnValue(true); rawReturnVal[i] = securityInterceptor.importMessage(dataHolder).getQos(); } // create return object try { msgArr_[i].setReturnObj(new PublishReturnQos(glob, rawReturnVal[i])); } catch (Throwable e) { log.warning("Can't parse publish returned value '" + rawReturnVal[i] + "', setting to default: " + e.toString()); //e.printStackTrace(); msgArr_[i].setReturnObj(new PublishReturnQos(glob, "<qos/>")); } } if (log.isLoggable(Level.FINE)) log.fine("Imported/decrypted " + rawReturnVal.length + " publish message return values."); } } /** * Encrypt and send a subscribe request, decrypt the returned data */ private void subscribe(MsgQueueEntry entry) throws XmlBlasterException { MsgQueueSubscribeEntry subscribeEntry = (MsgQueueSubscribeEntry)entry; String key = subscribeEntry.getSubscribeKeyData().toXml(); String qos = subscribeEntry.getSubscribeQosData().toXml(); if (securityInterceptor != null) { // We export/encrypt the message (call the interceptor) CryptDataHolder dataHolder = new CryptDataHolder(MethodName.SUBSCRIBE, new MsgUnitRaw(key, (byte[])null, qos)); MsgUnitRaw msgUnitRaw = securityInterceptor.exportMessage(dataHolder); key = msgUnitRaw.getKey(); qos = msgUnitRaw.getQos(); if (log.isLoggable(Level.FINE)) log.fine("Exported/encrypted subscribe request."); } else { log.warning("No session security context, subscribe request is not encrypted"); } String rawReturnVal = this.driver.subscribe(key, qos); // Invoke remote server connectionsHandler.getDispatchStatistic().incrNumSubscribe(1); if (subscribeEntry.wantReturnObj()) { if (securityInterceptor != null) { // decrypt return value ... CryptDataHolder dataHolder = new CryptDataHolder(MethodName.SUBSCRIBE, new MsgUnitRaw(null, (byte[])null, rawReturnVal)); dataHolder.setReturnValue(true); rawReturnVal = securityInterceptor.importMessage(dataHolder).getQos(); } try { subscribeEntry.setReturnObj(new SubscribeReturnQos(glob, rawReturnVal)); } catch (Throwable e) { log.warning("Can't parse returned subscribe value '" + rawReturnVal + "', setting to default: " + e.toString()); subscribeEntry.setReturnObj(new SubscribeReturnQos(glob, "<qos/>")); } } } /** * Encrypt and send a unSubscribe request, decrypt the returned data */ private void unSubscribe(MsgQueueEntry entry) throws XmlBlasterException { MsgQueueUnSubscribeEntry unSubscribeEntry = (MsgQueueUnSubscribeEntry)entry; String key = unSubscribeEntry.getUnSubscribeKey().toXml(); String qos = unSubscribeEntry.getUnSubscribeQos().toXml(); if (securityInterceptor != null) { // We export/encrypt the message (call the interceptor) CryptDataHolder dataHolder = new CryptDataHolder(MethodName.UNSUBSCRIBE, new MsgUnitRaw(key, (byte[])null, qos)); MsgUnitRaw msgUnitRaw = securityInterceptor.exportMessage(dataHolder); key = msgUnitRaw.getKey(); qos = msgUnitRaw.getQos(); if (log.isLoggable(Level.FINE)) log.fine("Exported/encrypted unSubscribe request."); } else { log.warning("No session security context, unSubscribe request is not encrypted"); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -