⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 client.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        }      } catch (EOFException eof) {          // This is what happens when the remote side goes down      } catch (Exception e) {        LOG.info(getName() + " caught: " + e, e);      } finally {        //If there was no exception thrown in this method, then the only        //way we reached here is by breaking out of the while loop (after        //waitForWork). And if we took that route to reach here, we have         //already removed the connection object in the ConnectionCuller thread.        //We don't want to remove this again as some other thread might have        //actually put a new Connection object in the table in the meantime.        synchronized (connections) {          if (connections.get(address) == this) {            connections.remove(address);          }        }        close();      }    }    /** Initiates a call by sending the parameter to the remote server.     * Note: this is not called from the Connection thread, but by other     * threads.     */    public void sendParam(Call call) throws IOException {      boolean error = true;      try {        calls.put(new Integer(call.id), call);        synchronized (out) {          if (LOG.isDebugEnabled())            LOG.debug(getName() + " sending #" + call.id);          try {            writingCall = call;            DataOutputBuffer d = new DataOutputBuffer(); //for serializing the                                                         //data to be written            d.writeInt(call.id);            call.param.write(d);            byte[] data = d.getData();            int dataLength = d.getLength();            out.writeInt(dataLength);      //first put the data length            out.write(data, 0, dataLength);//write the data            out.flush();          } finally {            writingCall = null;          }        }        error = false;      } finally {        if (error) {          synchronized (connections) {            if (connections.get(address) == this)              connections.remove(address);          }          close();                                // close on error        }      }    }      /** Close the connection. */    public void close() {      //socket may be null if the connection could not be established to the      //server in question, and the culler asked us to close the connection      if (socket == null) return;      try {        socket.close();                           // close socket      } catch (IOException e) {}      if (LOG.isDebugEnabled())        LOG.debug(getName() + ": closing");    }  }  /** Call implementation used for parallel calls. */  private class ParallelCall extends Call {    private ParallelResults results;    private int index;        public ParallelCall(Writable param, ParallelResults results, int index) {      super(param);      this.results = results;      this.index = index;    }    /** Deliver result to result collector. */    public void callComplete() {      results.callComplete(this);    }  }  /** Result collector for parallel calls. */  private static class ParallelResults {    private Writable[] values;    private int size;    private int count;    public ParallelResults(int size) {      this.values = new Writable[size];      this.size = size;    }    /** Collect a result. */    public synchronized void callComplete(ParallelCall call) {      values[call.index] = call.value;            // store the value      count++;                                    // count it      if (count == size)                          // if all values are in        notify();                                 // then notify waiting caller    }  }  private class ConnectionCuller extends Thread {    public static final int MIN_SLEEP_TIME = 1000;    public void run() {      LOG.info(getName() + ": starting");      while (running) {        try {          Thread.sleep(MIN_SLEEP_TIME);        } catch (InterruptedException ie) {}        synchronized (connections) {          Iterator i = connections.values().iterator();          while (i.hasNext()) {            Connection c = (Connection)i.next();            if (c.isIdle()) {             //We don't actually close the socket here (i.e., don't invoke            //the close() method). We leave that work to the response receiver            //thread. The reason for that is since we have taken a lock on the            //connections table object, we don't want to slow down the entire            //system if we happen to talk to a slow server.              i.remove();              synchronized (c) {                c.setCloseConnection();                c.notify();              }            }          }        }      }    }  }  /** Construct an IPC client whose values are of the given {@link Writable}   * class. */  public Client(Class valueClass, Configuration conf) {    this.valueClass = valueClass;    this.timeout = conf.getInt("ipc.client.timeout",10000);    this.maxIdleTime = conf.getInt("ipc.client.connection.maxidletime",1000);    this.maxRetries = conf.getInt("ipc.client.connect.max.retries", 10);    this.conf = conf;    Thread t = new ConnectionCuller();    t.setDaemon(true);    t.setName(valueClass.getName()              +" ConnectionCuller maxidletime="+maxIdleTime+"ms");    t.start();  }   /** Stop all threads related to this client.  No further calls may be made   * using this client. */  public void stop() {    LOG.info("Stopping client");    running = false;  }  /** Sets the timeout used for network i/o. */  public void setTimeout(int timeout) { this.timeout = timeout; }  /** Make a call, passing <code>param</code>, to the IPC server running at   * <code>address</code>, returning the value.  Throws exceptions if there are   * network problems or if the remote code threw an exception. */  public Writable call(Writable param, InetSocketAddress address)    throws IOException {    Connection connection = getConnection(address);    Call call = new Call(param);    synchronized (call) {      connection.sendParam(call);                 // send the parameter      long wait = timeout;      do {        try {          call.wait(wait);                       // wait for the result        } catch (InterruptedException e) {}        wait = timeout - (System.currentTimeMillis() - call.lastActivity);      } while (!call.done && wait > 0);      if (call.error != null) {        throw call.error;      } else if (!call.done) {        throw new SocketTimeoutException("timed out waiting for rpc response");      } else {        return call.value;      }    }  }  /** Makes a set of calls in parallel.  Each parameter is sent to the   * corresponding address.  When all values are available, or have timed out   * or errored, the collected results are returned in an array.  The array   * contains nulls for calls that timed out or errored.  */  public Writable[] call(Writable[] params, InetSocketAddress[] addresses)    throws IOException {    if (addresses.length == 0) return new Writable[0];    ParallelResults results = new ParallelResults(params.length);    synchronized (results) {      for (int i = 0; i < params.length; i++) {        ParallelCall call = new ParallelCall(params[i], results, i);        try {          Connection connection = getConnection(addresses[i]);          connection.sendParam(call);             // send each parameter        } catch (IOException e) {          LOG.info("Calling "+addresses[i]+" caught: " + e); // log errors          results.size--;                         //  wait for one fewer result        }      }      try {        results.wait(timeout);                    // wait for all results      } catch (InterruptedException e) {}      if (results.count == 0) {        throw new IOException("no responses");      } else {        return results.values;      }    }  }  /** Get a connection from the pool, or create a new one and add it to the   * pool.  Connections to a given host/port are reused. */  private Connection getConnection(InetSocketAddress address)    throws IOException {    Connection connection;    synchronized (connections) {      connection = (Connection)connections.get(address);      if (connection == null) {        connection = new Connection(address);        connections.put(address, connection);        connection.start();      }      connection.incrementRef();    }    //we don't invoke the method below inside "synchronized (connections)"    //block above. The reason for that is if the server happens to be slow,    //it will take longer to establish a connection and that will slow the    //entire system down.    connection.setupIOstreams();    return connection;  }  private Writable makeValue() {    Writable value;                             // construct value    try {      value = (Writable)valueClass.newInstance();    } catch (InstantiationException e) {      throw new RuntimeException(e.toString());    } catch (IllegalAccessException e) {      throw new RuntimeException(e.toString());    }    return value;  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -