⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 httprelayclientmessagereceiver.java

📁 jxme的一些相关程序,主要是手机上程序开发以及手机和计算机通信的一些程序资料,程序编译需要Ant支持
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        }
        
        if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("starting blocking receiver");
        blockingReceiver  = new BlockingMessageReceiver();
        receiverThread = new Thread(blockingReceiver, "BlockingMessageReceiver" );
        receiverThread.setDaemon(true);
        receiverThread.start();
      }
      
    } else if(requestMode==MODE_POLL) {
      
      if (pollingTask == null) { // we weren't polling so far
        
        if (receiverThread != null) { // Stop the blocking mode
          if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("stopping blocking receiver");
          blockingReceiver.stop();
          receiverThread.interrupt();
          blockingReceiver = null;
          receiverThread = null;
        }
        if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("starting polling task");
        if (pollTimer == null) pollTimer = new Timer(true);
        pollingTask = new PollingMessageReceiverTask();
        pollTimer.schedule(pollingTask, 0, POLL_INTERVAL);
      }
    }
  }
  
  /** stop the underlying server **/
  public void stop() throws InterruptedException {
    super.stop();
    timer.cancel();
    if (blockingReceiver!=null) {
      blockingReceiver.stop();
    }
  }
  
  public String toString() {
    return "HttpRelayClientMessageReceiver connected to " + url;
  }
  
  /**
   * Makes an HTTP POST to the relay, sending the queryString. Returns
   * the Response object.
   * @param queryString The command to POST
   * @param timeout How long to wait for a response. NOTE: currently unused,
   *   as URL/HttpURLConnection do not support the functionality.
   */
  private HttpURLConnection postCommand(String queryString,
  int timeout)
  throws IOException, InterruptedException {
    
    if (LOG.isEnabledFor(Priority.DEBUG)) {
      LOG.debug("posting command to relay " + url + ": " + queryString);
    }
    
    HttpURLConnection urlConn = (HttpURLConnection) url.openConnection();
    urlConn.setDoOutput(true);
    urlConn.setRequestMethod("POST");
    urlConn.setAllowUserInteraction(false);
    urlConn.setRequestProperty("Content-Length",
    Integer.toString(queryString.length()));
    urlConn.connect();
    BufferedOutputStream out = new BufferedOutputStream(urlConn.getOutputStream());
    out.write(queryString.getBytes(), 0, queryString.length() );
    out.flush();
    out.close();
    return urlConn;
  }
  
  /** Pulls the value for 'key' out of the map as an Integer. If the value
   * does not exist, or the value is not a valid Integer, null is returned.
   **/
  Integer extractInteger(HashMap map, String key) {
    String str = (String) map.get(key);
    if (str==null)
      return null;
    else {
      try {
        Integer retval = new Integer(str);
        return retval;
      } catch(NumberFormatException e) {
        if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "Relay misbheaving; reported non-integer for: "
        + key);
      }
      return null;
    }
  }
  /**
   * Takes a response object, buffers the contents, constructs a message,
   * and hands the message over to the endpoint.
   */
  private void processRelayedMessage(HttpURLConnection conn)
  throws IOException {
    
    if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processing relayed message");
    int contentLen = conn.getContentLength();
    if (contentLen<0) {
      throw new IOException("Content-length must be specified");
    }
    InputStream in = conn.getInputStream();
    try {
      Message msg = endpoint.newMessage();
      MessageWireFormatFactory.newMessageWireFormat(
      new MimeMediaType( "application/x-jxta-msg" )).readMessage(in, msg);
      endpoint.demux(msg);
      
      // Make sure we are not hogging the cpu with input
      // thus preventing it from being processed.
      // It is better to slow the sending side a bit.
      Thread.yield();
      
    } catch (IOException e) {
      if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Error reading message ");
      throw e;
    }
  }
  
  /** simple reader that fills buf from in up to maxLen bytes **/
  private int read(InputStream in, byte[] buf, int maxLen)
  throws IOException {
    
    int pos = 0;
    int n;
    
    while ((n = in.read(buf, pos, maxLen - pos)) > -1)
      pos += n;
    
    return pos;
  }
  
  
  /** Runnable that blocks for messages. Stays in infinite loop until
   * the receiver is shut down. **/
  private class BlockingMessageReceiver implements Runnable {
    
    boolean isRunning = true;
    
    void stop() {
      if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Trying to stop: " + url );
      isRunning = false;
    }
    
    public void run() {
      if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("BlockingMessageReceiver starting");
      
      while (isRunning) {
        
        try {
          // we have to reconstruct the query every time in case the
          // the lease was renewed
          
          QueryString query = getLeaseIdQueryString();
          query.add(HttpUtil.COMMAND_NAME, HttpUtil.COMMAND_VALUE_BLOCK);
          String command = query.toString();
          
          if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("POSTing block command to relay: " + url);
          HttpURLConnection conn = postCommand(command,
          BLOCK_TIMEOUT);
          
          if (isRunning) {
            int responseCode = conn.getResponseCode();
            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("response code == " + responseCode);
            if (responseCode == HttpUtil.HTTP_SC_OK) {
              if (conn.getContentLength() == 0) {
                if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Relayed sent back no message");
              } else {
                processRelayedMessage(conn);
              }
            } else {
              if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Non-OK response code from relay; "+
              "waiting for interval: " +
              BLOCK_RETRY_INTERVAL);
              Thread.sleep(BLOCK_RETRY_INTERVAL);
            }
          }
          conn.disconnect();
        } catch(FileNotFoundException e) {
          // This can also happen if the lease is broken because
          // of a "feature" of JDK's HttpUrlConnection which
          // pretends to be a filesystem and has getResponseCode
          // throw FileNotFoundExc with some combinations of urls
          // and response code !!!
          if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Exception during relay request: " + e);
          lostLease();
          
          // sleep before we start trying again
          try {
            Thread.sleep(BLOCK_RETRY_INTERVAL);
          } catch (InterruptedException e2) {
            // no code intended
          Thread.interrupted();
          }
        } catch(IOException ioe) {
          // sleep before we start trying again
          if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("IOException during relay request: " + ioe);
          try {
            Thread.sleep(BLOCK_RETRY_INTERVAL);
          } catch (InterruptedException e2) {
            Thread.interrupted();
            // no code intended
          }
        } catch(RelayLeaseException e) {
          if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Exception: Lease is invalid");
          // We're impatient, speed-up the normal schedule if needed.
          lostLease();
          
          // sleep before we start trying again
          try {
            Thread.sleep(BLOCK_RETRY_INTERVAL);
          } catch (InterruptedException e2) {
            Thread.interrupted();
            // no code intended
          }
        } catch( Throwable e ) {
          if (LOG.isEnabledFor(Priority.DEBUG))
            LOG.debug("Throwable caught during blocking request to "+
            "relay; waiting for an interval: " + BLOCK_RETRY_INTERVAL, e);
          try {
            Thread.sleep(BLOCK_RETRY_INTERVAL);
          } catch (InterruptedException e2) {
            Thread.interrupted();
            // no code intended
          }
        }
      }
    }
  }
  
  
  /** polls for messages. XXX untested. ***/
  private class PollingMessageReceiverTask extends TimerTask {
    
    public void run() {
      try {
        QueryString query = getLeaseIdQueryString();
        query.add(HttpUtil.COMMAND_NAME, HttpUtil.COMMAND_VALUE_POLL);
        String command = query.toString();
        
        HttpURLConnection conn = postCommand(command,
        POLL_INTERVAL);
        
        if (conn.getResponseCode() == HttpUtil.HTTP_SC_OK) {
          processRelayedMessage(conn);
        }
        conn.disconnect();
      } catch(RelayLeaseException e) {
        // We're impatient, speed-up the normal schedule.
        lostLease();
      } catch(FileNotFoundException e) {
        lostLease();
      } catch(IOException e) {
        if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("IOException during poll request to relay: "+e);
      } catch(InterruptedException e) {
        if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("InterruptedException while polling");
      }
    }
  }
  
  
  /** Returns a new QueryString object with the lease session id added **/
  private QueryString getLeaseIdQueryString() throws RelayLeaseException {
    QueryString query = new QueryString();
    String leaseId = getLeaseId();
    if (leaseId!=null) {
      query.add("leaseId", getLeaseId() );
      return query;
    } else {
      throw new RelayLeaseException("Lease expired");
    }
  }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -