📄 outgoingmessagequeueimpl.java
字号:
/*
* Created on May 8, 2004
* Created by Alon Rohter
* Copyright (C) 2004, 2005, 2006 Aelitis, All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* AELITIS, SAS au capital de 46,603.30 euros
* 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
*
*/
package com.aelitis.azureus.core.networkmanager.impl;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import org.gudy.azureus2.core3.util.*;
import com.aelitis.azureus.core.networkmanager.NetworkManager;
import com.aelitis.azureus.core.networkmanager.OutgoingMessageQueue;
import com.aelitis.azureus.core.networkmanager.RawMessage;
import com.aelitis.azureus.core.networkmanager.Transport;
import com.aelitis.azureus.core.peermanager.messaging.*;
/**
* Priority-based outbound peer message queue.
*/
public class
OutgoingMessageQueueImpl
implements OutgoingMessageQueue
{
private final LinkedList queue = new LinkedList();
private final AEMonitor queue_mon = new AEMonitor( "OutgoingMessageQueue:queue" );
private final ArrayList delayed_notifications = new ArrayList();
private final AEMonitor delayed_notifications_mon = new AEMonitor( "OutgoingMessageQueue:DN" );
private volatile ArrayList listeners = new ArrayList(); //copied-on-write
private final AEMonitor listeners_mon = new AEMonitor( "OutgoingMessageQueue:L");
private int total_size = 0;
private RawMessage urgent_message = null;
private boolean destroyed = false;
private MessageStreamEncoder stream_encoder;
private Transport transport;
private int percent_complete = -1;
private static final boolean TRACE_HISTORY = true; //TODO
private static final int MAX_HISTORY_TRACES = 30;
private final LinkedList prev_sent = new LinkedList();
/**
* Create a new outgoing message queue.
* @param stream_encoder default message encoder
*/
public OutgoingMessageQueueImpl( MessageStreamEncoder stream_encoder ) {
this.stream_encoder = stream_encoder;
}
public void
setTransport(
Transport _transport )
{
transport = _transport;
}
public int
getMssSize()
{
return( transport==null?NetworkManager.getMinMssSize():transport.getMssSize());
}
/**
* Set the message stream encoder that will be used to encode outgoing messages.
* @param stream_encoder to use
*/
public void setEncoder( MessageStreamEncoder stream_encoder ) {
this.stream_encoder = stream_encoder;
}
public MessageStreamEncoder
getEncoder()
{
return( stream_encoder );
}
/**
* Get the percentage of the current message that has already been sent out.
* @return percentage complete (0-99), or -1 if no message is currently being sent
*/
public int getPercentDoneOfCurrentMessage() {
return percent_complete;
}
/**
* Destroy this queue; i.e. perform cleanup actions.
*/
public void destroy() {
destroyed = true;
try{
queue_mon.enter();
while( !queue.isEmpty() ) {
((RawMessage)queue.remove( 0 )).destroy();
}
}finally{
queue_mon.exit();
}
total_size = 0;
}
/**
* Get the total number of bytes ready to be transported.
* @return total bytes remaining
*/
public int getTotalSize() { return total_size; }
/**
* Whether or not an urgent message (one that needs an immediate send, i.e. a no-delay message) is queued.
* @return true if there's a message tagged for immediate write
*/
public boolean hasUrgentMessage() { return urgent_message == null ? false : true; }
/**
* Add a message to the message queue.
* NOTE: Allows for manual listener notification at some later time,
* using doListenerNotifications(), instead of notifying immediately
* from within this method. This is useful if you want to invoke
* listeners outside of some greater synchronised block to avoid
* deadlock.
* @param message message to add
* @param manual_listener_notify true for manual notification, false for automatic
*/
public void addMessage( Message message, boolean manual_listener_notify ) {
//do message add notifications
boolean allowed = true;
ArrayList list_ref = listeners;
for( int i=0; i < list_ref.size(); i++ ) {
MessageQueueListener listener = (MessageQueueListener)list_ref.get( i );
allowed = allowed && listener.messageAdded( message );
}
if( !allowed ) { //message addition not allowed
//LGLogger.log( "Message [" +message.getDescription()+ "] not allowed for queueing, message addition skipped." );
//message.destroy(); //TODO destroy????
return;
}
RawMessage rmesg = stream_encoder.encodeMessage( message );
if( destroyed ) { //queue is shutdown, drop any added messages
rmesg.destroy();
return;
}
removeMessagesOfType( rmesg.messagesToRemove(), manual_listener_notify );
try{
queue_mon.enter();
int pos = 0;
for( Iterator i = queue.iterator(); i.hasNext(); ) {
RawMessage msg = (RawMessage)i.next();
if( rmesg.getPriority() > msg.getPriority()
&& msg.getRawData()[0].position(DirectByteBuffer.SS_NET) == 0 ) { //but don't insert in front of a half-sent message
break;
}
pos++;
}
if( rmesg.isNoDelay() ) {
urgent_message = rmesg;
}
queue.add( pos, rmesg );
DirectByteBuffer[] payload = rmesg.getRawData();
for( int i=0; i < payload.length; i++ ) {
total_size += payload[i].remaining(DirectByteBuffer.SS_NET);
}
}finally{
queue_mon.exit();
}
if( manual_listener_notify ) { //register listener event for later, manual notification
NotificationItem item = new NotificationItem( NotificationItem.MESSAGE_ADDED );
item.message = rmesg;
try {
delayed_notifications_mon.enter();
delayed_notifications.add( item );
}
finally {
delayed_notifications_mon.exit();
}
}
else { //do listener notification now
ArrayList listeners_ref = listeners;
for( int i=0; i < listeners_ref.size(); i++ ) {
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
listener.messageQueued( rmesg.getBaseMessage() );
}
}
}
/**
* Remove all messages of the given types from the queue.
* NOTE: Allows for manual listener notification at some later time,
* using doListenerNotifications(), instead of notifying immediately
* from within this method. This is useful if you want to invoke
* listeners outside of some greater synchronised block to avoid
* deadlock.
* @param message_types type to remove
* @param manual_listener_notify true for manual notification, false for automatic
*/
public void removeMessagesOfType( Message[] message_types, boolean manual_listener_notify ) {
if( message_types == null ) return;
ArrayList messages_removed = null;
try{
queue_mon.enter();
for( Iterator i = queue.iterator(); i.hasNext(); ) {
RawMessage msg = (RawMessage)i.next();
for( int t=0; t < message_types.length; t++ ) {
boolean same_type = message_types[t].getID().equals( msg.getID() );
if( same_type && msg.getRawData()[0].position(DirectByteBuffer.SS_NET) == 0 ) { //dont remove a half-sent message
if( msg == urgent_message ) urgent_message = null;
DirectByteBuffer[] payload = msg.getRawData();
for( int x=0; x < payload.length; x++ ) {
total_size -= payload[x].remaining(DirectByteBuffer.SS_NET);
}
if( manual_listener_notify ) {
NotificationItem item = new NotificationItem( NotificationItem.MESSAGE_REMOVED );
item.message = msg;
try {
delayed_notifications_mon.enter();
delayed_notifications.add( item );
}
finally {
delayed_notifications_mon.exit();
}
}
else {
if ( messages_removed == null ){
messages_removed = new ArrayList();
}
messages_removed.add( msg );
}
i.remove();
break;
}
}
}
}finally{
queue_mon.exit();
}
if( !manual_listener_notify && messages_removed != null ) {
//do listener notifications now
ArrayList listeners_ref = listeners;
for( int x=0; x < messages_removed.size(); x++ ) {
RawMessage msg = (RawMessage)messages_removed.get( x );
for( int i=0; i < listeners_ref.size(); i++ ) {
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
listener.messageRemoved( msg.getBaseMessage() );
}
msg.destroy();
}
}
}
/**
* Remove a particular message from the queue.
* NOTE: Only the original message found in the queue will be destroyed upon removal,
* which may not necessarily be the one passed as the method parameter,
* as some messages override equals() (i.e. BTRequest messages) instead of using reference
* equality, and could be a completely different object, and would need to be destroyed
* manually. If the message does not override equals, then any such method will likely
* *not* be found and removed, as internal queued object was a new allocation on insertion.
* NOTE: Allows for manual listener notification at some later time,
* using doListenerNotifications(), instead of notifying immediately
* from within this method. This is useful if you want to invoke
* listeners outside of some greater synchronised block to avoid
* deadlock.
* @param message to remove
* @param manual_listener_notify true for manual notification, false for automatic
* @return true if the message was removed, false otherwise
*/
public boolean removeMessage( Message message, boolean manual_listener_notify ) {
RawMessage msg_removed = null;
try{
queue_mon.enter();
for( Iterator it = queue.iterator(); it.hasNext(); ) {
RawMessage raw = (RawMessage)it.next();
if( message.equals( raw.getBaseMessage() ) ) {
if( raw.getRawData()[0].position(DirectByteBuffer.SS_NET) == 0 ) { //dont remove a half-sent message
if( raw == urgent_message ) urgent_message = null;
DirectByteBuffer[] payload = raw.getRawData();
for( int x=0; x < payload.length; x++ ) {
total_size -= payload[x].remaining(DirectByteBuffer.SS_NET);
}
queue.remove( raw );
msg_removed = raw;
}
break;
}
}
}finally{
queue_mon.exit();
}
if( msg_removed != null ) {
if( manual_listener_notify ) { //delayed manual notification
NotificationItem item = new NotificationItem( NotificationItem.MESSAGE_REMOVED );
item.message = msg_removed;
try {
delayed_notifications_mon.enter();
delayed_notifications.add( item );
}
finally {
delayed_notifications_mon.exit();
}
}
else { //do listener notification now
ArrayList listeners_ref = listeners;
for( int i=0; i < listeners_ref.size(); i++ ) {
MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
listener.messageRemoved( msg_removed.getBaseMessage() );
}
msg_removed.destroy();
}
return true;
}
return false;
}
/**
* Deliver (write) message(s) data to the underlying transport.
*
* NOTE: Allows for manual listener notification at some later time,
* using doListenerNotifications(), instead of notifying immediately
* from within this method. This is useful if you want to invoke
* listeners outside of some greater synchronised block to avoid
* deadlock.
* @param max_bytes maximum number of bytes to deliver
* @param manual_listener_notify true for manual notification, false for automatic
* @return number of bytes delivered
* @throws IOException on delivery error
*/
public int deliverToTransport( int max_bytes, boolean manual_listener_notify ) throws IOException {
if( max_bytes < 1 ) {
Debug.out( "max_bytes < 1: " +max_bytes );
return 0;
}
if ( transport == null ){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -