⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 client.java

📁 爬虫数据的改进,并修正了一些bug
💻 JAVA
字号:
/* Copyright (c) 2003 The Nutch Organization.  All rights reserved.   */
/* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */

package net.nutch.ipc;

import java.net.Socket;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;

import java.io.IOException;
import java.io.EOFException;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;

import java.util.Hashtable;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import net.nutch.util.LogFormatter;
import net.nutch.io.Writable;
import net.nutch.io.NullWritable;
import net.nutch.io.UTF8;

/** A client for an IPC service.  IPC calls take a single {@link Writable} as a
 * parameter, and return a {@link Writable} as their value.  A service runs on
 * a port and is defined by a parameter class and a value class.
 * 
 * @author Doug Cutting
 * @see Server
 */
public class Client {
	
	public static final Logger LOG = Logger.getLogger("search");

	private Hashtable connections = new Hashtable();

	private Class valueClass;                       // class of call values
	private int timeout = 5000;                    // timeout for calls
	private int counter;                            // counter for call ids
	private boolean running = true;                 // true while client runs

	/** A call waiting for a value. */
	private class Call {
		int id;                                       // call id
		Writable param;                               // parameter
		Writable value;                               // value, null if error
		String error;                                 // error, null if value

		protected Call(Writable param) {
			this.param = param;
			synchronized (Client.this) {
				this.id = counter++;
			}
		}

		/** Called by the connection thread when the call is complete and the
		 * value or error string are available.  Notifies by default.  */
		public synchronized void callComplete() {
			notify();                                 // notify caller
		}
	}

	/** Thread that reads responses and notifies callers.  Each connection owns a
	 * socket connected to a remote address.  Calls are multiplexed through this
	 * socket: responses may be delivered out of order. */
	private class Connection extends Thread {
		private InetSocketAddress address;            // address of server
		private Socket socket;                        // connected socket
		private DataInputStream in;                   
		private DataOutputStream out;
		private Hashtable<Integer,Call> calls = new Hashtable<Integer,Call>();    // currently active calls

		public Connection(InetSocketAddress address) throws IOException {
			this.address = address;
			this.socket = new Socket(address.getAddress(), address.getPort());
			socket.setSoTimeout(timeout);
			this.in = new DataInputStream
			(new BufferedInputStream(socket.getInputStream()));
			this.out = new DataOutputStream
			(new BufferedOutputStream(socket.getOutputStream()));
			this.setDaemon(true);
			this.setName("Client connection to "
					+ address.getAddress().getHostAddress()
					+ ":" + address.getPort());
		}

		public void run() {
			LOG.info(getName() + ": starting");
			try {
				while (running) {
					int id;
					try {
						id = in.readInt();                    // try to read an id
					} catch (SocketTimeoutException e) {
						continue;
					}

					//if (LOG.isLoggable(Level.FINE))
					//  LOG.fine(getName() + " got value #" + id);

					Call call = (Call)calls.remove(new Integer(id));
					boolean isError = in.readBoolean();     // read if error
					if (isError) {
						UTF8 utf8 = new UTF8();
						utf8.readFields(in);                  // read error string
						call.error = utf8.toString();
						call.value = NullWritable.get();
					} else {
						Writable value = makeValue();
						value.readFields(in);                 // read value
						call.value = value;
						call.error = null;
					}
					call.callComplete();                   // deliver result to caller
				}
			} catch (EOFException eof) {
				// This is what happens when the remote side goes down
			} catch (Exception e) {
				LOG.log(Level.INFO, getName() + " caught: " + e, e);
			} finally {
				close();
			}
		}

		/** Initiates a call by sending the parameter to the remote server.
		 * Note: this is not called from the Connection thread, but by other
		 * threads.
		 */
		public void sendParam(Call call) throws IOException {
			boolean error = true;
			try {
				calls.put(new Integer(call.id), call);
				synchronized (out) {
					//if (LOG.isLoggable(Level.FINE))
					//  LOG.fine(getName() + " sending #" + call.id);
					out.writeInt(call.id);
					call.param.write(out);
					out.flush();
				}
				error = false;
			} finally {
				if (error)
					close();                                // close on error
			}
		}

		/** Close the connection and remove it from the pool. */
		public void close() {
			LOG.info(getName() + ": closing");
			connections.remove(address);                // remove connection
			try {
				socket.close();                           // close socket
			} catch (IOException e) {}
		}

	}

	/** Call implementation used for parallel calls. */
	private class ParallelCall extends Call {
		private ParallelResults results;
		private int index;

		public ParallelCall(Writable param, ParallelResults results, int index) {
			super(param);
			this.results = results;
			this.index = index;
		}

		/** Deliver result to result collector. */
		public void callComplete() {
			results.callComplete(this);
		}
	}

	/** Result collector for parallel calls. */
	private static class ParallelResults {
		private Writable[] values;
		private int size;
		private int count;

		public ParallelResults(int size) {
			this.values = new Writable[size];
			this.size = size;
		}

		/** Collect a result. */
		public synchronized void callComplete(ParallelCall call) {
			values[call.index] = call.value;            // store the value
			count++;                                    // count it
			if (count == size)                          // if all values are in
				notify();                                 // then notify waiting caller
		}
	}

	/** Construct an IPC client whose values are of the given {@link Writable}
	 * class. */
	public Client(Class valueClass) {
		this.valueClass = valueClass;
	}

	/** Stop all threads related to this client.  No further calls may be made
	 * using this client. */
	public void stop() {
		LOG.info("Stopping client");
		try {
			Thread.sleep(timeout);                        // let all calls complete
		} catch (InterruptedException e) {}
		running = false;
	}

	/** Sets the timeout used for network i/o. */
	public void setTimeout(int timeout) { 
		this.timeout = timeout; 
	}
	
	/** Make a call, passing <code>param</code>, to the IPC server running at
	 * <code>address</code>, returning the value.  Throws exceptions if there are
	 * network problems or if the remote code threw an exception. */
	public Writable call(Writable param, InetSocketAddress address)
	throws IOException {
		Connection connection = getConnection(address);
		Call call = new Call(param);
		synchronized (call) {
			connection.sendParam(call);                 // send the parameter
			try {
				call.wait(timeout);                       // wait for the result
			} catch (InterruptedException e) {}

			if (call.error != null) {
				throw new IOException(call.error);
			} else if (call.value == null) {
				throw new IOException("timed out waiting for response");
			} else {
				return call.value;
			}
		}
	}

	/** Makes a set of calls in parallel.  Each parameter is sent to the
	 * corresponding address.  When all values are available, or have timed out
	 * or errored, the collected results are returned in an array.  The array
	 * contains nulls for calls that timed out or errored.  */
	public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
	throws IOException {
		if (params.length == 0) return new Writable[0];

		ParallelResults results = new ParallelResults(params.length);
		synchronized (results) {
			for (int i = 0; i < params.length; i++) {
				// Add by Xie Shuqiang. 2006.11.28
				if (addresses[i] == null){
					results.size--;
					continue;
				}
				//
				ParallelCall call = new ParallelCall(params[i], results, i);
				try {
					Connection connection = getConnection(addresses[i]);
					connection.sendParam(call);             // send each parameter
				} catch (IOException e) {
					LOG.warn("Calling "+addresses[i]+" caught: " + e); // log errors
					results.size--;                         //  wait for one fewer result
				}
			}
			try {
				if (results.size > 0)
					results.wait(timeout);                    // wait for all results
			} catch (InterruptedException e) {}

			if (results.count == 0 && results.size > 0) {
				throw new IOException("no responses");
			} else {
				return results.values;
			}
		}
	}

	/** Get a connection from the pool, or create a new one and add it to the
	 * pool.  Connections to a given host/port are reused. */
	private Connection getConnection(InetSocketAddress address)
	throws IOException {
		Connection connection;
		synchronized (connections) {
			connection = (Connection)connections.get(address);
			if (connection == null) {
				connection = new Connection(address);
				connections.put(address, connection);
				connection.start();
			}
		}
		return connection;
	}

	private Writable makeValue() {
		Writable value;                             // construct value
		try {
			value = (Writable)valueClass.newInstance();
		} catch (InstantiationException e) {
			throw new RuntimeException(e.toString());
		} catch (IllegalAccessException e) {
			throw new RuntimeException(e.toString());
		}
		return value;
	}

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -