⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 transferprocessor.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
字号:
/*
 * Created on Oct 7, 2004
 * Created by Alon Rohter
 * Copyright (C) 2004, 2005, 2006 Aelitis, All Rights Reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 * 
 * AELITIS, SAS au capital de 46,603.30 euros
 * 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
 *
 */

package com.aelitis.azureus.core.networkmanager.impl;

import java.util.*;

import org.gudy.azureus2.core3.util.AEMonitor;

import com.aelitis.azureus.core.networkmanager.LimitedRateGroup;
import com.aelitis.azureus.core.networkmanager.NetworkConnectionBase;




/**
 *
 */
public class TransferProcessor {
  public static final int TYPE_UPLOAD   = 0;
  public static final int TYPE_DOWNLOAD = 1;
  
  private final LimitedRateGroup max_rate;
  
  private final ByteBucket main_bucket;
  private final EntityHandler main_controller;
  
  private final HashMap group_buckets = new HashMap();
  private final HashMap connections = new HashMap();
  private final AEMonitor connections_mon;

  
  /**
   * Create new transfer processor for the given read/write type, limited to the given max rate.
   * @param processor_type read or write processor
   * @param max_rate_limit to use
   */
  public TransferProcessor( int processor_type, LimitedRateGroup max_rate_limit ) {
    this.max_rate = max_rate_limit;
    
    connections_mon = new AEMonitor( "TransferProcessor:" +processor_type );

    main_bucket = new ByteBucket( max_rate.getRateLimitBytesPerSecond() ); 

    main_controller = new EntityHandler( processor_type, new RateHandler() {
      public int getCurrentNumBytesAllowed() {
        if( main_bucket.getRate() != max_rate.getRateLimitBytesPerSecond() ) { //sync rate
          main_bucket.setRate( max_rate.getRateLimitBytesPerSecond() );
        }
        return main_bucket.getAvailableByteCount();
      }
      
      public void bytesProcessed( int num_bytes_written ) {
        main_bucket.setBytesUsed( num_bytes_written );
      }
    });
  }
  

    
  
  /**
   * Register peer connection for upload handling.
   * NOTE: The given max rate limit is ignored until the connection is upgraded.
   * @param connection to register
   * @param group rate limit group
   */
  public void registerPeerConnection( NetworkConnectionBase connection, LimitedRateGroup group ) {
    final ConnectionData conn_data = new ConnectionData();

    try {  connections_mon.enter();
      //do group registration
      GroupData group_data = (GroupData)group_buckets.get( group );
      if( group_data == null ) {
        int limit = NetworkManagerUtilities.getGroupRateLimit( group );
        group_data = new GroupData( new ByteBucket( limit ) );
        group_buckets.put( group, group_data );
      }
      group_data.group_size++;
      conn_data.group = group;
      conn_data.group_data = group_data;
      conn_data.state = ConnectionData.STATE_NORMAL;

      connections.put( connection, conn_data );
    }
    finally {  connections_mon.exit();  }
    
    main_controller.registerPeerConnection( connection );
  }
  
  public boolean isRegistered( NetworkConnectionBase connection ){
    try{ connections_mon.enter();
      return( connections.containsKey( connection ));
    }
    finally{ connections_mon.exit(); }
  }
  
  /**
   * Cancel upload handling for the given peer connection.
   * @param connection to cancel
   */
  public void deregisterPeerConnection( NetworkConnectionBase connection ) {
    try{ connections_mon.enter();
      ConnectionData conn_data = (ConnectionData)connections.remove( connection );
      
      if( conn_data != null ) {
        //do group de-registration
        if( conn_data.group_data.group_size == 1 ) {  //last of the group
          group_buckets.remove( conn_data.group ); //so remove
        }
        else {
          conn_data.group_data.group_size--;
        }
      }
    }
    finally{ connections_mon.exit(); }
    

    main_controller.cancelPeerConnection( connection );
  }
  
  
  

  /**
   * Upgrade the given connection to a high-speed transfer handler.
   * @param connection to upgrade
   */
  public void upgradePeerConnection( final NetworkConnectionBase connection ) {
    ConnectionData connection_data = null;
    
    try{ connections_mon.enter();
      connection_data = (ConnectionData)connections.get( connection );
    }
    finally{ connections_mon.exit(); }
    
    if( connection_data != null && connection_data.state == ConnectionData.STATE_NORMAL ) {
      final ConnectionData conn_data = connection_data;
      
      main_controller.upgradePeerConnection( connection, new RateHandler() {
        public int getCurrentNumBytesAllowed() {          
          // sync global rate
          if( main_bucket.getRate() != max_rate.getRateLimitBytesPerSecond() ) {
            main_bucket.setRate( max_rate.getRateLimitBytesPerSecond() );
          }
          // sync group rate
          int group_rate = NetworkManagerUtilities.getGroupRateLimit( conn_data.group );
          if( conn_data.group_data.bucket.getRate() != group_rate ) {
            conn_data.group_data.bucket.setRate( group_rate );
          }

          int group_allowed = conn_data.group_data.bucket.getAvailableByteCount();
          int global_allowed = main_bucket.getAvailableByteCount();

          // reserve bandwidth for the general pool
          global_allowed -= connection.getMssSize();
          if( global_allowed < 0 ) global_allowed = 0;
          
          int allowed = group_allowed > global_allowed ? global_allowed : group_allowed;
          return allowed;
        }

        public void bytesProcessed( int num_bytes_written ) {
          conn_data.group_data.bucket.setBytesUsed( num_bytes_written );
          main_bucket.setBytesUsed( num_bytes_written );
        }
      });
      
      conn_data.state = ConnectionData.STATE_UPGRADED;
    }
  }
  
  
  /**
   * Downgrade the given connection back to a normal-speed transfer handler.
   * @param connection to downgrade
   */
  public void downgradePeerConnection( NetworkConnectionBase connection ) {
    ConnectionData conn_data = null;
    
    try{ connections_mon.enter();
      conn_data = (ConnectionData)connections.get( connection );
    }
    finally{ connections_mon.exit(); }
    
    if( conn_data != null && conn_data.state == ConnectionData.STATE_UPGRADED ) {
      main_controller.downgradePeerConnection( connection );
      conn_data.state = ConnectionData.STATE_NORMAL;
    }
  }
  
  

  
  private static class ConnectionData {
    private static final int STATE_NORMAL   = 0;
    private static final int STATE_UPGRADED = 1;
    
    private int state;
    private LimitedRateGroup group;
    private GroupData group_data;
  }

    
  private static class GroupData {
    private final ByteBucket bucket;
    private int group_size = 0;
    
    private GroupData( ByteBucket bucket ) {
      this.bucket = bucket;
    }
  }
  
  
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -