📄 nonblockingclienthandler.java
字号:
/**
* Process read
* @return value indicates if the thread should return form run()
*/
private boolean processRead() throws Exception, AppException {
int count = 0;
int fullCount = 0;
ByteBuffer buffer = (ByteBuffer)
getServer().getByteBufferPool().borrowObject();
while(true) {
try {
count = getSocketChannel().read(buffer);
if(count<=0) {
//logger.finest("SocketChannel read was "+count+"!");
getServer().getByteBufferPool().returnObject(buffer);
buffer = null;
break;
} else {
fullCount += count;
}
buffer.flip(); // Make readable
readByteBuffer.add(buffer);
buffer = (ByteBuffer)
getServer().getByteBufferPool().borrowObject();
} catch(Exception error) {
logger.finest("Error in data read: "+error);
lost = true;
synchronized(getInputStream()) {
getInputStream().notifyAll();
}
throw error;
} finally {
if(buffer!=null && count<=0) {
getServer().getByteBufferPool().returnObject(buffer);
buffer = null;
}
}
}//end while
if(count<0) {
logger.finest("SocketChannel read was "+count+"!");
lost = true;
synchronized(getInputStream()) {
getInputStream().notifyAll();
}
} else {
logger.finest(fullCount+" bytes read");
if(fullCount!=0) {
updateLastCommunicationTime();
synchronized(getInputStream()) {
getInputStream().notify(); //if any are waiting
}
if(hasEvent(ClientEvent.ACCEPT) == false) {
processGotDataInBuffers();
}
}
//check if any data was read but not yet processed
while(getInputStream().available()>0) {
logger.finest("Sending again for processing...");
if(hasEvent(ClientEvent.ACCEPT) == false) {
processGotDataInBuffers();
break;
} else {
synchronized(getInputStream()) {
getInputStream().notifyAll();
}
Thread.sleep(100);
}
}
if(connection) {
registerForRead();
//getSelectionKey().selector().wakeup();
returnThread(); //return to pool
return true;
}
}//end of else
logger.finest("We don't have connection, lets return all resources.");
return false;
}
/**
* Process write
* @return value indicates if the thread should return form run()
*/
private boolean processWrite() throws IOException {
updateLastCommunicationTime();
boolean flag = byteBufferOutputStream.writeAllByteBuffer();
if(flag==false) {
registerWrite();
} else if(/*flag==true && */clientWriteHandler!=null) {
clientWriteHandler.handleWrite(this);
}
if(connection) {
returnThread(); //return to pool
return true;
} else {
logger.finest("We don't have connection, lets return all resources.");
}
return false;
}
protected void returnThread() {
threadAccessCount--;
Assertion.affirm(threadAccessCount>=0, "ThreadAccessCount went less the 0! Value: "+threadAccessCount);
//return is done at ClientThread end
removeEvent((ClientEvent)threadEvent.get());
}
protected void returnClientHandler() {
logger.finest(getName());
try {
for(int i=0;threadAccessCount!=0;i++) {
if(i==100) {
logger.warning("ClientHandler must have got into a loop waiting for thread to free up! ThreadAccessCount="+threadAccessCount);
threadAccessCount = 0;
if(Assertion.isEnabled()) {
assertionSystemExit();
} else {
break;
}
}
if(threadAccessCount<=0) break;
logger.finest("Waiting for other thread of "+getName()+" to finish");
Thread.sleep(60);
}
} catch(InterruptedException ie) {
appLogger.warning("InterruptedException: "+ie);
}
super.returnClientHandler();
}
public void setDataMode(DataMode dataMode, DataType dataType)
throws IOException {
if(getDataMode(dataType)==dataMode) return;
appLogger.fine("Setting Type:"+dataType+", Mode:"+dataMode);
super.checkDataModeSet(dataMode, dataType);
setDataModeNonBlocking(dataMode, dataType);
}
private void setDataModeNonBlocking(DataMode dataMode, DataType dataType)
throws IOException {
logger.finest("ENTER");
if(dataMode == DataMode.STRING) {
if(dataType == DataType.OUT) {
if(dataModeOUT == DataMode.BYTE || dataModeOUT == DataMode.BINARY) {
dataModeOUT = dataMode;
} else if(dataModeOUT == DataMode.OBJECT) {
dataModeOUT = dataMode;
o_out.flush(); o_out = null;
b_out = new BufferedOutputStream(out);
} else {
Assertion.affirm(false, "Unknown DataType.OUT DataMode - "+dataModeOUT);
}
Assertion.affirm(b_out!=null, "BufferedOutputStream is still null!");
Assertion.affirm(o_out==null, "ObjectOutputStream is still not null!");
} else if(dataType == DataType.IN) {
dataModeIN = dataMode;
if(o_in!=null) {
if(o_in.available()!=0)
logger.warning("Data looks to be present in ObjectInputStream");
o_in = null;
}
b_in = null;
bufferedReader = null;
//input stream will work
Assertion.affirm(in!=null, "InputStream is still null!");
Assertion.affirm(b_in==null, "BufferedInputStream is still not null!");
Assertion.affirm(bufferedReader==null, "BufferedReader is still not null!");
}
} else if(dataMode == DataMode.OBJECT) {
if(dataType == DataType.IN) {
//we will disable this for now
throw new IllegalArgumentException("Can't set DataType.IN mode to OBJECT when blocking mode is set as false!");
}
if(dataType == DataType.OUT) {
dataModeOUT = dataMode;
b_out = null;
o_out = new ObjectOutputStream(out);
Assertion.affirm(o_out!=null, "ObjectOutputStream is still null!");
} /*else if(dataType == DataType.IN) {
dataModeIN = dataMode;
b_in = null;
bufferedReader = null;
//registerForRead();
o_in = new ObjectInputStream(in); //will block
Assertion.affirm(o_in!=null, "ObjectInputStream is still null!");
}*/
} else if(dataMode == DataMode.BYTE || dataMode == DataMode.BINARY) {
if(dataType == DataType.OUT) {
if(dataModeOUT == DataMode.STRING ||
dataModeOUT == DataMode.BYTE ||
dataModeOUT == DataMode.BINARY) {
dataModeOUT = dataMode;
} else if(dataModeOUT == DataMode.OBJECT) {
dataModeOUT = dataMode;
o_out = null;
b_out = new BufferedOutputStream(out);
} else {
Assertion.affirm(false, "Unknown DataType.OUT - DataMode: "+dataModeOUT);
}
Assertion.affirm(b_out!=null, "BufferedOutputStream is still null!");
} else if(dataType == DataType.IN) {
dataModeIN = dataMode;
o_in = null;
bufferedReader = null;
b_in = null;
//input stream will work
Assertion.affirm(in!=null, "InputStream is still null!");
} else {
throw new IllegalArgumentException("Unknown DataType : "+dataType);
}
} else {
throw new IllegalArgumentException("Unknown DataMode : "+dataMode);
}
}
protected byte[] readInputStream() throws IOException {
return readInputStream(getInputStream());
}
public void updateInputOutputStreams() throws IOException {
byteBufferOutputStream = new ByteBufferOutputStream(writeByteBuffer, this);
setInputStream( new ByteBufferInputStream(readByteBuffer, this, getCharset()) );
setOutputStream(byteBufferOutputStream);
}
public void setSocketChannel(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
public SocketChannel getSocketChannel() {
return socketChannel;
}
public void setSelectionKey(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
}
public SelectionKey getSelectionKey() {
if(selectionKey==null)
selectionKey = getSocketChannel().keyFor(getServer().getSelector());
return selectionKey;
}
private void processGotDataInBuffers() throws AppException,
ConnectionLostException, ClassNotFoundException, IOException {
if(getInputStream().available()==0) return;
logger.finest("Trying to process got data.. DataMode.IN="+dataModeIN);
AuthStatus authStatus = null;
//--For debug
//((ByteBufferInputStream) getInputStream()).dumpContent();
String temp = null;
String rec = null;
Object recObject = null;
byte[] recByte = null;
boolean timeToCheckForNewLineMiss = false;
do {
//updateLastCommunicationTime();
if(dataModeIN == DataMode.STRING) {
ByteBufferInputStream bbin = (ByteBufferInputStream)
getInputStream();
timeToCheckForNewLineMiss = true;
while(bbin.isLineReady()) {
rec = bbin.readLine();
if(rec==null) {
lost = true;
return;
}
if(getCommunicationLogging() && authorised == true) {
appLogger.fine("Got STRING ["+getHostAddress()+"] : "+
rec);
}
if(authorised == false)
authStatus = clientAuthenticationHandler.handleAuthentication(this, rec);
else
clientCommandHandler.handleCommand(this, rec);
if(isClosed()==true) return;
while(authStatus==AuthStatus.FAILURE)
authStatus = processAuthorisation();
if(authStatus==AuthStatus.SUCCESS)
authorised = true;
if(dataModeIN != DataMode.STRING) {
break;
}
timeToCheckForNewLineMiss = false;
}//end of while
if(timeToCheckForNewLineMiss && bbin.availableOnlyInByteBuffer()==0) {
return;
} else {
timeToCheckForNewLineMiss = false;
}
}
//} else if(dataModeIN == DataMode.OBJECT) {
/*
while(dataModeIN == DataMode.OBJECT && o_in!=null) {
recObject = o_in.readObject();
if(recObject==null) {
lost = true;
return;
}
if(getCommunicationLogging() && authorised == true) {
appLogger.fine("Got OBJECT ["+getHostAddress()+"] : "+
recObject.toString());
}
if(authorised == false)
authStatus = clientAuthenticationHandler.handleAuthentication(this, recObject);
else
clientObjectHandler.handleObject(this, recObject);
if(isClosed()==true) return;
while(authStatus==AuthStatus.FAILURE)
authStatus = processAuthorisation();
if(authStatus==AuthStatus.SUCCESS)
authorised = true;
}
*/
//} else if(dataModeIN == DataMode.BYTE) {
while(dataModeIN == DataMode.BYTE && getInputStream().available()!=0) {
rec = readBytes();
if(rec==null) {
lost = true;
return;
}
if(getCommunicationLogging() && authorised == true) {
appLogger.fine("Got BYTE ["+getHostAddress()+"] : "+rec);
}
if(authorised == false)
authStatus = clientAuthenticationHandler.handleAuthentication(this, rec);
else
clientCommandHandler.handleCommand(this, rec);
if(isClosed()==true) return;
while(authStatus==AuthStatus.FAILURE)
authStatus = processAuthorisation();
if(authStatus==AuthStatus.SUCCESS)
authorised = true;
}
//} else if(dataModeIN == DataMode.BINARY) {
while(dataModeIN == DataMode.BINARY && getInputStream().available()!=0) {
recByte = readBinary();
if(recByte==null) {
lost = true;
return;
}
if(getCommunicationLogging() && authorised == true) {
appLogger.fine("Got BINARY ["+getHostAddress()+"] : "+
MyString.getMemInfo(recByte.length));
}
if(authorised == false)
authStatus = clientAuthenticationHandler.handleAuthentication(this, recByte);
else
clientBinaryHandler.handleBinary(this, recByte);
if(isClosed()==true) return;
while(authStatus==AuthStatus.FAILURE)
authStatus = processAuthorisation();
if(authStatus==AuthStatus.SUCCESS)
authorised = true;
}
//} else {
if(dataModeIN != DataMode.STRING && dataModeIN != DataMode.OBJECT
&& dataModeIN != DataMode.BYTE && dataModeIN != DataMode.BINARY) {
throw new IllegalStateException("Incoming DataMode is not supported : "+dataModeIN);
}
} while(getInputStream().available()!=0);
}
public void registerForRead()
throws IOException, ClosedChannelException {
try {
if(getSelectionKey()==null) {
boolean flag = getServer().registerChannel(getSocketChannel(),
SelectionKey.OP_READ, this);
if(flag) {
logger.finest("Adding OP_READ as interest Ops for "+getName());
} else if(ByteBufferOutputStream.isLoggable(Level.FINEST)) {
logger.finest("OP_READ is already present in interest Ops for "+getName());
}
} else if(getSelectionKey().isValid()) {
if((getSelectionKey().interestOps() & SelectionKey.OP_READ) == 0 ) {
logger.finest("Adding OP_READ to interest Ops for "+getName());
removeEvent(ClientEvent.READ);
getSelectionKey().interestOps(getSelectionKey().interestOps()
| SelectionKey.OP_READ);
if(wakeupSelectorAfterRegisterRead) {
getServer().getSelector().wakeup();
}
} else {
if(ByteBufferOutputStream.isLoggable(Level.FINEST)) {
logger.finest("OP_READ is already present in interest Ops for "+getName());
}
}
} else {
throw new IOException("SelectionKey is invalid!");
}
} catch(CancelledKeyException e) {
throw new IOException("SelectionKey is cancelled!");
}
}
public void registerForWrite()
throws IOException, ClosedChannelException {
if(hasEvent(ClientEvent.RUN_BLOCKING) || hasEvent(ClientEvent.MAX_CON_BLOCKING)) {
throw new IllegalStateException("This method is only allowed under Non-Blocking mode.");
}
if(clientWriteHandler==null) {
throw new IllegalStateException("ClientWriteHandler has not been set!");
}
registerWrite();
}
public void registerWrite() throws IOException {
try {
if(getSelectionKey()==null) {
boolean flag = getServer().registerChannel(getSocketChannel(),
SelectionKey.OP_WRITE, this);
if(flag) {
logger.finest("Adding OP_WRITE as interest Ops for "+getName());
} else if(ByteBufferOutputStream.isLoggable(Level.FINEST)) {
logger.finest("OP_WRITE is already present in interest Ops for "+getName());
}
} else if(getSelectionKey().isValid()) {
if((getSelectionKey().interestOps() & SelectionKey.OP_WRITE) == 0 ) {
logger.finest("Adding OP_WRITE to interest Ops for "+getName());
removeEvent(ClientEvent.WRITE);
getSelectionKey().interestOps(getSelectionKey().interestOps()
| SelectionKey.OP_WRITE);
if(wakeupSelectorAfterRegisterWrite) {
getServer().getSelector().wakeup();
}
} else {
if(ByteBufferOutputStream.isLoggable(Level.FINEST)) {
logger.finest("OP_WRITE is already present in interest Ops for "+getName());
}
}
} else {
throw new IOException("SelectionKey is invalid!");
}
} catch(CancelledKeyException e) {
throw new IOException("SelectionKey is cancelled!");
}
}
protected void setClientWriteHandler(ClientWriteHandler handler) {
clientWriteHandler=handler;
}
/**
* Returns number of thread currently in this object.
* @since 1.4.6
*/
public int getThreadAccessCount() {
return threadAccessCount;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -