📄 client.java
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.ipc;import java.net.Socket;import java.net.InetSocketAddress;import java.net.SocketTimeoutException;import java.net.UnknownHostException;import java.io.IOException;import java.io.EOFException;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.BufferedInputStream;import java.io.BufferedOutputStream;import java.io.FilterInputStream;import java.io.FilterOutputStream;import java.util.Hashtable;import java.util.Iterator;import java.util.Collection;import java.util.Random;import org.apache.commons.logging.*;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configurable;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableUtils;import org.apache.hadoop.io.UTF8;import org.apache.hadoop.io.DataOutputBuffer;/** A client for an IPC service. IPC calls take a single {@link Writable} as a * parameter, and return a {@link Writable} as their value. A service runs on * a port and is defined by a parameter class and a value class. * * @author Doug Cutting * @see Server */public class Client { public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.Client"); private Hashtable connections = new Hashtable(); private Class valueClass; // class of call values private int timeout ;// timeout for calls private int counter; // counter for call ids private boolean running = true; // true while client runs private Configuration conf; private int maxIdleTime; //connections will be culled if it was idle for //maxIdleTime msecs private int maxRetries; //the max. no. of retries for socket connections /** A call waiting for a value. */ private class Call { int id; // call id Writable param; // parameter Writable value; // value, null if error RemoteException error; // error, null if value long lastActivity; // time of last i/o boolean done; // true when call is done protected Call(Writable param) { this.param = param; synchronized (Client.this) { this.id = counter++; } touch(); } /** Called by the connection thread when the call is complete and the * value or error string are available. Notifies by default. */ public synchronized void callComplete() { notify(); // notify caller } /** Update lastActivity with the current time. */ public synchronized void touch() { lastActivity = System.currentTimeMillis(); } /** Update lastActivity with the current time. */ public synchronized void setResult(Writable value, RemoteException error) { this.value = value; this.error = error; this.done = true; } } /** Thread that reads responses and notifies callers. Each connection owns a * socket connected to a remote address. Calls are multiplexed through this * socket: responses may be delivered out of order. */ private class Connection extends Thread { private InetSocketAddress address; // address of server private Socket socket = null; // connected socket private DataInputStream in; private DataOutputStream out; private Hashtable calls = new Hashtable(); // currently active calls private Call readingCall; private Call writingCall; private int inUse = 0; private long lastActivity = 0; private boolean shouldCloseConnection = false; public Connection(InetSocketAddress address) throws IOException { if (address.isUnresolved()) { throw new UnknownHostException("unknown host: " + address.getHostName()); } this.address = address; this.setName("Client connection to " + address.toString()); this.setDaemon(true); } public synchronized void setupIOstreams() throws IOException { if (socket != null) { notify(); return; } short failures = 0; while (true) { try { this.socket = new Socket(address.getAddress(), address.getPort()); break; } catch (IOException ie) { //SocketTimeoutException is also caught if (failures == maxRetries) { //reset inUse so that the culler gets a chance to throw this //connection object out of the table. We don't want to increment //inUse to infinity (everytime getConnection is called inUse is //incremented)! inUse = 0; throw ie; } failures++; LOG.info("Retrying connect to server: " + address + ". Already tried " + failures + " time(s)."); try { Thread.sleep(1000); } catch (InterruptedException iex){ } } } socket.setSoTimeout(timeout); this.in = new DataInputStream (new BufferedInputStream (new FilterInputStream(socket.getInputStream()) { public int read(byte[] buf, int off, int len) throws IOException { int value = super.read(buf, off, len); if (readingCall != null) { readingCall.touch(); } return value; } })); this.out = new DataOutputStream (new BufferedOutputStream (new FilterOutputStream(socket.getOutputStream()) { public void write(byte[] buf, int o, int len) throws IOException { out.write(buf, o, len); if (writingCall != null) { writingCall.touch(); } } })); notify(); } private synchronized boolean waitForWork() { //wait till someone signals us to start reading RPC response or //close the connection. If we are idle long enough (blocked in wait), //the ConnectionCuller thread will wake us up and ask us to close the //connection. //We need to wait when inUse is 0 or socket is null (it may be null if //the Connection object has been created but the socket connection //has not been setup yet). We stop waiting if we have been asked to close //connection while ((inUse == 0 || socket == null) && !shouldCloseConnection) { try { wait(); } catch (InterruptedException e) {} } return !shouldCloseConnection; } private synchronized void incrementRef() { inUse++; } private synchronized void decrementRef() { lastActivity = System.currentTimeMillis(); inUse--; } public synchronized boolean isIdle() { //check whether the connection is in use or just created if (inUse != 0) return false; long currTime = System.currentTimeMillis(); if (currTime - lastActivity > maxIdleTime) return true; return false; } public InetSocketAddress getRemoteAddress() { return address; } public void setCloseConnection() { shouldCloseConnection = true; } public void run() { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": starting"); try { while (running) { int id; //wait here for work - read connection or close connection if (waitForWork() == false) break; try { id = in.readInt(); // try to read an id } catch (SocketTimeoutException e) { continue; } if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + id); Call call = (Call)calls.remove(new Integer(id)); boolean isError = in.readBoolean(); // read if error if (isError) { RemoteException ex = new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in)); call.setResult(null, ex); } else { Writable value = makeValue(); try { readingCall = call; if(value instanceof Configurable) { ((Configurable) value).setConf(conf); } value.readFields(in); // read value } finally { readingCall = null; } call.setResult(value, null); } call.callComplete(); // deliver result to caller //received the response. So decrement the ref count decrementRef();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -