⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 server.java

📁 爬虫数据的改进,并修正了一些bug
💻 JAVA
字号:
/* Copyright (c) 2003 The Nutch Organization.  All rights reserved.   */
/* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */

package net.nutch.ipc;

import java.io.IOException;
import java.io.EOFException;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;

import java.net.Socket;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;

import java.util.LinkedList;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import net.nutch.io.Writable;
import net.nutch.io.UTF8;

/** An abstract IPC service.  IPC calls take a single {@link Writable} as a
 * parameter, and return a {@link Writable} as their value.  A service runs on
 * a port and is defined by a parameter class and a value class.
 * 
 * @author Doug Cutting
 * @see Client
 */
public abstract class Server {
  public static final Logger LOG =
    Logger.getLogger("server");

  private int port;                               // port we listen on
  private int handlerCount;                       // number of handler threads
  private int maxQueuedCalls;                     // max number of queued calls
  private Class paramClass;                       // class of call parameters

  private int timeout = 10000;                    // timeout for i/o
  private boolean running = true;                 // true while server runs
  private LinkedList callQueue = new LinkedList(); // queued calls
  private Object callDequeued = new Object();     // used by wait/notify

  /** A call queued for handling. */
  private static class Call {
    private int id;                               // the client's call id
    private Writable param;                       // the parameter passed
    private Connection connection;                // connection to client

    public Call(int id, Writable param, Connection connection) {
      this.id = id;
      this.param = param;
      this.connection = connection;
    }
  }

  /** Listens on the socket, starting new connection threads. */
  private class Listener extends Thread {
    private ServerSocket socket;

    public Listener() throws IOException {
      this.socket = new ServerSocket(port);
      socket.setSoTimeout(timeout);
      this.setDaemon(true);
      this.setName("Server listener on port " + port);
    }

    public void run() {
      LOG.info(getName() + ": starting");
      while (running) {
        try {
          new Connection(socket.accept()).start(); // start a new connection

        } catch (SocketTimeoutException e) {      // ignore timeouts
        } catch (Exception e) {                   // log all other exceptions
          LOG.log(Level.INFO, getName() + " caught: " + e, e);
        }
      }
      try {
        socket.close();
      } catch (IOException e) {}
      LOG.info(getName() + ": exiting");
    }
  }

  /** Reads calls from a connection and queues them for handling. */
  private class Connection extends Thread {
    private Socket socket;
    private DataInputStream in;
    private DataOutputStream out;

    public Connection(Socket socket) throws IOException {
      this.socket = socket;
      socket.setSoTimeout(timeout);
      this.in = new DataInputStream
        (new BufferedInputStream(socket.getInputStream()));
      this.out = new DataOutputStream
        (new BufferedOutputStream(socket.getOutputStream()));
      this.setDaemon(true);
      this.setName("Server connection on port " + port + " from "
                   + socket.getInetAddress().getHostAddress());
    }

    public void run() {
      LOG.info(getName() + ": starting");
      try {
        while (running) {
          int id;
          try {
            id = in.readInt();                    // try to read an id
          } catch (SocketTimeoutException e) {
            continue;
          }
          
//          if (LOG.isLoggable(Level.FINE))
//            LOG.fine(getName() + " got #" + id);
          LOG.info(">>> server: "+ id + " ["+ paramClass.getName()+"]");
        
          Writable param = makeParam();           // read param
          param.readFields(in);        
        
          Call call = new Call(id, param, this);
        
          synchronized (callQueue) {
            callQueue.addLast(call);              // queue the call
            callQueue.notify();                   // wake up a waiting handler
          }
        
          while (running && callQueue.size() >= maxQueuedCalls) {
            synchronized (callDequeued) {         // queue is full
              callDequeued.wait(timeout);         // wait for a dequeue
            }
          }
        }
      } catch (EOFException eof) {
          // This is what happens when the other side shuts things down
      } catch (Exception e) {
        LOG.log(Level.INFO, getName() + " caught: " + e, e);
      } finally {
        try {
          socket.close();
        } catch (IOException e) {}
        LOG.info(getName() + ": exiting");
      }
    }

  }

  /** Handles queued calls . */
  public class Handler extends Thread {
    public Handler() {
      this.setDaemon(true);
      this.setName("Server handler on " + port);
    }

    public void run() {
      LOG.info(getName() + ": starting");
      while (running) {
        try {
          Call call;
          synchronized (callQueue) {
            while (running && callQueue.size()==0) { // wait for a call
              callQueue.wait(timeout);
            }
            if (!running) break;
            call = (Call)callQueue.removeFirst(); // pop the queue
          }

          synchronized (callDequeued) {           // tell others we've dequeued
            callDequeued.notify();
          }

          //if (LOG.isLoggable(Level.FINE))
          //  LOG.fine(getName() + ": has #" + call.id + " from " +
          //           call.connection.socket.getInetAddress().getHostAddress());
          
          String error = null;
          Writable value = null;
          try {
            value = call(call.param);             // make the call
          } catch (Exception e) {
            LOG.log(Level.INFO, getName() + " call error: " + e, e);
            error = e.toString();
          }
            
          DataOutputStream out = call.connection.out;
          synchronized (out) {
            out.writeInt(call.id);                // write call id
            out.writeBoolean(error!=null);        // write error flag
            if (error != null)
              value = new UTF8(error);
            value.write(out);                     // write value
            out.flush();
          }

        } catch (Exception e) {
          LOG.log(Level.INFO, getName() + " caught: " + e, e);
        }
      }
      LOG.info(getName() + ": exiting");
    }

  }
  
  /** Constructs a server listening on the named port.  Parameters passed must
   * be of the named class.  The <code>handlerCount</handlerCount> determines
   * the number of handler threads that will be used to process calls.
   */
  protected Server(int port, Class paramClass, int handlerCount) {
    this.port = port;
    this.paramClass = paramClass;
    this.handlerCount = handlerCount;
    this.maxQueuedCalls = handlerCount;
  }

  /** Sets the timeout used for network i/o. */
  public void setTimeout(int timeout) { this.timeout = timeout; }

  /** Starts the service.  Must be called before any calls will be handled. */
  public synchronized void start() throws IOException {
    Listener listener = new Listener();
    listener.start();
    
    for (int i = 0; i < handlerCount; i++) {
      Handler handler = new Handler();
      handler.start();
    }
  }

  /** Stops the service.  No calls will be handled after this is called.  All
   * threads will exit. */
  public synchronized void stop() {
    LOG.info("Stopping server on " + port);
    running = false;
    try {
      Thread.sleep(timeout);                        // let all threads exit
    } catch (InterruptedException e) {}
    notify();
  }

  /** Wait for the server to be stopped. */
  public synchronized void join() throws InterruptedException {
    wait();
  }

  /** Called for each call. */
  public abstract Writable call(Writable param) throws IOException;

  
  private Writable makeParam() {
    Writable param;                               // construct param
    try {
      param = (Writable)paramClass.newInstance();
    } catch (InstantiationException e) {
      throw new RuntimeException(e.toString());
    } catch (IllegalAccessException e) {
      throw new RuntimeException(e.toString());
    }
    return param;
  }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -