📄 requestreplyexecutor.java
字号:
/*------------------------------------------------------------------------------Name: RequestReplyExecutor.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE fileComment: Send/receive messages over outStream and inStream.------------------------------------------------------------------------------*/package org.xmlBlaster.util.protocol;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.context.ContextNode;import org.xmlBlaster.util.def.MethodName;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.protocol.I_XmlBlaster;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.I_ResponseListener;import org.xmlBlaster.util.MsgUnitRaw;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.qos.address.AddressBase;import org.xmlBlaster.util.xbformat.I_ProgressListener;import org.xmlBlaster.util.xbformat.MsgInfo;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.client.protocol.I_CallbackExtended;import org.xmlBlaster.engine.qos.AddressServer;import EDU.oswego.cs.dl.util.concurrent.Latch;import java.io.IOException;import java.sql.Timestamp;import java.util.Date;import java.util.Set;import java.util.HashSet;import java.util.Map;import java.util.HashMap;import java.util.Collections;/** * Request/reply simulates a local method invocation. * <p /> * A common base class for socket or email based messaging. * Allows to block during a request and deliver the return message * to the waiting thread. * * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/protocol.socket.html" target="others">xmlBlaster SOCKET access protocol</a> * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/protocol.email.html" target="others">xmlBlaster EMAIL access protocol</a> * @author <a href="mailto:xmlBlaster@marcelruff.info">Marcel Ruff</a>. */public abstract class RequestReplyExecutor implements RequestReplyExecutorMBean{ private String ME = RequestReplyExecutor.class.getName(); protected Global glob; private static Logger log = Logger.getLogger(RequestReplyExecutor.class.getName()); /** The prefix to create a unique requestId namspace (is set to the loginName) */ protected String prefix = null; /** How long to block on remote call waiting on response */ protected long responseTimeout; /** How long to block on remote call waiting on ping responses */ protected long pingResponseTimeout; /** How long to block on remote call waiting on update responses */ protected long updateResponseTimeout; /** This is the client side */ protected I_CallbackExtended cbClient; /** The singleton handle for this xmlBlaster server (the server side) */ protected I_XmlBlaster xmlBlasterImpl; /** A set containing LatchHolder instances */ private final Set latchSet = new HashSet(); protected AddressBase addressConfig; /** A listener may register to receive send/receive progress informations */ protected I_ProgressListener progressListener; protected int minSizeForCompression; protected boolean compressZlib; protected boolean compressZlibStream; protected boolean useEmailExpiryTimestamp; /** * For listeners who want to be informed about return messages or exceptions, * the invocation is blocking during this period. * <p /> * The key is the String requestId, the value the listener thread I_ResponseListener */ protected final Map responseListenerMap = Collections.synchronizedMap(new HashMap()); private boolean responseListenerMapWasCleared; /** Used for execute() */ public static final boolean ONEWAY = false; /** Used for execute() */ public static final boolean WAIT_ON_RESPONSE = true; /** My JMX registration, can be done optionally by implementing classes */ protected Object mbeanHandle; protected ContextNode contextNode; public RequestReplyExecutor() { } /** * Used by SocketCallbackImpl on client side, uses I_CallbackExtended to invoke client classes * <p /> * Used by HandleClient on server side, uses I_XmlBlaster to invoke xmlBlaster core * <p /> * This executor has mixed client and server specific code for two reasons:<br /> * - Possibly we can use the same socket between two xmlBlaster server (load balance)<br /> * - Everything is together<br /> * @param addressConfig The configuration to use */ protected void initialize(Global glob, AddressBase addressConfig) { this.glob = (glob == null) ? Global.instance() : glob; this.addressConfig = addressConfig.getClone(); this.ME = RequestReplyExecutor.class.getName() + ":" + addressConfig.getRawAddress(); setMinSizeForCompression((int)this.addressConfig.getMinSize()); if (Constants.COMPRESS_ZLIB_STREAM.equals(this.addressConfig.getCompressType())) { // Statically configured for server side protocol plugin this.compressZlibStream = true; if (log.isLoggable(Level.FINE)) log.fine("Full stream compression enabled with '" + Constants.COMPRESS_ZLIB_STREAM + "'"); } else if (Constants.COMPRESS_ZLIB.equals(this.addressConfig.getCompressType())) { // Compress each message indiviually this.compressZlib = true; log.info("Message compression enabled with '" + Constants.COMPRESS_ZLIB + "', minimum size for compression is " + getMinSizeForCompression() + " bytes"); } else { this.compressZlibStream = false; this.compressZlib = false; } // 1. Response should never expire // 2. Otherwise the Pop3Driver must be changed to nevertheless return it to wake up the blocking latch setUseEmailExpiryTimestamp(addressConfig.getEnv("useEmailExpiryTimestamp", true).getValue()); setResponseTimeout(addressConfig.getEnv("responseTimeout", getDefaultResponseTimeout()).getValue()); if (log.isLoggable(Level.FINE)) log.fine(this.addressConfig.getEnvLookupKey("responseTimeout") + "=" + this.responseTimeout); // the responseTimeout is used later to wait on a return value // additionally we protect against blocking on socket level during invocation // JacORB CORBA allows similar setting with "jacorb.connection.client_idle_timeout" // and with "jacorb.client.pending_reply_timeout" setPingResponseTimeout(addressConfig.getEnv("pingResponseTimeout", getDefaultPingResponseTimeout()).getValue()); if (log.isLoggable(Level.FINE)) log.fine(this.addressConfig.getEnvLookupKey("pingResponseTimeout") + "=" + this.pingResponseTimeout); setUpdateResponseTimeout(addressConfig.getEnv("updateResponseTimeout", getDefaultUpdateResponseTimeout()).getValue()); if (log.isLoggable(Level.FINE)) log.fine(this.addressConfig.getEnvLookupKey("updateResponseTimeout") + "=" + this.updateResponseTimeout); } /** * The protocol type, used for logging * @return "SOCKET" or "EMAIL", never null */ abstract public String getType(); public I_ProgressListener registerProgressListener(I_ProgressListener listener) { I_ProgressListener oldOne = this.progressListener; this.progressListener = listener; return oldOne; } public final I_ProgressListener getProgressListener() { return this.progressListener; } /** * How long to block on remote call waiting on response. * The default is to block forever (Integer.MAX_VALUE) * Changed after xmlBlaster release 1.0.7 (before it was one minute: Constants.MINUTE_IN_MILLIS) * Can be overwritten by implementations like EMAIL */ public long getDefaultResponseTimeout() { return Integer.MAX_VALUE; } /** * How long to block on remote call waiting on a ping response. * The default is to block for one minute * This method can be overwritten by implementations like EMAIL */ public long getDefaultPingResponseTimeout() { return Constants.MINUTE_IN_MILLIS; } /** * How long to block on remote call waiting on a update() response. * The default is to block forever * This method can be overwritten by implementations like EMAIL */ public long getDefaultUpdateResponseTimeout() { return Integer.MAX_VALUE; } /** * Set the given millis to protect against blocking client. * @param millis If <= 0 it is set to the default (forever). * An argument less than or equal to zero means not to wait at all * and is not supported */ public final void setResponseTimeout(long millis) { if (millis <= 0L) { log.warning(this.addressConfig.getEnvLookupKey("responseTimeout") + "=" + millis + " is invalid, setting it to " + getDefaultResponseTimeout() + " millis"); this.responseTimeout = getDefaultResponseTimeout(); } else this.responseTimeout = millis; } /** * Set the given millis to protect against blocking client for ping invocations. * @param millis If <= 0 it is set to the default (one minute). * An argument less than or equal to zero means not to wait at all * and is not supported */ public final void setPingResponseTimeout(long millis) { if (millis <= 0L) { log.warning(this.addressConfig.getEnvLookupKey("pingResponseTimeout") + "=" + millis + " is invalid, setting it to " + getDefaultPingResponseTimeout() + " millis"); this.pingResponseTimeout = getDefaultPingResponseTimeout(); } else this.pingResponseTimeout = millis; } /** * Set the given millis to protect against blocking client for update() invocations. * @param millis If <= 0 it is set to the default (one minute). * An argument less than or equal to zero means not to wait at all * and is not supported */ public final void setUpdateResponseTimeout(long millis) { if (millis <= 0L) { log.warning(this.addressConfig.getEnvLookupKey("updateResponseTimeout") + "=" + millis + " is invalid, setting it to " + getDefaultUpdateResponseTimeout() + " millis"); this.updateResponseTimeout = getDefaultUpdateResponseTimeout(); } else this.updateResponseTimeout = millis; } /** * @return Returns the responseTimeout. */ public long getResponseTimeout(MethodName methodName) { if (MethodName.PING.equals(methodName)) { return this.pingResponseTimeout; } else if (MethodName.UPDATE.equals(methodName)) { return this.updateResponseTimeout; } return this.responseTimeout; } /** * Access the timeout for method invocation. * @param methodName e.g. "PING", "UPDATE", "SUBSCRIBE", "PUBLISH", ... * @return Returns the responseTimeout for JMX in milli seconds */ public long getResponseTimeout(String methodName) { return getResponseTimeout(MethodName.toMethodName(methodName)); } /** * Return the time in future when the email can be deleted. * @return Returns the expiry timestamp, is null if message never expires */ public Timestamp getExpiryTimestamp(MethodName methodName) { if (!isUseEmailExpiryTimestamp()) return null; long diff = getResponseTimeout(methodName); if (diff <= 0) return null; long current = new Date().getTime(); return new Timestamp(current+diff); } /** * For logging. * @param methodName * @return */ public String getResponseTimeoutPropertyName(MethodName methodName) { if (MethodName.PING.equals(methodName)) { return "pingResponseTimeout"; } else if (MethodName.UPDATE.equals(methodName)) { return "updateResponseTimeout"; } return "responseTimeout"; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -