⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 router.java

📁 P2P协议GUNTELLA的java源代码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
    return queryRouteTable.get(message.getGUID());
  }

  // TODO history for routing

  /**
   *  Routes PING messages -
   *  RULES: Route PING messages to all connections
   *  except originator
   *
   */
  void routePingMessage(RouteMessage m)
  {
    if ( m.getConnection().getStatus() != NodeConnection.STATUS_OK )
    {
      // Originating connection does not exist, so drop the message
      // If we received Pong responses, there would be no connection to
      // route them to
      return;
    }
    
    // Make a local connection list to avoid concurrency issues
    LinkedList listcopy = connectionList.getList();
    ListIterator iterator = listcopy.listIterator(0);

    prepareMessage(m.getMessage());

    while ( iterator.hasNext() )
    {
      NodeConnection connection = (NodeConnection)iterator.next();

      if ( !connection.equals(m.getConnection()) &&
           connection.getStatus() == NodeConnection.STATUS_OK )
      {
        routerSend(connection, m.getMessage());
      }
    }

    // History of Pings
    pingRouteTable.put(m.getMessage().getGUID(),
                       m.getConnection());
  }

  /**
   *  Routes PONG messages -
   *  RULES: Route PONG messages only to the connection
   *  the PING arrived on
   *
   */
  void routePongMessage(RouteMessage m)
  {
    // Harvest this servant from the pong message
    PongMessage pongMessage = (PongMessage)m.getMessage();
    hostCache.addHost(new Host(pongMessage));

    NodeConnection originator = pingRouteTable.get(m.getMessage().getGUID());
    if ( null != originator &&
         originator.getStatus() == NodeConnection.STATUS_OK )
    {
      prepareMessage(m.getMessage());
      
      routerSend(originator, m.getMessage());
    }
    else
    {
      Log.getLog().logInformation("No connection for routing pong");
    }
  }

  /**
   *  Routes QUERY messages -
   *  RULES: Route QUERY messages to all connections
   *  except the originator
   *
   */
  void routeQueryMessage(RouteMessage m)
  {
    
    if ( m.getConnection().getStatus() != NodeConnection.STATUS_OK )
    {
      // Originating connection does not exist, so drop the message
      // If we received a response to the query, there would be no
      // connection to route it to
      return;
    }
    
    // Make a local connection list to avoid concurrency L
    LinkedList listcopy = connectionList.getList();
    ListIterator iterator = listcopy.listIterator(0);

    prepareMessage(m.getMessage());

    while ( iterator.hasNext() )
    {
      NodeConnection connection = (NodeConnection)iterator.next();

      if ( !connection.equals(m.getConnection()) &&
           connection.getStatus() == NodeConnection.STATUS_OK )
      {
        routerSend(connection, m.getMessage());
        
      }
    }

    // History of Queries
    queryRouteTable.put(m.getMessage().getGUID(),
                        m.getConnection());

    // inform any search listener
    if ( 0 != searchReceivers.size() )
    {
      fireSearchMessage((SearchMessage)m.getMessage());
    }
  }

  /**
   *  Routes QUERY REPLY messages -
   *  Rules: Route Query Replys to the connection
   *  which had the Query
   *
   */
  void routeQueryReplyMessage(RouteMessage m)
  {
    NodeConnection originator = queryRouteTable.get(m.getMessage().getGUID());

    if ( null != originator &&
         originator.getStatus() == NodeConnection.STATUS_OK )
    {
       prepareMessage(m.getMessage());
      
       routerSend(originator, m.getMessage());

        // Record query hit route, for push forwarding
        // Push messages are routed by Servant ID
       queryHitRouteTable.put
              (((SearchReplyMessage)m.getMessage()).getClientIdentifier(),
                 m.getConnection());
    }
    else
    {
      Log.getLog().logInformation("No connection for routing query reply");
    }
  }

  /**
   *  Routes PUSH messages -
   *  Rules: Route PUSH on the connection that had the QUERYHIT
   *
   *
   */
  void routePushMessage(RouteMessage m)
  {

    PushMessage pushMessage = (PushMessage)m.getMessage();
    if ( 0 != pushReceivers.size() &&
         Utilities.getClientGUID().equals(pushMessage.getClientIdentifier()))
    {
      // this is a push request for the JTella servant
      firePushMessage(pushMessage);
      return;
    }

    NodeConnection originator = 
        queryHitRouteTable.get(pushMessage.getClientIdentifier());

    if ( null != originator )
    {
      prepareMessage(pushMessage);
      routerSend(originator, pushMessage);
    }
    else
    {
      Log.getLog().logInformation("No connection for routing push");
    }
  }

  /**
   *  Updates a message for sending
   *
   *  @param message message to update
   */
  void prepareMessage(Message message)
  {
      message.setTTL((byte) (message.getTTL() - 1)); 
      message.setHops((byte) (message.getHops() + 1));
  }
  
  /**
   *  Utility method for common send
   *
   *  @param connection connection
   *  @param message message
   */
  boolean routerSend(Connection connection, Message message)
  {
    try
    {
      connection.send(message);
    }
    catch (IOException io)
    {
        Log.getLog().log(io);
    }
    
    return true; // todo fix
  }
  
  /**
   *  Performs some validation against network trafic
   *
   *  @return true if the message is acceptable, false otherwise
   */
  boolean validateMessage(Message m)
  {

    //---------------------------------------------------------------
    //  The idea is to limit trafic, making sure hops doesn't exceed
    //  a reasonable amount - 7
    //---------------------------------------------------------------
    if ( m.getHops() > MAX_HOPS )
    {
      Log.getLog().logInformation("Router dropped message exceeding max hops");
      return false;
    }

    if ( m.getTTL() > MAX_TTL)
    {
      Log.getLog().logInformation("Router dropped message exceeding max ttl");
      return false;
    }


    if ( m.getTTL() > MAX_HOPS &&
         m.getTTL() < MAX_TTL )
    {
      Log.getLog().logInformation("Router adjusted message ttl to 7");
      m.setTTL(MAX_HOPS);
    }

    if ( (m.getTTL() + m.getHops()) > MAX_HOPS )
    {
      Log.getLog().logInformation("Router adjusted message ttl to 7");
      m.setTTL((byte)(MAX_HOPS - m.getHops()));
    }


    return true;
  }

  /**
   *  Sends a search message to listeners
   *
   *  @param searchMessage search message to send
   */
  void fireSearchMessage(SearchMessage searchMessage)
  {
    Vector v = (Vector)searchReceivers.clone();
    Enumeration e = v.elements();

    while ( e.hasMoreElements() )
    {
      MessageReceiver receiver = (MessageReceiver)e.nextElement();
      receiver.receiveSearch((SearchMessage)searchMessage);
    }

  }

  /**
   *  Sends a push message to listener(s)
   *
   *  @param pushMessage push message to send
   */
  void firePushMessage(PushMessage pushMessage)
  {
    Vector v = (Vector)pushReceivers.clone();
    Enumeration e = v.elements();

    while ( e.hasMoreElements() )
    {
      MessageReceiver receiver = (MessageReceiver)e.nextElement();
      receiver.receivePush((PushMessage)pushMessage);
    }

  }

  /**
   *  Records a message to route
   *
   */
  class RouteMessage
  {
    Message m;
    NodeConnection connection;

    RouteMessage(Message m, NodeConnection connection)
    {
      this.m = m;
      this.connection = connection;
    }

    /**
     *
     *
     */
    Message getMessage()
    {
      return m;
    }

    /**
     *
     *
     */
    NodeConnection getConnection()
    {
      return connection;
    }
  }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -