📄 trnonblockingserver.java
字号:
public boolean
selectSuccess(
VirtualChannelSelector selector,
SocketChannel sc,
Object attachment )
{
try{
int read_result = processor.processRead();
if ( read_result == 0 ) { //read processing is complete
if ( selector_registered ){
read_selector.pauseSelects( sc );
}
}else if ( read_result < 0 ) { //a read error occured
removeAndCloseConnection( processor );
}else{
if ( !selector_registered ){
selector_registered = true;
read_selector.register( sc, this, null );
}
}
return( read_result != 2 );
}catch( Throwable e ){
Debug.printStackTrace(e);
removeAndCloseConnection( processor );
return( false );
}
}
public void
selectFailure(
VirtualChannelSelector selector,
SocketChannel sc,
Object attachment,
Throwable msg )
{
removeAndCloseConnection( processor );
}
};
read_listener.selectSuccess( read_selector, channel, null );
}
}
protected void
readyToWrite(
final TRNonBlockingServerProcessor processor )
{
final VirtualChannelSelector.VirtualSelectorListener write_listener =
new VirtualChannelSelector.VirtualSelectorListener()
{
private boolean selector_registered;
public boolean
selectSuccess(
VirtualChannelSelector selector,
SocketChannel sc,
Object attachment )
{
try{
int write_result = processor.processWrite();
if( write_result > 0 ) { //more writing is needed
if ( selector_registered ){
write_selector.resumeSelects( sc ); //resume for more writing
}else{
selector_registered = true;
write_selector.register( sc, this, null );
}
}else if( write_result == 0 ) { //write processing is complete
removeAndCloseConnection( processor );
}else if( write_result < 0 ) { //a write error occured
processor.failed();
removeAndCloseConnection( processor );
}
return( write_result != 2 );
}catch( Throwable e ){
Debug.printStackTrace(e);
removeAndCloseConnection( processor );
return( false );
}
}
public void
selectFailure(
VirtualChannelSelector selector,
SocketChannel sc,
Object attachment,
Throwable msg )
{
removeAndCloseConnection( processor );
}
};
write_listener.selectSuccess( write_selector, processor.getSocketChannel(), null );
}
protected void
removeAndCloseConnection(
TRNonBlockingServerProcessor processor )
{
processor.completed();
try{
this_mon.enter();
if ( processors.remove( processor )){
read_selector.cancel( processor.getSocketChannel() );
write_selector.cancel( processor.getSocketChannel() );
connections_to_close.add( processor );
}
}finally{
this_mon.exit();
}
}
public void
checkTimeouts(
long now )
{
// we don't particularly care about timeouts if nothing's going on, hence we only
// trigger the check on the arrival of a new connection
/*
String con_rate = "";
String tim_rate = "";
if ( last_stats_time > 0 ){
long time_diff = (now - last_stats_time)/1000;
long conn_diff = total_connections - last_connections;
long tim_diff = total_timeouts - last_timeouts;
con_rate = "" + (conn_diff/time_diff);
tim_rate = "" + (tim_diff/time_diff);
}
System.out.println( "Tracker: con/sec = " + con_rate + ", timeout/sec = " + tim_rate + ", tot_con = " + total_connections+ ", total timeouts = " + total_timeouts +
", current connections = " + processors.size() + ", closing = " + connections_to_close.size());
last_stats_time = now;
last_connections = total_connections;
last_timeouts = total_timeouts;
*/
try{
this_mon.enter();
List new_processors = new ArrayList(processors.size());
for (int i=0;i<processors.size();i++){
TRNonBlockingServerProcessor processor = (TRNonBlockingServerProcessor)processors.get(i);
if ( now - processor.getStartTime() > PROCESSING_GET_LIMIT ){
read_selector.cancel( processor.getSocketChannel() );
write_selector.cancel( processor.getSocketChannel() );
connections_to_close.add( processor );
total_timeouts++;
}else{
new_processors.add( processor );
}
}
processors = new_processors;
}finally{
this_mon.exit();
}
}
public void
closeLoop()
{
// socket channel close ops can block, hence we move it off the main processing loops
// to ensure that a rogue connection doesn't stall us
List pending_list = new ArrayList();
long default_delay = CLOSE_DELAY*2/3;
long delay = default_delay;
while( true ){
// wait a small amount of time to allow the client to close the connection rather
// than us. This prevents a buildup of TIME_WAIT state sockets
if ( delay > 0 ){
try{
Thread.sleep( delay );
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
// System.out.println( "close delay = " + delay + ", pending =" + pending_list.size());
long start = SystemTime.getCurrentTime();
for (int i=0;i<pending_list.size();i++){
try{
TRNonBlockingServerProcessor processor = (TRNonBlockingServerProcessor)pending_list.get(i);
processor.closed();
processor.getSocketChannel().close();
}catch( Throwable e ){
}
}
try{
this_mon.enter();
pending_list = connections_to_close;
connections_to_close = new ArrayList();
}finally{
this_mon.exit();
}
// reduce the sleep time if we're not keeping up
long duration = SystemTime.getCurrentTime() - start;
if ( duration < 0 ){
duration = 0;
}
delay = default_delay - duration;
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -