client.java

来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 373 行

JAVA
373
字号
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.ipc;import java.net.Socket;import java.net.InetSocketAddress;import java.net.SocketTimeoutException;import java.io.IOException;import java.io.EOFException;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.BufferedInputStream;import java.io.BufferedOutputStream;import java.io.FilterInputStream;import java.io.FilterOutputStream;import java.rmi.RemoteException;import java.util.Hashtable;import java.util.logging.Logger;import java.util.logging.Level;import org.apache.hadoop.util.LogFormatter;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configurable;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.UTF8;/** A client for an IPC service.  IPC calls take a single {@link Writable} as a * parameter, and return a {@link Writable} as their value.  A service runs on * a port and is defined by a parameter class and a value class. *  * @author Doug Cutting * @see Server */public class Client {  public static final Logger LOG =    LogFormatter.getLogger("org.apache.hadoop.ipc.Client");  private Hashtable connections = new Hashtable();  private Class valueClass;                       // class of call values  private int timeout ;// timeout for calls  private int counter;                            // counter for call ids  private boolean running = true;                 // true while client runs  private Configuration conf;  /** A call waiting for a value. */  private class Call {    int id;                                       // call id    Writable param;                               // parameter    Writable value;                               // value, null if error    String error;                                 // error, null if value    long lastActivity;                            // time of last i/o    boolean done;                                 // true when call is done    protected Call(Writable param) {      this.param = param;      synchronized (Client.this) {        this.id = counter++;      }      touch();    }    /** Called by the connection thread when the call is complete and the     * value or error string are available.  Notifies by default.  */    public synchronized void callComplete() {        notify();                                 // notify caller    }    /** Update lastActivity with the current time. */    public synchronized void touch() {      lastActivity = System.currentTimeMillis();    }    /** Update lastActivity with the current time. */    public synchronized void setResult(Writable value, String error) {      this.value = value;      this.error = error;      this.done = true;    }      }  /** Thread that reads responses and notifies callers.  Each connection owns a   * socket connected to a remote address.  Calls are multiplexed through this   * socket: responses may be delivered out of order. */  private class Connection extends Thread {    private InetSocketAddress address;            // address of server    private Socket socket;                        // connected socket    private DataInputStream in;                       private DataOutputStream out;    private Hashtable calls = new Hashtable();    // currently active calls    private Call readingCall;    private Call writingCall;    public Connection(InetSocketAddress address) throws IOException {      this.address = address;      this.socket = new Socket(address.getAddress(), address.getPort());      socket.setSoTimeout(timeout);      this.in = new DataInputStream        (new BufferedInputStream         (new FilterInputStream(socket.getInputStream()) {             public int read(byte[] buf, int off, int len) throws IOException {               int value = super.read(buf, off, len);               if (readingCall != null) {                 readingCall.touch();               }               return value;             }          }));      this.out = new DataOutputStream        (new BufferedOutputStream         (new FilterOutputStream(socket.getOutputStream()) {             public void write(byte[] buf, int o, int len) throws IOException {               out.write(buf, o, len);               if (writingCall != null) {                 writingCall.touch();               }             }           }));      this.setDaemon(true);      this.setName("Client connection to "                   + address.getAddress().getHostAddress()                   + ":" + address.getPort());    }    public void run() {      LOG.info(getName() + ": starting");      try {        while (running) {          int id;          try {            id = in.readInt();                    // try to read an id          } catch (SocketTimeoutException e) {            continue;          }          if (LOG.isLoggable(Level.FINE))            LOG.fine(getName() + " got value #" + id);          Call call = (Call)calls.remove(new Integer(id));          boolean isError = in.readBoolean();     // read if error          if (isError) {            UTF8 utf8 = new UTF8();            utf8.readFields(in);                  // read error string            call.setResult(null, utf8.toString());          } else {            Writable value = makeValue();            try {              readingCall = call;              if(value instanceof Configurable) {                ((Configurable) value).setConf(conf);              }              value.readFields(in);                 // read value            } finally {              readingCall = null;            }            call.setResult(value, null);          }          call.callComplete();                   // deliver result to caller        }      } catch (EOFException eof) {          // This is what happens when the remote side goes down      } catch (Exception e) {        LOG.log(Level.INFO, getName() + " caught: " + e, e);      } finally {        close();      }    }    /** Initiates a call by sending the parameter to the remote server.     * Note: this is not called from the Connection thread, but by other     * threads.     */    public void sendParam(Call call) throws IOException {      boolean error = true;      try {        calls.put(new Integer(call.id), call);        synchronized (out) {          if (LOG.isLoggable(Level.FINE))            LOG.fine(getName() + " sending #" + call.id);          try {            writingCall = call;            out.writeInt(call.id);            call.param.write(out);            out.flush();          } finally {            writingCall = null;          }        }        error = false;      } finally {        if (error)          close();                                // close on error      }    }    /** Close the connection and remove it from the pool. */    public void close() {      LOG.info(getName() + ": closing");      synchronized (connections) {        connections.remove(address);              // remove connection      }      try {        socket.close();                           // close socket      } catch (IOException e) {}    }  }  /** Call implementation used for parallel calls. */  private class ParallelCall extends Call {    private ParallelResults results;    private int index;        public ParallelCall(Writable param, ParallelResults results, int index) {      super(param);      this.results = results;      this.index = index;    }    /** Deliver result to result collector. */    public void callComplete() {      results.callComplete(this);    }  }  /** Result collector for parallel calls. */  private static class ParallelResults {    private Writable[] values;    private int size;    private int count;    public ParallelResults(int size) {      this.values = new Writable[size];      this.size = size;    }    /** Collect a result. */    public synchronized void callComplete(ParallelCall call) {      values[call.index] = call.value;            // store the value      count++;                                    // count it      if (count == size)                          // if all values are in        notify();                                 // then notify waiting caller    }  }  /** Construct an IPC client whose values are of the given {@link Writable}   * class. */  public Client(Class valueClass, Configuration conf) {    this.valueClass = valueClass;    this.timeout = conf.getInt("ipc.client.timeout",10000);    this.conf = conf;  }  /** Stop all threads related to this client.  No further calls may be made   * using this client. */  public void stop() {    LOG.info("Stopping client");    try {      Thread.sleep(timeout);                        // let all calls complete    } catch (InterruptedException e) {}    running = false;  }  /** Sets the timeout used for network i/o. */  public void setTimeout(int timeout) { this.timeout = timeout; }  /** Make a call, passing <code>param</code>, to the IPC server running at   * <code>address</code>, returning the value.  Throws exceptions if there are   * network problems or if the remote code threw an exception. */  public Writable call(Writable param, InetSocketAddress address)    throws IOException {    Connection connection = getConnection(address);    Call call = new Call(param);    synchronized (call) {      connection.sendParam(call);                 // send the parameter      long wait = timeout;      do {        try {          call.wait(wait);                       // wait for the result        } catch (InterruptedException e) {}        wait = timeout - (System.currentTimeMillis() - call.lastActivity);      } while (!call.done && wait > 0);      if (call.error != null) {        throw new RemoteException(call.error);      } else if (!call.done) {        throw new IOException("timed out waiting for response");      } else {        return call.value;      }    }  }  /** Makes a set of calls in parallel.  Each parameter is sent to the   * corresponding address.  When all values are available, or have timed out   * or errored, the collected results are returned in an array.  The array   * contains nulls for calls that timed out or errored.  */  public Writable[] call(Writable[] params, InetSocketAddress[] addresses)    throws IOException {    if (addresses.length == 0) return new Writable[0];    ParallelResults results = new ParallelResults(params.length);    synchronized (results) {      for (int i = 0; i < params.length; i++) {        ParallelCall call = new ParallelCall(params[i], results, i);        try {          Connection connection = getConnection(addresses[i]);          connection.sendParam(call);             // send each parameter        } catch (IOException e) {          LOG.info("Calling "+addresses[i]+" caught: " + e); // log errors          results.size--;                         //  wait for one fewer result        }      }      try {        results.wait(timeout);                    // wait for all results      } catch (InterruptedException e) {}      if (results.count == 0) {        throw new IOException("no responses");      } else {        return results.values;      }    }  }  /** Get a connection from the pool, or create a new one and add it to the   * pool.  Connections to a given host/port are reused. */  private Connection getConnection(InetSocketAddress address)    throws IOException {    Connection connection;    synchronized (connections) {      connection = (Connection)connections.get(address);      if (connection == null) {        connection = new Connection(address);        connections.put(address, connection);        connection.start();      }    }    return connection;  }  private Writable makeValue() {    Writable value;                             // construct value    try {      value = (Writable)valueClass.newInstance();    } catch (InstantiationException e) {      throw new RuntimeException(e.toString());    } catch (IllegalAccessException e) {      throw new RuntimeException(e.toString());    }    return value;  }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?