⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 connection.java

📁 P2P协议GUNTELLA的java源代码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:


  // TODO return a boolean indicating if the message was enqueued

  /**
   *  Send a priority message. Priority message are not subject to flow control
   *  dropping
   *
   *  @param m message
   */
  void prioritySend(Message m) throws IOException
  {
    if ( shutdownFlag )
    {
      return;
    }

    enqueueMessage(m, true);
  }

  /**
   *  Sends a Message through this connection
   *
   *  @param m message
   */
  void send(Message m) throws IOException
  {
    if ( shutdownFlag )
    {
      return;
    }

    enqueueMessage(m, false);

  }

  /**
   *  Sends a message down the connection and sends any response
   *  to <code>MessageReceiver</code>
   *
   */
  void sendAndReceive(Message m,
                      MessageReceiver receiver) throws IOException
  {
    if ( shutdownFlag )
    {
      return;
    }

    Log.getLog().logDebug("Connection sendAndReceive: " +
                           m.getType() +
                           " to to " +
                           socket.getInetAddress().getHostAddress());

    router.routeBack(m, receiver);

    prioritySend(m);
  }

  /**
   *  Adds a message to the send queue, subject to dropping due to a message
   *  backlog
   *
   */
  void enqueueMessage(Message message, boolean priority)
  {

    if ( droppedCount > ( .5 * outputCount ) )
    {
      // drop this connection, it is hung or performing at less than half our
      // send rate
      shutdown();
      return;
    }

    int type = message.getType();
    int backlogSize = messageBacklog.size();
    
    if ( !priority )
    {
      switch (type)
      {
        case Message.PING:
        {
          if ( backlogSize > BACKLOG_PING_LEVEL )
          {
            droppedCount++;
            return;
          }

          break;
        }

        case Message.PONG:
        {
          if ( backlogSize > BACKLOG_PONG_LEVEL )
          {
            droppedCount++;
            return;
          }

          break;
        }

        case Message.QUERY:
        {
          if ( backlogSize > BACKLOG_QUERY_LEVEL )
          {
            droppedCount++;
            return;
          }

          break;
        }

        case Message.QUERYREPLY:
        {
          if ( backlogSize > BACKLOG_QUERYREPLY_LEVEL )
          {
            droppedCount++;
            return;
          }

          break;
       }

        case Message.PUSH:
        {
          if ( backlogSize > BACKLOG_PUSH_LEVEL )
          {
            droppedCount++;
            
            // give up on this connection
            shutdown();
            return;
          }

         break;
        }
      }
    }
    
    MessageData messageData = new MessageData(message, priority);

    if ( priority )
    {
      messageBacklog.add(0, messageData);
    }
    else
    {
      // add non-priority messages at the end of the list
      synchronized ( messageBacklog )
      {
        // prevent race between size/add
        int currentSize = messageBacklog.size();
        messageBacklog.add(0 == currentSize ? 0 : currentSize - 1, 
                           messageData);
      }  
    }

    synchronized ( asyncSender )
    {
      asyncSender.notify();
    }
    
  }



  /**
   *  Handles a serious error on the connection
   *
   */
  void handleConnectionError(Exception e)
  {
    Log.getLog().logInformation("Shutting down connection: " + host);
    status = STATUS_FAILED;
    if ( null != e)
    {
      Log.getLog().log(e);
    }
    shutdown();
  }

  /**
   *  Get the connected host
   *
   *  @return host name
   */
  public String getConnectedServant()
  {
    return host;
  }

  /**
   *  Get the current status of the connection
   *
   *  @returns status
   */
  public int getStatus()
  {
    return status;
  }

  /**
   *  Get the type of connection, incoming or outgoing
   *
   *  @return connection type
   */
  public int getType()
  {
    return type;
  }

	/**
	 *  Get the message output count
	 *
	 *  @return output
	 */
	public int getMessageOutput()
	{
		return outputCount;
	}

	/**
	 *  Get the message input count
	 *
	 *  @return input
	 */
	public int getMessageInput()
	{
		return inputCount;
	}

  /**
   *  Get the number of messages dropped on this connection
   *
   *  @return dropcount
   */
  public int getMessageDropCount()
  {
    return droppedCount;
  }

	/**
	 *  Get the lenght of time the connection has lived
	 *
	 *  @return time in seconds
	 */
	public int getUpTime()
	{
		long msLife = System.currentTimeMillis() - createTime;

		return (int)(msLife/1000);
	}

  /**
   *  Returns the timestamp of the last send
   *
   *  @return timestamp
   */
  public long getSendTime()
  {
    return sendTime;
  }

  /**
   *  Message data stored in the message backlog
   *
   */
  class MessageData
  {
    private Message message;
    private boolean priority;

    /**
     * Constructs message data
     *
     *  @param message GNUTella message
     *  @param priority priority messages are not subject to dropping
     */
    MessageData(Message message, boolean priority)
    {
      this.message = message;
      this.priority = priority;
    }


    /**
     *  Get the message
     *
     *  @return message
     */
    Message getMessage()
    {
      return message;
    }

    /**
     *  Check if this is a priority message. Generally, messages originated by
     *  the JTella servant are considered priority messages
     *
     *  @return priority flag
     */
    boolean isPriority()
    {
      return priority;
    }
  }


  /**
   *  Provides a mechanism to send a message and handle the problem of
   *  blocking on write, due to an unresponsive servant on the connection
   *
   */
  class AsyncSender extends Thread
  {
    private boolean shutdown = false;

    AsyncSender()
    {
      super("AsyncSender");
    }

    /**
     *   Get the message
     *
     */
    public Message getMessage()
    {
      int size = messageBacklog.size();

      while ( 0 == size && !shutdown )
      {
        try
        {
          synchronized ( this )
          {
            wait();
          }              
        }
        catch (InterruptedException ie)
        {
        }

        size = messageBacklog.size();
      }
      
      if ( shutdown )
      {
        return null;
      }

      return ((MessageData)messageBacklog.remove(0)).getMessage();
    }

    public void shutdown()
    {
      shutdown = true;
      interrupt();
    }

    public void run()
    {
      while (!shutdown)
      {
        Message message = getMessage();

        try
        {
          Connection.this.sendTime = System.currentTimeMillis();
          byte[] messageBytes = message.getByteArray();
  		    outputCount++;

          Connection.this.outputStream.write(messageBytes,
                                             0,
                                             messageBytes.length);
          Connection.this.outputStream.flush();

          // temp
          /*
          StringBuffer buffer = new StringBuffer();
          for (int i = 0; i < messageBytes.length; i++) 
          {
            buffer.append("[" +
                          Integer.toHexString(messageBytes[i]) +
                          "]");
   
          }
          Log.getLog().logDebug("Sent message: " +
                                message.getType() +
                                "\n" +
                                buffer.toString());
          */
          // end temp
          
        }
        catch (Exception e)
        {
          shutdown();
          Log.getLog().log(e);
        }

      }
    }
  }
}


⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -