📄 pooltcpendpoint.java
字号:
public void startEndpoint() throws IOException, InstantiationException { if (!initialized) { initEndpoint(); } if(isPool) { tp.start(); } running = true; if(isPool) { listener = new TcpWorkerThread(this); tp.runIt(listener); } else { log.error("XXX Error - need pool !"); } } public void stopEndpoint() { if (running) { tp.shutdown(); running = false; if (serverSocket != null) { closeServerSocket(); } } } protected void closeServerSocket() { try { // Need to create a connection to unlock the accept(); Socket s; if (inet == null) { s=new Socket("127.0.0.1", port ); }else{ s=new Socket(inet, port ); // setting soLinger to a small value will help shutdown the // connection quicker s.setSoLinger(true, 0); } s.close(); } catch(Exception e) { log.error("Caught exception trying to unlock accept.", e); } try { serverSocket.close(); } catch(Exception e) { log.error("Caught exception trying to close socket.", e); } serverSocket = null; } // -------------------- Private methods Socket acceptSocket() { if( !running || serverSocket==null ) return null; Socket accepted = null; try { if(factory==null) { accepted = serverSocket.accept(); } else { accepted = factory.acceptSocket(serverSocket); } if(!running && (null != accepted)) { accepted.close(); // rude, but unlikely! accepted = null; } if( factory != null && accepted != null) factory.initSocket( accepted ); } catch(InterruptedIOException iioe) { // normal part -- should happen regularly so // that the endpoint can release if the server // is shutdown. } catch (IOException e) { String msg = null; if (running) { msg = sm.getString("endpoint.err.nonfatal", serverSocket, e); log.error(msg, e); } if (accepted != null) { try { accepted.close(); accepted = null; } catch(Exception ex) { msg = sm.getString("endpoint.err.nonfatal", accepted, ex); log.warn(msg, ex); } } reinitializing = true; // Restart endpoint when getting an IOException during accept synchronized (threadSync) { if (reinitializing) { reinitializing = false; // 1) Attempt to close server socket closeServerSocket(); initialized = false; // 2) Reinit endpoint (recreate server socket) try { msg = sm.getString("endpoint.warn.reinit"); log.warn(msg); initEndpoint(); } catch (Throwable t) { msg = sm.getString("endpoint.err.nonfatal", serverSocket, t); log.error(msg, t); } // 3) If failed, attempt to restart endpoint if (!initialized) { msg = sm.getString("endpoint.warn.restart"); log.warn(msg); try { stopEndpoint(); initEndpoint(); startEndpoint(); } catch (Throwable t) { msg = sm.getString("endpoint.err.fatal", serverSocket, t); log.error(msg, t); } finally { // Current thread is now invalid: kill it throw new IllegalStateException ("Terminating thread"); } } } } } return accepted; } /** @deprecated */ public void log(String msg) { log.info(msg); } /** @deprecated */ public void log(String msg, Throwable t) { log.error( msg, t ); } /** @deprecated */ public void log(String msg, int level) { log.info( msg ); } /** @deprecated */ public void log(String msg, Throwable t, int level) { log.error( msg, t ); } void setSocketOptions(Socket socket) { try { if(linger >= 0 ) socket.setSoLinger( true, linger); if( tcpNoDelay ) socket.setTcpNoDelay(tcpNoDelay); if( socketTimeout > 0 ) socket.setSoTimeout( socketTimeout ); } catch( SocketException se ) { se.printStackTrace(); } }}// -------------------- Threads --------------------/* * I switched the threading model here. * * We used to have a "listener" thread and a "connection" * thread, this results in code simplicity but also a needless * thread switch. * * Instead I am now using a pool of threads, all the threads are * simmetric in their execution and no thread switch is needed. */class TcpWorkerThread implements ThreadPoolRunnable { /* This is not a normal Runnable - it gets attached to an existing thread, runs and when run() ends - the thread keeps running. It's better to keep the name ThreadPoolRunnable - avoid confusion. We also want to use per/thread data and avoid sync wherever possible. */ PoolTcpEndpoint endpoint; SimplePool connectionCache; static final boolean usePool=false; public TcpWorkerThread(PoolTcpEndpoint endpoint) { this.endpoint = endpoint; if( usePool ) { connectionCache = new SimplePool(endpoint.getMaxThreads()); for(int i = 0 ; i < endpoint.getMaxThreads()/2 ; i++) { connectionCache.put(new TcpConnection()); } } } public Object[] getInitData() { if( usePool ) { return endpoint.getConnectionHandler().init(); } else { // no synchronization overhead, but 2 array access Object obj[]=new Object[2]; obj[1]= endpoint.getConnectionHandler().init(); obj[0]=new TcpConnection(); return obj; } } public void runIt(Object perThrData[]) { // Create per-thread cache while(endpoint.isRunning()) { Socket s = null; try { s = endpoint.acceptSocket(); } catch (Throwable t) { endpoint.log.error("Exception in acceptSocket", t); throw new IllegalStateException("Terminating thread"); } if(null != s) { // Continue accepting on another thread... endpoint.tp.runIt(this); try { if(endpoint.getServerSocketFactory()!=null) { endpoint.getServerSocketFactory().handshake(s); } } catch (Throwable t) { endpoint.log.debug("Handshake failed", t); // Try to close the socket try { s.close(); } catch (IOException e) {} break; } TcpConnection con = null; try { if( usePool ) { con=(TcpConnection)connectionCache.get(); if( con == null ) con = new TcpConnection(); } else { con = (TcpConnection) perThrData[0]; perThrData = (Object []) perThrData[1]; } con.setEndpoint(endpoint); con.setSocket(s); endpoint.setSocketOptions( s ); endpoint.getConnectionHandler().processConnection(con, perThrData); } catch (Throwable t) { endpoint.log.error("Unexpected error", t); // Try to close the socket try { s.close(); } catch (IOException e) {} } finally { if (con != null) { con.recycle(); if (usePool) { connectionCache.put(con); } } } break; } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -