⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 outgoingmessagequeueimpl.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
    	throw( new IOException( "not ready to deliver data" ));
    }
    int data_written = 0;
    int protocol_written = 0;
    
    ArrayList messages_sent = null;
    
    try{
      queue_mon.enter();

    	if( !queue.isEmpty() ) {
        ArrayList raw_buffers = new ArrayList();
        ArrayList orig_positions = new ArrayList();
        int total_sofar = 0;
        
        for( Iterator i = queue.iterator(); i.hasNext(); ) {
          DirectByteBuffer[] payloads = ((RawMessage)i.next()).getRawData();
          boolean stop = false;
          
          for( int x=0; x < payloads.length; x++ ) {
            ByteBuffer buff = payloads[x].getBuffer( DirectByteBuffer.SS_NET );
            raw_buffers.add( buff );
            orig_positions.add( new Integer( buff.position() ) );
            total_sofar += buff.remaining();
            
            if( total_sofar >= max_bytes ) {
              stop = true;
              break;
            }
          }
          
          if( stop )  break;
        }
                
        int num_raw = raw_buffers.size();
        
        ByteBuffer last_buff = (ByteBuffer)raw_buffers.get( num_raw - 1 );
        int orig_last_limit = last_buff.limit();
    		if( total_sofar > max_bytes ) {
          last_buff.limit( orig_last_limit - (total_sofar - max_bytes) );
    		}
        
        ByteBuffer[] buffs = new ByteBuffer[ num_raw ];
        raw_buffers.toArray( buffs );
        
        transport.write( buffs, 0, num_raw );
        
        last_buff.limit( orig_last_limit );
        
        int pos = 0;
        boolean stop = false;
        
        while( !queue.isEmpty() && !stop ) {
          RawMessage msg = (RawMessage)queue.get( 0 );
          DirectByteBuffer[] payloads = msg.getRawData();
                    
          for( int x=0; x < payloads.length; x++ ) {
            ByteBuffer bb = payloads[x].getBuffer( DirectByteBuffer.SS_NET );
            
            int bytes_written = (bb.limit() - bb.remaining()) - ((Integer)orig_positions.get( pos )).intValue();
            total_size -= bytes_written;
            
            if( x > 0 && msg.getType() == Message.TYPE_DATA_PAYLOAD ) {  //assumes the first buffer is message header
              data_written += bytes_written;
            }
            else {
              protocol_written += bytes_written;
            }
            
            if( bb.hasRemaining() ) {  //still data left to send in this message
              stop = true;  //so don't bother checking later messages for completion
              
              //compute send percentage
              int message_size = 0;
              int written = 0;
              
              for( int i=0; i < payloads.length; i++ ) {
                ByteBuffer buff = payloads[i].getBuffer( DirectByteBuffer.SS_NET );
                
                message_size += buff.limit();
                
                if( i < x ) {  //if in front of non-empty buffer
                  written += buff.limit();
                }
                else if( i == x ) {  //is non-empty buffer
                  written += buff.position();
                }
              }
              
              percent_complete = (written * 100) / message_size;

              break;
            }
            else if( x == payloads.length - 1 ) {  //last payload buffer of message is empty
              if( msg == urgent_message ) urgent_message = null;
            
              queue.remove( 0 );
              
              
              if( TRACE_HISTORY ) {
              	prev_sent.addLast( msg );
              	if( prev_sent.size() > MAX_HISTORY_TRACES )  prev_sent.removeFirst();
              }
              
              
              percent_complete = -1;  //reset send percentage
                            
              if( manual_listener_notify ) {
                NotificationItem item = new NotificationItem( NotificationItem.MESSAGE_SENT );
                item.message = msg;
                try {  delayed_notifications_mon.enter();
                  delayed_notifications.add( item );
                } finally {  delayed_notifications_mon.exit();  }
              }
              else {
                if( messages_sent == null ) {
                  messages_sent = new ArrayList();
                }
                messages_sent.add( msg );
              }
            }
            
            pos++;
            if( pos >= num_raw ) {
              stop = true;
              break;
            }
          }
        }
    	}
    }finally{
      queue_mon.exit();
    }
    
    	// we can have messages that end up getting serialised as 0 bytes (for http
    	// connections for example) - we still need to notify them of being sent...
    
    if( data_written + protocol_written > 0 || messages_sent != null ) {
      if( manual_listener_notify ) {
        
        if( data_written > 0 ) {  //data bytes notify
          NotificationItem item = new NotificationItem( NotificationItem.DATA_BYTES_SENT );
          item.byte_count = data_written;
          try {
            delayed_notifications_mon.enter();
            
            delayed_notifications.add( item );
          }
          finally {
            delayed_notifications_mon.exit();
          }
        }

        if( protocol_written > 0 ) {  //protocol bytes notify
          NotificationItem item = new NotificationItem( NotificationItem.PROTOCOL_BYTES_SENT );
          item.byte_count = protocol_written;
          try {
            delayed_notifications_mon.enter();
            
            delayed_notifications.add( item );
          }
          finally {
            delayed_notifications_mon.exit();
          }
        }
      }
      else {  //do listener notification now
        ArrayList listeners_ref = listeners;
        
        int num_listeners = listeners_ref.size();
        for( int i=0; i < num_listeners; i++ ) {
          MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );

          if( data_written > 0 )  listener.dataBytesSent( data_written );
          if( protocol_written > 0 )  listener.protocolBytesSent( protocol_written );
          
          if ( messages_sent != null ){
          	
	          for( int x=0; x < messages_sent.size(); x++ ) {
	            RawMessage msg = (RawMessage)messages_sent.get( x );
	
	            listener.messageSent( msg.getBaseMessage() );
	            
	            if( i == num_listeners - 1 ) {  //the last listener notification, so destroy
	              msg.destroy();
	            }
	          }
          }
        }
      }
    }
    
    return data_written + protocol_written;
  }
  
  
  /**
   * Manually send any unsent listener notifications.
   */
  public void doListenerNotifications() {
    ArrayList notifications_copy;
    try {
      delayed_notifications_mon.enter();
      
      if( delayed_notifications.size() == 0 )  return;
      notifications_copy = new ArrayList( delayed_notifications );
      delayed_notifications.clear();
    }
    finally {
      delayed_notifications_mon.exit();
    }
    
    ArrayList listeners_ref = listeners;
    
    for( int j=0; j < notifications_copy.size(); j++ ) {  //for each notification
      NotificationItem item = (NotificationItem)notifications_copy.get( j );

      switch( item.type ) {
        case NotificationItem.MESSAGE_ADDED:
          for( int i=0; i < listeners_ref.size(); i++ ) {  //for each listener
            MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
            listener.messageQueued( item.message.getBaseMessage() );
          }
          break;
          
        case NotificationItem.MESSAGE_REMOVED:
          for( int i=0; i < listeners_ref.size(); i++ ) {  //for each listener
            MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
            listener.messageRemoved( item.message.getBaseMessage() );
          }
          item.message.destroy();
          break;
          
        case NotificationItem.MESSAGE_SENT:
          for( int i=0; i < listeners_ref.size(); i++ ) {  //for each listener
            MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
            listener.messageSent( item.message.getBaseMessage() );
          }
          item.message.destroy();
          break;
          
        case NotificationItem.PROTOCOL_BYTES_SENT:
          for( int i=0; i < listeners_ref.size(); i++ ) {  //for each listener
            MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
            listener.protocolBytesSent( item.byte_count );
          }
          break;
          
        case NotificationItem.DATA_BYTES_SENT:
          for( int i=0; i < listeners_ref.size(); i++ ) {  //for each listener
            MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );
            listener.dataBytesSent( item.byte_count );
          }
          break;
          
        default:
          Debug.out( "NotificationItem.type unknown :" + item.type );
      }
    }
  }
  
  
  
  
  public String getQueueTrace() {
  	StringBuffer trace = new StringBuffer();
  	
  	trace.append( "**** OUTGOING QUEUE TRACE ****\n" );
  	
  	try{
      queue_mon.enter();
      
      
      int i=0;
    	
    	for( Iterator it = prev_sent.iterator(); it.hasNext(); ) {
    		RawMessage raw = (RawMessage)it.next();
        trace.append( "[#h" +i+ "]: ")
             .append(raw.getID())
             .append(" [")
             .append(raw.getDescription())
             .append("]")
             .append("\n" );
        i++;
    	}      
      
      

      int position = queue.size() - 1;

      for( Iterator it = queue.iterator(); it.hasNext(); ) {
        RawMessage raw = (RawMessage)it.next();
        
        int pos = raw.getRawData()[0].position(DirectByteBuffer.SS_NET);
        int length = raw.getRawData()[0].limit( DirectByteBuffer.SS_NET );
        
        trace.append( "[#")
             .append(position)
             .append(" ")
             .append(pos)
             .append(":")
             .append(length)
             .append("]: ")
             .append(raw.getID())
             .append(" [")
             .append(raw.getDescription())
             .append("]")
             .append("\n" );
        
        position--;
      }
    }
  	finally{
      queue_mon.exit();
    }
  	
  	return trace.toString();
  }
  
  
  /**
   * Add a listener to be notified of queue events.
   * @param listener
   */
  public void registerQueueListener( MessageQueueListener listener ) {
    try{  listeners_mon.enter();
      //copy-on-write
      ArrayList new_list = new ArrayList( listeners.size() + 1 );
      new_list.addAll( listeners );
      new_list.add( listener );
      listeners = new_list;
    }
    finally{  listeners_mon.exit();  }
  }
  
  
  /**
   * Cancel queue event notification listener.
   * @param listener
   */
  public void cancelQueueListener( MessageQueueListener listener ) {
    try{  listeners_mon.enter();
      //copy-on-write
      ArrayList new_list = new ArrayList( listeners );
      new_list.remove( listener );
      listeners = new_list;
    }
    finally{  listeners_mon.exit();  }
  }

  
  
  /**
   * Notifty the queue (and its listeners) of a message sent externally on the queue's behalf.
   * @param message sent externally
   */
  public void notifyOfExternallySentMessage( Message message ) {
    ArrayList listeners_ref = listeners;

    DirectByteBuffer[] buffs = message.getData();
    int size = 0;
    for( int i=0; i < buffs.length; i++ ) {
      size += buffs[i].remaining( DirectByteBuffer.SS_NET );
    }
    
    for( int i=0; i < listeners_ref.size(); i++ ) {
      MessageQueueListener listener = (MessageQueueListener)listeners_ref.get( i );

      listener.messageSent( message );
      
      if( message.getType() == Message.TYPE_DATA_PAYLOAD ) {
        listener.dataBytesSent( size );
      }
      else {
        listener.protocolBytesSent( size );
      }
    }
    
    //System.out.println( "notifiedOfExternallySentMessage:: [" +message.getID()+ "] size=" +size );
    
  }
  
  
  
  
  
  private static class NotificationItem {
    private static final int MESSAGE_ADDED        = 0;
    private static final int MESSAGE_REMOVED      = 1;
    private static final int MESSAGE_SENT         = 2;
    private static final int DATA_BYTES_SENT      = 3;
    private static final int PROTOCOL_BYTES_SENT  = 4;
    private final int type;
    private RawMessage message;
    private int byte_count = 0;
    private NotificationItem( int notification_type ) {
      type = notification_type;
    }
  }
  
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -