📄 client.java
字号:
} } catch (EOFException eof) { // This is what happens when the remote side goes down } catch (Exception e) { LOG.info(getName() + " caught: " + e, e); } finally { //If there was no exception thrown in this method, then the only //way we reached here is by breaking out of the while loop (after //waitForWork). And if we took that route to reach here, we have //already removed the connection object in the ConnectionCuller thread. //We don't want to remove this again as some other thread might have //actually put a new Connection object in the table in the meantime. synchronized (connections) { if (connections.get(address) == this) { connections.remove(address); } } close(); } } /** Initiates a call by sending the parameter to the remote server. * Note: this is not called from the Connection thread, but by other * threads. */ public void sendParam(Call call) throws IOException { boolean error = true; try { calls.put(new Integer(call.id), call); synchronized (out) { if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); try { writingCall = call; DataOutputBuffer d = new DataOutputBuffer(); //for serializing the //data to be written d.writeInt(call.id); call.param.write(d); byte[] data = d.getData(); int dataLength = d.getLength(); out.writeInt(dataLength); //first put the data length out.write(data, 0, dataLength);//write the data out.flush(); } finally { writingCall = null; } } error = false; } finally { if (error) { synchronized (connections) { if (connections.get(address) == this) connections.remove(address); } close(); // close on error } } } /** Close the connection. */ public void close() { //socket may be null if the connection could not be established to the //server in question, and the culler asked us to close the connection if (socket == null) return; try { socket.close(); // close socket } catch (IOException e) {} if (LOG.isDebugEnabled()) LOG.debug(getName() + ": closing"); } } /** Call implementation used for parallel calls. */ private class ParallelCall extends Call { private ParallelResults results; private int index; public ParallelCall(Writable param, ParallelResults results, int index) { super(param); this.results = results; this.index = index; } /** Deliver result to result collector. */ public void callComplete() { results.callComplete(this); } } /** Result collector for parallel calls. */ private static class ParallelResults { private Writable[] values; private int size; private int count; public ParallelResults(int size) { this.values = new Writable[size]; this.size = size; } /** Collect a result. */ public synchronized void callComplete(ParallelCall call) { values[call.index] = call.value; // store the value count++; // count it if (count == size) // if all values are in notify(); // then notify waiting caller } } private class ConnectionCuller extends Thread { public static final int MIN_SLEEP_TIME = 1000; public void run() { LOG.info(getName() + ": starting"); while (running) { try { Thread.sleep(MIN_SLEEP_TIME); } catch (InterruptedException ie) {} synchronized (connections) { Iterator i = connections.values().iterator(); while (i.hasNext()) { Connection c = (Connection)i.next(); if (c.isIdle()) { //We don't actually close the socket here (i.e., don't invoke //the close() method). We leave that work to the response receiver //thread. The reason for that is since we have taken a lock on the //connections table object, we don't want to slow down the entire //system if we happen to talk to a slow server. i.remove(); synchronized (c) { c.setCloseConnection(); c.notify(); } } } } } } } /** Construct an IPC client whose values are of the given {@link Writable} * class. */ public Client(Class valueClass, Configuration conf) { this.valueClass = valueClass; this.timeout = conf.getInt("ipc.client.timeout",10000); this.maxIdleTime = conf.getInt("ipc.client.connection.maxidletime",1000); this.maxRetries = conf.getInt("ipc.client.connect.max.retries", 10); this.conf = conf; Thread t = new ConnectionCuller(); t.setDaemon(true); t.setName(valueClass.getName() +" ConnectionCuller maxidletime="+maxIdleTime+"ms"); t.start(); } /** Stop all threads related to this client. No further calls may be made * using this client. */ public void stop() { LOG.info("Stopping client"); running = false; } /** Sets the timeout used for network i/o. */ public void setTimeout(int timeout) { this.timeout = timeout; } /** Make a call, passing <code>param</code>, to the IPC server running at * <code>address</code>, returning the value. Throws exceptions if there are * network problems or if the remote code threw an exception. */ public Writable call(Writable param, InetSocketAddress address) throws IOException { Connection connection = getConnection(address); Call call = new Call(param); synchronized (call) { connection.sendParam(call); // send the parameter long wait = timeout; do { try { call.wait(wait); // wait for the result } catch (InterruptedException e) {} wait = timeout - (System.currentTimeMillis() - call.lastActivity); } while (!call.done && wait > 0); if (call.error != null) { throw call.error; } else if (!call.done) { throw new SocketTimeoutException("timed out waiting for rpc response"); } else { return call.value; } } } /** Makes a set of calls in parallel. Each parameter is sent to the * corresponding address. When all values are available, or have timed out * or errored, the collected results are returned in an array. The array * contains nulls for calls that timed out or errored. */ public Writable[] call(Writable[] params, InetSocketAddress[] addresses) throws IOException { if (addresses.length == 0) return new Writable[0]; ParallelResults results = new ParallelResults(params.length); synchronized (results) { for (int i = 0; i < params.length; i++) { ParallelCall call = new ParallelCall(params[i], results, i); try { Connection connection = getConnection(addresses[i]); connection.sendParam(call); // send each parameter } catch (IOException e) { LOG.info("Calling "+addresses[i]+" caught: " + e); // log errors results.size--; // wait for one fewer result } } try { results.wait(timeout); // wait for all results } catch (InterruptedException e) {} if (results.count == 0) { throw new IOException("no responses"); } else { return results.values; } } } /** Get a connection from the pool, or create a new one and add it to the * pool. Connections to a given host/port are reused. */ private Connection getConnection(InetSocketAddress address) throws IOException { Connection connection; synchronized (connections) { connection = (Connection)connections.get(address); if (connection == null) { connection = new Connection(address); connections.put(address, connection); connection.start(); } connection.incrementRef(); } //we don't invoke the method below inside "synchronized (connections)" //block above. The reason for that is if the server happens to be slow, //it will take longer to establish a connection and that will slow the //entire system down. connection.setupIOstreams(); return connection; } private Writable makeValue() { Writable value; // construct value try { value = (Writable)valueClass.newInstance(); } catch (InstantiationException e) { throw new RuntimeException(e.toString()); } catch (IllegalAccessException e) { throw new RuntimeException(e.toString()); } return value; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -