⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 trnonblockingserver.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
	        		public boolean 
					selectSuccess( 
						VirtualChannelSelector 	selector, 
						SocketChannel 			sc, 
						Object 					attachment ) 
	        		{
	        			try{
		        			int read_result = processor.processRead();
		            
		        			if ( read_result == 0 ) {  //read processing is complete
		        				
		        				if ( selector_registered ){
		        					
		        					read_selector.pauseSelects( sc );
		        				}
		        					        						
		        			}else if ( read_result < 0 ) {  //a read error occured

		        				removeAndCloseConnection( processor );
		        				
		        			}else{
		        			
			        			if ( !selector_registered ){
			        				
			        				selector_registered	= true;
			        				
			        		        read_selector.register( sc, this, null );
			        			}
		        			}
			        			
		        			return( read_result != 2 );
		        			
	              		}catch( Throwable e ){
	            			
	            			Debug.printStackTrace(e);
	            			
	            			removeAndCloseConnection( processor );
	            			
	            			return( false );
	            		}
	        		}
	
	        		public void 
					selectFailure( 
						VirtualChannelSelector 	selector, 
						SocketChannel 			sc, 
						Object 					attachment, 
						Throwable 				msg ) 
	        		{
	        			removeAndCloseConnection( processor );
	        		}	
				};
				
			read_listener.selectSuccess( read_selector, channel, null );
        }
    }

    protected void
	readyToWrite(
		final TRNonBlockingServerProcessor	processor )
    {
        final VirtualChannelSelector.VirtualSelectorListener write_listener = 
        	new VirtualChannelSelector.VirtualSelectorListener() 
			{
        		private boolean	selector_registered;

            	public boolean 
				selectSuccess( 
					VirtualChannelSelector 	selector, 
					SocketChannel 			sc, 
					Object 					attachment ) 
            	{
            		try{
	            		int write_result = processor.processWrite();
	              
	            		if( write_result > 0 ) { //more writing is needed
	            			
	            			if ( selector_registered ){
	            				
		            			write_selector.resumeSelects( sc );  //resume for more writing

	            			}else{
	            				
	            				selector_registered	= true;
	            				
	            				write_selector.register( sc, this, null ); 
	            			}
	            			
	            		}else if( write_result == 0 ) {  //write processing is complete

	            			removeAndCloseConnection( processor );
	
	            		}else if( write_result < 0 ) {  //a write error occured

	            			processor.failed();
	            			
	            			removeAndCloseConnection( processor );
	            		}
	            		
	            		return( write_result != 2 );
	            		
            		}catch( Throwable e ){
            			
            			Debug.printStackTrace(e);
            			
            			removeAndCloseConnection( processor );
            			
            			return( false );
            		}
            	}

            	public void 
				selectFailure( 
					VirtualChannelSelector 	selector, 
					SocketChannel 			sc, 
					Object 					attachment, 
					Throwable 				msg ) 
            	{
            		removeAndCloseConnection( processor );
            	}
			};
  

		write_listener.selectSuccess( write_selector, processor.getSocketChannel(), null );
    }
    
    protected void
    removeAndCloseConnection(
    	TRNonBlockingServerProcessor	processor )
    {
    	processor.completed();
    	
        try{
        	this_mon.enter();
        	
        	if ( processors.remove( processor )){
            
        		read_selector.cancel( processor.getSocketChannel() );
        		write_selector.cancel( processor.getSocketChannel() );
        	
        		connections_to_close.add( processor );
        	}
        	
        }finally{
        	
        	this_mon.exit();
        }
    }
    
	public void
	checkTimeouts(
		long	now )
	{
   		// we don't particularly care about timeouts if nothing's going on, hence we only
		// trigger the check on the arrival of a new connection
	
		/*
		String	con_rate 	= "";
		String	tim_rate	= "";
		
		if ( last_stats_time > 0 ){
						
			long	time_diff = (now - last_stats_time)/1000;
			
			long	conn_diff 	= total_connections - last_connections;
			long	tim_diff	= total_timeouts - last_timeouts;
			
			con_rate = "" + (conn_diff/time_diff);
			tim_rate = "" + (tim_diff/time_diff);
		}

		System.out.println( "Tracker: con/sec = " + con_rate + ", timeout/sec = " + tim_rate + ", tot_con = " + total_connections+ ", total timeouts = " + total_timeouts + 
							", current connections = " + processors.size() + ", closing = " + connections_to_close.size());

		last_stats_time		= now;
		last_connections	= total_connections;
		last_timeouts		= total_timeouts;

		*/
		
		try{
        	this_mon.enter();
        
        	List	new_processors = new ArrayList(processors.size());
        	
        	for (int i=0;i<processors.size();i++){
        		
        		TRNonBlockingServerProcessor	processor = (TRNonBlockingServerProcessor)processors.get(i);
        		
        		if ( now - processor.getStartTime() > PROCESSING_GET_LIMIT ){
              
        			read_selector.cancel( processor.getSocketChannel() );
        			write_selector.cancel( processor.getSocketChannel() );
              
        			connections_to_close.add( processor );
                	            		
        			total_timeouts++;
               		
        		}else{
        			
        			new_processors.add( processor );
        		}
        	}
        	
        	processors	= new_processors;
        	
		}finally{
			
			this_mon.exit();
		}
	}	
	
	public void
	closeLoop()
	{
			// socket channel close ops can block, hence we move it off the main processing loops
			// to ensure that a rogue connection doesn't stall us
		
		List	pending_list	= new ArrayList();
		
		long	default_delay = CLOSE_DELAY*2/3;
		
		long	delay = default_delay;
		
		while( true ){

				// wait a small amount of time to allow the client to close the connection rather
				// than us. This prevents a buildup of TIME_WAIT state sockets
			
			if ( delay > 0 ){
				
				try{
					Thread.sleep( delay );
					
				}catch( Throwable e ){
					
					Debug.printStackTrace(e);
				}
			}
			
			// System.out.println( "close delay = " + delay + ", pending =" + pending_list.size());
			
			long	start = SystemTime.getCurrentTime();
			
	        for (int i=0;i<pending_list.size();i++){
	        	
	        	try{
	        		TRNonBlockingServerProcessor processor = (TRNonBlockingServerProcessor)pending_list.get(i);
	        		
	        		processor.closed();
	        		
	        		processor.getSocketChannel().close();
	        		
	        	}catch( Throwable e ){
	        		
	        	}
	        }
				        	
		    try{
		    	this_mon.enter();
	        
		    	pending_list	= connections_to_close;
		    	
		    	connections_to_close	= new ArrayList();
		    	
	        }finally{
	        	
	        	this_mon.exit();
	        }
	        
	        	// reduce the sleep time if we're not keeping up
	        
	        long	duration = SystemTime.getCurrentTime() - start;
	        
	        if ( duration < 0 ){
	        	
	        	duration	= 0;
	        }
	        
	        delay = default_delay - duration;
		}	
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -