⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 server.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        try {          if (LOG.isDebugEnabled())            LOG.debug(getName() + ": disconnecting client " +                   c.getHostAddress() + ". Number of active connections: "+                  numConnections);          c.close();        } catch (Exception e) {}        c = null;      }      else {        c.setLastContact(System.currentTimeMillis());      }    }       synchronized void doStop() {      if (selector != null) {        selector.wakeup();        Thread.yield();      }    }  }  /** Reads calls from a connection and queues them for handling. */  private class Connection {    private SocketChannel channel;    private SelectionKey key;    private ByteBuffer data;    private ByteBuffer dataLengthBuffer;    private DataOutputStream out;    private SocketChannelOutputStream channelOut;    private long lastContact;    private int dataLength;    private Socket socket;    // Cache the remote host & port info so that even if the socket is     // disconnected, we can say where it used to connect to.    private String hostAddress;    private int remotePort;    public Connection(SelectionKey key, SocketChannel channel,     long lastContact) {      this.key = key;      this.channel = channel;      this.lastContact = lastContact;      this.data = null;      this.dataLengthBuffer = null;      this.socket = channel.socket();      this.out = new DataOutputStream        (new BufferedOutputStream(         this.channelOut = new SocketChannelOutputStream(channel, 4096)));      InetAddress addr = socket.getInetAddress();      if (addr == null) {        this.hostAddress = "*Unknown*";      } else {        this.hostAddress = addr.getHostAddress();      }      this.remotePort = socket.getPort();    }       public String toString() {      return getHostAddress() + ":" + remotePort;     }        public String getHostAddress() {      return hostAddress;    }    public void setLastContact(long lastContact) {      this.lastContact = lastContact;    }    public long getLastContact() {      return lastContact;    }    private boolean timedOut() {      if(System.currentTimeMillis() -  lastContact > maxIdleTime)        return true;      return false;    }    private boolean timedOut(long currentTime) {        if(currentTime -  lastContact > maxIdleTime)          return true;        return false;    }    public int readAndProcess() throws IOException, InterruptedException {      int count = -1;      if (dataLengthBuffer == null)        dataLengthBuffer = ByteBuffer.allocateDirect(4);      if (dataLengthBuffer.remaining() > 0) {        count = channel.read(dataLengthBuffer);        if (count < 0) return count;        if (dataLengthBuffer.remaining() == 0) {          dataLengthBuffer.flip();           dataLength = dataLengthBuffer.getInt();          data = ByteBuffer.allocateDirect(dataLength);        }        //return count;      }      count = channel.read(data);      if (data.remaining() == 0) {        data.flip();        processData();        data = dataLengthBuffer = null;       }      return count;    }    private void processData() throws  IOException, InterruptedException {      byte[] bytes = new byte[dataLength];      data.get(bytes);      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));      int id = dis.readInt();                    // try to read an id              if (LOG.isDebugEnabled())        LOG.debug(" got #" + id);                  Writable param = makeParam();           // read param      param.readFields(dis);                      Call call = new Call(id, param, this);      synchronized (callQueue) {        if (callQueue.size() >= maxQueueSize) {          Call oldCall = (Call) callQueue.removeFirst();          LOG.warn("Call queue overflow discarding oldest call " + oldCall);        }        callQueue.addLast(call);              // queue the call        callQueue.notify();                   // wake up a waiting handler      }            }    private void close() throws IOException {      data = null;      dataLengthBuffer = null;      if (!channel.isOpen())        return;      try {socket.shutdownOutput();} catch(Exception e) {}      try {out.close();} catch(Exception e) {}      try {channelOut.destroy();} catch(Exception e) {}      if (channel.isOpen()) {        try {channel.close();} catch(Exception e) {}      }      try {socket.close();} catch(Exception e) {}      try {key.cancel();} catch(Exception e) {}      key = null;    }  }  /** Handles queued calls . */  private class Handler extends Thread {    public Handler(int instanceNumber) {      this.setDaemon(true);      this.setName("Server handler "+ instanceNumber + " on " + port);    }    public void run() {      LOG.info(getName() + ": starting");      SERVER.set(Server.this);      while (running) {        try {          Call call;          synchronized (callQueue) {            while (running && callQueue.size()==0) { // wait for a call              callQueue.wait(timeout);            }            if (!running) break;            call = (Call)callQueue.removeFirst(); // pop the queue          }          synchronized (callDequeued) {           // tell others we've dequeued            callDequeued.notify();          }          // throw the message away if it is too old          if (System.currentTimeMillis() - call.receivedTime >               maxCallStartAge) {            LOG.warn("Call " + call.toString() +                      " discarded for being too old (" +                     (System.currentTimeMillis() - call.receivedTime) + ")");            continue;          }                    if (LOG.isDebugEnabled())            LOG.debug(getName() + ": has #" + call.id + " from " +                     call.connection);                    String errorClass = null;          String error = null;          Writable value = null;          try {            value = call(call.param);             // make the call          } catch (Throwable e) {            LOG.info(getName() + " call error: " + e, e);            errorClass = e.getClass().getName();            error = getStackTrace(e);          }                      DataOutputStream out = call.connection.out;          synchronized (out) {            try {              out.writeInt(call.id);                // write call id              out.writeBoolean(error!=null);        // write error flag              if (error == null) {                value.write(out);              } else {                WritableUtils.writeString(out, errorClass);                WritableUtils.writeString(out, error);              }              out.flush();            } catch (Exception e) {              LOG.warn("handler output error", e);              synchronized (connectionList) {                if (connectionList.remove(call.connection))                  numConnections--;              }              call.connection.close();            }          }        } catch (Exception e) {          LOG.info(getName() + " caught: " + e, e);        }      }      LOG.info(getName() + ": exiting");    }    private String getStackTrace(Throwable throwable) {      StringWriter stringWriter = new StringWriter();      PrintWriter printWriter = new PrintWriter(stringWriter);      throwable.printStackTrace(printWriter);      printWriter.flush();      return stringWriter.toString();    }  }  /** Constructs a server listening on the named port and address.  Parameters passed must   * be of the named class.  The <code>handlerCount</handlerCount> determines   * the number of handler threads that will be used to process calls.   *    */  protected Server(String bindAddress, int port, Class paramClass, int handlerCount, Configuration conf) {    this.bindAddress = bindAddress;    this.conf = conf;    this.port = port;    this.paramClass = paramClass;    this.handlerCount = handlerCount;    this.timeout = conf.getInt("ipc.client.timeout",10000);    maxCallStartAge = (long) (timeout * MAX_CALL_QUEUE_TIME);    maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;    this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);    this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);    this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);  }      /** Constructs a server listening on the named port.  Parameters passed must   * be of the named class.  The <code>handlerCount</handlerCount> determines   * the number of handler threads that will be used to process calls.   *    * @deprecated the bind address should always be specified   */  protected Server(int port, Class paramClass, int handlerCount, Configuration conf) {    this("0.0.0.0",port,paramClass,handlerCount,conf);  }  /** Sets the timeout used for network i/o. */  public void setTimeout(int timeout) { this.timeout = timeout; }  /** Starts the service.  Must be called before any calls will be handled. */  public synchronized void start() throws IOException {    listener = new Listener();    listener.start();        for (int i = 0; i < handlerCount; i++) {      Handler handler = new Handler(i);      handler.start();    }  }  /** Stops the service.  No new calls will be handled after this is called. */  public synchronized void stop() {    LOG.info("Stopping server on " + port);    running = false;    listener.doStop();    notifyAll();  }  /** Wait for the server to be stopped.   * Does not wait for all subthreads to finish.   *  See {@link #stop()}.   */  public synchronized void join() throws InterruptedException {    while (running) {      wait();    }  }  /** Called for each call. */  public abstract Writable call(Writable param) throws IOException;    private Writable makeParam() {    Writable param;                               // construct param    try {      param = (Writable)paramClass.newInstance();      if (param instanceof Configurable) {        ((Configurable)param).setConf(conf);      }    } catch (InstantiationException e) {      throw new RuntimeException(e.toString());    } catch (IllegalAccessException e) {      throw new RuntimeException(e.toString());    }    return param;  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -