⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rpc.java

📁 hadoop:Nutch集群平台
💻 JAVA
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.ipc;import java.lang.reflect.Proxy;import java.lang.reflect.Method;import java.lang.reflect.Array;import java.lang.reflect.InvocationHandler;import java.lang.reflect.InvocationTargetException;import java.net.ConnectException;import java.net.InetSocketAddress;import java.net.SocketTimeoutException;import java.io.*;import org.apache.commons.logging.*;import org.apache.hadoop.io.*;import org.apache.hadoop.util.StringUtils;import org.apache.hadoop.conf.*;/** A simple RPC mechanism. * * A <i>protocol</i> is a Java interface.  All parameters and return types must * be one of: * * <ul> <li>a primitive type, <code>boolean</code>, <code>byte</code>, * <code>char</code>, <code>short</code>, <code>int</code>, <code>long</code>, * <code>float</code>, <code>double</code>, or <code>void</code>; or</li> * * <li>a {@link String}; or</li> * * <li>a {@link Writable}; or</li> * * <li>an array of the above types</li> </ul> * * All methods in the protocol should throw only IOException.  No field data of * the protocol instance is transmitted. */public class RPC {  private static final Log LOG =    LogFactory.getLog("org.apache.hadoop.ipc.RPC");  private RPC() {}                                  // no public ctor  /** A method invocation, including the method name and its parameters.*/  private static class Invocation implements Writable, Configurable {    private String methodName;    private Class[] parameterClasses;    private Object[] parameters;    private Configuration conf;    public Invocation() {}    public Invocation(Method method, Object[] parameters) {      this.methodName = method.getName();      this.parameterClasses = method.getParameterTypes();      this.parameters = parameters;    }    /** The name of the method invoked. */    public String getMethodName() { return methodName; }    /** The parameter classes. */    public Class[] getParameterClasses() { return parameterClasses; }    /** The parameter instances. */    public Object[] getParameters() { return parameters; }    public void readFields(DataInput in) throws IOException {      methodName = UTF8.readString(in);      parameters = new Object[in.readInt()];      parameterClasses = new Class[parameters.length];      ObjectWritable objectWritable = new ObjectWritable();      for (int i = 0; i < parameters.length; i++) {        parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);        parameterClasses[i] = objectWritable.getDeclaredClass();      }    }    public void write(DataOutput out) throws IOException {      UTF8.writeString(out, methodName);      out.writeInt(parameterClasses.length);      for (int i = 0; i < parameterClasses.length; i++) {        ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],                                   conf);      }    }    public String toString() {      StringBuffer buffer = new StringBuffer();      buffer.append(methodName);      buffer.append("(");      for (int i = 0; i < parameters.length; i++) {        if (i != 0)          buffer.append(", ");        buffer.append(parameters[i]);      }      buffer.append(")");      return buffer.toString();    }    public void setConf(Configuration conf) {      this.conf = conf;    }    public Configuration getConf() {      return this.conf;    }  }  private static Client CLIENT;  private static synchronized Client getClient(Configuration conf) {    // Construct & cache client.  The configuration is only used for timeout,    // and Clients have connection pools.  So we can either (a) lose some    // connection pooling and leak sockets, or (b) use the same timeout for all    // configurations.  Since the IPC is usually intended globally, not    // per-job, we choose (a).    if (CLIENT == null) {      CLIENT = new Client(ObjectWritable.class, conf);    }    return CLIENT;  }  /**   * Stop all RPC client connections   */  public static synchronized void stopClient(){    if(CLIENT != null)      CLIENT.stop();  }  private static class Invoker implements InvocationHandler {    private InetSocketAddress address;    private Client client;    public Invoker(InetSocketAddress address, Configuration conf) {      this.address = address;      this.client = getClient(conf);    }    public Object invoke(Object proxy, Method method, Object[] args)      throws Throwable {      long startTime = System.currentTimeMillis();      ObjectWritable value = (ObjectWritable)        client.call(new Invocation(method, args), address);      long callTime = System.currentTimeMillis() - startTime;      LOG.debug("Call: " + method.getName() + " " + callTime);      return value.get();    }  }  /**   * A version mismatch for the RPC protocol.   * @author Owen O'Malley   */  public static class VersionMismatch extends IOException {    private String interfaceName;    private long clientVersion;    private long serverVersion;        /**     * Create a version mismatch exception     * @param interfaceName the name of the protocol mismatch     * @param clientVersion the client's version of the protocol     * @param serverVersion the server's version of the protocol     */    public VersionMismatch(String interfaceName, long clientVersion,                           long serverVersion) {      super("Protocol " + interfaceName + " version mismatch. (client = " +            clientVersion + ", server = " + serverVersion + ")");      this.interfaceName = interfaceName;      this.clientVersion = clientVersion;      this.serverVersion = serverVersion;    }        /**     * Get the interface name     * @return the java class name      *          (eg. org.apache.hadoop.mapred.InterTrackerProtocol)     */    public String getInterfaceName() {      return interfaceName;    }        /**     * Get the client's prefered version     */    public long getClientVersion() {      return clientVersion;    }        /**     * Get the server's agreed to version.     */    public long getServerVersion() {      return serverVersion;    }  }    public static VersionedProtocol waitForProxy(Class protocol,                                               long clientVersion,                                               InetSocketAddress addr,                                               Configuration conf                                               ) throws IOException {    while (true) {      try {        return getProxy(protocol, clientVersion, addr, conf);      } catch( ConnectException se ) {  // namenode has not been started        LOG.info("Server at " + addr + " not available yet, Zzzzz...");      } catch( SocketTimeoutException te ) {  // namenode is busy        LOG.info("Problem connecting to server: " + addr);      }      try {        Thread.sleep(1000);      } catch (InterruptedException ie) {        // IGNORE      }    }  }  /** Construct a client-side proxy object that implements the named protocol,   * talking to a server at the named address. */  public static VersionedProtocol getProxy(Class protocol, long clientVersion,                                           InetSocketAddress addr, Configuration conf) throws IOException {    VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance(                                  protocol.getClassLoader(),                                  new Class[] { protocol },                                  new Invoker(addr, conf));    long serverVersion = proxy.getProtocolVersion(protocol.getName(),                                                   clientVersion);    if (serverVersion == clientVersion) {      return proxy;    } else {      throw new VersionMismatch(protocol.getName(), clientVersion,                                 serverVersion);    }  }  /** Expert: Make multiple, parallel calls to a set of servers. */  public static Object[] call(Method method, Object[][] params,                              InetSocketAddress[] addrs, Configuration conf)    throws IOException {    Invocation[] invocations = new Invocation[params.length];    for (int i = 0; i < params.length; i++)      invocations[i] = new Invocation(method, params[i]);    Writable[] wrappedValues = getClient(conf).call(invocations, addrs);        if (method.getReturnType() == Void.TYPE) {      return null;    }    Object[] values =      (Object[])Array.newInstance(method.getReturnType(),wrappedValues.length);    for (int i = 0; i < values.length; i++)      if (wrappedValues[i] != null)        values[i] = ((ObjectWritable)wrappedValues[i]).get();        return values;  }  /** Construct a server for a protocol implementation instance listening on a   * port and address. */  public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf) {    return getServer(instance, bindAddress, port, 1, false, conf);  }  /** Construct a server for a protocol implementation instance listening on a   * port and address. */  public static Server getServer(final Object instance, final String bindAddress, final int port,                                 final int numHandlers,                                 final boolean verbose, Configuration conf) {    return new Server(instance, conf, bindAddress,port, numHandlers, verbose);  }    /** Construct a server for a protocol implementation instance listening on a   * port.   *    * @deprecated the bind address should always be specified   */  public static Server getServer(final Object instance, final int port, Configuration conf) {    return getServer(instance, port, 1, false, conf);  }  /** Construct a server for a protocol implementation instance listening on a   * port.    *   * @deprecated the bind address should always be specified   */  public static Server getServer(final Object instance,final int port,                                 final int numHandlers,                                 final boolean verbose, Configuration conf) {    return new Server(instance, conf, port, numHandlers, verbose);  }        /** An RPC Server. */  public static class Server extends org.apache.hadoop.ipc.Server {    private Object instance;    private Class implementation;    private boolean verbose;    /** Construct an RPC server.     * @param instance the instance whose methods will be called     * @param conf the configuration to use     * @param port the port to listen for connections on     *      * @deprecated the bind address should always be specified     */    public Server(Object instance, Configuration conf, int port) {      this(instance, conf,  "0.0.0.0", port, 1, false);    }    /** Construct an RPC server.     * @param instance the instance whose methods will be called     * @param conf the configuration to use     * @param bindAddress the address to bind on to listen for connection     * @param port the port to listen for connections on     */    public Server(Object instance, Configuration conf, String bindAddress, int port) {      this(instance, conf,  bindAddress, port, 1, false);    }    /** Construct an RPC server.     * @param instance the instance whose methods will be called     * @param conf the configuration to use     * @param bindAddress the address to bind on to listen for connection     * @param port the port to listen for connections on     * @param numHandlers the number of method handler threads to run     * @param verbose whether each call should be logged     */    public Server(Object instance, Configuration conf, String bindAddress,  int port,                  int numHandlers, boolean verbose) {      super(bindAddress, port, Invocation.class, numHandlers, conf);      this.instance = instance;      this.implementation = instance.getClass();      this.verbose = verbose;    }    /** Construct an RPC server.     * @param instance the instance whose methods will be called     * @param conf the configuration to use     * @param port the port to listen for connections on     * @param numHandlers the number of method handler threads to run     * @param verbose whether each call should be logged     *      * @deprecated the bind address should always be specified     */    public Server(Object instance, Configuration conf,  int port,                  int numHandlers, boolean verbose) {      super("0.0.0.0", port, Invocation.class, numHandlers, conf);      this.instance = instance;      this.implementation = instance.getClass();      this.verbose = verbose;    }    public Writable call(Writable param) throws IOException {      try {        Invocation call = (Invocation)param;        if (verbose) log("Call: " + call);                Method method =          implementation.getMethod(call.getMethodName(),                                   call.getParameterClasses());        long startTime = System.currentTimeMillis();        Object value = method.invoke(instance, call.getParameters());        long callTime = System.currentTimeMillis() - startTime;        LOG.debug("Served: " + call.getMethodName() + " " + callTime);        if (verbose) log("Return: "+value);        return new ObjectWritable(method.getReturnType(), value);      } catch (InvocationTargetException e) {        Throwable target = e.getTargetException();        if (target instanceof IOException) {          throw (IOException)target;        } else {          IOException ioe = new IOException(target.toString());          ioe.setStackTrace(target.getStackTrace());          throw ioe;        }      } catch (Throwable e) {        IOException ioe = new IOException(e.toString());        ioe.setStackTrace(e.getStackTrace());        throw ioe;      }    }  }  private static void log(String value) {    if (value!= null && value.length() > 55)      value = value.substring(0, 55)+"...";    LOG.info(value);  }  }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -