📄 abstractlistener.java
字号:
if ( secure ) { complain(LOG_NOTICE, getLogChannel(), "Secure connection requested"); // VT: FIXME: It may make sense to be more flexible in // creating the SSL context, to prevent // NullPointerExceptions from being thrown up try { ss = SSLContextFactory.createContext(password).getServerSocketFactory().createServerSocket(port, 256, addr); } catch ( SSLException sslex ) { complain(LOG_WARNING, getLogChannel(), "Can't establish a secure listener on " + addr + ":" + port, sslex ); complain(LOG_WARNING, getLogChannel(), "Reverting to insecure connection"); } } if ( ss == null ) { ss = new ServerSocket(port, 256, addr); } complain(LOG_NOTICE, getLogChannel(), "Listening on " + addr + ":" + port); } catch ( Throwable t ) { throw new ServiceUnavailableException("Can't listen on " + addr + ":" + port, t); } } /** * Keep accepting the TCP clients. */ protected void execute() throws Throwable { while ( isEnabled() ) { Socket s = ss.accept(); complain(LOG_INFO, getLogChannel(), "Client arrived from " + s.getInetAddress() + ":" + s.getPort()); // Redundant, but still: need to check if we're enabled - // it's been quite a while since we've checked for that, and // we may be shutting down right now. if ( !isEnabled() ) { // They'll see this as a dropped connection, but it // doesn't really make sense to be nice and verbose // about it right now... complain(LOG_WARNING, getLogChannel(), "Shutting down - dropped " + s.getInetAddress() + ":" + s.getPort()); s.close(); return; } BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream())); PrintWriter pw = new PrintWriter(s.getOutputStream()); ConnectionHandler ch = createHandler(s, br, pw); ch.setLogger(getLogger()); // VT: FIXME: It may make perfect sense to split this off // into a separate thread to prevent a denial of service // caused by slowly starting clients // Failure to start the client handler usually means that // the client haven't complied to some of our protocol // handshake requirements. In this case, we won't accept the // client. if ( !ch.start().waitFor() ) { complain(LOG_NOTICE, getLogChannel(), "Client coming from " + s.getInetAddress() + ":" + s.getPort() + " failed to complete handshake - dropped"); return; } else { complain(LOG_INFO, getLogChannel(), "" + s.getInetAddress() + ":" + s.getPort() + ": started"); } if ( isUnique() ) { // Have to shut down all others (actually, there's just // zero or one) synchronized ( clientSet ) { for ( Iterator i = clientSet.iterator(); i.hasNext(); ) { ConnectionHandler oldHandler = (ConnectionHandler)i.next(); oldHandler.send("E Disconnected: another client came from " + s.getInetAddress() + ":" + s.getPort()); oldHandler.stop(); i.remove(); } } } synchronized ( clientSet ) { clientSet.add(ch); } } } protected void shutdown(Throwable cause) throws Throwable { } public int compareTo(Object other) { if ( !other.getClass().equals(getClass()) ) { throw new ClassCastException("Expecting " + getClass().getName() + ", got " + other.getClass().getName()); } Listener l = (Listener)other; return (addr + ":" + port).compareTo(l.addr + ":" + l.port); } } abstract protected class ConnectionHandler extends PassiveService { protected Socket socket; protected BufferedReader br; protected PrintWriter pw; protected Thread parser; public ConnectionHandler(Socket socket, BufferedReader br, PrintWriter pw) { this.socket = socket; this.br = br; this.pw = pw; } /** * Tell the listener what devices we have. */ abstract public void iHave(); /** * Send a message to the client. * * @param message Message to send. */ public synchronized void send(String message) { if ( !isEnabled() ) { throw new IllegalStateException("Not enabled now, stopped?"); } pw.println(message); pw.flush(); } protected void startup() throws Throwable { complain(LOG_INFO, getLogChannel(), "Active connections: " + (++connectionCount)); // Dump all the known data on the client sayHello(); // Start the command parser thread parser = new Thread(createParser()); parser.start(); } protected void sayHello() { } abstract protected CommandParser createParser(); protected void shutdown(Throwable cause) throws Throwable { complain(LOG_INFO, getLogChannel(), "Active connections: " + (--connectionCount)); complain(LOG_NOTICE, getLogChannel(), "Shutting down..."); try { // Shut down the command parser thread, unless it is down // already parser.interrupt(); // Remove ourselves from the client list // VT: NOTE: Have to be careful, because this may happen at the // wrong time - when the clientSet is busy being enumerated. // Since this process is asynchronous, it's OK to get a lock on // it. synchronized ( clientSet ) { clientSet.remove(this); } socket.close(); // VT: FIXME: Do I need this (below)? br.close(); pw.close(); complain(LOG_NOTICE, getLogChannel(), "Shut down"); } catch ( Throwable t ) { complain(LOG_WARNING, getLogChannel(), "Unexpected exception:", t); } if ( connectionCount == 0 ) { complain(LOG_NOTICE, getLogChannel(), "Last active connection is gone, cleaning up"); cleanup(); } } abstract protected class CommandParser implements Runnable { public void run() { while ( true ) { String line = null; try { line = br.readLine(); if ( !isEnabled() ) { complain(LOG_NOTICE, getLogChannel(), "Interrupted, input ignored: '" + line + "'"); return; } if ( line == null ) { complain(LOG_NOTICE, getLogChannel(), "Lost the client"); stop(); return; } // Let's try to make sure that nobody tries to // interfere with us. // VT: FIXME: Just make sure we don't create a deadlock here synchronized ( this ) { parse(line); } } catch ( InterruptedException iex ) { // We've probably been stopped complain(LOG_NOTICE, getLogChannel(), "Interrupted"); if ( isEnabled() ) { stop(); } return; } catch ( SocketException sex ) { // Either a network error occured, or we've been stopped complain(LOG_NOTICE, getLogChannel(), "Socket error: " + sex.getMessage()); if ( isEnabled() ) { stop(); } return; } catch ( SSLException sslex ) { // I don't want to deal with SSL errors... complain(LOG_NOTICE, getLogChannel(), "SSL problem: " + sslex.getMessage()); if ( isEnabled() ) { stop(); } return; } catch ( Throwable t ) { complain(LOG_WARNING, getLogChannel(), "Huh? Command received: '" + line + "'", t); // The exception message is usually empty send("E " + ((t.getMessage() == null) ? "Bad command" : t.getMessage()) + ": " + line); } } } public final void parse(String command) throws Throwable { // VT: NOTE: Make sure the control flow is right. It is // if-elseif-...else, in other words, if there's a match, we // process the command and return if ( "q".equalsIgnoreCase(command) ) { complain(LOG_NOTICE, getLogChannel(), "Client disconnected"); stop(); } else if ( "".equalsIgnoreCase(command) ) { // Nothing, really, they're just checking if we're alive // VT: FIXME: Make sure the heartbeat bug is fixed } else if ( "heartbeat".equals(command) ) { send("OK"); return; } else { parse2(command); } } abstract protected void parse2(String command) throws Throwable; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -