⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 requestreplyexecutor.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
      String requestId = msgInfo.createRequestId(prefix);      if (log.isLoggable(Level.FINE)) log.fine("Invoking  msgInfo type='" + msgInfo.getTypeStr() + "' message " + msgInfo.getMethodName() + "(requestId=" + requestId + ") oneway=" + !expectingResponse + " udp=" + udp);      final Object[] response = new Object[1];  // As only final variables are accessable from the inner class, we put the response in this array      response[0] = null;      final LatchHolder startSignal;      // Register the return value / Exception listener ...      if (expectingResponse) {         //startSignal = new Latch(); // defaults to false         startSignal = addLatch(new Latch()); //synchronized (this.latchSet) { this.latchSet.add(startSignal); } // remember all blocking threads for release on shutdown         if (!hasConnection()) return null;         addResponseListener(requestId, new I_ResponseListener() {            public void incomingMessage(String reqId, Object responseObj) {               if (log.isLoggable(Level.FINE)) log.fine("RequestId=" + reqId + ": return value arrived ...");               response[0] = responseObj;               startSignal.latch.release(); // wake up            }         });      }      else         startSignal = null;      // Send the message / method invocation ...      if (log.isLoggable(Level.FINEST)) log.finest("Sending now : >" + msgInfo.toLiteral() + "<");      try {         sendMessage(msgInfo, msgInfo.getRequestId(), msgInfo.getMethodName(), udp);         // if (log.isLoggable(Level.FINE)) log.trace(ME, "Successfully sent " + msgInfo.getNumMessages() + " messages");      }      catch (Throwable e) {         if (startSignal != null) {            removeLatch(startSignal); // synchronized (this.latchSet) { this.latchSet.remove(startSignal); }         }         String tmp = (msgInfo==null) ? "" : msgInfo.getMethodNameStr();         String str = "Request blocked and timed out, giving up now waiting on " +                      tmp + "(" + requestId + ") response. Please check settings of " +                      "responseTimeout="+this.responseTimeout+                      " pingResponseTimeout="+this.pingResponseTimeout+                      " updateResponseTimeout="+this.updateResponseTimeout;         if (e instanceof XmlBlasterException) {            if (log.isLoggable(Level.FINE)) log.fine(str + ": " + e.toString());            throw (XmlBlasterException)e;         }         if (e instanceof NullPointerException)            e.printStackTrace();         throw new XmlBlasterException(glob, ErrorCode.COMMUNICATION_TIMEOUT, ME, str, e);      }      if (log.isLoggable(Level.FINEST)) log.finest("Successful sent message: >" + msgInfo.toLiteral() + "<");      if (!expectingResponse) {         return null;      }      // Waiting for the response to arrive ...      try {         boolean awakened = false;         while (true) {            try {               //  An argument less than or equal to zero means not to wait at all               awakened = startSignal.latch.attempt(getResponseTimeout(msgInfo.getMethodName())); // block max. milliseconds               if (startSignal.latchIsInterrupted) {                  awakened = false; // Simulates a responseTimeout                  startSignal.latchIsInterrupted = false;               }               break;            }            catch (InterruptedException e) {               log.warning("Waking up (waited on " + msgInfo.getMethodName() + "(" + requestId + ") response): " + e.toString());               // try again            }         }         if (awakened) {            if (log.isLoggable(Level.FINE)) log.fine("Waking up, got response for " + msgInfo.getMethodName() + "(requestId=" + requestId + ")");            if (response[0]==null) // Caused by freePendingThreads()               throw new IOException(ME + ": Lost " + getType() + " connection for " + msgInfo.getMethodName() + "(requestId=" + requestId + ")");            if (log.isLoggable(Level.FINEST)) log.finest("Response for " + msgInfo.getMethodName() + "(" + requestId + ") is: " + response[0].toString());            if (response[0] instanceof XmlBlasterException)               throw (XmlBlasterException)response[0];            return response[0];         }         else {            String str = "Timeout of " + getResponseTimeout(msgInfo.getMethodName())                       + " milliseconds occured when waiting on " + msgInfo.getMethodName() + "(" + requestId                       + ") response. You can change it with -plugin/"                       + getType().toLowerCase()+"/"+getResponseTimeoutPropertyName(msgInfo.getMethodName())+" <millis>";            removeResponseListener(requestId);            throw new XmlBlasterException(glob, ErrorCode.COMMUNICATION_RESPONSETIMEOUT, ME, str);         }      }      finally {         removeLatch(startSignal); //synchronized (this.latchSet) { this.latchSet.remove(startSignal); }      }   }   private class LatchHolder {      public LatchHolder(Latch latch) {         this.latch = latch;      }      Latch latch;      boolean latchIsInterrupted;   }   /**    * Interrupts a blocking request with a not returning reply.    * The pending message is handled as not delivered and will be queued    * @return Number of interrupted invocations, typically 0 or 1    */   public int interruptInvocation() {      LatchHolder[] latches = getLatches();      for (int i=0; i<latches.length; i++) {         latches[i].latchIsInterrupted = true;         latches[i].latch.release(); // wake up         log.severe("DEBUG ONLY: Forced release of latch");      }      return latches.length;   }   private LatchHolder addLatch(Latch latch) {      LatchHolder latchHolder = new LatchHolder(latch);      synchronized (this.latchSet) {         boolean added = this.latchSet.add(latchHolder);         if (!added)            throw new IllegalArgumentException("Didn't expect the latch already");      }      return latchHolder;   }   private void removeLatch(LatchHolder latchHolder) {      synchronized (this.latchSet) {         this.latchSet.remove(latchHolder);      }   }   private LatchHolder[] getLatches() {      synchronized (this.latchSet) {         return (LatchHolder[])this.latchSet.toArray(new LatchHolder[this.latchSet.size()]);      }   }   /**    * If we detect somewhere that the socket is down    * use this method to free blocking threads which wait on responses    */   public final void freePendingThreads() {      if (log != null && log.isLoggable(Level.FINE) && this.latchSet.size()>0) log.fine("Freeing " + this.latchSet.size() + " pending threads (waiting on responses) from their ugly blocking situation");      LatchHolder[] latches = getLatches();      for (int i=0; i<latches.length; i++) {         latches[i].latchIsInterrupted = true;         latches[i].latch.release(); // wake up      }      synchronized (this.latchSet) { latchSet.clear(); }   }   /**    * Send a one way response message back to the other side    */   protected final void executeResponse(MsgInfo receiver, Object response, boolean udp) throws XmlBlasterException, IOException {      // Take a clone:      MsgInfo returner = receiver.createReturner(MsgInfo.RESPONSE_BYTE);      if (response instanceof String)         returner.addMessage((String)response);      else if (response instanceof String[])         returner.addMessage((String[])response);      else if (response instanceof MsgUnitRaw[])         returner.addMessage((MsgUnitRaw[])response);      else if (response instanceof MsgUnitRaw)         returner.addMessage((MsgUnitRaw)response);      else         throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "Invalid response data type " + response.toString());      sendMessage(returner, receiver.getRequestId(), receiver.getMethodName(), udp);      if (log.isLoggable(Level.FINE)) log.fine("Successfully sent response for " + receiver.getMethodName() + "(" + receiver.getRequestId() + ")");      if (log.isLoggable(Level.FINEST)) log.finest("Successful sent response for " + receiver.getMethodName() + "() >" + returner.toLiteral() + "<");   }   /**    * Send a one way exception back to the other side    */   protected final void executeException(MsgInfo receiver, XmlBlasterException e, boolean udp) throws XmlBlasterException, IOException {      e.isServerSide(glob.isServerSide());      MsgInfo returner = receiver.createReturner(MsgInfo.EXCEPTION_BYTE);      returner.setChecksum(false);      returner.setCompressed(false);      returner.addException(e);      sendMessage(returner, receiver.getRequestId(), receiver.getMethodName(), udp);      if (log.isLoggable(Level.FINE)) log.fine("Successfully sent exception for " + receiver.getMethodName() + "(" + receiver.getRequestId() + ")");      if (log.isLoggable(Level.FINEST)) log.finest("Successful sent exception for " + receiver.getMethodName() + "() >" + returner.toLiteral() + "<");   }   /**    * Flush the data to the protocol layer (socket, email, ...).    * Overwrite this in your derived class to send using your specific protocol    */   abstract protected void sendMessage(MsgInfo msgInfo, String requestId, MethodName methodName, boolean udp) throws XmlBlasterException, IOException;   public boolean isCompressZlib() {      return this.compressZlib;   }   public void setCompressZlib(boolean compress) {      this.compressZlib = compress;   }   /**    * Compressing too small messages won't reduce the size    * @return The number of bytes, only compress if bigger    */   public int getMinSizeForCompression() {      return this.minSizeForCompression;   }   public boolean isCompressZlibStream() {      return this.compressZlibStream;   }   public void setCompressZlibStream(boolean compress) {      this.compressZlibStream = compress;   }   /**    * @return Returns the updateResponseTimeout.    */   public final long getUpdateResponseTimeout() {      return this.updateResponseTimeout;   }   /**    * @return Returns the useEmailExpiryTimestamp.    */   public boolean isUseEmailExpiryTimestamp() {      return this.useEmailExpiryTimestamp;   }   /**    * @param useEmailExpiryTimestamp The useEmailExpiryTimestamp to set.    */   public void setUseEmailExpiryTimestamp(boolean useEmailExpiryTimestamp) {      this.useEmailExpiryTimestamp = useEmailExpiryTimestamp;   }   /**    * @return a human readable usage help string    */   public java.lang.String usage() {      return        "interruptInvocation(): Interrupts a blocking request" +        "\n  The pending message is handled as not delivered and will be kept in queue";   }   public boolean isShutdown() {      return true;   }   /**    * The invocation timeout for "ping" method calls.    * @return Returns the pingResponseTimeout.    */   public final long getPingResponseTimeout() {      return this.pingResponseTimeout;   }   /**    * The invocation timeout for all remaining method calls like "publish", "connect", "subscribe"    * but NOT for "ping" and "update"    * @return Returns the responseTimeout.    */   public final long getResponseTimeout() {      return this.responseTimeout;   }   /**    * @param minSizeForCompression The minSizeForCompression to set.    */   public void setMinSizeForCompression(int minSizeForCompression) {      this.minSizeForCompression = minSizeForCompression;   }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -