📄 genericmessageconnectionindirect.java
字号:
protected
GenericMessageConnectionIndirect(
MessageManagerImpl _message_manager,
String _msg_id,
String _msg_desc,
GenericMessageEndpoint _endpoint,
long _connection_id )
{
// incoming
message_manager = _message_manager;
msg_id = _msg_id;
msg_desc = _msg_desc;
endpoint = _endpoint;
connection_id = _connection_id;
incoming = true;
last_message_received = SystemTime.getCurrentTime();
if ( TRACE ){
trace( "inbound connect from " + endpoint.getNotionalAddress());
}
log( "incoming connection from " + endpoint.getNotionalAddress());
}
public void
setOwner(
GenericMessageConnectionImpl _owner )
{
owner = _owner;
}
public int
getMaximumMessageSize()
{
return( MAX_MESSAGE_SIZE );
}
public long
getLastMessageReceivedTime()
{
long now = SystemTime.getCurrentTime();
if ( now < last_message_received ){
last_message_received = now;
}
return( last_message_received );
}
public GenericMessageEndpoint
getEndpoint()
{
return( endpoint );
}
public void
connect(
ByteBuffer initial_data,
ConnectionListener listener )
{
if ( TRACE ){
trace( "outbound connect to " + endpoint.getNotionalAddress());
}
try{
Map message = new HashMap();
byte[] initial_data_bytes = new byte[ initial_data.remaining()];
initial_data.get( initial_data_bytes );
List initial_messages = new ArrayList();
initial_messages.add( initial_data_bytes );
message.put( "type", new Long( MESSAGE_TYPE_CONNECT ));
message.put( "msg_id", msg_id );
message.put( "msg_desc", msg_desc );
message.put( "data", initial_messages );
Map reply = nat_traverser.sendMessage( message_manager, rendezvous, target, message );
last_message_sent = SystemTime.getCurrentTime();
if ( reply == null || !reply.containsKey( "type") ){
listener.connectFailure( new Throwable( "Indirect connect failed (response=" + reply + ")" ));
}else{
int reply_type = ((Long)reply.get( "type" )).intValue();
if ( reply_type == MESSAGE_TYPE_ERROR ){
listener.connectFailure( new Throwable( new String((byte[])reply.get( "error" ))));
}else if ( reply_type == MESSAGE_TYPE_DISCONNECT ){
listener.connectFailure( new Throwable( "Disconnected" ));
}else if ( reply_type == MESSAGE_TYPE_CONNECT ){
connection_id = ((Long)reply.get( "con_id" )).longValue();
synchronized( local_connections ){
local_connections.put( new Long( connection_id ), this );
}
listener.connectSuccess();
List replies = (List)reply.get( "data" );
for (int i=0;i<replies.size();i++){
owner.receive( new GenericMessage(msg_id, msg_desc, new DirectByteBuffer(ByteBuffer.wrap((byte[])replies.get(i))), false ));
}
}else{
Debug.out( "Unexpected reply type - " + reply_type );
listener.connectFailure( new Throwable( "Unexpected reply type - " + reply_type ));
}
}
}catch( Throwable e ){
listener.connectFailure( e );
}
}
public void
accepted()
{
}
public void
send(
PooledByteBuffer pbb )
throws MessageException
{
byte[] bytes = pbb.toByteArray();
if ( TRACE ){
trace( "send " + bytes.length );
}
if ( incoming ){
synchronized( send_queue ){
if ( send_queue.size() > 64 ){
throw( new MessageException( "Send queue limit exceeded" ));
}
send_queue.add( bytes );
}
send_queue_sem.release();
}else{
List messages = new ArrayList();
messages.add( bytes );
send( messages );
}
}
protected void
send(
List messages )
{
if ( TRACE ){
trace( " send " + messages );
}
try{
Map message = new HashMap();
message.put( "con_id", new Long( connection_id ));
message.put( "type", new Long( MESSAGE_TYPE_DATA ));
message.put( "data", messages );
Map reply = nat_traverser.sendMessage( message_manager, rendezvous, target, message );
last_message_sent = SystemTime.getCurrentTime();
if ( reply == null || !reply.containsKey( "type")){
owner.reportFailed( new Throwable( "Indirect message send failed (response=" + reply + ")" ));
}else{
int reply_type = ((Long)reply.get( "type" )).intValue();
if ( reply_type == MESSAGE_TYPE_ERROR ){
owner.reportFailed( new Throwable( new String((byte[])reply.get( "error" ))));
}else if ( reply_type == MESSAGE_TYPE_DATA ){
List replies = (List)reply.get( "data" );
for (int i=0;i<replies.size();i++){
owner.receive( new GenericMessage(msg_id, msg_desc, new DirectByteBuffer(ByteBuffer.wrap((byte[])replies.get(i))), false ));
}
// if there's more data queued force a keep alive to pick it up but delay
// a little to give the rendezvous a breather
if ( reply.get( "more_data" ) != null ){
if ( TRACE ){
trace( " received 'more to come'" );
}
new DelayedEvent(
"GenMsg:kap",
500,
new AERunnable()
{
public void
runSupport()
{
if ( prepareForKeepAlive( true )){
keep_alive_pool.run(
new AERunnable()
{
public void
runSupport()
{
GenericMessageConnectionIndirect.this.keepAlive();
}
});
}
}
});
}
}else if ( reply_type == MESSAGE_TYPE_DISCONNECT ){
owner.reportFailed( new Throwable( "Disconnected" ));
}
}
}catch( Throwable e ){
owner.reportFailed( e );
}
}
protected List
receive(
List messages )
{
if ( TRACE ){
trace( "receive: " + messages );
}
last_message_received = SystemTime.getCurrentTime();
for (int i=0;i<messages.size();i++){
owner.receive( new GenericMessage(msg_id, msg_desc, new DirectByteBuffer(ByteBuffer.wrap((byte[])messages.get(i))), false ));
}
List reply = new ArrayList();
// hang around a bit to see if we can piggyback a reply
if ( send_queue_sem.reserve( 2500 )){
// give a little more time in case async > 1 message is being queued
try{
Thread.sleep(250);
}catch( Throwable e ){
}
int max = getMaximumMessageSize();
int total = 0;
synchronized( send_queue ){
while( send_queue.size() > 0 ){
byte[] data = (byte[])send_queue.getFirst();
if ( total > 0 && total + data.length > max ){
break;
}
reply.add( send_queue.removeFirst());
total += data.length;
}
if ( TRACE ){
trace( " messages returned = " + reply.size() + " (" + total + "), more=" + (send_queue.size() > 0 ));
}
}
// grab sems for any entries other than the initial one grabbed above
for (int i=1;i<reply.size();i++){
send_queue_sem.reserve();
}
}
return( reply );
}
protected boolean
receiveIncomplete()
{
synchronized( send_queue ){
return( send_queue.size() > 0 );
}
}
public void
close()
throws MessageException
{
close( null );
}
protected void
close(
Throwable close_cause )
throws MessageException
{
if ( closed ){
return;
}
if ( TRACE ){
if ( close_cause == null ){
trace( "close[local]" );
}else{
trace( "close[" + close_cause.getMessage() + "]" );
}
}
log( "connection to " + endpoint.getNotionalAddress() + " closed" + (close_cause==null?"":(" (" + close_cause + ")")));
try{
closed = true;
if ( incoming ){
synchronized( remote_connections ){
remote_connections.remove( new Long( connection_id ));
}
}else{
synchronized( local_connections ){
local_connections.remove( new Long( connection_id ));
}
Map message = new HashMap();
message.put( "con_id", new Long( connection_id ));
message.put( "type", new Long( MESSAGE_TYPE_DISCONNECT ));
try{
nat_traverser.sendMessage( message_manager, rendezvous, target, message );
last_message_sent = SystemTime.getCurrentTime();
}catch( Throwable e ){
throw( new MessageException( "Close operation failed", e ));
}
}
}finally{
if ( close_cause != null ){
owner.reportFailed( close_cause );
}
}
}
protected boolean
isClosed()
{
return( closed );
}
protected boolean
prepareForKeepAlive(
boolean force )
{
if ( keep_alive_in_progress ){
return( false );
}
long now = SystemTime.getCurrentTime();
if ( force || now < last_message_sent || now - last_message_sent > KEEP_ALIVE_MIN ){
keep_alive_in_progress = true;
return( true );
}
return( false );
}
protected void
keepAlive()
{
if (TRACE ){
trace( "keepAlive" );
}
try{
send( new ArrayList());
}finally{
keep_alive_in_progress = false;
}
}
protected static void
log(
String str )
{
if ( Logger.isEnabled()){
Logger.log(new LogEvent(LOGID, "GenericMessaging (indirect):" + str ));
}
}
protected void
trace(
String str )
{
if ( TRACE ){
System.out.println( "GMCI[" +(incoming?"R":"L") + "/" + connection_id + "] " + str );
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -