📄 outgoingmessagequeue.java
字号:
return 0;
}
int data_written = 0;
int protocol_written = 0;
ArrayList messages_sent = null;
try{
queue_mon.enter();
if( !queue.isEmpty() ) {
ByteBuffer[] buffers = new ByteBuffer[ queue.size() ];
int[] starting_pos = new int[ queue.size() ];
int pos = 0;
int total_sofar = 0;
while( total_sofar < max_bytes && pos < buffers.length ) {
buffers[ pos ] = ((ProtocolMessage)queue.get( pos )).getPayload().getBuffer(DirectByteBuffer.SS_NET);
total_sofar += buffers[ pos ].remaining();
starting_pos[ pos ] = buffers[ pos ].position();
pos++;
}
pos--; //remove last while loop auto-increment
int orig_limit = buffers[ pos ].limit();
if( total_sofar > max_bytes ) {
buffers[ pos ].limit( orig_limit - (total_sofar - max_bytes) );
}
transport.write( buffers, 0, pos + 1 );
buffers[ pos ].limit( orig_limit );
pos = 0;
while( !queue.isEmpty() ) {
ProtocolMessage msg = (ProtocolMessage)queue.get( 0 );
ByteBuffer bb = msg.getPayload().getBuffer(DirectByteBuffer.SS_NET);
if( !bb.hasRemaining() ) {
if( msg == urgent_message ) urgent_message = null;
int bytes_written = bb.limit() - starting_pos[ pos ];
total_size -= bytes_written;
if( msg.isDataMessage() ) {
data_written += bytes_written;
}
else {
protocol_written += bytes_written;
}
queue.remove( 0 );
if( manual_listener_notify ) {
NotificationItem item = new NotificationItem( NotificationItem.MESSAGE_SENT );
item.message = msg;
item.transport = transport;
try {
delayed_notifications_mon.enter();
delayed_notifications.add( item );
}
finally {
delayed_notifications_mon.exit();
}
}
else {
if ( messages_sent == null ){
messages_sent = new ArrayList();
}
messages_sent.add( msg );
}
}
else {
int bytes_written = (bb.limit() - bb.remaining()) - starting_pos[ pos ];
total_size -= bytes_written;
if( msg.isDataMessage() ) {
data_written += bytes_written;
}
else {
protocol_written += bytes_written;
}
break;
}
pos++;
}
}
}finally{
queue_mon.exit();
}
if( data_written + protocol_written > 0 ) {
if( manual_listener_notify ) {
if( data_written > 0 ) { //data bytes notify
NotificationItem item = new NotificationItem( NotificationItem.DATA_BYTES_SENT );
item.byte_count = data_written;
try {
delayed_notifications_mon.enter();
delayed_notifications.add( item );
}
finally {
delayed_notifications_mon.exit();
}
}
if( protocol_written > 0 ) { //protocol bytes notify
NotificationItem item = new NotificationItem( NotificationItem.PROTOCOL_BYTES_SENT );
item.byte_count = protocol_written;
try {
delayed_notifications_mon.enter();
delayed_notifications.add( item );
}
finally {
delayed_notifications_mon.exit();
}
}
}
else { //do listener notification now
ArrayList listeners_ref = listeners;
int num_listeners = listeners_ref.size();
for( int i=0; i < num_listeners; i++ ) {
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
if( data_written > 0 ) listener.dataBytesSent( data_written );
if( protocol_written > 0 ) listener.protocolBytesSent( protocol_written );
if ( messages_sent != null ){
for( int x=0; x < messages_sent.size(); x++ ) {
ProtocolMessage msg = (ProtocolMessage)messages_sent.get( x );
listener.messageSent( msg );
if( i == num_listeners - 1 ) { //the last listener notification, so destroy
LGLogger.log( LGLogger.CORE_NETWORK, "Sent " +msg.getDescription()+ " message to " + transport.getDescription() );
msg.destroy();
}
}
}
}
}
}
return data_written + protocol_written;
}
/**
* Manually send any unsent listener notifications.
*/
public void doListenerNotifications() {
ArrayList notifications_copy;
try {
delayed_notifications_mon.enter();
if( delayed_notifications.size() == 0 ) return;
notifications_copy = new ArrayList( delayed_notifications );
delayed_notifications.clear();
}
finally {
delayed_notifications_mon.exit();
}
ArrayList listeners_ref = listeners;
for( int j=0; j < notifications_copy.size(); j++ ) { //for each notification
NotificationItem item = (NotificationItem)notifications_copy.get( j );
switch( item.type ) {
case NotificationItem.MESSAGE_ADDED:
for( int i=0; i < listeners_ref.size(); i++ ) { //for each listener
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
listener.messageAdded( item.message );
}
break;
case NotificationItem.MESSAGE_REMOVED:
for( int i=0; i < listeners_ref.size(); i++ ) { //for each listener
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
listener.messageRemoved( item.message );
}
item.message.destroy();
break;
case NotificationItem.MESSAGE_SENT:
for( int i=0; i < listeners_ref.size(); i++ ) { //for each listener
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
listener.messageSent( item.message );
}
LGLogger.log( LGLogger.CORE_NETWORK, "Sent " +item.message.getDescription()+ " message to " + item.transport.getDescription() );
item.message.destroy();
break;
case NotificationItem.PROTOCOL_BYTES_SENT:
for( int i=0; i < listeners_ref.size(); i++ ) { //for each listener
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
listener.protocolBytesSent( item.byte_count );
}
break;
case NotificationItem.DATA_BYTES_SENT:
for( int i=0; i < listeners_ref.size(); i++ ) { //for each listener
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
listener.dataBytesSent( item.byte_count );
}
break;
default:
Debug.out( "NotificationItem.type unknown :" + item.type );
}
}
}
/////////////////////////////////////////////////////////////////
/**
* Receive notification when a new message is added to the queue.
*/
public interface MessageQueueListener {
/**
* The given message has just been queued for sending out the transport.
* @param message queued
*/
public void messageAdded( ProtocolMessage message );
/**
* The given message has just been forcibly removed from the queue,
* i.e. it was *not* sent out the transport.
* @param message removed
*/
public void messageRemoved( ProtocolMessage message );
/**
* The given message has been completely sent out through the transport.
* @param message sent
*/
public void messageSent( ProtocolMessage message );
/**
* The given number of protocol (overhead) bytes has been written to the transport.
* @param byte_count number of protocol bytes
*/
public void protocolBytesSent( int byte_count );
/**
* The given number of (piece) data bytes has been written to the transport.
* @param byte_count number of data bytes
*/
public void dataBytesSent( int byte_count );
}
/**
* Add a listener to be notified of queue events.
* @param listener
*/
public void registerQueueListener( MessageQueueListener listener ) {
try{ listeners_mon.enter();
//copy-on-write
ArrayList new_list = new ArrayList( listeners.size() + 1 );
new_list.addAll( listeners );
new_list.add( listener );
listeners = new_list;
}
finally{ listeners_mon.exit(); }
}
/**
* Cancel queue event notification listener.
* @param listener
*/
public void cancelQueueListener( MessageQueueListener listener ) {
try{ listeners_mon.enter();
//copy-on-write
ArrayList new_list = new ArrayList( listeners );
new_list.remove( listener );
listeners = new_list;
}
finally{ listeners_mon.exit(); }
}
private static class NotificationItem {
private static final int MESSAGE_ADDED = 0;
private static final int MESSAGE_REMOVED = 1;
private static final int MESSAGE_SENT = 2;
private static final int DATA_BYTES_SENT = 3;
private static final int PROTOCOL_BYTES_SENT = 4;
private final int type;
private ProtocolMessage message;
private Transport transport;
private int byte_count = 0;
private NotificationItem( int notification_type ) {
type = notification_type;
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -