📄 server.java
字号:
try { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() + ". Number of active connections: "+ numConnections); c.close(); } catch (Exception e) {} c = null; } else { c.setLastContact(System.currentTimeMillis()); } } synchronized void doStop() { if (selector != null) { selector.wakeup(); Thread.yield(); } } } /** Reads calls from a connection and queues them for handling. */ private class Connection { private SocketChannel channel; private SelectionKey key; private ByteBuffer data; private ByteBuffer dataLengthBuffer; private DataOutputStream out; private SocketChannelOutputStream channelOut; private long lastContact; private int dataLength; private Socket socket; // Cache the remote host & port info so that even if the socket is // disconnected, we can say where it used to connect to. private String hostAddress; private int remotePort; public Connection(SelectionKey key, SocketChannel channel, long lastContact) { this.key = key; this.channel = channel; this.lastContact = lastContact; this.data = null; this.dataLengthBuffer = null; this.socket = channel.socket(); this.out = new DataOutputStream (new BufferedOutputStream( this.channelOut = new SocketChannelOutputStream(channel, 4096))); InetAddress addr = socket.getInetAddress(); if (addr == null) { this.hostAddress = "*Unknown*"; } else { this.hostAddress = addr.getHostAddress(); } this.remotePort = socket.getPort(); } public String toString() { return getHostAddress() + ":" + remotePort; } public String getHostAddress() { return hostAddress; } public void setLastContact(long lastContact) { this.lastContact = lastContact; } public long getLastContact() { return lastContact; } private boolean timedOut() { if(System.currentTimeMillis() - lastContact > maxIdleTime) return true; return false; } private boolean timedOut(long currentTime) { if(currentTime - lastContact > maxIdleTime) return true; return false; } public int readAndProcess() throws IOException, InterruptedException { int count = -1; if (dataLengthBuffer == null) dataLengthBuffer = ByteBuffer.allocateDirect(4); if (dataLengthBuffer.remaining() > 0) { count = channel.read(dataLengthBuffer); if (count < 0) return count; if (dataLengthBuffer.remaining() == 0) { dataLengthBuffer.flip(); dataLength = dataLengthBuffer.getInt(); data = ByteBuffer.allocateDirect(dataLength); } //return count; } count = channel.read(data); if (data.remaining() == 0) { data.flip(); processData(); data = dataLengthBuffer = null; } return count; } private void processData() throws IOException, InterruptedException { byte[] bytes = new byte[dataLength]; data.get(bytes); DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes)); int id = dis.readInt(); // try to read an id if (LOG.isDebugEnabled()) LOG.debug(" got #" + id); Writable param = makeParam(); // read param param.readFields(dis); Call call = new Call(id, param, this); synchronized (callQueue) { if (callQueue.size() >= maxQueueSize) { Call oldCall = (Call) callQueue.removeFirst(); LOG.warn("Call queue overflow discarding oldest call " + oldCall); } callQueue.addLast(call); // queue the call callQueue.notify(); // wake up a waiting handler } } private void close() throws IOException { data = null; dataLengthBuffer = null; if (!channel.isOpen()) return; try {socket.shutdownOutput();} catch(Exception e) {} try {out.close();} catch(Exception e) {} try {channelOut.destroy();} catch(Exception e) {} if (channel.isOpen()) { try {channel.close();} catch(Exception e) {} } try {socket.close();} catch(Exception e) {} try {key.cancel();} catch(Exception e) {} key = null; } } /** Handles queued calls . */ private class Handler extends Thread { public Handler(int instanceNumber) { this.setDaemon(true); this.setName("Server handler "+ instanceNumber + " on " + port); } public void run() { LOG.info(getName() + ": starting"); SERVER.set(Server.this); while (running) { try { Call call; synchronized (callQueue) { while (running && callQueue.size()==0) { // wait for a call callQueue.wait(timeout); } if (!running) break; call = (Call)callQueue.removeFirst(); // pop the queue } synchronized (callDequeued) { // tell others we've dequeued callDequeued.notify(); } // throw the message away if it is too old if (System.currentTimeMillis() - call.receivedTime > maxCallStartAge) { LOG.warn("Call " + call.toString() + " discarded for being too old (" + (System.currentTimeMillis() - call.receivedTime) + ")"); continue; } if (LOG.isDebugEnabled()) LOG.debug(getName() + ": has #" + call.id + " from " + call.connection); String errorClass = null; String error = null; Writable value = null; try { value = call(call.param); // make the call } catch (Throwable e) { LOG.info(getName() + " call error: " + e, e); errorClass = e.getClass().getName(); error = getStackTrace(e); } DataOutputStream out = call.connection.out; synchronized (out) { try { out.writeInt(call.id); // write call id out.writeBoolean(error!=null); // write error flag if (error == null) { value.write(out); } else { WritableUtils.writeString(out, errorClass); WritableUtils.writeString(out, error); } out.flush(); } catch (Exception e) { LOG.warn("handler output error", e); synchronized (connectionList) { if (connectionList.remove(call.connection)) numConnections--; } call.connection.close(); } } } catch (Exception e) { LOG.info(getName() + " caught: " + e, e); } } LOG.info(getName() + ": exiting"); } private String getStackTrace(Throwable throwable) { StringWriter stringWriter = new StringWriter(); PrintWriter printWriter = new PrintWriter(stringWriter); throwable.printStackTrace(printWriter); printWriter.flush(); return stringWriter.toString(); } } /** Constructs a server listening on the named port and address. Parameters passed must * be of the named class. The <code>handlerCount</handlerCount> determines * the number of handler threads that will be used to process calls. * */ protected Server(String bindAddress, int port, Class paramClass, int handlerCount, Configuration conf) { this.bindAddress = bindAddress; this.conf = conf; this.port = port; this.paramClass = paramClass; this.handlerCount = handlerCount; this.timeout = conf.getInt("ipc.client.timeout",10000); maxCallStartAge = (long) (timeout * MAX_CALL_QUEUE_TIME); maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER; this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000); this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10); this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000); } /** Constructs a server listening on the named port. Parameters passed must * be of the named class. The <code>handlerCount</handlerCount> determines * the number of handler threads that will be used to process calls. * * @deprecated the bind address should always be specified */ protected Server(int port, Class paramClass, int handlerCount, Configuration conf) { this("0.0.0.0",port,paramClass,handlerCount,conf); } /** Sets the timeout used for network i/o. */ public void setTimeout(int timeout) { this.timeout = timeout; } /** Starts the service. Must be called before any calls will be handled. */ public synchronized void start() throws IOException { listener = new Listener(); listener.start(); for (int i = 0; i < handlerCount; i++) { Handler handler = new Handler(i); handler.start(); } } /** Stops the service. No new calls will be handled after this is called. */ public synchronized void stop() { LOG.info("Stopping server on " + port); running = false; listener.doStop(); notifyAll(); } /** Wait for the server to be stopped. * Does not wait for all subthreads to finish. * See {@link #stop()}. */ public synchronized void join() throws InterruptedException { while (running) { wait(); } } /** Called for each call. */ public abstract Writable call(Writable param) throws IOException; private Writable makeParam() { Writable param; // construct param try { param = (Writable)paramClass.newInstance(); if (param instanceof Configurable) { ((Configurable)param).setConf(conf); } } catch (InstantiationException e) { throw new RuntimeException(e.toString()); } catch (IllegalAccessException e) { throw new RuntimeException(e.toString()); } return param; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -