⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 outgoingmessagequeueimpl.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*
 * Created on May 8, 2004
 * Created by Alon Rohter
 * Copyright (C) 2004, 2005, 2006 Aelitis, All Rights Reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 * 
 * AELITIS, SAS au capital de 46,603.30 euros
 * 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
 *
 */


package com.aelitis.azureus.core.networkmanager.impl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;

import org.gudy.azureus2.core3.util.*;

import com.aelitis.azureus.core.networkmanager.NetworkManager;
import com.aelitis.azureus.core.networkmanager.OutgoingMessageQueue;
import com.aelitis.azureus.core.networkmanager.RawMessage;
import com.aelitis.azureus.core.networkmanager.Transport;
import com.aelitis.azureus.core.peermanager.messaging.*;



/**
 * Priority-based outbound peer message queue.
 */
public class 
OutgoingMessageQueueImpl
	implements OutgoingMessageQueue
{
  private final LinkedList 		queue		= new LinkedList();
  private final AEMonitor	queue_mon	= new AEMonitor( "OutgoingMessageQueue:queue" );

  private final ArrayList delayed_notifications = new ArrayList();
  private final AEMonitor delayed_notifications_mon = new AEMonitor( "OutgoingMessageQueue:DN" );

  private volatile ArrayList listeners 		= new ArrayList();  //copied-on-write
  private final AEMonitor listeners_mon		= new AEMonitor( "OutgoingMessageQueue:L");
  
  private int total_size = 0;
  private RawMessage urgent_message = null;
  private boolean destroyed = false;
  
  private MessageStreamEncoder stream_encoder;
  private Transport transport;
  
  private int percent_complete = -1;
    
  private static final boolean TRACE_HISTORY = true;  //TODO
  private static final int MAX_HISTORY_TRACES = 30;
  private final LinkedList prev_sent = new LinkedList();
  
  
  
  /**
   * Create a new outgoing message queue.
   * @param stream_encoder default message encoder
   */
  public OutgoingMessageQueueImpl( MessageStreamEncoder stream_encoder ) {
    this.stream_encoder = stream_encoder;
  }
  
  public void
  setTransport(
	 Transport		_transport )
  {
	transport 	= _transport;  
  }
  
  public int
  getMssSize()
  {
	  return( transport==null?NetworkManager.getMinMssSize():transport.getMssSize());
  }
  
  /**
   * Set the message stream encoder that will be used to encode outgoing messages.
   * @param stream_encoder to use
   */
  public void setEncoder( MessageStreamEncoder stream_encoder ) {
    this.stream_encoder = stream_encoder;
  }
  
  public MessageStreamEncoder
  getEncoder()
  {
	  return( stream_encoder );
  }
  
  /**
   * Get the percentage of the current message that has already been sent out.
   * @return percentage complete (0-99), or -1 if no message is currently being sent
   */
  public int getPercentDoneOfCurrentMessage() {
    return percent_complete;
  }
  

  /**
   * Destroy this queue; i.e. perform cleanup actions.
   */
  public void destroy() {
    destroyed = true;
    try{
      queue_mon.enter();
    
      while( !queue.isEmpty() ) {
      	((RawMessage)queue.remove( 0 )).destroy();
      }
    }finally{
      queue_mon.exit();
    }
    total_size = 0;
  }
  
  
  /**
   * Get the total number of bytes ready to be transported.
   * @return total bytes remaining
   */
  public int getTotalSize() {  return total_size;  }
  
  
  /**
   * Whether or not an urgent message (one that needs an immediate send, i.e. a no-delay message) is queued.
   * @return true if there's a message tagged for immediate write
   */
  public boolean hasUrgentMessage() {  return urgent_message == null ? false : true;  }
  
  
  /**
   * Add a message to the message queue.
   * NOTE: Allows for manual listener notification at some later time,
   * using doListenerNotifications(), instead of notifying immediately
   * from within this method.  This is useful if you want to invoke
   * listeners outside of some greater synchronised block to avoid
   * deadlock.
   * @param message message to add
   * @param manual_listener_notify true for manual notification, false for automatic
   */
  public void addMessage( Message message, boolean manual_listener_notify ) {
    //do message add notifications
    boolean allowed = true;
    ArrayList list_ref = listeners;
    
    for( int i=0; i < list_ref.size(); i++ ) {
      MessageQueueListener listener = (MessageQueueListener)list_ref.get( i );
      allowed = allowed && listener.messageAdded( message );
    }
    
    if( !allowed ) {  //message addition not allowed
      //LGLogger.log( "Message [" +message.getDescription()+ "] not allowed for queueing, message addition skipped." );
      //message.destroy();  //TODO destroy????
      return;
    }
    
    
    RawMessage rmesg = stream_encoder.encodeMessage( message );
    
    if( destroyed ) {  //queue is shutdown, drop any added messages
      rmesg.destroy();
      return;
    }
    
    removeMessagesOfType( rmesg.messagesToRemove(), manual_listener_notify );
    
    try{
      queue_mon.enter();
    
      int pos = 0;
      for( Iterator i = queue.iterator(); i.hasNext(); ) {
        RawMessage msg = (RawMessage)i.next();
        if( rmesg.getPriority() > msg.getPriority() 
          && msg.getRawData()[0].position(DirectByteBuffer.SS_NET) == 0 ) {  //but don't insert in front of a half-sent message
          break;
        }
        pos++;
      }
      if( rmesg.isNoDelay() ) {
        urgent_message = rmesg;
      }
      queue.add( pos, rmesg );
      
      DirectByteBuffer[] payload = rmesg.getRawData();
      for( int i=0; i < payload.length; i++ ) {
        total_size += payload[i].remaining(DirectByteBuffer.SS_NET);
      }
    }finally{
      queue_mon.exit();
    }
    
    if( manual_listener_notify ) {  //register listener event for later, manual notification
      NotificationItem item = new NotificationItem( NotificationItem.MESSAGE_ADDED );
      item.message = rmesg;
      try {
        delayed_notifications_mon.enter();
        
        delayed_notifications.add( item );
      }
      finally {
        delayed_notifications_mon.exit();
      }
    }
    else { //do listener notification now
      ArrayList listeners_ref = listeners;
    
      for( int i=0; i < listeners_ref.size(); i++ ) {
        MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
        listener.messageQueued( rmesg.getBaseMessage() );
      }
    }
  }
  

  
  /**
   * Remove all messages of the given types from the queue.
   * NOTE: Allows for manual listener notification at some later time,
   * using doListenerNotifications(), instead of notifying immediately
   * from within this method.  This is useful if you want to invoke
   * listeners outside of some greater synchronised block to avoid
   * deadlock.
   * @param message_types type to remove
   * @param manual_listener_notify true for manual notification, false for automatic
   */
  public void removeMessagesOfType( Message[] message_types, boolean manual_listener_notify ) {
    if( message_types == null ) return;
    
    ArrayList messages_removed = null;
    
    try{
      queue_mon.enter();
    
      for( Iterator i = queue.iterator(); i.hasNext(); ) {
        RawMessage msg = (RawMessage)i.next();
        
        for( int t=0; t < message_types.length; t++ ) {
          boolean same_type = message_types[t].getID().equals( msg.getID() );
          
          if( same_type && msg.getRawData()[0].position(DirectByteBuffer.SS_NET) == 0 ) {   //dont remove a half-sent message
            if( msg == urgent_message ) urgent_message = null;
            
            DirectByteBuffer[] payload = msg.getRawData();
            for( int x=0; x < payload.length; x++ ) {
              total_size -= payload[x].remaining(DirectByteBuffer.SS_NET);
            }
            
            if( manual_listener_notify ) {
              NotificationItem item = new NotificationItem( NotificationItem.MESSAGE_REMOVED );
              item.message = msg;
              try {
                delayed_notifications_mon.enter();
                
                delayed_notifications.add( item );
              }
              finally {
                delayed_notifications_mon.exit();
              }
            }
            else {
              if ( messages_removed == null ){
              	messages_removed = new ArrayList();
              }
              messages_removed.add( msg );
            }
        		i.remove();
            break;
        	}
        }
      }
    }finally{
      queue_mon.exit();
    }

    if( !manual_listener_notify && messages_removed != null ) {
      //do listener notifications now
      ArrayList listeners_ref = listeners;
        
      for( int x=0; x < messages_removed.size(); x++ ) {
        RawMessage msg = (RawMessage)messages_removed.get( x );
        
        for( int i=0; i < listeners_ref.size(); i++ ) {
          MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
          listener.messageRemoved( msg.getBaseMessage() );
        }
        msg.destroy();
      }
    }
  }
  
  
  /**
   * Remove a particular message from the queue.
   * NOTE: Only the original message found in the queue will be destroyed upon removal,
   * which may not necessarily be the one passed as the method parameter,
   * as some messages override equals() (i.e. BTRequest messages) instead of using reference
   * equality, and could be a completely different object, and would need to be destroyed
   * manually.  If the message does not override equals, then any such method will likely
   * *not* be found and removed, as internal queued object was a new allocation on insertion.
   * NOTE: Allows for manual listener notification at some later time,
   * using doListenerNotifications(), instead of notifying immediately
   * from within this method.  This is useful if you want to invoke
   * listeners outside of some greater synchronised block to avoid
   * deadlock.
   * @param message to remove
   * @param manual_listener_notify true for manual notification, false for automatic
   * @return true if the message was removed, false otherwise
   */
  public boolean removeMessage( Message message, boolean manual_listener_notify ) {
    RawMessage msg_removed = null;
    
    try{
      queue_mon.enter();

      for( Iterator it = queue.iterator(); it.hasNext(); ) {
        RawMessage raw = (RawMessage)it.next();
        
        if( message.equals( raw.getBaseMessage() ) ) {
          if( raw.getRawData()[0].position(DirectByteBuffer.SS_NET) == 0 ) {  //dont remove a half-sent message
            if( raw == urgent_message ) urgent_message = null;  
            
            DirectByteBuffer[] payload = raw.getRawData();
            for( int x=0; x < payload.length; x++ ) {
              total_size -= payload[x].remaining(DirectByteBuffer.SS_NET);
            }

            queue.remove( raw );
            msg_removed = raw;
          }
          
          break;
        }
      }
    }finally{
      queue_mon.exit();
    }
    
    
    if( msg_removed != null ) {
      if( manual_listener_notify ) { //delayed manual notification
        NotificationItem item = new NotificationItem( NotificationItem.MESSAGE_REMOVED );
        item.message = msg_removed;
        try {
          delayed_notifications_mon.enter();
          
          delayed_notifications.add( item );
        }
        finally {
          delayed_notifications_mon.exit();
        }
      }
      else {   //do listener notification now
        ArrayList listeners_ref = listeners;
      
        for( int i=0; i < listeners_ref.size(); i++ ) {
          MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
          listener.messageRemoved( msg_removed.getBaseMessage() );
        }
        msg_removed.destroy();
      }
      return true;
    }
    
    return false;
  }
  
  
  /**
   * Deliver (write) message(s) data to the underlying transport.
   * 
   * NOTE: Allows for manual listener notification at some later time,
   * using doListenerNotifications(), instead of notifying immediately
   * from within this method.  This is useful if you want to invoke
   * listeners outside of some greater synchronised block to avoid
   * deadlock.
   * @param max_bytes maximum number of bytes to deliver
   * @param manual_listener_notify true for manual notification, false for automatic
   * @return number of bytes delivered
   * @throws IOException on delivery error
   */
  public int deliverToTransport( int max_bytes, boolean manual_listener_notify ) throws IOException {    
    if( max_bytes < 1 ) {
      Debug.out( "max_bytes < 1: " +max_bytes );
      return 0;
    }
    
    if ( transport == null ){

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -