📄 tcptransportimpl.java
字号:
/*
* Created on May 8, 2004
* Created by Alon Rohter
* Copyright (C) 2004, 2005, 2006 Aelitis, All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* AELITIS, SAS au capital de 46,603.30 euros
* 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
*
*/
package com.aelitis.azureus.core.networkmanager.impl;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import org.gudy.azureus2.core3.config.COConfigurationManager;
import org.gudy.azureus2.core3.logging.*;
import org.gudy.azureus2.core3.util.*;
import com.aelitis.azureus.core.networkmanager.*;
/**
* Represents a peer TCP transport connection (eg. a network socket).
*/
public class TCPTransportImpl implements TCPTransport {
private static final LogIDs LOGID = LogIDs.NET;
protected TCPTransportHelperFilter filter;
protected volatile boolean is_ready_for_write = false;
protected volatile boolean is_ready_for_read = false;
protected Throwable write_select_failure = null;
protected Throwable read_select_failure = null;
private ConnectDisconnectManager.ConnectListener connect_request_key = null;
private String description = "<disconnected>";
private ByteBuffer data_already_read = null;
private final boolean is_inbound_connection;
private int transport_mode = TRANSPORT_MODE_NORMAL;
public volatile boolean has_been_closed = false;
private static final TransportStats stats = AEDiagnostics.TRACE_TCP_TRANSPORT_STATS ? new TransportStats() : null;
private boolean connect_with_crypto;
private byte[] shared_secret;
private int fallback_count;
private final boolean fallback_allowed;
/**
* Constructor for disconnected (outbound) transport.
*/
public TCPTransportImpl( boolean use_crypto, boolean allow_fallback, byte[] _shared_secret ) {
filter = null;
is_inbound_connection = false;
connect_with_crypto = use_crypto;
shared_secret = _shared_secret;
fallback_allowed = allow_fallback;
}
/**
* Constructor for connected (inbound) transport.
* @param channel connection
* @param already_read bytes from the channel
*/
public TCPTransportImpl( TCPTransportHelperFilter filter, ByteBuffer already_read ) {
this.filter = filter;
this.data_already_read = already_read;
is_inbound_connection = true;
connect_with_crypto = false; //inbound connections will automatically be using crypto if necessary
fallback_allowed = false;
description = ( is_inbound_connection ? "R" : "L" ) + ": " + filter.getSocketChannel().socket().getInetAddress().getHostAddress() + ": " + filter.getSocketChannel().socket().getPort();
registerSelectHandling();
}
/**
* Inject the given already-read data back into the read stream.
* @param bytes_already_read data
*/
public void setAlreadyRead( ByteBuffer bytes_already_read ) {
if( bytes_already_read != null && bytes_already_read.hasRemaining() ) {
data_already_read = bytes_already_read;
}
}
/**
* Get the socket channel used by the transport.
* @return the socket channel
*/
public SocketChannel getSocketChannel() { return filter.getSocketChannel(); }
/**
* Get a textual description for this transport.
* @return description
*/
public String getDescription() { return description; }
/**
* Is the transport ready to write,
* i.e. will a write request result in >0 bytes written.
* @return true if the transport is write ready, false if not yet ready
*/
public boolean isReadyForWrite() { return is_ready_for_write; }
/**
* Is the transport ready to read,
* i.e. will a read request result in >0 bytes read.
* @return true if the transport is read ready, false if not yet ready
*/
public boolean isReadyForRead() { return is_ready_for_read; }
/**
* Write data to the transport from the given buffers.
* NOTE: Works like GatheringByteChannel.
* @param buffers from which bytes are to be retrieved
* @param array_offset offset within the buffer array of the first buffer from which bytes are to be retrieved
* @param length maximum number of buffers to be accessed
* @return number of bytes written
* @throws IOException on write error
*/
public long write( ByteBuffer[] buffers, int array_offset, int length ) throws IOException {
if( write_select_failure != null ) throw new IOException( "write_select_failure: " + write_select_failure.getMessage() );
long written = filter.write( buffers, array_offset, length );
if( stats != null ) stats.bytesWritten( (int)written ); //TODO
if( written < 1 ) requestWriteSelect();
return written;
}
private void requestWriteSelect() {
is_ready_for_write = false;
if( filter != null ){
NetworkManager.getSingleton().getWriteSelector().resumeSelects( filter.getSocketChannel() );
}
}
private void requestReadSelect() {
is_ready_for_read = false;
if( filter != null ){
NetworkManager.getSingleton().getReadSelector().resumeSelects( filter.getSocketChannel() );
}
}
private void registerSelectHandling() {
if( filter == null ) {
Debug.out( "ERROR: registerSelectHandling():: socket_channel == null" );
return;
}
//read selection
NetworkManager.getSingleton().getReadSelector().register( filter.getSocketChannel(), new VirtualChannelSelector.VirtualSelectorListener() {
public boolean selectSuccess( VirtualChannelSelector selector, SocketChannel sc,Object attachment ) {
is_ready_for_read = true;
return true;
}
public void selectFailure( VirtualChannelSelector selector, SocketChannel sc,Object attachment, Throwable msg ) {
read_select_failure = msg;
is_ready_for_read = true; //set to true so that the next read attempt will throw an exception
}
}, null );
//write selection
NetworkManager.getSingleton().getWriteSelector().register( filter.getSocketChannel(), new VirtualChannelSelector.VirtualSelectorListener() {
public boolean selectSuccess( VirtualChannelSelector selector, SocketChannel sc,Object attachment ) {
is_ready_for_write = true;
return true;
}
public void selectFailure( VirtualChannelSelector selector, SocketChannel sc,Object attachment, Throwable msg ) {
write_select_failure = msg;
is_ready_for_write = true; //set to true so that the next write attempt will throw an exception
}
}, null );
}
/**
* Read data from the transport into the given buffers.
* NOTE: Works like ScatteringByteChannel.
* @param buffers into which bytes are to be placed
* @param array_offset offset within the buffer array of the first buffer into which bytes are to be placed
* @param length maximum number of buffers to be accessed
* @return number of bytes read
* @throws IOException on read error
*/
public long read( ByteBuffer[] buffers, int array_offset, int length ) throws IOException {
if( read_select_failure != null ) {
is_ready_for_read = false;
throw new IOException( "read_select_failure: " + read_select_failure.getMessage() );
}
//insert already-read data into the front of the stream
if( data_already_read != null ) {
int inserted = 0;
for( int i = array_offset; i < (array_offset + length); i++ ) {
ByteBuffer bb = buffers[ i ];
int orig_limit = data_already_read.limit();
if( data_already_read.remaining() > bb.remaining() ) {
data_already_read.limit( data_already_read.position() + bb.remaining() );
}
inserted += data_already_read.remaining();
bb.put( data_already_read );
data_already_read.limit( orig_limit );
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -