📄 outgoingmessagequeueimpl.java
字号:
throw( new IOException( "not ready to deliver data" ));
}
int data_written = 0;
int protocol_written = 0;
ArrayList messages_sent = null;
try{
queue_mon.enter();
if( !queue.isEmpty() ) {
ArrayList raw_buffers = new ArrayList();
ArrayList orig_positions = new ArrayList();
int total_sofar = 0;
for( Iterator i = queue.iterator(); i.hasNext(); ) {
DirectByteBuffer[] payloads = ((RawMessage)i.next()).getRawData();
boolean stop = false;
for( int x=0; x < payloads.length; x++ ) {
ByteBuffer buff = payloads[x].getBuffer( DirectByteBuffer.SS_NET );
raw_buffers.add( buff );
orig_positions.add( new Integer( buff.position() ) );
total_sofar += buff.remaining();
if( total_sofar >= max_bytes ) {
stop = true;
break;
}
}
if( stop ) break;
}
int num_raw = raw_buffers.size();
ByteBuffer last_buff = (ByteBuffer)raw_buffers.get( num_raw - 1 );
int orig_last_limit = last_buff.limit();
if( total_sofar > max_bytes ) {
last_buff.limit( orig_last_limit - (total_sofar - max_bytes) );
}
ByteBuffer[] buffs = new ByteBuffer[ num_raw ];
raw_buffers.toArray( buffs );
transport.write( buffs, 0, num_raw );
last_buff.limit( orig_last_limit );
int pos = 0;
boolean stop = false;
while( !queue.isEmpty() && !stop ) {
RawMessage msg = (RawMessage)queue.get( 0 );
DirectByteBuffer[] payloads = msg.getRawData();
for( int x=0; x < payloads.length; x++ ) {
ByteBuffer bb = payloads[x].getBuffer( DirectByteBuffer.SS_NET );
int bytes_written = (bb.limit() - bb.remaining()) - ((Integer)orig_positions.get( pos )).intValue();
total_size -= bytes_written;
if( x > 0 && msg.getType() == Message.TYPE_DATA_PAYLOAD ) { //assumes the first buffer is message header
data_written += bytes_written;
}
else {
protocol_written += bytes_written;
}
if( bb.hasRemaining() ) { //still data left to send in this message
stop = true; //so don't bother checking later messages for completion
//compute send percentage
int message_size = 0;
int written = 0;
for( int i=0; i < payloads.length; i++ ) {
ByteBuffer buff = payloads[i].getBuffer( DirectByteBuffer.SS_NET );
message_size += buff.limit();
if( i < x ) { //if in front of non-empty buffer
written += buff.limit();
}
else if( i == x ) { //is non-empty buffer
written += buff.position();
}
}
percent_complete = (written * 100) / message_size;
break;
}
else if( x == payloads.length - 1 ) { //last payload buffer of message is empty
if( msg == urgent_message ) urgent_message = null;
queue.remove( 0 );
if( TRACE_HISTORY ) {
prev_sent.addLast( msg );
if( prev_sent.size() > MAX_HISTORY_TRACES ) prev_sent.removeFirst();
}
percent_complete = -1; //reset send percentage
if( manual_listener_notify ) {
NotificationItem item = new NotificationItem( NotificationItem.MESSAGE_SENT );
item.message = msg;
try { delayed_notifications_mon.enter();
delayed_notifications.add( item );
} finally { delayed_notifications_mon.exit(); }
}
else {
if( messages_sent == null ) {
messages_sent = new ArrayList();
}
messages_sent.add( msg );
}
}
pos++;
if( pos >= num_raw ) {
stop = true;
break;
}
}
}
}
}finally{
queue_mon.exit();
}
// we can have messages that end up getting serialised as 0 bytes (for http
// connections for example) - we still need to notify them of being sent...
if( data_written + protocol_written > 0 || messages_sent != null ) {
if( manual_listener_notify ) {
if( data_written > 0 ) { //data bytes notify
NotificationItem item = new NotificationItem( NotificationItem.DATA_BYTES_SENT );
item.byte_count = data_written;
try {
delayed_notifications_mon.enter();
delayed_notifications.add( item );
}
finally {
delayed_notifications_mon.exit();
}
}
if( protocol_written > 0 ) { //protocol bytes notify
NotificationItem item = new NotificationItem( NotificationItem.PROTOCOL_BYTES_SENT );
item.byte_count = protocol_written;
try {
delayed_notifications_mon.enter();
delayed_notifications.add( item );
}
finally {
delayed_notifications_mon.exit();
}
}
}
else { //do listener notification now
ArrayList listeners_ref = listeners;
int num_listeners = listeners_ref.size();
for( int i=0; i < num_listeners; i++ ) {
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
if( data_written > 0 ) listener.dataBytesSent( data_written );
if( protocol_written > 0 ) listener.protocolBytesSent( protocol_written );
if ( messages_sent != null ){
for( int x=0; x < messages_sent.size(); x++ ) {
RawMessage msg = (RawMessage)messages_sent.get( x );
listener.messageSent( msg.getBaseMessage() );
if( i == num_listeners - 1 ) { //the last listener notification, so destroy
msg.destroy();
}
}
}
}
}
}
return data_written + protocol_written;
}
/**
* Manually send any unsent listener notifications.
*/
public void doListenerNotifications() {
ArrayList notifications_copy;
try {
delayed_notifications_mon.enter();
if( delayed_notifications.size() == 0 ) return;
notifications_copy = new ArrayList( delayed_notifications );
delayed_notifications.clear();
}
finally {
delayed_notifications_mon.exit();
}
ArrayList listeners_ref = listeners;
for( int j=0; j < notifications_copy.size(); j++ ) { //for each notification
NotificationItem item = (NotificationItem)notifications_copy.get( j );
switch( item.type ) {
case NotificationItem.MESSAGE_ADDED:
for( int i=0; i < listeners_ref.size(); i++ ) { //for each listener
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
listener.messageQueued( item.message.getBaseMessage() );
}
break;
case NotificationItem.MESSAGE_REMOVED:
for( int i=0; i < listeners_ref.size(); i++ ) { //for each listener
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
listener.messageRemoved( item.message.getBaseMessage() );
}
item.message.destroy();
break;
case NotificationItem.MESSAGE_SENT:
for( int i=0; i < listeners_ref.size(); i++ ) { //for each listener
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
listener.messageSent( item.message.getBaseMessage() );
}
item.message.destroy();
break;
case NotificationItem.PROTOCOL_BYTES_SENT:
for( int i=0; i < listeners_ref.size(); i++ ) { //for each listener
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
listener.protocolBytesSent( item.byte_count );
}
break;
case NotificationItem.DATA_BYTES_SENT:
for( int i=0; i < listeners_ref.size(); i++ ) { //for each listener
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
listener.dataBytesSent( item.byte_count );
}
break;
default:
Debug.out( "NotificationItem.type unknown :" + item.type );
}
}
}
public String getQueueTrace() {
StringBuffer trace = new StringBuffer();
trace.append( "**** OUTGOING QUEUE TRACE ****\n" );
try{
queue_mon.enter();
int i=0;
for( Iterator it = prev_sent.iterator(); it.hasNext(); ) {
RawMessage raw = (RawMessage)it.next();
trace.append( "[#h" +i+ "]: ")
.append(raw.getID())
.append(" [")
.append(raw.getDescription())
.append("]")
.append("\n" );
i++;
}
int position = queue.size() - 1;
for( Iterator it = queue.iterator(); it.hasNext(); ) {
RawMessage raw = (RawMessage)it.next();
int pos = raw.getRawData()[0].position(DirectByteBuffer.SS_NET);
int length = raw.getRawData()[0].limit( DirectByteBuffer.SS_NET );
trace.append( "[#")
.append(position)
.append(" ")
.append(pos)
.append(":")
.append(length)
.append("]: ")
.append(raw.getID())
.append(" [")
.append(raw.getDescription())
.append("]")
.append("\n" );
position--;
}
}
finally{
queue_mon.exit();
}
return trace.toString();
}
/**
* Add a listener to be notified of queue events.
* @param listener
*/
public void registerQueueListener( MessageQueueListener listener ) {
try{ listeners_mon.enter();
//copy-on-write
ArrayList new_list = new ArrayList( listeners.size() + 1 );
new_list.addAll( listeners );
new_list.add( listener );
listeners = new_list;
}
finally{ listeners_mon.exit(); }
}
/**
* Cancel queue event notification listener.
* @param listener
*/
public void cancelQueueListener( MessageQueueListener listener ) {
try{ listeners_mon.enter();
//copy-on-write
ArrayList new_list = new ArrayList( listeners );
new_list.remove( listener );
listeners = new_list;
}
finally{ listeners_mon.exit(); }
}
/**
* Notifty the queue (and its listeners) of a message sent externally on the queue's behalf.
* @param message sent externally
*/
public void notifyOfExternallySentMessage( Message message ) {
ArrayList listeners_ref = listeners;
DirectByteBuffer[] buffs = message.getData();
int size = 0;
for( int i=0; i < buffs.length; i++ ) {
size += buffs[i].remaining( DirectByteBuffer.SS_NET );
}
for( int i=0; i < listeners_ref.size(); i++ ) {
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
listener.messageSent( message );
if( message.getType() == Message.TYPE_DATA_PAYLOAD ) {
listener.dataBytesSent( size );
}
else {
listener.protocolBytesSent( size );
}
}
//System.out.println( "notifiedOfExternallySentMessage:: [" +message.getID()+ "] size=" +size );
}
private static class NotificationItem {
private static final int MESSAGE_ADDED = 0;
private static final int MESSAGE_REMOVED = 1;
private static final int MESSAGE_SENT = 2;
private static final int DATA_BYTES_SENT = 3;
private static final int PROTOCOL_BYTES_SENT = 4;
private final int type;
private RawMessage message;
private int byte_count = 0;
private NotificationItem( int notification_type ) {
type = notification_type;
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -