📄 requestreplyexecutor.java
字号:
String requestId = msgInfo.createRequestId(prefix); if (log.isLoggable(Level.FINE)) log.fine("Invoking msgInfo type='" + msgInfo.getTypeStr() + "' message " + msgInfo.getMethodName() + "(requestId=" + requestId + ") oneway=" + !expectingResponse + " udp=" + udp); final Object[] response = new Object[1]; // As only final variables are accessable from the inner class, we put the response in this array response[0] = null; final LatchHolder startSignal; // Register the return value / Exception listener ... if (expectingResponse) { //startSignal = new Latch(); // defaults to false startSignal = addLatch(new Latch()); //synchronized (this.latchSet) { this.latchSet.add(startSignal); } // remember all blocking threads for release on shutdown if (!hasConnection()) return null; addResponseListener(requestId, new I_ResponseListener() { public void incomingMessage(String reqId, Object responseObj) { if (log.isLoggable(Level.FINE)) log.fine("RequestId=" + reqId + ": return value arrived ..."); response[0] = responseObj; startSignal.latch.release(); // wake up } }); } else startSignal = null; // Send the message / method invocation ... if (log.isLoggable(Level.FINEST)) log.finest("Sending now : >" + msgInfo.toLiteral() + "<"); try { sendMessage(msgInfo, msgInfo.getRequestId(), msgInfo.getMethodName(), udp); // if (log.isLoggable(Level.FINE)) log.trace(ME, "Successfully sent " + msgInfo.getNumMessages() + " messages"); } catch (Throwable e) { if (startSignal != null) { removeLatch(startSignal); // synchronized (this.latchSet) { this.latchSet.remove(startSignal); } } String tmp = (msgInfo==null) ? "" : msgInfo.getMethodNameStr(); String str = "Request blocked and timed out, giving up now waiting on " + tmp + "(" + requestId + ") response. Please check settings of " + "responseTimeout="+this.responseTimeout+ " pingResponseTimeout="+this.pingResponseTimeout+ " updateResponseTimeout="+this.updateResponseTimeout; if (e instanceof XmlBlasterException) { if (log.isLoggable(Level.FINE)) log.fine(str + ": " + e.toString()); throw (XmlBlasterException)e; } if (e instanceof NullPointerException) e.printStackTrace(); throw new XmlBlasterException(glob, ErrorCode.COMMUNICATION_TIMEOUT, ME, str, e); } if (log.isLoggable(Level.FINEST)) log.finest("Successful sent message: >" + msgInfo.toLiteral() + "<"); if (!expectingResponse) { return null; } // Waiting for the response to arrive ... try { boolean awakened = false; while (true) { try { // An argument less than or equal to zero means not to wait at all awakened = startSignal.latch.attempt(getResponseTimeout(msgInfo.getMethodName())); // block max. milliseconds if (startSignal.latchIsInterrupted) { awakened = false; // Simulates a responseTimeout startSignal.latchIsInterrupted = false; } break; } catch (InterruptedException e) { log.warning("Waking up (waited on " + msgInfo.getMethodName() + "(" + requestId + ") response): " + e.toString()); // try again } } if (awakened) { if (log.isLoggable(Level.FINE)) log.fine("Waking up, got response for " + msgInfo.getMethodName() + "(requestId=" + requestId + ")"); if (response[0]==null) // Caused by freePendingThreads() throw new IOException(ME + ": Lost " + getType() + " connection for " + msgInfo.getMethodName() + "(requestId=" + requestId + ")"); if (log.isLoggable(Level.FINEST)) log.finest("Response for " + msgInfo.getMethodName() + "(" + requestId + ") is: " + response[0].toString()); if (response[0] instanceof XmlBlasterException) throw (XmlBlasterException)response[0]; return response[0]; } else { String str = "Timeout of " + getResponseTimeout(msgInfo.getMethodName()) + " milliseconds occured when waiting on " + msgInfo.getMethodName() + "(" + requestId + ") response. You can change it with -plugin/" + getType().toLowerCase()+"/"+getResponseTimeoutPropertyName(msgInfo.getMethodName())+" <millis>"; removeResponseListener(requestId); throw new XmlBlasterException(glob, ErrorCode.COMMUNICATION_RESPONSETIMEOUT, ME, str); } } finally { removeLatch(startSignal); //synchronized (this.latchSet) { this.latchSet.remove(startSignal); } } } private class LatchHolder { public LatchHolder(Latch latch) { this.latch = latch; } Latch latch; boolean latchIsInterrupted; } /** * Interrupts a blocking request with a not returning reply. * The pending message is handled as not delivered and will be queued * @return Number of interrupted invocations, typically 0 or 1 */ public int interruptInvocation() { LatchHolder[] latches = getLatches(); for (int i=0; i<latches.length; i++) { latches[i].latchIsInterrupted = true; latches[i].latch.release(); // wake up log.severe("DEBUG ONLY: Forced release of latch"); } return latches.length; } private LatchHolder addLatch(Latch latch) { LatchHolder latchHolder = new LatchHolder(latch); synchronized (this.latchSet) { boolean added = this.latchSet.add(latchHolder); if (!added) throw new IllegalArgumentException("Didn't expect the latch already"); } return latchHolder; } private void removeLatch(LatchHolder latchHolder) { synchronized (this.latchSet) { this.latchSet.remove(latchHolder); } } private LatchHolder[] getLatches() { synchronized (this.latchSet) { return (LatchHolder[])this.latchSet.toArray(new LatchHolder[this.latchSet.size()]); } } /** * If we detect somewhere that the socket is down * use this method to free blocking threads which wait on responses */ public final void freePendingThreads() { if (log != null && log.isLoggable(Level.FINE) && this.latchSet.size()>0) log.fine("Freeing " + this.latchSet.size() + " pending threads (waiting on responses) from their ugly blocking situation"); LatchHolder[] latches = getLatches(); for (int i=0; i<latches.length; i++) { latches[i].latchIsInterrupted = true; latches[i].latch.release(); // wake up } synchronized (this.latchSet) { latchSet.clear(); } } /** * Send a one way response message back to the other side */ protected final void executeResponse(MsgInfo receiver, Object response, boolean udp) throws XmlBlasterException, IOException { // Take a clone: MsgInfo returner = receiver.createReturner(MsgInfo.RESPONSE_BYTE); if (response instanceof String) returner.addMessage((String)response); else if (response instanceof String[]) returner.addMessage((String[])response); else if (response instanceof MsgUnitRaw[]) returner.addMessage((MsgUnitRaw[])response); else if (response instanceof MsgUnitRaw) returner.addMessage((MsgUnitRaw)response); else throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "Invalid response data type " + response.toString()); sendMessage(returner, receiver.getRequestId(), receiver.getMethodName(), udp); if (log.isLoggable(Level.FINE)) log.fine("Successfully sent response for " + receiver.getMethodName() + "(" + receiver.getRequestId() + ")"); if (log.isLoggable(Level.FINEST)) log.finest("Successful sent response for " + receiver.getMethodName() + "() >" + returner.toLiteral() + "<"); } /** * Send a one way exception back to the other side */ protected final void executeException(MsgInfo receiver, XmlBlasterException e, boolean udp) throws XmlBlasterException, IOException { e.isServerSide(glob.isServerSide()); MsgInfo returner = receiver.createReturner(MsgInfo.EXCEPTION_BYTE); returner.setChecksum(false); returner.setCompressed(false); returner.addException(e); sendMessage(returner, receiver.getRequestId(), receiver.getMethodName(), udp); if (log.isLoggable(Level.FINE)) log.fine("Successfully sent exception for " + receiver.getMethodName() + "(" + receiver.getRequestId() + ")"); if (log.isLoggable(Level.FINEST)) log.finest("Successful sent exception for " + receiver.getMethodName() + "() >" + returner.toLiteral() + "<"); } /** * Flush the data to the protocol layer (socket, email, ...). * Overwrite this in your derived class to send using your specific protocol */ abstract protected void sendMessage(MsgInfo msgInfo, String requestId, MethodName methodName, boolean udp) throws XmlBlasterException, IOException; public boolean isCompressZlib() { return this.compressZlib; } public void setCompressZlib(boolean compress) { this.compressZlib = compress; } /** * Compressing too small messages won't reduce the size * @return The number of bytes, only compress if bigger */ public int getMinSizeForCompression() { return this.minSizeForCompression; } public boolean isCompressZlibStream() { return this.compressZlibStream; } public void setCompressZlibStream(boolean compress) { this.compressZlibStream = compress; } /** * @return Returns the updateResponseTimeout. */ public final long getUpdateResponseTimeout() { return this.updateResponseTimeout; } /** * @return Returns the useEmailExpiryTimestamp. */ public boolean isUseEmailExpiryTimestamp() { return this.useEmailExpiryTimestamp; } /** * @param useEmailExpiryTimestamp The useEmailExpiryTimestamp to set. */ public void setUseEmailExpiryTimestamp(boolean useEmailExpiryTimestamp) { this.useEmailExpiryTimestamp = useEmailExpiryTimestamp; } /** * @return a human readable usage help string */ public java.lang.String usage() { return "interruptInvocation(): Interrupts a blocking request" + "\n The pending message is handled as not delivered and will be kept in queue"; } public boolean isShutdown() { return true; } /** * The invocation timeout for "ping" method calls. * @return Returns the pingResponseTimeout. */ public final long getPingResponseTimeout() { return this.pingResponseTimeout; } /** * The invocation timeout for all remaining method calls like "publish", "connect", "subscribe" * but NOT for "ping" and "update" * @return Returns the responseTimeout. */ public final long getResponseTimeout() { return this.responseTimeout; } /** * @param minSizeForCompression The minSizeForCompression to set. */ public void setMinSizeForCompression(int minSizeForCompression) { this.minSizeForCompression = minSizeForCompression; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -