⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 incomingsocketchannelmanager.java

📁 基于JXTA开发平台的下载软件开发源代码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
	    					
		    				return( TransportCryptoManager.HandshakeListener.MATCH_CRYPTO_AUTO_FALLBACK );
		    					
	    				}else{
	    					
		    				return( TransportCryptoManager.HandshakeListener.MATCH_CRYPTO_NO_AUTO_FALLBACK );
		    				
	    				}
	    			}
	    		}
	        	});
	        	
	        }
	      });
	      
	      server_selector.start();
	    }
  	}finally{
  		
  		this_mon.exit();
  	}
  }
  
  
  protected void process( TCPTransportHelperFilter filter ) {
    //do timeout check if necessary
    long now = SystemTime.getCurrentTime();
    if( now < last_timeout_check_time || now - last_timeout_check_time > 5*1000 ) {
      doTimeoutChecks();
      last_timeout_check_time = now;
    }
    
    if( match_buffers_cow.isEmpty() ) {  //no match registrations, just close
    	if (Logger.isEnabled())
    		Logger.log(new LogEvent(LOGID, "Incoming TCP connection from ["
    				+ filter.getSocketChannel().socket().getInetAddress().getHostAddress() + ":"
    				+ filter.getSocketChannel().socket().getPort()+ "] dropped because zero routing handlers registered"));
    	NetworkManager.getSingleton().closeSocketChannel( filter.getSocketChannel() );
      return;
    }
    
    //set advanced socket options
    try {
      int so_sndbuf_size = COConfigurationManager.getIntParameter( "network.tcp.socket.SO_SNDBUF" );
      if( so_sndbuf_size > 0 )  filter.getSocketChannel().socket().setSendBufferSize( so_sndbuf_size );
      
      String ip_tos = COConfigurationManager.getStringParameter( "network.tcp.socket.IPTOS" );
      if( ip_tos.length() > 0 )  filter.getSocketChannel().socket().setTrafficClass( Integer.decode( ip_tos ).intValue() );
    }
    catch( Throwable t ) {
      t.printStackTrace();
    }
    
    	// note that the filter may have some data internally queued in it after the crypto handshake decode
    	// (in particular the BT header). However, there should be some data right behind it that will trigger
    	// a read-select below, thus giving prompt access to the queued data
    
    final IncomingConnection ic = new IncomingConnection( filter, max_match_buffer_size );
    
    VirtualChannelSelector	selector = NetworkManager.getSingleton().getReadSelector();
    
    try{  connections_mon.enter();

      connections.add( ic );
      
      selector.register(  ic.filter.getSocketChannel(), this, ic );
      
    } finally {  connections_mon.exit();  } 

    	// might be stuff queued up in the filter - force one process cycle (NAT check in particular )
    
    selectSuccess( selector, ic.filter.getSocketChannel(), ic );
  }
  
  
public boolean selectSuccess( VirtualChannelSelector selector, SocketChannel sc, Object attachment ) {
	IncomingConnection	ic = (IncomingConnection)attachment;
	
    try {                 
    	long bytes_read = ic.filter.read( new ByteBuffer[]{ ic.buffer }, 0, 1 );
      
      if( bytes_read < 0 ) {
        throw new IOException( "end of stream on socket read" );
      }
      
      if( bytes_read == 0 ) {
        return false;
      }
      
      ic.last_read_time = SystemTime.getCurrentTime();
      
      MatchListener listener = checkForMatch( ic.buffer, false );
      
      if( listener == null ) {  //no match found
        if( ic.buffer.position() >= max_match_buffer_size ) { //we've already read in enough bytes to have compared against all potential match buffers
          ic.buffer.flip();
          if (Logger.isEnabled())
          	Logger.log(new LogEvent(LOGID,
          			LogEvent.LT_WARNING,
          			"Incoming TCP stream from ["
          			+ sc.socket().getInetAddress()
          			.getHostAddress()
          			+ ":"
          			+ sc.socket().getPort()
          			+ "] does not match "
          			+ "any known byte pattern: "
          			+ ByteFormatter.nicePrint(ic.buffer.array())));
          removeConnection( ic, true );
        }
      }
      else {  //match found!
        ic.buffer.flip();
        if (Logger.isEnabled())
        	Logger.log(new LogEvent(LOGID,
        			"Incoming TCP stream from ["
        			+ sc.socket().getInetAddress()
        			.getHostAddress()
        			+ ":"
        			+ sc.socket().getPort()
        			+ "] recognized as "
        			+ "known byte pattern: "
        			+ ByteFormatter.nicePrint(ic.buffer.array())));
        removeConnection( ic, false );
        listener.connectionMatched( ic.filter, ic.buffer );
      }
    }
    catch( Throwable t ) {
      try {
      	if (Logger.isEnabled())
      		Logger.log(new LogEvent(LOGID,
      				LogEvent.LT_ERROR,
      				"Incoming TCP connection ["
      				+ sc.socket().getInetAddress()
      				.getHostAddress() + ":"
      				+ sc.socket().getPort()
      				+ "] socket read exception: "
      				+ t.getMessage()));
      }
      catch( Throwable x ) {
        Debug.out( "Caught exception on incoming exception log:" );
        x.printStackTrace();
        System.out.println( "CAUSED BY:" );
        t.printStackTrace();
      }
      
      removeConnection( ic, true );
    }
    
    return true;
  }
  
  //FAILURE
  public void selectFailure( VirtualChannelSelector selector, SocketChannel sc, Object attachment, Throwable msg ) {
	IncomingConnection	ic = (IncomingConnection)attachment;
  	if (Logger.isEnabled())
  		Logger.log(new LogEvent(LOGID, LogEvent.LT_ERROR,
  				"Incoming TCP connection [" + sc
  				+ "] socket select op failure: "
  				+ msg.getMessage()));
    removeConnection( ic, true );
  }
  
  
  private void restart() {
  	try{
  		this_mon.enter();
      	
  		if( server_selector != null ) {	  			  			
  			server_selector.stop();
  			server_selector = null;
  		}
  	}finally{
      		
  		this_mon.exit();
  	}
      	
  	try{ Thread.sleep( 1000 );  }catch( Throwable t ) { t.printStackTrace();  }
      	
  	start();
  }
  
  
  protected void removeConnection( IncomingConnection connection, boolean close_as_well ) {
    try{  connections_mon.enter();
    
      NetworkManager.getSingleton().getReadSelector().cancel( connection.filter.getSocketChannel() );  //cancel read op
      connections.remove( connection );   //remove from connection list
      
    } finally {  connections_mon.exit();  }
    
    if( close_as_well ) {
      NetworkManager.getSingleton().closeSocketChannel( connection.filter.getSocketChannel() );  //async close it
    }
  }
  

  protected MatchListener 
  checkForMatch( 
		 ByteBuffer to_check, boolean min_match ) 
  { 
       //remember original values for later restore
      int orig_position = to_check.position();
      int orig_limit = to_check.limit();
      
      //rewind
      to_check.position( 0 );

      MatchListener listener = null;
           
      for( Iterator i = match_buffers_cow.entrySet().iterator(); i.hasNext(); ) {
        Map.Entry entry = (Map.Entry)i.next();
        NetworkManager.ByteMatcher bm = (NetworkManager.ByteMatcher)entry.getKey();
        
        if ( min_match ){
            if( orig_position < bm.minSize() ) {  //not enough bytes yet to compare
  	          continue;
  	        }
  	                
  	        if( bm.minMatches( to_check ) ) {  //match found!
  	          listener = (MatchListener)entry.getValue();
  	          break;
  	        }      	
        }else{
	        if( orig_position < bm.size() ) {  //not enough bytes yet to compare
	          continue;
	        }
	                
	        if( bm.matches( to_check ) ) {  //match found!
	          listener = (MatchListener)entry.getValue();
	          break;
	        }
        }
      }

      //restore original values in case the checks changed them
      to_check.position( orig_position );
      to_check.limit( orig_limit );
      
      return listener;
  }
  
  protected void doTimeoutChecks() {
    try{  connections_mon.enter();

      ArrayList to_close = null;
      long now = SystemTime.getCurrentTime();
      
      for( int i=0; i < connections.size(); i++ ) {
        IncomingConnection ic = (IncomingConnection)connections.get( i );
        
        if( ic.last_read_time > 0 ) {  //at least one read op has occured
          if( now < ic.last_read_time ) {  //time went backwards!
            ic.last_read_time = now;
          }
          else if( now - ic.last_read_time > READ_TIMEOUT ) {  //10s read timeout
          	if (Logger.isEnabled())
							Logger.log(new LogEvent(LOGID, "Incoming TCP connection ["
									+ ic.filter.getSocketChannel().socket().getInetAddress().getHostAddress() + ":"
									+ ic.filter.getSocketChannel().socket().getPort()
									+ "] forcibly timed out due to socket read inactivity ["
									+ ic.buffer.position() + " bytes read: "
									+ new String(ic.buffer.array()) + "]"));
            if( to_close == null )  to_close = new ArrayList();
            to_close.add( ic );
          }
        }
        else { //no bytes have been read yet
          if( now < ic.initial_connect_time ) {  //time went backwards!
            ic.initial_connect_time = now;
          }
          else if( now - ic.initial_connect_time > CONNECT_TIMEOUT ) {  //60s connect timeout
          	if (Logger.isEnabled())
							Logger.log(new LogEvent(LOGID, "Incoming TCP connection ["
									+ ic.filter.getSocketChannel() + "] forcibly timed out after "
									+ "60sec due to socket inactivity"));
            if( to_close == null )  to_close = new ArrayList();
            to_close.add( ic );
          }
        }
      }
      
      if( to_close != null ) {
        for( int i=0; i < to_close.size(); i++ ) {
          IncomingConnection ic = (IncomingConnection)to_close.get( i );
          removeConnection( ic, true );
        }
      }
      
    } finally {  connections_mon.exit();  }
  }
  
  
  
    
 
  
  protected static class IncomingConnection {
  	protected final TCPTransportHelperFilter filter;
  	protected final ByteBuffer buffer;
  	protected long initial_connect_time;
  	protected long last_read_time = -1;
    
  	protected IncomingConnection( TCPTransportHelperFilter filter, int buff_size ) {
      this.filter = filter;
      this.buffer = ByteBuffer.allocate( buff_size );
      this.initial_connect_time = SystemTime.getCurrentTime();
    }
  }
  
  
  
    
  
  
  /**
   * Listener for byte matches.
   */
  public interface MatchListener {
    
	  /**
	   * Currently if message crypto is on and default fallback for incoming not
	   * enabled then we would bounce incoming messages from non-crypto transports
	   * For example, NAT check
	   * This method allows auto-fallback for such transports
	   * @return
	   */
	public boolean
	autoCryptoFallback();
	
    /**
     * The given socket has been accepted as matching the byte filter.
     * @param channel matching accepted connection
     * @param read_so_far bytes already read
     */
    public void connectionMatched( TCPTransportHelperFilter filter, ByteBuffer read_so_far );
  }
  
  
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -