⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 outgoingmessagequeue.java

📁 Azureus is a powerful, full-featured, cross-platform java BitTorrent client
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
      return 0;
    }
    
    int data_written = 0;
    int protocol_written = 0;
    
    ArrayList messages_sent = null;
    
    try{
      queue_mon.enter();

    	if( !queue.isEmpty() ) {
        ByteBuffer[] buffers = new ByteBuffer[ queue.size() ];
        int[] starting_pos = new int[ queue.size() ];
        int pos = 0;
    		int total_sofar = 0;
        while( total_sofar < max_bytes && pos < buffers.length ) {
          buffers[ pos ] = ((ProtocolMessage)queue.get( pos )).getPayload().getBuffer(DirectByteBuffer.SS_NET);
          total_sofar += buffers[ pos ].remaining();
          starting_pos[ pos ] = buffers[ pos ].position();
          pos++;
    		}
        pos--; //remove last while loop auto-increment
    		int orig_limit = buffers[ pos ].limit();
    		if( total_sofar > max_bytes ) {
    			buffers[ pos ].limit( orig_limit - (total_sofar - max_bytes) );
    		}
             
        transport.write( buffers, 0, pos + 1 );

        buffers[ pos ].limit( orig_limit );
        pos = 0;
        while( !queue.isEmpty() ) {
          ProtocolMessage msg = (ProtocolMessage)queue.get( 0 );
          ByteBuffer bb = msg.getPayload().getBuffer(DirectByteBuffer.SS_NET);
          if( !bb.hasRemaining() ) {
            if( msg == urgent_message ) urgent_message = null;
            
            int bytes_written = bb.limit() - starting_pos[ pos ];
            total_size -= bytes_written;
            
            if( msg.isDataMessage() ) {
              data_written += bytes_written;
            }
            else {
              protocol_written += bytes_written;
            }
            
            queue.remove( 0 );
            if( manual_listener_notify ) {
              NotificationItem item = new NotificationItem( NotificationItem.MESSAGE_SENT );
              item.message = msg;
              item.transport = transport;
              try {
                delayed_notifications_mon.enter();
                
                delayed_notifications.add( item );
              }
              finally {
                delayed_notifications_mon.exit();
              }
            }
            else {
              if ( messages_sent == null ){
              	
              	messages_sent = new ArrayList();
              }
              
              messages_sent.add( msg );
            }
          }
          else {           
            int bytes_written = (bb.limit() - bb.remaining()) - starting_pos[ pos ];
            total_size -= bytes_written;
            
            if( msg.isDataMessage() ) {
              data_written += bytes_written;
            }
            else {
              protocol_written += bytes_written;
            }
            
            break;
          }
          pos++;
        }
    	}
    }finally{
      queue_mon.exit();
    }
    
    if( data_written + protocol_written > 0 ) {
      if( manual_listener_notify ) {
        
        if( data_written > 0 ) {  //data bytes notify
          NotificationItem item = new NotificationItem( NotificationItem.DATA_BYTES_SENT );
          item.byte_count = data_written;
          try {
            delayed_notifications_mon.enter();
            
            delayed_notifications.add( item );
          }
          finally {
            delayed_notifications_mon.exit();
          }
        }

        if( protocol_written > 0 ) {  //protocol bytes notify
          NotificationItem item = new NotificationItem( NotificationItem.PROTOCOL_BYTES_SENT );
          item.byte_count = protocol_written;
          try {
            delayed_notifications_mon.enter();
            
            delayed_notifications.add( item );
          }
          finally {
            delayed_notifications_mon.exit();
          }
        }
      }
      else {  //do listener notification now
        ArrayList listeners_ref = listeners;
        
        int num_listeners = listeners_ref.size();
        for( int i=0; i < num_listeners; i++ ) {
          MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );

          if( data_written > 0 )  listener.dataBytesSent( data_written );
          if( protocol_written > 0 )  listener.protocolBytesSent( protocol_written );
          
          if ( messages_sent != null ){
          	
	          for( int x=0; x < messages_sent.size(); x++ ) {
	            ProtocolMessage msg = (ProtocolMessage)messages_sent.get( x );
	
	            listener.messageSent( msg );
	            
	            if( i == num_listeners - 1 ) {  //the last listener notification, so destroy
	              LGLogger.log( LGLogger.CORE_NETWORK, "Sent " +msg.getDescription()+ " message to " + transport.getDescription() );
	              msg.destroy();
	            }
	          }
          }
        }
      }
    }
    
    return data_written + protocol_written;
  }
  
  
  /**
   * Manually send any unsent listener notifications.
   */
  public void doListenerNotifications() {
    ArrayList notifications_copy;
    try {
      delayed_notifications_mon.enter();
      
      if( delayed_notifications.size() == 0 )  return;
      notifications_copy = new ArrayList( delayed_notifications );
      delayed_notifications.clear();
    }
    finally {
      delayed_notifications_mon.exit();
    }
    
    ArrayList listeners_ref = listeners;
    
    for( int j=0; j < notifications_copy.size(); j++ ) {  //for each notification
      NotificationItem item = (NotificationItem)notifications_copy.get( j );

      switch( item.type ) {
        case NotificationItem.MESSAGE_ADDED:
          for( int i=0; i < listeners_ref.size(); i++ ) {  //for each listener
            MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
            listener.messageAdded( item.message );
          }
          break;
          
        case NotificationItem.MESSAGE_REMOVED:
          for( int i=0; i < listeners_ref.size(); i++ ) {  //for each listener
            MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
            listener.messageRemoved( item.message );
          }
          item.message.destroy();
          break;
          
        case NotificationItem.MESSAGE_SENT:
          for( int i=0; i < listeners_ref.size(); i++ ) {  //for each listener
            MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
            listener.messageSent( item.message );
          }
          LGLogger.log( LGLogger.CORE_NETWORK, "Sent " +item.message.getDescription()+ " message to " + item.transport.getDescription() );
          item.message.destroy();
          break;
          
        case NotificationItem.PROTOCOL_BYTES_SENT:
          for( int i=0; i < listeners_ref.size(); i++ ) {  //for each listener
            MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
            listener.protocolBytesSent( item.byte_count );
          }
          break;
          
        case NotificationItem.DATA_BYTES_SENT:
          for( int i=0; i < listeners_ref.size(); i++ ) {  //for each listener
            MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
            listener.dataBytesSent( item.byte_count );
          }
          break;
          
        default:
          Debug.out( "NotificationItem.type unknown :" + item.type );
      }
    }
  }
  

  /////////////////////////////////////////////////////////////////
  
  /**
   * Receive notification when a new message is added to the queue.
   */
  public interface MessageQueueListener {
    /**
     * The given message has just been queued for sending out the transport.
     * @param message queued
     */
    public void messageAdded( ProtocolMessage message );
    
    /**
     * The given message has just been forcibly removed from the queue,
     * i.e. it was *not* sent out the transport.
     * @param message removed
     */
    public void messageRemoved( ProtocolMessage message );
    
    /**
     * The given message has been completely sent out through the transport.
     * @param message sent
     */
    public void messageSent( ProtocolMessage message );
    
    /**
     * The given number of protocol (overhead) bytes has been written to the transport.
     * @param byte_count number of protocol bytes
     */
    public void protocolBytesSent( int byte_count );
    
    
    /**
     * The given number of (piece) data bytes has been written to the transport.
     * @param byte_count number of data bytes
     */
    public void dataBytesSent( int byte_count );
  }
  

  
  /**
   * Add a listener to be notified of queue events.
   * @param listener
   */
  public void registerQueueListener( MessageQueueListener listener ) {
    try{  listeners_mon.enter();
      //copy-on-write
      ArrayList new_list = new ArrayList( listeners.size() + 1 );
      new_list.addAll( listeners );
      new_list.add( listener );
      listeners = new_list;
    }
    finally{  listeners_mon.exit();  }
  }
  
  
  /**
   * Cancel queue event notification listener.
   * @param listener
   */
  public void cancelQueueListener( MessageQueueListener listener ) {
    try{  listeners_mon.enter();
      //copy-on-write
      ArrayList new_list = new ArrayList( listeners );
      new_list.remove( listener );
      listeners = new_list;
    }
    finally{  listeners_mon.exit();  }
  }

  
  private static class NotificationItem {
    private static final int MESSAGE_ADDED        = 0;
    private static final int MESSAGE_REMOVED      = 1;
    private static final int MESSAGE_SENT         = 2;
    private static final int DATA_BYTES_SENT      = 3;
    private static final int PROTOCOL_BYTES_SENT  = 4;
    private final int type;
    private ProtocolMessage message;
    private Transport transport;
    private int byte_count = 0;
    private NotificationItem( int notification_type ) {
      type = notification_type;
    }
  }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -