⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nonblockingclienthandler.java

📁 一个用java编写的服务器,对于学习网络编程的人来说是个很好的例子
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*
 * This file is part of the QuickServer library 
 * Copyright (C) QuickServer.org
 *
 * Use, modification, copying and distribution of this software is subject to
 * the terms and conditions of the GNU Lesser General Public License. 
 * You should have received a copy of the GNU LGP License along with this 
 * library; if not, you can download a copy from <http://www.quickserver.org/>.
 *
 * For questions, suggestions, bug-reports, enhancement-requests etc.
 * visit http://www.quickserver.org
 *
 */

package org.quickserver.net.server.impl;

import org.quickserver.net.server.*;
import org.quickserver.net.*;
import org.quickserver.util.*;
import org.quickserver.util.io.*;

import java.io.*;
import java.net.*;
import java.util.*;
import java.util.logging.*;

import java.nio.*;
import java.nio.channels.*;

public class NonBlockingClientHandler extends BasicClientHandler {
	private static final Logger logger = Logger.getLogger(NonBlockingClientHandler.class.getName());

	protected ClientWriteHandler clientWriteHandler; //v1.4.5	
	private SocketChannel socketChannel;

	protected ArrayList readByteBuffer = new ArrayList();
	protected ArrayList writeByteBuffer = new ArrayList();

	protected SelectionKey selectionKey;

	protected volatile int threadAccessCount = 0;
	protected volatile boolean willReturn;
	protected volatile boolean waitingForFinalWrite;

	private static int maxThreadAccessCount = 3; //one for each event ACCEPT, WRITE, READ
	private static boolean wakeupSelectorAfterRegisterWrite = true;
	private static boolean wakeupSelectorAfterRegisterRead = true;

	/**
	 * Sets the flag to wakeup Selector After RegisterForWrite is called.
	 * @since 1.4.7
	 */
	public static void setWakeupSelectorAfterRegisterWrite(boolean flag) {
		wakeupSelectorAfterRegisterWrite = flag;
	}
	/**
	 * Returns wakeupSelectorAfterRegisterWrite the flag that controls if wakeup is called on Selector
	 * after RegisterForWrite is called. 
	 * @since 1.4.7
	 */
	public static boolean getWakeupSelectorAfterRegisterWrite() {
		return wakeupSelectorAfterRegisterWrite;
	}
	
	/**
	 * Sets the flag to wakeup Selector After RegisterForRead is called.
	 * @since 1.4.7
	 */
	public static void setWakeupSelectorAfterRegisterRead(boolean flag) {
		wakeupSelectorAfterRegisterRead = flag;
	}
	/**
	 * Returns wakeupSelectorAfterRegisterRead the flag that controls if wakeup is called on Selector
	 * after RegisterForRead is called. 
	 * @since 1.4.7
	 */
	public static boolean getWakeupSelectorAfterRegisterRead() {
		return wakeupSelectorAfterRegisterRead;
	}

	/**
	 * Sets the maximum count of thread allowed to run objects of this class at a time.
	 * @since 1.4.7
	 */
	public static void setMaxThreadAccessCount(int count) {
		if(count<3 && count!=-1) throw new IllegalArgumentException("Value should be >=3 or -1");
		maxThreadAccessCount = count;
	}
	/**
	 * Returns the maximum count of thread allowed to run objects of this class at a time.
	 * @since 1.4.7
	 */
	public static int getMaxThreadAccessCount() {
		return maxThreadAccessCount;
	}

	//v1.4.7
	private ByteBufferOutputStream byteBufferOutputStream;

	public NonBlockingClientHandler(int instanceCount) {
		super(instanceCount);
	}

	public NonBlockingClientHandler() {
		super();
	}

	public void clean() {
		logger.finest("Starting clean - "+getName());
		if(threadAccessCount!=0) {
			logger.warning("Thread Access Count was not 0!: "+threadAccessCount);
			if(Assertion.isEnabled()) {
				assertionSystemExit();
			}
			threadAccessCount = 0;
		}
				
		while(readByteBuffer.isEmpty()==false) {
			try {
				getServer().getByteBufferPool().returnObject(
					readByteBuffer.remove(0));	
			} catch(Exception er) {
				appLogger.warning("Error in returning read ByteBuffer to pool: "+er);
				break;
			}
		}

		while(writeByteBuffer.isEmpty()==false) {
			try {
				getServer().getByteBufferPool().returnObject(
					writeByteBuffer.remove(0));	
			} catch(Exception er) {
				appLogger.warning("Error in returning write ByteBuffer to pool: "+er);
				break;
			}
		}

		if(selectionKey!=null) {
			selectionKey.cancel();
			selectionKey.selector().wakeup();
			selectionKey = null;
		}
		willReturn = false;	
		waitingForFinalWrite = false;
		socketChannel = null;
		if(byteBufferOutputStream!=null) {
			byteBufferOutputStream.close();
		}

		super.clean();

		clientWriteHandler = null;//1.4.5		
		byteBufferOutputStream = null;

		logger.finest("Finished clean - "+getName());
	}

	protected void finalize() throws Throwable {
		clean();
		super.finalize(); 
	}

	public void handleClient(TheClient theClient) {
		super.handleClient(theClient);
		setClientWriteHandler(theClient.getClientWriteHandler()); //v1.4.5
		setSocketChannel(theClient.getSocketChannel());//1.4.5
	}

	protected void setInputStream(InputStream in) throws IOException {
		this.in = in;
		if(getDataMode(DataType.IN) == DataMode.STRING) {
			b_in = null;
			o_in = null;
			bufferedReader = null;
		} else if(getDataMode(DataType.IN) == DataMode.OBJECT) {
			b_in = null;
			bufferedReader = null;
			o_in = new ObjectInputStream(in);
		} else if(getDataMode(DataType.IN) == DataMode.BYTE || 
				getDataMode(DataType.IN) == DataMode.BINARY) {
			o_in = null;
			bufferedReader = null;
			b_in = null;
		} 
	}

	public BufferedReader getBufferedReader() {
		throw new IllegalStateException("Access to BufferedReader in not allowed in Non-Blocking mode!");
	}

	public void closeConnection() {
		synchronized(this) {
			if(connection==false) return;
			if(waitingForFinalWrite) return;
			if(getSelectionKey()!=null && getSelectionKey().isValid() && lost == false) {
				waitingForFinalWrite = true;
			} else {
				connection = false;
			}
		}

		try	{			
			if(getSocketChannel()!=null && socket!=null) {
				if(waitingForFinalWrite) {
					try {					
						waitTillFullyWritten();	
					} catch(Exception error) {
						logger.warning("Error in waitingForFinalWrite : "+error);
						if(logger.isLoggable(Level.FINE)) {
							logger.fine("StackTrace:\n"+MyString.getStackTrace(error));
						}
					} finally {
						connection = false;
						byteBufferOutputStream.forceNotify();
						getSelectionKey().cancel();
					}
				}//end of waitingForFinalWrite

				
				synchronized(this) {
					if(hasEvent(ClientEvent.MAX_CON)==false) {
						notifyCloseOrLost();
					}
					if(getSocketChannel().isOpen()) {
						logger.finest("Closing SocketChannel");
						getSocketChannel().close();
					}
				}
			}				
			if(getServer()!=null) {
				getServer().getSelector().wakeup();
			}
		} catch(IOException e) {
			logger.warning("Error in closeConnection : "+e);
			if(logger.isLoggable(Level.FINE)) {
				logger.fine("StackTrace:\n"+MyString.getStackTrace(e));
			}
		} catch(NullPointerException npe) {
			logger.fine("NullPointerException: "+npe);
			if(logger.isLoggable(Level.FINE)) {
				logger.fine("StackTrace:\n"+MyString.getStackTrace(npe));
			}
		} 
	}
	
	/**
	 * waitTillFullyWritten
	 * @since 1.4.7
	 */
	public void waitTillFullyWritten() {
		Object waitLock = new Object();
		if(byteBufferOutputStream.isDataAvailableForWrite(waitLock)) {
			if(ByteBufferOutputStream.isLoggable(Level.FINEST)) {
				logger.finest("Waiting "+getName());
			}
			try {
				synchronized(waitLock) {
					waitLock.wait(1000*60*2);//2 min max
				}
			} catch(InterruptedException ie) {
				logger.warning("Error: "+ie);
			}
			if(ByteBufferOutputStream.isLoggable(Level.FINEST)) {
				logger.finest("Done. "+getName());
			}
		}
	}

	public void run() {
		if(unprocessedClientEvents.size()==0) {
			logger.finest("No unprocessed ClientEvents!");
			return;
		}

		synchronized(this) {
			if(willReturn) {
				return;
			} else {
				threadAccessCount++;
			}
		}

		ClientEvent currentEvent = (ClientEvent) unprocessedClientEvents.remove(0);

		if(logger.isLoggable(Level.FINEST)) {
			StringBuffer sb = new StringBuffer();
			sb.append("Running ").append(getName());
			sb.append(" using ");
			sb.append(Thread.currentThread().getName());
			sb.append(" for ");

			synchronized(clientEvents) {
				if(clientEvents.size()>1) {
					sb.append(currentEvent+", Current Events - "+clientEvents);
				} else {
					sb.append(currentEvent);
				}
			}
			logger.finest(sb.toString());
		}

		if(currentEvent==null) {
			threadEvent.set(null);
			return;
		} else {
			threadEvent.set(currentEvent);
		}		

		try {
			if(maxThreadAccessCount!=-1 && threadAccessCount>maxThreadAccessCount) {
				logger.warning("ThreadAccessCount can't go beyond "+maxThreadAccessCount+": "+threadAccessCount);
				if(Assertion.isEnabled()) {
					throw new AssertionError("ThreadAccessCount can't go beyond "+maxThreadAccessCount+": "+threadAccessCount);
				}
				return;
			}

			if(socket==null)
				throw new SocketException("Socket was null!");

			if(getThreadEvent()==ClientEvent.ACCEPT || 
					getThreadEvent()==ClientEvent.MAX_CON) {
				prepareForRun();
				Assertion.affirm(willReturn==false, "WillReturn has to be false!: "+willReturn);
			}

			if(getThreadEvent()==ClientEvent.MAX_CON) {
				processMaxConnection(currentEvent);
			}

			try {
				if(getThreadEvent()==ClientEvent.ACCEPT) {					
					registerForRead();
					clientEventHandler.gotConnected(this);

					if(authorised == false) {						
						if(clientAuthenticationHandler==null && authenticator == null) {
							authorised = true;
							logger.finest("No Authenticator "+getName()+" so return thread.");
						} else {
							if(clientAuthenticationHandler!=null) {
								AuthStatus authStatus = null;
								do {
									authStatus = processAuthorisation();
								} while(authStatus==AuthStatus.FAILURE);

								if(authStatus==AuthStatus.SUCCESS)
									authorised = true;
							} else {
								processAuthorisation();
							}
							if(authorised)
								logger.finest("Authentication done "+getName()+", so return thread.");
							else
								logger.finest("askAuthentication() done "+getName()+", so return thread.");
						}
					}//end authorised
					returnThread(); //return thread to pool
					return;
				}			
				
				if(connection && getThreadEvent()==ClientEvent.READ) {
					if(processRead()) return;
				}

				if(connection && getThreadEvent()==ClientEvent.WRITE) {
					if(processWrite()) return;
				}

			} catch(SocketException e) {
				appLogger.finest("SocketException - Client [" + 
					getHostAddress() +"]: "	+ e.getMessage());
				//e.printStackTrace();
				lost = true;
			} catch(AppException e) {
				//errors from Application
				appLogger.finest("AppException "+Thread.currentThread().getName()+": " 
					+ e.getMessage());		
			} catch(javax.net.ssl.SSLException e) {
				lost = true;
				if(Assertion.isEnabled()) {
					appLogger.info("SSLException - Client ["+getHostAddress()
						+"] "+Thread.currentThread().getName()+": " + e);
				} else {
					appLogger.warning("SSLException - Client ["+
						getHostAddress()+"]: "+e);
				}
			} catch(ConnectionLostException e) {
				lost = true;
				if(e.getMessage()!=null)
					appLogger.finest("Connection lost " +
						Thread.currentThread().getName()+": " + e.getMessage());
				else
					appLogger.finest("Connection lost "+Thread.currentThread().getName());
			} catch(ClosedChannelException e) {
				lost = true;
				appLogger.finest("Channel closed "+Thread.currentThread().getName()+": " + e);
			} catch(IOException e) {
				lost = true;
				appLogger.fine("IOError "+Thread.currentThread().getName()+": " + e);
			} catch(AssertionError er) {
				logger.warning("[AssertionError] "+getName()+" "+er);
				if(logger.isLoggable(Level.FINEST)) {
					logger.finest("StackTrace "+Thread.currentThread().getName()+": "+MyString.getStackTrace(er));
				}
				assertionSystemExit();
			} catch(Error er) {
				logger.warning("[Error] "+er);
				if(logger.isLoggable(Level.FINEST)) {
					logger.finest("StackTrace "+Thread.currentThread().getName()+": "+MyString.getStackTrace(er));
				}
				if(Assertion.isEnabled()) {
					assertionSystemExit();
				}
				lost = true;
			} catch(RuntimeException re) {
				logger.warning("[RuntimeException] "+MyString.getStackTrace(re));
				if(Assertion.isEnabled()) {
					assertionSystemExit();
				}
				lost = true;
			}
			
			if(getThreadEvent()!=ClientEvent.MAX_CON) {
				notifyCloseOrLost();
			}
			
			if(connection) {
				logger.finest(Thread.currentThread().getName()+" calling closeConnection()");
				closeConnection();
			} 
			
			if(connection==true && lost==true && waitingForFinalWrite) {
				byteBufferOutputStream.forceNotify();
			}
		} catch(javax.net.ssl.SSLException se) {
			logger.warning("SSLException "+Thread.currentThread().getName()+" - " + se);
		} catch(IOException ie) {
			logger.warning("IOError "+Thread.currentThread().getName()+" - Closing Client : " + ie);
		} catch(RuntimeException re) {
			logger.warning("[RuntimeException] "+getName()+" "+Thread.currentThread().getName()+" - "+MyString.getStackTrace(re));
			if(Assertion.isEnabled()) {
				assertionSystemExit();
			}
		} catch(Exception e) {
			logger.warning("Error "+Thread.currentThread().getName()+" - Event:"+getThreadEvent()+" - Socket:"+socket+" : "+e);
			logger.fine("StackTrace: "+getName()+"\n"+MyString.getStackTrace(e));
			if(Assertion.isEnabled()) {
				assertionSystemExit();
			}
		} catch(Error e) {
			logger.warning("Error "+Thread.currentThread().getName()+" - Event:"+getThreadEvent()+" - Socket:"+socket+" : "+e);
			logger.fine("StackTrace: "+getName()+"\n"+MyString.getStackTrace(e));
			if(Assertion.isEnabled()) {
				assertionSystemExit();
			}
		}

		synchronized(this) {
			try	{
				if(getSelectionKey()!=null && getSelectionKey().isValid()) {
					logger.finest("Canceling SelectionKey");
					getSelectionKey().cancel();
				}

				if(socket!=null && socket.isClosed()==false) {
					logger.finest("Closing Socket");
					socket.close();
				}

				if(getSocketChannel()!=null && getSocketChannel().isOpen()) {
					logger.finest("Closing SocketChannel");
					socketChannel.close();
				}				
			} catch(Exception re) {
				logger.warning("Error closing Socket/Channel: " +re);
			}
		}//end synchronized

		willClean = true;
		returnClientData();

		boolean returnClientHandler = false;
		synchronized(lockObj) {
			returnThread();
			returnClientHandler = checkReturnClientHandler();
		}

		if(returnClientHandler) {
			returnClientHandler(); //return to pool
		}
	}

	protected boolean checkReturnClientHandler() {
		if(willReturn==false) {
			willReturn = true;
			return true;
		}
		return false;
	}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -