📄 connectdisconnectmanager.java
字号:
try{ new_canceled_mon.enter();
canceled = canceled_requests.contains( request.listener );
}
finally{ new_canceled_mon.exit(); }
if( canceled ) {
closeConnection( request.channel );
}
else {
connect_selector.cancel( request.channel );
request.listener.connectSuccess( request.channel );
}
}
else { //should never happen
Debug.out( "finishConnect() failed" );
request.listener.connectFailure( new Throwable( "finishConnect() failed" ) );
closeConnection( request.channel );
}
}
catch( Throwable t ) {
if( SHOW_CONNECT_STATS ) {
long queue_wait_time = request.connect_start_time - request.request_start_time;
long connect_time = SystemTime.getCurrentTime() - request.connect_start_time;
int num_queued = new_requests.size();
int num_connecting = pending_attempts.size();
System.out.println("F: queue_wait_time="+queue_wait_time+
", connect_time="+connect_time+
", num_queued="+num_queued+
", num_connecting="+num_connecting);
}
request.listener.connectFailure( t );
closeConnection( request.channel );
}
}
private void runSelect() {
//do cancellations
try{
new_canceled_mon.enter();
for (Iterator can_it =canceled_requests.iterator(); can_it.hasNext();) {
ConnectListener key =(ConnectListener) can_it.next();
ConnectionRequest to_remove =null;
for (Iterator pen_it =pending_attempts.keySet().iterator(); pen_it.hasNext();) {
ConnectionRequest request =(ConnectionRequest) pen_it.next();
if (request.listener ==key) {
connect_selector.cancel(request.channel);
closeConnection(request.channel);
to_remove =request;
break;
}
}
if( to_remove != null ) {
pending_attempts.remove( to_remove );
}
}
canceled_requests.clear();
}
finally{
new_canceled_mon.exit();
}
//run select
try{
connect_selector.select(100);
}
catch( Throwable t ) {
Debug.out("connnectSelectLoop() EXCEPTION: ", t);
}
//do connect attempt timeout checks
int num_stalled_requests =0;
final long now =SystemTime.getCurrentTime();
for (Iterator i =pending_attempts.keySet().iterator(); i.hasNext();) {
final ConnectionRequest request =(ConnectionRequest) i.next();
final long waiting_time =now -request.connect_start_time;
if( waiting_time > request.connect_timeout ) {
i.remove();
SocketChannel channel = request.channel;
connect_selector.cancel( channel );
closeConnection( channel );
InetSocketAddress sock_address = request.address;
InetAddress a = sock_address.getAddress();
String target;
if ( a != null ){
target = a.getHostAddress() + ":" + sock_address.getPort();
}else{
target = sock_address.toString();
}
request.listener.connectFailure( new Throwable( "Connection attempt to " + target + " aborted: timed out after " + request.connect_timeout/1000+ "sec" ) );
}
else if( waiting_time >= CONNECT_ATTEMPT_STALL_TIME ) {
num_stalled_requests++;
}
else if( waiting_time < 0 ) { //time went backwards
request.connect_start_time =now;
}
}
//check if our connect queue is stalled, and expand if so
if (num_stalled_requests ==pending_attempts.size() &&pending_attempts.size() <MAX_SIMULTANIOUS_CONNECT_ATTEMPTS) {
ConnectionRequest cr =null;
try{
new_canceled_mon.enter();
if( !new_requests.isEmpty() ) {
cr = (ConnectionRequest)new_requests.removeFirst();
}
}
finally{
new_canceled_mon.exit();
}
if( cr != null ) {
addNewRequest( cr );
}
}
}
private void doClosings() {
try{
pending_closes_mon.enter();
long now = SystemTime.getCurrentTime();
if ( delayed_closes.size() > 0 ){
Iterator it = delayed_closes.entrySet().iterator();
while( it.hasNext()){
Map.Entry entry = (Map.Entry)it.next();
long wait = ((Long)entry.getValue()).longValue() - now;
if ( wait < 0 || wait > 60*1000 ){
pending_closes.addLast( entry.getKey());
it.remove();
}
}
}
while( !pending_closes.isEmpty() ) {
SocketChannel channel = (SocketChannel)pending_closes.removeFirst();
if( channel != null ) {
connect_selector.cancel( channel );
try{
channel.close();
}
catch( Throwable t ) {
/*Debug.printStackTrace(t);*/
}
}
}
}finally{
pending_closes_mon.exit();
}
}
/**
* Request that a new connection be made out to the given address.
* @param address remote ip+port to connect to
* @param listener to receive notification of connect attempt success/failure
*/
public void requestNewConnection( InetSocketAddress address, ConnectListener listener ) {
requestNewConnection( address, listener, CONNECT_ATTEMPT_TIMEOUT );
}
public void requestNewConnection( InetSocketAddress address, ConnectListener listener, long connect_timeout ) {
ConnectionRequest cr = new ConnectionRequest( address, listener, connect_timeout );
try{
new_canceled_mon.enter();
//insert at a random position because new connections are usually added in 50-peer
//chunks, i.e. from a tracker announce reply, and we want to evenly distribute the
//connect attempts if there are multiple torrents running
int insert_pos = random.nextInt( new_requests.size() + 1 );
new_requests.add( insert_pos, cr );
}finally{
new_canceled_mon.exit();
}
}
/**
* Close the given connection.
* @param channel to close
*/
public void
closeConnection(
SocketChannel channel )
{
closeConnection( channel, 0 );
}
public void closeConnection( SocketChannel channel, int delay ) {
try{
pending_closes_mon.enter();
if ( delay == 0 ){
if ( !delayed_closes.containsKey( channel )){
if ( !pending_closes.contains( channel )){
pending_closes.addLast( channel );
}
}
}else{
delayed_closes.put( channel, new Long( SystemTime.getCurrentTime() + delay ));
}
}finally{
pending_closes_mon.exit();
}
}
/**
* Cancel a pending new connection request.
* @param listener_key used in the initial connect request
*/
public void cancelRequest( ConnectListener listener_key ) {
try{
new_canceled_mon.enter();
//check if we can cancel it right away
for( Iterator i = new_requests.iterator(); i.hasNext(); ) {
ConnectionRequest request = (ConnectionRequest)i.next();
if( request.listener == listener_key ) {
i.remove();
return;
}
}
canceled_requests.add( listener_key ); //else add for later removal during select
}
finally{
new_canceled_mon.exit();
}
}
private static class ConnectionRequest {
private final InetSocketAddress address;
private final ConnectListener listener;
private final long request_start_time;
private long connect_start_time;
private final long connect_timeout;
private SocketChannel channel;
private ConnectionRequest( InetSocketAddress _address, ConnectListener _listener, long _connect_timeout ) {
address = _address;
listener = _listener;
connect_timeout = _connect_timeout;
request_start_time = SystemTime.getCurrentTime();
}
}
///////////////////////////////////////////////////////////
/**
* Listener for notification of connection establishment.
*/
public interface ConnectListener {
/**
* The connection establishment process has started,
* i.e. the connection is actively being attempted.
*/
public void connectAttemptStarted();
/**
* The connection attempt succeeded.
* @param channel connected socket channel
*/
public void connectSuccess( SocketChannel channel ) ;
/**
* The connection attempt failed.
* @param failure_msg failure reason
*/
public void connectFailure( Throwable failure_msg );
}
/////////////////////////////////////////////////////////////
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -