⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 requestreplyexecutor.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/*------------------------------------------------------------------------------Name:      RequestReplyExecutor.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE fileComment:   Send/receive messages over outStream and inStream.------------------------------------------------------------------------------*/package org.xmlBlaster.util.protocol;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.context.ContextNode;import org.xmlBlaster.util.def.MethodName;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.protocol.I_XmlBlaster;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.I_ResponseListener;import org.xmlBlaster.util.MsgUnitRaw;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.qos.address.AddressBase;import org.xmlBlaster.util.xbformat.I_ProgressListener;import org.xmlBlaster.util.xbformat.MsgInfo;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.client.protocol.I_CallbackExtended;import org.xmlBlaster.engine.qos.AddressServer;import EDU.oswego.cs.dl.util.concurrent.Latch;import java.io.IOException;import java.sql.Timestamp;import java.util.Date;import java.util.Set;import java.util.HashSet;import java.util.Map;import java.util.HashMap;import java.util.Collections;/** * Request/reply simulates a local method invocation. * <p /> * A common base class for socket or email based messaging. * Allows to block during a request and deliver the return message * to the waiting thread. * * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/protocol.socket.html" target="others">xmlBlaster SOCKET access protocol</a> * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/protocol.email.html" target="others">xmlBlaster EMAIL access protocol</a> * @author <a href="mailto:xmlBlaster@marcelruff.info">Marcel Ruff</a>. */public abstract class RequestReplyExecutor implements RequestReplyExecutorMBean{   private String ME = RequestReplyExecutor.class.getName();   protected Global glob;   private static Logger log = Logger.getLogger(RequestReplyExecutor.class.getName());   /** The prefix to create a unique requestId namspace (is set to the loginName) */   protected String prefix = null;   /** How long to block on remote call waiting on response */   protected long responseTimeout;   /** How long to block on remote call waiting on ping responses */   protected long pingResponseTimeout;   /** How long to block on remote call waiting on update responses */   protected long updateResponseTimeout;   /** This is the client side */   protected I_CallbackExtended cbClient;   /** The singleton handle for this xmlBlaster server (the server side) */   protected I_XmlBlaster xmlBlasterImpl;   /** A set containing LatchHolder instances */   private final Set latchSet = new HashSet();   protected AddressBase addressConfig;   /** A listener may register to receive send/receive progress informations */   protected I_ProgressListener progressListener;   protected int minSizeForCompression;   protected boolean compressZlib;   protected boolean compressZlibStream;   protected boolean useEmailExpiryTimestamp;   /**    * For listeners who want to be informed about return messages or exceptions,    * the invocation is blocking during this period.    * <p />    * The key is the String requestId, the value the listener thread I_ResponseListener    */   protected final Map responseListenerMap = Collections.synchronizedMap(new HashMap());   private boolean responseListenerMapWasCleared;   /** Used for execute() */   public static final boolean ONEWAY = false;   /** Used for execute() */   public static final boolean WAIT_ON_RESPONSE = true;   /** My JMX registration, can be done optionally by implementing classes */   protected Object mbeanHandle;   protected ContextNode contextNode;   public RequestReplyExecutor() {   }   /**    * Used by SocketCallbackImpl on client side, uses I_CallbackExtended to invoke client classes    * <p />    * Used by HandleClient on server side, uses I_XmlBlaster to invoke xmlBlaster core    * <p />    * This executor has mixed client and server specific code for two reasons:<br />    * - Possibly we can use the same socket between two xmlBlaster server (load balance)<br />    * - Everything is together<br />    * @param addressConfig The configuration to use    */   protected void initialize(Global glob, AddressBase addressConfig) {      this.glob = (glob == null) ? Global.instance() : glob;      this.addressConfig = addressConfig.getClone();      this.ME = RequestReplyExecutor.class.getName() + ":" + addressConfig.getRawAddress();      setMinSizeForCompression((int)this.addressConfig.getMinSize());      if (Constants.COMPRESS_ZLIB_STREAM.equals(this.addressConfig.getCompressType())) { // Statically configured for server side protocol plugin         this.compressZlibStream = true;         if (log.isLoggable(Level.FINE)) log.fine("Full stream compression enabled with '" + Constants.COMPRESS_ZLIB_STREAM + "'");      }      else if (Constants.COMPRESS_ZLIB.equals(this.addressConfig.getCompressType())) { // Compress each message indiviually         this.compressZlib = true;         log.info("Message compression enabled with  '" + Constants.COMPRESS_ZLIB + "', minimum size for compression is " + getMinSizeForCompression() + " bytes");      }      else {         this.compressZlibStream = false;         this.compressZlib = false;      }      // 1. Response should never expire      // 2. Otherwise the Pop3Driver must be changed to nevertheless return it to wake up the blocking latch      setUseEmailExpiryTimestamp(addressConfig.getEnv("useEmailExpiryTimestamp", true).getValue());      setResponseTimeout(addressConfig.getEnv("responseTimeout", getDefaultResponseTimeout()).getValue());      if (log.isLoggable(Level.FINE)) log.fine(this.addressConfig.getEnvLookupKey("responseTimeout") + "=" + this.responseTimeout);      // the responseTimeout is used later to wait on a return value      // additionally we protect against blocking on socket level during invocation      // JacORB CORBA allows similar setting with "jacorb.connection.client_idle_timeout"      //        and with "jacorb.client.pending_reply_timeout"      setPingResponseTimeout(addressConfig.getEnv("pingResponseTimeout", getDefaultPingResponseTimeout()).getValue());      if (log.isLoggable(Level.FINE)) log.fine(this.addressConfig.getEnvLookupKey("pingResponseTimeout") + "=" + this.pingResponseTimeout);      setUpdateResponseTimeout(addressConfig.getEnv("updateResponseTimeout", getDefaultUpdateResponseTimeout()).getValue());      if (log.isLoggable(Level.FINE)) log.fine(this.addressConfig.getEnvLookupKey("updateResponseTimeout") + "=" + this.updateResponseTimeout);   }   /**    * The protocol type, used for logging    * @return "SOCKET" or "EMAIL", never null    */   abstract public String getType();   public I_ProgressListener registerProgressListener(I_ProgressListener listener) {      I_ProgressListener oldOne = this.progressListener;      this.progressListener = listener;      return oldOne;   }   public final I_ProgressListener getProgressListener() {      return this.progressListener;   }   /**    * How long to block on remote call waiting on response.    * The default is to block forever (Integer.MAX_VALUE)    * Changed after xmlBlaster release 1.0.7 (before it was one minute: Constants.MINUTE_IN_MILLIS)    * Can be overwritten by implementations like EMAIL    */   public long getDefaultResponseTimeout() {      return Integer.MAX_VALUE;   }   /**    * How long to block on remote call waiting on a ping response.    * The default is to block for one minute    * This method can be overwritten by implementations like EMAIL    */   public long getDefaultPingResponseTimeout() {      return Constants.MINUTE_IN_MILLIS;   }   /**    * How long to block on remote call waiting on a update() response.    * The default is to block forever    * This method can be overwritten by implementations like EMAIL    */   public long getDefaultUpdateResponseTimeout() {      return Integer.MAX_VALUE;   }   /**    * Set the given millis to protect against blocking client.    * @param millis If <= 0 it is set to the default (forever).    * An argument less than or equal to zero means not to wait at all    * and is not supported    */   public final void setResponseTimeout(long millis) {      if (millis <= 0L) {         log.warning(this.addressConfig.getEnvLookupKey("responseTimeout") + "=" + millis +                      " is invalid, setting it to " + getDefaultResponseTimeout() + " millis");         this.responseTimeout = getDefaultResponseTimeout();      }      else         this.responseTimeout = millis;   }   /**    * Set the given millis to protect against blocking client for ping invocations.    * @param millis If <= 0 it is set to the default (one minute).    * An argument less than or equal to zero means not to wait at all    * and is not supported    */   public final void setPingResponseTimeout(long millis) {      if (millis <= 0L) {         log.warning(this.addressConfig.getEnvLookupKey("pingResponseTimeout") + "=" + millis +                      " is invalid, setting it to " + getDefaultPingResponseTimeout() + " millis");         this.pingResponseTimeout = getDefaultPingResponseTimeout();      }      else         this.pingResponseTimeout = millis;   }   /**    * Set the given millis to protect against blocking client for update() invocations.    * @param millis If <= 0 it is set to the default (one minute).    * An argument less than or equal to zero means not to wait at all    * and is not supported    */   public final void setUpdateResponseTimeout(long millis) {      if (millis <= 0L) {         log.warning(this.addressConfig.getEnvLookupKey("updateResponseTimeout") + "=" + millis +                      " is invalid, setting it to " + getDefaultUpdateResponseTimeout() + " millis");         this.updateResponseTimeout = getDefaultUpdateResponseTimeout();      }      else         this.updateResponseTimeout = millis;   }   /**    * @return Returns the responseTimeout.    */   public long getResponseTimeout(MethodName methodName) {      if (MethodName.PING.equals(methodName)) {         return this.pingResponseTimeout;      }      else if (MethodName.UPDATE.equals(methodName)) {         return this.updateResponseTimeout;      }      return this.responseTimeout;   }   /**    * Access the timeout for method invocation.    * @param methodName e.g. "PING", "UPDATE", "SUBSCRIBE", "PUBLISH", ...    * @return Returns the responseTimeout for JMX in milli seconds    */   public long getResponseTimeout(String methodName) {      return getResponseTimeout(MethodName.toMethodName(methodName));   }   /**    * Return the time in future when the email can be deleted.    * @return Returns the expiry timestamp, is null if message never expires    */   public Timestamp getExpiryTimestamp(MethodName methodName) {      if (!isUseEmailExpiryTimestamp()) return null;      long diff = getResponseTimeout(methodName);      if (diff <= 0) return null;      long current = new Date().getTime();      return new Timestamp(current+diff);   }   /**    * For logging.    * @param methodName    * @return    */   public String getResponseTimeoutPropertyName(MethodName methodName) {      if (MethodName.PING.equals(methodName)) {         return "pingResponseTimeout";      }      else if (MethodName.UPDATE.equals(methodName)) {         return "updateResponseTimeout";      }      return "responseTimeout";   }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -