⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 clientdispatchconnection.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*------------------------------------------------------------------------------Name:      ClientDispatchConnection.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.client.dispatch;import java.util.logging.Level;import java.util.logging.Logger;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.def.MethodName;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.queuemsg.MsgQueueEntry;import org.xmlBlaster.client.queuemsg.MsgQueueConnectEntry;import org.xmlBlaster.client.queuemsg.MsgQueueDisconnectEntry;import org.xmlBlaster.client.queuemsg.MsgQueuePublishEntry;import org.xmlBlaster.client.queuemsg.MsgQueueSubscribeEntry;import org.xmlBlaster.client.queuemsg.MsgQueueUnSubscribeEntry;import org.xmlBlaster.client.queuemsg.MsgQueueEraseEntry;import org.xmlBlaster.client.queuemsg.MsgQueueGetEntry;import org.xmlBlaster.util.dispatch.DispatchConnection;import org.xmlBlaster.util.dispatch.I_PostSendListener;import org.xmlBlaster.util.qos.ConnectQosData;import org.xmlBlaster.client.qos.ConnectReturnQos;import org.xmlBlaster.client.qos.PublishReturnQos;import org.xmlBlaster.client.qos.SubscribeReturnQos;import org.xmlBlaster.client.qos.UnSubscribeReturnQos;import org.xmlBlaster.client.qos.EraseReturnQos;import org.xmlBlaster.client.protocol.I_XmlBlasterConnection;import org.xmlBlaster.client.protocol.ProtocolPluginManager;import org.xmlBlaster.util.qos.address.Address;import org.xmlBlaster.util.qos.address.AddressBase;import org.xmlBlaster.util.xbformat.I_ProgressListener;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.MsgUnitRaw;import org.xmlBlaster.authentication.plugins.CryptDataHolder;import org.xmlBlaster.authentication.plugins.I_MsgSecurityInterceptor;/** * Holding all necessary infos to establish callback * connections and invoke their update(). * @see DispatchConnection * @author xmlBlaster@marcelruff.info */public final class ClientDispatchConnection extends DispatchConnection{   private static Logger log = Logger.getLogger(ClientDispatchConnection.class.getName());   private final String ME;   private I_XmlBlasterConnection driver;   private final I_MsgSecurityInterceptor securityInterceptor;   private String encryptedConnectQos;   private ConnectReturnQos connectReturnQos;   private MsgQueueEntry connectEntry;   /**    * @param connectionsHandler The DevliveryConnectionsHandler witch i belong to    * @param aAddress The address i shall connect to    */   public ClientDispatchConnection(Global glob, ClientDispatchConnectionsHandler connectionsHandler, AddressBase address) throws XmlBlasterException {      super(glob, connectionsHandler, address);      this.ME = "ClientDispatchConnection-" + connectionsHandler.getDispatchManager().getQueue().getStorageId();      this.securityInterceptor = connectionsHandler.getDispatchManager().getMsgSecurityInterceptor();   }   public final String getDriverName() {      return (this.driver != null) ? this.driver.getProtocol() : "unknown";   }   /**    * @return A nice name for logging    */   public final String getName() {      return ME;   }   /**    * Load the appropriate protocol driver, e.g the CORBA protocol plugin.     * <p>    * This method is called by our base class during initialization.    * </p>    */   public final void loadPlugin() throws XmlBlasterException {      ProtocolPluginManager loader = glob.getProtocolPluginManager();      this.driver = loader.getPlugin(super.address.getType(), super.address.getVersion()); // e.g. CorbaConnection(glob);      if (this.driver == null)         throw new XmlBlasterException(glob, ErrorCode.RESOURCE_CONFIGURATION_PLUGINFAILED, ME, "Sorry, protocol type='" + super.address.getType() + "' is not supported");   }   /**    * @see DispatchConnection#connectLowlevel()    */   public final void connectLowlevel() throws XmlBlasterException {      if (this.driver == null)         throw new XmlBlasterException(glob, ErrorCode.RESOURCE_CONFIGURATION_PLUGINFAILED, ME, "Sorry, protocol type='" + super.address.getType() + "' is not supported");      this.driver.connectLowlevel((Address)super.address);      if (super.address.getPingInterval() > 0) {         //spanPingTimer(1, true); // Could deadlock as it uses complete dispatch framework with its synchronized?         this.driver.ping("<qos><state info='"+Constants.INFO_INITIAL+"'/></qos>");  // Try a low level ping      }      if (log.isLoggable(Level.FINE)) log.fine("Connected low level to " + super.address.toString());   }   /**    * Send the messages to xmlBlaster.     * @param msgArr The messages to send.    *  msgArr[i].getReturnVal() will contain the returned QoS object or null for oneway operations    */   public void doSend(MsgQueueEntry[] msgArr_) throws XmlBlasterException {      if (msgArr_.length < 1) {         return;      }      boolean onlyPublish = true;      boolean onlyPublishOneway = true;      for (int ii=0; ii<msgArr_.length; ii++) {         if (MethodName.PUBLISH_ONEWAY != msgArr_[ii].getMethodName())            onlyPublishOneway = false;         if (MethodName.PUBLISH != msgArr_[ii].getMethodName())            onlyPublish = false;      }      if (onlyPublishOneway || onlyPublish) {         publish(msgArr_);         return;      }            for (int ii=0; ii<msgArr_.length; ii++) {         try {            if (MethodName.PUBLISH_ONEWAY == msgArr_[ii].getMethodName()) {               MsgQueueEntry[] tmp = new MsgQueueEntry[] { msgArr_[ii] };               publish(tmp);            }            else if (MethodName.PUBLISH == msgArr_[ii].getMethodName()) {               MsgQueueEntry[] tmp = new MsgQueueEntry[] { msgArr_[ii] };               publish(tmp);            }            else if (MethodName.GET == msgArr_[ii].getMethodName()) {               get(msgArr_[ii]);            }            else if (MethodName.SUBSCRIBE == msgArr_[ii].getMethodName()) {               subscribe(msgArr_[ii]);            }            else if (MethodName.UNSUBSCRIBE == msgArr_[ii].getMethodName()) {               unSubscribe(msgArr_[ii]);            }            else if (MethodName.ERASE == msgArr_[ii].getMethodName()) {               erase(msgArr_[ii]);            }            else if (MethodName.CONNECT == msgArr_[ii].getMethodName()) {               connect(msgArr_[ii]);               this.connectEntry = msgArr_[ii]; // remember it            }            else if (MethodName.DISCONNECT == msgArr_[ii].getMethodName()) {               this.connectEntry = null;               disconnect(msgArr_[ii]);            }            else {               throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "Message type '" + msgArr_[ii].getEmbeddedType() + "' is not implemented");            }         }         catch (XmlBlasterException e) {            if (this.connectEntry != null && e.isErrorCode(ErrorCode.USER_SECURITY_AUTHENTICATION_ACCESSDENIED)) {               // Happens if the client was killed in the server by an admin task               // and has tried to reconnect with the old sessionId               log.warning("Server changed sessionId, trying reconnect now: " + e.toString());               //reconnect();   // loops?!               connect(this.connectEntry);               connectionsHandler.getDispatchManager().postSendNotification(this.connectEntry);               if (log.isLoggable(Level.FINE)) log.fine("Server changed sessionId to " + this.connectReturnQos.getServerInstanceId());               ii--;            }            else {               throw e;            }         }      }   }   private void publish(MsgQueueEntry[] msgArr_) throws XmlBlasterException {      // Convert to PublishEntry      MsgUnit[] msgArr = new MsgUnit[msgArr_.length];      for (int i=0; i<msgArr.length; i++) {         MsgQueuePublishEntry publishEntry = (MsgQueuePublishEntry)msgArr_[i];         msgArr[i] = publishEntry.getMsgUnit();      }      MsgUnitRaw[] msgUnitRawArr = new MsgUnitRaw[msgArr.length];      // We export/encrypt the message (call the interceptor)      if (securityInterceptor != null) {         for (int i=0; i<msgArr.length; i++) {            CryptDataHolder dataHolder = new CryptDataHolder(MethodName.PUBLISH, msgArr[i].getMsgUnitRaw());            msgUnitRawArr[i] = securityInterceptor.exportMessage(dataHolder);         }         if (log.isLoggable(Level.FINE)) log.fine("Exported/encrypted " + msgArr.length + " publish messages.");      }      else {         log.warning("No session security context, sending " + msgArr.length + " publish messages without encryption");         for (int i=0; i<msgArr.length; i++) {            msgUnitRawArr[i] = msgArr[i].getMsgUnitRaw();         }      }      if (MethodName.PUBLISH_ONEWAY == msgArr_[0].getMethodName()) {         this.driver.publishOneway(msgUnitRawArr);         connectionsHandler.getDispatchStatistic().incrNumPublish(msgUnitRawArr.length);         if (log.isLoggable(Level.FINE)) log.fine("Success, sent " + msgArr.length + " oneway publish messages.");         return;      }      if (log.isLoggable(Level.FINE)) log.fine("Before publish " + msgArr.length + " acknowledged messages ...");      String[] rawReturnVal = this.driver.publishArr(msgUnitRawArr);      connectionsHandler.getDispatchStatistic().incrNumPublish(rawReturnVal.length);      if (log.isLoggable(Level.FINE)) log.fine("Success, sent " + msgArr.length + " acknowledged publish messages, return value #1 is '" + rawReturnVal[0] + "'");      if (rawReturnVal != null) {         for (int i=0; i<rawReturnVal.length; i++) {            if (!msgArr_[i].wantReturnObj())               continue;            if (securityInterceptor != null) {               CryptDataHolder dataHolder = new CryptDataHolder(MethodName.PUBLISH, new MsgUnitRaw(null, (byte[])null, rawReturnVal[i]));               dataHolder.setReturnValue(true);               rawReturnVal[i] = securityInterceptor.importMessage(dataHolder).getQos();            }            // create return object            try {               msgArr_[i].setReturnObj(new PublishReturnQos(glob, rawReturnVal[i]));            }            catch (Throwable e) {               log.warning("Can't parse publish returned value '" + rawReturnVal[i] + "', setting to default: " + e.toString());               //e.printStackTrace();               msgArr_[i].setReturnObj(new PublishReturnQos(glob, "<qos/>"));            }         }         if (log.isLoggable(Level.FINE)) log.fine("Imported/decrypted " + rawReturnVal.length + " publish message return values.");      }   }   /**    * Encrypt and send a subscribe request, decrypt the returned data    */   private void subscribe(MsgQueueEntry entry) throws XmlBlasterException {      MsgQueueSubscribeEntry subscribeEntry = (MsgQueueSubscribeEntry)entry;      String key = subscribeEntry.getSubscribeKeyData().toXml();      String qos = subscribeEntry.getSubscribeQosData().toXml();      if (securityInterceptor != null) {  // We export/encrypt the message (call the interceptor)         CryptDataHolder dataHolder = new CryptDataHolder(MethodName.SUBSCRIBE, new MsgUnitRaw(key, (byte[])null, qos));         MsgUnitRaw msgUnitRaw = securityInterceptor.exportMessage(dataHolder);         key = msgUnitRaw.getKey();         qos = msgUnitRaw.getQos();         if (log.isLoggable(Level.FINE)) log.fine("Exported/encrypted subscribe request.");      }      else {         log.warning("No session security context, subscribe request is not encrypted");      }      String rawReturnVal = this.driver.subscribe(key, qos); // Invoke remote server      connectionsHandler.getDispatchStatistic().incrNumSubscribe(1);            if (subscribeEntry.wantReturnObj()) {         if (securityInterceptor != null) { // decrypt return value ...            CryptDataHolder dataHolder = new CryptDataHolder(MethodName.SUBSCRIBE, new MsgUnitRaw(null, (byte[])null, rawReturnVal));            dataHolder.setReturnValue(true);            rawReturnVal = securityInterceptor.importMessage(dataHolder).getQos();         }         try {            subscribeEntry.setReturnObj(new SubscribeReturnQos(glob, rawReturnVal));         }         catch (Throwable e) {            log.warning("Can't parse returned subscribe value '" + rawReturnVal + "', setting to default: " + e.toString());            subscribeEntry.setReturnObj(new SubscribeReturnQos(glob, "<qos/>"));         }      }   }   /**    * Encrypt and send a unSubscribe request, decrypt the returned data    */   private void unSubscribe(MsgQueueEntry entry) throws XmlBlasterException {      MsgQueueUnSubscribeEntry unSubscribeEntry = (MsgQueueUnSubscribeEntry)entry;      String key = unSubscribeEntry.getUnSubscribeKey().toXml();      String qos = unSubscribeEntry.getUnSubscribeQos().toXml();      if (securityInterceptor != null) {  // We export/encrypt the message (call the interceptor)         CryptDataHolder dataHolder = new CryptDataHolder(MethodName.UNSUBSCRIBE, new MsgUnitRaw(key, (byte[])null, qos));         MsgUnitRaw msgUnitRaw = securityInterceptor.exportMessage(dataHolder);         key = msgUnitRaw.getKey();         qos = msgUnitRaw.getQos();         if (log.isLoggable(Level.FINE)) log.fine("Exported/encrypted unSubscribe request.");      }      else {         log.warning("No session security context, unSubscribe request is not encrypted");      }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -