⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nonblockingclienthandler.java

📁 一个用java编写的服务器,对于学习网络编程的人来说是个很好的例子
💻 JAVA
📖 第 1 页 / 共 2 页
字号:

	/**
	 * Process read
	 * @return value indicates if the thread should return form run()
	 */
	private boolean processRead() throws Exception, AppException {
		int count = 0;
		int fullCount = 0;
		ByteBuffer buffer = (ByteBuffer) 
			getServer().getByteBufferPool().borrowObject();

		while(true) {
			try {
				count = getSocketChannel().read(buffer);
				if(count<=0) {
					//logger.finest("SocketChannel read was "+count+"!");
					getServer().getByteBufferPool().returnObject(buffer);
					buffer = null;
					break;
				} else {
					fullCount += count;
				}

				buffer.flip(); // Make readable
				readByteBuffer.add(buffer);

				buffer = (ByteBuffer) 
					getServer().getByteBufferPool().borrowObject();
			} catch(Exception error) {
				logger.finest("Error in data read: "+error);
				lost = true;
				synchronized(getInputStream()) {
					getInputStream().notifyAll();
				}
				throw error;
			} finally {
				if(buffer!=null && count<=0) {
					getServer().getByteBufferPool().returnObject(buffer);
					buffer = null;
				}
			}
		}//end while

		if(count<0) {
			logger.finest("SocketChannel read was "+count+"!");
			lost = true;
			synchronized(getInputStream()) {
				getInputStream().notifyAll();
			}
		} else {
			logger.finest(fullCount+" bytes read");
			if(fullCount!=0) {
				updateLastCommunicationTime();
				synchronized(getInputStream()) {
					getInputStream().notify(); //if any are waiting
				}
				if(hasEvent(ClientEvent.ACCEPT) == false) {
					processGotDataInBuffers();
				}
			}

			//check if any data was read but not yet processed
			while(getInputStream().available()>0) {
				logger.finest("Sending again for processing...");
				if(hasEvent(ClientEvent.ACCEPT) == false) {
					processGotDataInBuffers();
					break;
				} else {
					synchronized(getInputStream()) {
						getInputStream().notifyAll();									
					}
					Thread.sleep(100);								
				}							
			}

			if(connection) {
				registerForRead();
				//getSelectionKey().selector().wakeup();
				returnThread(); //return to pool
				return true;
			}
		}//end of else
		logger.finest("We don't have connection, lets return all resources.");
		return false;
	}

	/**
	 * Process write
	 * @return value indicates if the thread should return form run()
	 */
	private boolean processWrite() throws IOException {
		updateLastCommunicationTime();
		
		boolean flag = byteBufferOutputStream.writeAllByteBuffer();
		
		if(flag==false) {
			registerWrite();
		} else if(/*flag==true && */clientWriteHandler!=null) {
			clientWriteHandler.handleWrite(this);
		}	
		
		if(connection) {
			returnThread(); //return to pool
			return true;
		} else {
			logger.finest("We don't have connection, lets return all resources.");
		}
		return false;
	}

	protected void returnThread() {
		threadAccessCount--;
		Assertion.affirm(threadAccessCount>=0, "ThreadAccessCount went less the 0! Value: "+threadAccessCount);
		//return is done at ClientThread end
		removeEvent((ClientEvent)threadEvent.get());
	}

	protected void returnClientHandler() {
		logger.finest(getName());
		try {
			for(int i=0;threadAccessCount!=0;i++) {
				if(i==100) { 
					logger.warning("ClientHandler must have got into a loop waiting for thread to free up! ThreadAccessCount="+threadAccessCount);
					threadAccessCount = 0;
					if(Assertion.isEnabled()) {
						assertionSystemExit();
					} else {
						break;
					}
				}
				if(threadAccessCount<=0) break;

				logger.finest("Waiting for other thread of "+getName()+" to finish");
				Thread.sleep(60);
			}
		} catch(InterruptedException ie) {
			appLogger.warning("InterruptedException: "+ie);
		}
		super.returnClientHandler();
	}

	public void setDataMode(DataMode dataMode, DataType dataType) 
			throws IOException {
		if(getDataMode(dataType)==dataMode) return;

		appLogger.fine("Setting Type:"+dataType+", Mode:"+dataMode);
		super.checkDataModeSet(dataMode, dataType);

		setDataModeNonBlocking(dataMode, dataType);
	}

	private void setDataModeNonBlocking(DataMode dataMode, DataType dataType) 
			throws IOException {
		logger.finest("ENTER");
		if(dataMode == DataMode.STRING) {
			if(dataType == DataType.OUT) {
				if(dataModeOUT == DataMode.BYTE || dataModeOUT == DataMode.BINARY) {
					dataModeOUT = dataMode;
				} else if(dataModeOUT == DataMode.OBJECT) {
					dataModeOUT = dataMode;
					o_out.flush(); o_out = null;
					b_out = new BufferedOutputStream(out);
				} else {
					Assertion.affirm(false, "Unknown DataType.OUT DataMode - "+dataModeOUT);
				}
				Assertion.affirm(b_out!=null, "BufferedOutputStream is still null!");
				Assertion.affirm(o_out==null, "ObjectOutputStream is still not null!");
			} else if(dataType == DataType.IN) {
				dataModeIN = dataMode;

				if(o_in!=null) {
					if(o_in.available()!=0)
						logger.warning("Data looks to be present in ObjectInputStream");
					o_in = null;
				}
				b_in = null;
				bufferedReader = null;
				//input stream will work
				Assertion.affirm(in!=null, "InputStream is still null!");
				Assertion.affirm(b_in==null, "BufferedInputStream is still not null!");
				Assertion.affirm(bufferedReader==null, "BufferedReader is still not null!");
			}
		} else if(dataMode == DataMode.OBJECT) {
			if(dataType == DataType.IN) {
				//we will disable this for now
				throw new IllegalArgumentException("Can't set DataType.IN mode to OBJECT when blocking mode is set as false!");
			}

			if(dataType == DataType.OUT) {
				dataModeOUT = dataMode;
				b_out = null;
				o_out = new ObjectOutputStream(out);
				Assertion.affirm(o_out!=null, "ObjectOutputStream is still null!");
			} /*else if(dataType == DataType.IN) {
				dataModeIN = dataMode;
				b_in = null;
				bufferedReader = null;
				//registerForRead();
				o_in = new ObjectInputStream(in); //will block	
				Assertion.affirm(o_in!=null, "ObjectInputStream is still null!");
			}*/
		} else if(dataMode == DataMode.BYTE || dataMode == DataMode.BINARY) {
			if(dataType == DataType.OUT) {
				if(dataModeOUT == DataMode.STRING || 
						dataModeOUT == DataMode.BYTE || 
						dataModeOUT == DataMode.BINARY) {
					dataModeOUT = dataMode;
				} else if(dataModeOUT == DataMode.OBJECT) {
					dataModeOUT = dataMode;
					
					o_out = null;
					b_out = new BufferedOutputStream(out);
				} else {
					Assertion.affirm(false, "Unknown DataType.OUT - DataMode: "+dataModeOUT);
				}
				Assertion.affirm(b_out!=null, "BufferedOutputStream is still null!");
			} else if(dataType == DataType.IN) {
				dataModeIN = dataMode;
				o_in = null;
				bufferedReader = null;
				b_in = null;
				//input stream will work
				Assertion.affirm(in!=null, "InputStream is still null!");
			} else {
				throw new IllegalArgumentException("Unknown DataType : "+dataType);
			}
		} else {
			throw new IllegalArgumentException("Unknown DataMode : "+dataMode);
		}
	}

	protected byte[] readInputStream() throws IOException {
		return readInputStream(getInputStream());
	}

	public void updateInputOutputStreams() throws IOException {
		byteBufferOutputStream = new ByteBufferOutputStream(writeByteBuffer, this);
		setInputStream( new ByteBufferInputStream(readByteBuffer, this, getCharset()) );
		setOutputStream(byteBufferOutputStream);
	}

	public void setSocketChannel(SocketChannel socketChannel) {
		this.socketChannel = socketChannel;
	}
	public SocketChannel getSocketChannel() {
		return socketChannel;
	}

	public void setSelectionKey(SelectionKey selectionKey) {
		this.selectionKey = selectionKey;
	}
	public SelectionKey getSelectionKey() {
		if(selectionKey==null)
			selectionKey = getSocketChannel().keyFor(getServer().getSelector());
		return selectionKey;
	}

	private void processGotDataInBuffers() throws AppException, 
			ConnectionLostException, ClassNotFoundException, IOException {
		if(getInputStream().available()==0) return;
		
		logger.finest("Trying to process got data.. DataMode.IN="+dataModeIN);
		AuthStatus authStatus = null;
		
		//--For debug
		//((ByteBufferInputStream) getInputStream()).dumpContent();

		String temp = null;
		String rec = null;
		Object recObject = null;
		byte[] recByte = null;

		boolean timeToCheckForNewLineMiss = false;
		
		do {
			//updateLastCommunicationTime();

			if(dataModeIN == DataMode.STRING) {
				ByteBufferInputStream bbin = (ByteBufferInputStream) 
					getInputStream();
				timeToCheckForNewLineMiss = true;

				while(bbin.isLineReady()) {

					rec = bbin.readLine();
					if(rec==null) {
						lost = true;
						return;
					}
					if(getCommunicationLogging() && authorised == true) {
						appLogger.fine("Got STRING ["+getHostAddress()+"] : "+
							rec);
					}
					
					if(authorised == false)
						authStatus = clientAuthenticationHandler.handleAuthentication(this, rec);
					else
						clientCommandHandler.handleCommand(this, rec);

					if(isClosed()==true) return;

					while(authStatus==AuthStatus.FAILURE)
						authStatus = processAuthorisation();

					if(authStatus==AuthStatus.SUCCESS)
						authorised = true;

					if(dataModeIN != DataMode.STRING) {
						break;
					}

					timeToCheckForNewLineMiss = false;
				}//end of while

				if(timeToCheckForNewLineMiss && bbin.availableOnlyInByteBuffer()==0) {
					return;
				} else {
					timeToCheckForNewLineMiss = false;
				}
			}

			//} else if(dataModeIN == DataMode.OBJECT) {
			/*
			while(dataModeIN == DataMode.OBJECT && o_in!=null) {
				recObject = o_in.readObject();
				if(recObject==null) {
					lost = true;
					return;
				}
				if(getCommunicationLogging() && authorised == true) {
					appLogger.fine("Got OBJECT ["+getHostAddress()+"] : "+
						recObject.toString());
				}


				if(authorised == false)
					authStatus = clientAuthenticationHandler.handleAuthentication(this, recObject);
				else
					clientObjectHandler.handleObject(this, recObject);
				
				if(isClosed()==true) return;

				while(authStatus==AuthStatus.FAILURE)
					authStatus = processAuthorisation();
				
				if(authStatus==AuthStatus.SUCCESS)
					authorised = true;
			}
			*/
			
			//} else if(dataModeIN == DataMode.BYTE) {
			while(dataModeIN == DataMode.BYTE && getInputStream().available()!=0) {
				rec = readBytes();
				if(rec==null) {
					lost = true;
					return;
				}
				if(getCommunicationLogging() && authorised == true) {
					appLogger.fine("Got BYTE ["+getHostAddress()+"] : "+rec);
				}

				if(authorised == false)
					authStatus = clientAuthenticationHandler.handleAuthentication(this, rec);
				else
					clientCommandHandler.handleCommand(this, rec);

				if(isClosed()==true) return;

				while(authStatus==AuthStatus.FAILURE)
					authStatus = processAuthorisation();

				if(authStatus==AuthStatus.SUCCESS)
					authorised = true;
			}

			//} else if(dataModeIN == DataMode.BINARY) {
			while(dataModeIN == DataMode.BINARY && getInputStream().available()!=0) {
				recByte = readBinary();
				if(recByte==null) {
					lost = true;
					return;
				}
				if(getCommunicationLogging() && authorised == true) {
					appLogger.fine("Got BINARY ["+getHostAddress()+"] : "+
						MyString.getMemInfo(recByte.length));
				}

				if(authorised == false)
					authStatus = clientAuthenticationHandler.handleAuthentication(this, recByte);
				else
					clientBinaryHandler.handleBinary(this, recByte);

				if(isClosed()==true) return;

				while(authStatus==AuthStatus.FAILURE)
					authStatus = processAuthorisation();

				if(authStatus==AuthStatus.SUCCESS)
					authorised = true;
			}

			//} else {
			if(dataModeIN != DataMode.STRING && dataModeIN != DataMode.OBJECT 
				&& dataModeIN != DataMode.BYTE && dataModeIN != DataMode.BINARY) {
				throw new IllegalStateException("Incoming DataMode is not supported : "+dataModeIN);
			}
		} while(getInputStream().available()!=0);
	}

	public void registerForRead() 
			throws IOException, ClosedChannelException {
		try {		
			if(getSelectionKey()==null) {
				boolean flag = getServer().registerChannel(getSocketChannel(), 
					SelectionKey.OP_READ, this);
				if(flag) {
					logger.finest("Adding OP_READ as interest Ops for "+getName());
				} else if(ByteBufferOutputStream.isLoggable(Level.FINEST)) {
					logger.finest("OP_READ is already present in interest Ops for "+getName());
				}
			} else if(getSelectionKey().isValid()) {
				if((getSelectionKey().interestOps() & SelectionKey.OP_READ) == 0 ) {
					logger.finest("Adding OP_READ to interest Ops for "+getName());
					removeEvent(ClientEvent.READ);
					getSelectionKey().interestOps(getSelectionKey().interestOps() 
						| SelectionKey.OP_READ);
					if(wakeupSelectorAfterRegisterRead) {
						getServer().getSelector().wakeup();
					}
				} else {
					if(ByteBufferOutputStream.isLoggable(Level.FINEST)) {
						logger.finest("OP_READ is already present in interest Ops for "+getName());
					}
				}
			} else {
				throw new IOException("SelectionKey is invalid!");
			}
		} catch(CancelledKeyException e) {
			throw new IOException("SelectionKey is cancelled!");
		}
	}

	public void registerForWrite() 
			throws IOException, ClosedChannelException {
		if(hasEvent(ClientEvent.RUN_BLOCKING) || hasEvent(ClientEvent.MAX_CON_BLOCKING)) {
			throw new IllegalStateException("This method is only allowed under Non-Blocking mode.");
		}

		if(clientWriteHandler==null) {
			throw new IllegalStateException("ClientWriteHandler has not been set!");
		}
		registerWrite();
	}
	
	public void registerWrite() throws IOException {
		try {
			if(getSelectionKey()==null) {				
				boolean flag = getServer().registerChannel(getSocketChannel(), 
						SelectionKey.OP_WRITE, this);
				if(flag) {
					logger.finest("Adding OP_WRITE as interest Ops for "+getName());
				} else if(ByteBufferOutputStream.isLoggable(Level.FINEST)) {
					logger.finest("OP_WRITE is already present in interest Ops for "+getName());
				}
			} else if(getSelectionKey().isValid()) {
				if((getSelectionKey().interestOps() & SelectionKey.OP_WRITE) == 0 ) {
					logger.finest("Adding OP_WRITE to interest Ops for "+getName());
					removeEvent(ClientEvent.WRITE);
					getSelectionKey().interestOps(getSelectionKey().interestOps() 
						| SelectionKey.OP_WRITE);
					if(wakeupSelectorAfterRegisterWrite) {
						getServer().getSelector().wakeup();
					}
				} else {
					if(ByteBufferOutputStream.isLoggable(Level.FINEST)) {
						logger.finest("OP_WRITE is already present in interest Ops for "+getName());
					}
				}
			} else {
				throw new IOException("SelectionKey is invalid!");
			}
		} catch(CancelledKeyException e) {
			throw new IOException("SelectionKey is cancelled!");
		}
	}

	protected void setClientWriteHandler(ClientWriteHandler handler) {
		clientWriteHandler=handler;
	}

	/**
	 * Returns number of thread currently in this object.
	 * @since 1.4.6
	 */
	public int getThreadAccessCount() {
		return threadAccessCount;
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -