⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 client.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.ipc;import java.net.Socket;import java.net.InetSocketAddress;import java.net.SocketTimeoutException;import java.net.UnknownHostException;import java.io.IOException;import java.io.EOFException;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.BufferedInputStream;import java.io.BufferedOutputStream;import java.io.FilterInputStream;import java.io.FilterOutputStream;import java.util.Hashtable;import java.util.Iterator;import java.util.Collection;import java.util.Random;import org.apache.commons.logging.*;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configurable;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableUtils;import org.apache.hadoop.io.UTF8;import org.apache.hadoop.io.DataOutputBuffer;/** A client for an IPC service.  IPC calls take a single {@link Writable} as a * parameter, and return a {@link Writable} as their value.  A service runs on * a port and is defined by a parameter class and a value class. *  * @author Doug Cutting * @see Server */public class Client {  public static final Log LOG =    LogFactory.getLog("org.apache.hadoop.ipc.Client");  private Hashtable connections = new Hashtable();  private Class valueClass;                       // class of call values  private int timeout ;// timeout for calls  private int counter;                            // counter for call ids  private boolean running = true;                 // true while client runs  private Configuration conf;  private int maxIdleTime; //connections will be culled if it was idle for                            //maxIdleTime msecs  private int maxRetries; //the max. no. of retries for socket connections  /** A call waiting for a value. */  private class Call {    int id;                                       // call id    Writable param;                               // parameter    Writable value;                               // value, null if error    RemoteException error;                        // error, null if value    long lastActivity;                            // time of last i/o    boolean done;                                 // true when call is done    protected Call(Writable param) {      this.param = param;      synchronized (Client.this) {        this.id = counter++;      }      touch();    }    /** Called by the connection thread when the call is complete and the     * value or error string are available.  Notifies by default.  */    public synchronized void callComplete() {        notify();                                 // notify caller    }    /** Update lastActivity with the current time. */    public synchronized void touch() {      lastActivity = System.currentTimeMillis();    }    /** Update lastActivity with the current time. */    public synchronized void setResult(Writable value, RemoteException error) {      this.value = value;      this.error = error;      this.done = true;    }      }  /** Thread that reads responses and notifies callers.  Each connection owns a   * socket connected to a remote address.  Calls are multiplexed through this   * socket: responses may be delivered out of order. */  private class Connection extends Thread {    private InetSocketAddress address;            // address of server    private Socket socket = null;                 // connected socket    private DataInputStream in;                       private DataOutputStream out;    private Hashtable calls = new Hashtable();    // currently active calls    private Call readingCall;    private Call writingCall;    private int inUse = 0;    private long lastActivity = 0;    private boolean shouldCloseConnection = false;    public Connection(InetSocketAddress address) throws IOException {      if (address.isUnresolved()) {         throw new UnknownHostException("unknown host: " + address.getHostName());      }      this.address = address;      this.setName("Client connection to " + address.toString());      this.setDaemon(true);    }    public synchronized void setupIOstreams() throws IOException {      if (socket != null) {        notify();        return;      }      short failures = 0;      while (true) {        try {          this.socket = new Socket(address.getAddress(), address.getPort());          break;        } catch (IOException ie) { //SocketTimeoutException is also caught           if (failures == maxRetries) {            //reset inUse so that the culler gets a chance to throw this            //connection object out of the table. We don't want to increment            //inUse to infinity (everytime getConnection is called inUse is            //incremented)!            inUse = 0;            throw ie;          }          failures++;          LOG.info("Retrying connect to server: " + address +                    ". Already tried " + failures + " time(s).");          try {             Thread.sleep(1000);          } catch (InterruptedException iex){          }        }      }      socket.setSoTimeout(timeout);      this.in = new DataInputStream        (new BufferedInputStream         (new FilterInputStream(socket.getInputStream()) {             public int read(byte[] buf, int off, int len) throws IOException {               int value = super.read(buf, off, len);               if (readingCall != null) {                 readingCall.touch();               }               return value;             }          }));      this.out = new DataOutputStream        (new BufferedOutputStream         (new FilterOutputStream(socket.getOutputStream()) {             public void write(byte[] buf, int o, int len) throws IOException {               out.write(buf, o, len);               if (writingCall != null) {                 writingCall.touch();               }             }           }));      notify();    }    private synchronized boolean waitForWork() {      //wait till someone signals us to start reading RPC response or      //close the connection. If we are idle long enough (blocked in wait),      //the ConnectionCuller thread will wake us up and ask us to close the      //connection.       //We need to wait when inUse is 0 or socket is null (it may be null if      //the Connection object has been created but the socket connection      //has not been setup yet). We stop waiting if we have been asked to close      //connection      while ((inUse == 0 || socket == null) && !shouldCloseConnection) {        try {          wait();        } catch (InterruptedException e) {}      }      return !shouldCloseConnection;    }    private synchronized void incrementRef() {      inUse++;    }    private synchronized void decrementRef() {      lastActivity = System.currentTimeMillis();      inUse--;    }    public synchronized boolean isIdle() {      //check whether the connection is in use or just created      if (inUse != 0) return false;      long currTime = System.currentTimeMillis();      if (currTime - lastActivity > maxIdleTime)        return true;      return false;    }    public InetSocketAddress getRemoteAddress() {      return address;    }    public void setCloseConnection() {      shouldCloseConnection = true;    }    public void run() {      if (LOG.isDebugEnabled())        LOG.debug(getName() + ": starting");      try {        while (running) {          int id;          //wait here for work - read connection or close connection          if (waitForWork() == false)            break;          try {            id = in.readInt();                    // try to read an id          } catch (SocketTimeoutException e) {            continue;          }          if (LOG.isDebugEnabled())            LOG.debug(getName() + " got value #" + id);          Call call = (Call)calls.remove(new Integer(id));          boolean isError = in.readBoolean();     // read if error          if (isError) {            RemoteException ex =               new RemoteException(WritableUtils.readString(in),                                  WritableUtils.readString(in));            call.setResult(null, ex);          } else {            Writable value = makeValue();            try {              readingCall = call;              if(value instanceof Configurable) {                ((Configurable) value).setConf(conf);              }              value.readFields(in);                 // read value            } finally {              readingCall = null;            }            call.setResult(value, null);          }          call.callComplete();                   // deliver result to caller          //received the response. So decrement the ref count          decrementRef();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -