uploadmanager.java

来自「Azureus is a powerful, full-featured, cr」· Java 代码 · 共 236 行

JAVA
236
字号
/*
 * Created on Oct 7, 2004
 * Created by Alon Rohter
 * Copyright (C) 2004 Aelitis, All Rights Reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 * 
 * AELITIS, SARL au capital de 30,000 euros
 * 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
 *
 */

package com.aelitis.azureus.core.peermanager;

import java.util.*;

import org.gudy.azureus2.core3.config.*;
import org.gudy.azureus2.core3.util.AEMonitor;

import com.aelitis.azureus.core.networkmanager.*;
import com.aelitis.azureus.core.peermanager.messages.ProtocolMessage;
import com.aelitis.azureus.core.peermanager.messages.bittorrent.BTProtocolMessage;


/**
 *
 */
public class UploadManager {
  private static final int UNLIMITED_WRITE_RATE = 1024 * 1024 * 100; //100 mbyte/s
  
  private static final UploadManager instance = new UploadManager();
  
  private int standard_max_rate_bps;
  private final ByteBucket standard_bucket;
  private final HashMap standard_peer_connections = new HashMap();
  private final AEMonitor standard_peer_connections_mon = new AEMonitor( "UploadManager:SPC" );
  private final UploadEntityController standard_entity_controller;
  
  private final HashMap group_buckets = new HashMap();
  private final AEMonitor group_buckets_mon = new AEMonitor( "UploadManager:GB" );
  
  
  private UploadManager() {
    int max_rateKBs = COConfigurationManager.getIntParameter( "Max Upload Speed KBs" );
    standard_max_rate_bps = max_rateKBs == 0 ? UNLIMITED_WRITE_RATE : max_rateKBs * 1024;
    COConfigurationManager.addParameterListener( "Max Upload Speed KBs", new ParameterListener() {
      public void parameterChanged( String parameterName ) {
        int rateKBs = COConfigurationManager.getIntParameter( "Max Upload Speed KBs" );
        standard_max_rate_bps = rateKBs == 0 ? UNLIMITED_WRITE_RATE : rateKBs * 1024;
      }
    });
    
    standard_bucket = new ByteBucket( standard_max_rate_bps ); 
    
    standard_entity_controller = new UploadEntityController( new RateHandler() {
      public int getCurrentNumBytesAllowed() {
        if( standard_bucket.getRate() != standard_max_rate_bps ) { //sync rate
          standard_bucket.setRate( standard_max_rate_bps );
        }
        return standard_bucket.getAvailableByteCount();
      }
      
      public void bytesWritten( int num_bytes_written ) {
        standard_bucket.setBytesUsed( num_bytes_written );
      }
    });
  }
  
  
  /**
   * Get the singleton instance of the upload manager.
   * @return the upload manager
   */
  public static UploadManager getSingleton() {  return instance;  }
  


  
  public void registerStandardPeerConnection( final Connection connection, final LimitedRateGroup group ) {
    final ConnectionData conn_data = new ConnectionData();
    
    OutgoingMessageQueue.MessageQueueListener listener = new OutgoingMessageQueue.MessageQueueListener() {
      public void messageAdded( ProtocolMessage message ) {
        if( message.getType() == BTProtocolMessage.BT_PIECE ) {  //is sending piece data
          if( conn_data.state == ConnectionData.STATE_NORMAL ) {  //so upgrade it
            
            standard_entity_controller.upgradePeerConnection( connection, new RateHandler() {
              public int getCurrentNumBytesAllowed() {                
                //sync global rate
                if( standard_bucket.getRate() != standard_max_rate_bps ) {
                  standard_bucket.setRate( standard_max_rate_bps );
                }
                //sync group rate
                int group_rate = getTranslatedLimit( group );
                if( conn_data.group_bucket.getRate() != group_rate ) {
                  conn_data.group_bucket.setRate( group_rate );
                }
                
                int group_allowed = conn_data.group_bucket.getAvailableByteCount();
                int global_allowed = standard_bucket.getAvailableByteCount();
                
                //reserve bandwidth for the general pool if needed
                if( standard_entity_controller.isGeneralPoolWriteNeeded() ) {
                  global_allowed -= NetworkManager.getSingleton().getTcpMssSize();
                  if( global_allowed < 0 )  global_allowed = 0;
                }
                
                int allowed = group_allowed > global_allowed ? global_allowed : group_allowed;
                return allowed;
              }

              public void bytesWritten( int num_bytes_written ) {
                conn_data.group_bucket.setBytesUsed( num_bytes_written );
                standard_bucket.setBytesUsed( num_bytes_written );
              }
            });
            conn_data.state = ConnectionData.STATE_UPGRADED;
          }
        }
      }

      public void messageSent( ProtocolMessage message ) {
        if( message.getType() == BTProtocolMessage.BT_CHOKE ) {  //is done sending piece data
          if( conn_data.state == ConnectionData.STATE_UPGRADED ) {  //so downgrade it
            standard_entity_controller.downgradePeerConnection( connection );
            conn_data.state = ConnectionData.STATE_NORMAL;
          }
        }
      }

      public void messageRemoved( ProtocolMessage message ) {/*nothing*/}
      public void protocolBytesSent( int byte_count ) {/*ignore*/}
      public void dataBytesSent( int byte_count ) {/*ignore*/}
    };
    
    
    //do group registration
    GroupData group_data;
    try {  group_buckets_mon.enter(); 
      group_data = (GroupData)group_buckets.get( group );
      if( group_data == null ) {
        int limit = getTranslatedLimit( group );
        group_data = new GroupData( new ByteBucket( limit ) );
        group_buckets.put( group, group_data );
      }
      group_data.group_size++;
    }
    finally {  group_buckets_mon.exit();  } 
    
    conn_data.group_bucket = group_data.bucket;
    conn_data.queue_listener = listener;
    conn_data.state = ConnectionData.STATE_NORMAL;
    conn_data.group = group;
    
    try{ standard_peer_connections_mon.enter();
      standard_peer_connections.put( connection, conn_data );
    }
    finally{ standard_peer_connections_mon.exit(); }
    
    connection.getOutgoingMessageQueue().registerQueueListener( listener );
    standard_entity_controller.registerPeerConnection( connection );
  }
  
  
  public void cancelStandardPeerConnection( Connection connection ) {
    ConnectionData conn_data = null;
    
    try{ standard_peer_connections_mon.enter();
      conn_data = (ConnectionData)standard_peer_connections.remove( connection );
    }
    finally{ standard_peer_connections_mon.exit(); }
    
    if( conn_data != null ) {
      connection.getOutgoingMessageQueue().cancelQueueListener( conn_data.queue_listener );
      
      //do group de-registration
      try {  group_buckets_mon.enter(); 
        GroupData group_data = (GroupData)group_buckets.get( conn_data.group );
        if( group_data.group_size == 1 ) { //last of the group
          group_buckets.remove( conn_data.group ); //so remove
        }
        else {
          group_data.group_size--;
        }
      }
      finally {  group_buckets_mon.exit();  }
    }
    
    standard_entity_controller.cancelPeerConnection( connection );
  }
  
  
  private int getTranslatedLimit( LimitedRateGroup group ) {
    int limit = group.getRateLimitBytesPerSecond();
    if( limit == 0 ) {  //unlimited
      limit = UNLIMITED_WRITE_RATE;
    }
    else if( limit < 0 ) {  //disabled
      limit = 0;
    }
    return limit;
  }
  
  
  
  private static class ConnectionData {
    private static final int STATE_NORMAL   = 0;
    private static final int STATE_UPGRADED = 1;
    
    private OutgoingMessageQueue.MessageQueueListener queue_listener;
    private int state;
    private LimitedRateGroup group;
    private ByteBucket group_bucket;
  }

    
  private static class GroupData {
    private final ByteBucket bucket;
    private int group_size = 0;
    private GroupData( ByteBucket bucket ) {
      this.bucket = bucket;
    }
  }
  
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?