📄 router.java
字号:
return queryRouteTable.get(message.getGUID());
}
// TODO history for routing
/**
* Routes PING messages -
* RULES: Route PING messages to all connections
* except originator
*
*/
void routePingMessage(RouteMessage m)
{
if ( m.getConnection().getStatus() != NodeConnection.STATUS_OK )
{
// Originating connection does not exist, so drop the message
// If we received Pong responses, there would be no connection to
// route them to
return;
}
// Make a local connection list to avoid concurrency issues
LinkedList listcopy = connectionList.getList();
ListIterator iterator = listcopy.listIterator(0);
prepareMessage(m.getMessage());
while ( iterator.hasNext() )
{
NodeConnection connection = (NodeConnection)iterator.next();
if ( !connection.equals(m.getConnection()) &&
connection.getStatus() == NodeConnection.STATUS_OK )
{
routerSend(connection, m.getMessage());
}
}
// History of Pings
pingRouteTable.put(m.getMessage().getGUID(),
m.getConnection());
}
/**
* Routes PONG messages -
* RULES: Route PONG messages only to the connection
* the PING arrived on
*
*/
void routePongMessage(RouteMessage m)
{
// Harvest this servant from the pong message
PongMessage pongMessage = (PongMessage)m.getMessage();
hostCache.addHost(new Host(pongMessage));
NodeConnection originator = pingRouteTable.get(m.getMessage().getGUID());
if ( null != originator &&
originator.getStatus() == NodeConnection.STATUS_OK )
{
prepareMessage(m.getMessage());
routerSend(originator, m.getMessage());
}
else
{
Log.getLog().logInformation("No connection for routing pong");
}
}
/**
* Routes QUERY messages -
* RULES: Route QUERY messages to all connections
* except the originator
*
*/
void routeQueryMessage(RouteMessage m)
{
if ( m.getConnection().getStatus() != NodeConnection.STATUS_OK )
{
// Originating connection does not exist, so drop the message
// If we received a response to the query, there would be no
// connection to route it to
return;
}
// Make a local connection list to avoid concurrency L
LinkedList listcopy = connectionList.getList();
ListIterator iterator = listcopy.listIterator(0);
prepareMessage(m.getMessage());
while ( iterator.hasNext() )
{
NodeConnection connection = (NodeConnection)iterator.next();
if ( !connection.equals(m.getConnection()) &&
connection.getStatus() == NodeConnection.STATUS_OK )
{
routerSend(connection, m.getMessage());
}
}
// History of Queries
queryRouteTable.put(m.getMessage().getGUID(),
m.getConnection());
// inform any search listener
if ( 0 != searchReceivers.size() )
{
fireSearchMessage((SearchMessage)m.getMessage());
}
}
/**
* Routes QUERY REPLY messages -
* Rules: Route Query Replys to the connection
* which had the Query
*
*/
void routeQueryReplyMessage(RouteMessage m)
{
NodeConnection originator = queryRouteTable.get(m.getMessage().getGUID());
if ( null != originator &&
originator.getStatus() == NodeConnection.STATUS_OK )
{
prepareMessage(m.getMessage());
routerSend(originator, m.getMessage());
// Record query hit route, for push forwarding
// Push messages are routed by Servant ID
queryHitRouteTable.put
(((SearchReplyMessage)m.getMessage()).getClientIdentifier(),
m.getConnection());
}
else
{
Log.getLog().logInformation("No connection for routing query reply");
}
}
/**
* Routes PUSH messages -
* Rules: Route PUSH on the connection that had the QUERYHIT
*
*
*/
void routePushMessage(RouteMessage m)
{
PushMessage pushMessage = (PushMessage)m.getMessage();
if ( 0 != pushReceivers.size() &&
Utilities.getClientGUID().equals(pushMessage.getClientIdentifier()))
{
// this is a push request for the JTella servant
firePushMessage(pushMessage);
return;
}
NodeConnection originator =
queryHitRouteTable.get(pushMessage.getClientIdentifier());
if ( null != originator )
{
prepareMessage(pushMessage);
routerSend(originator, pushMessage);
}
else
{
Log.getLog().logInformation("No connection for routing push");
}
}
/**
* Updates a message for sending
*
* @param message message to update
*/
void prepareMessage(Message message)
{
message.setTTL((byte) (message.getTTL() - 1));
message.setHops((byte) (message.getHops() + 1));
}
/**
* Utility method for common send
*
* @param connection connection
* @param message message
*/
boolean routerSend(Connection connection, Message message)
{
try
{
connection.send(message);
}
catch (IOException io)
{
Log.getLog().log(io);
}
return true; // todo fix
}
/**
* Performs some validation against network trafic
*
* @return true if the message is acceptable, false otherwise
*/
boolean validateMessage(Message m)
{
//---------------------------------------------------------------
// The idea is to limit trafic, making sure hops doesn't exceed
// a reasonable amount - 7
//---------------------------------------------------------------
if ( m.getHops() > MAX_HOPS )
{
Log.getLog().logInformation("Router dropped message exceeding max hops");
return false;
}
if ( m.getTTL() > MAX_TTL)
{
Log.getLog().logInformation("Router dropped message exceeding max ttl");
return false;
}
if ( m.getTTL() > MAX_HOPS &&
m.getTTL() < MAX_TTL )
{
Log.getLog().logInformation("Router adjusted message ttl to 7");
m.setTTL(MAX_HOPS);
}
if ( (m.getTTL() + m.getHops()) > MAX_HOPS )
{
Log.getLog().logInformation("Router adjusted message ttl to 7");
m.setTTL((byte)(MAX_HOPS - m.getHops()));
}
return true;
}
/**
* Sends a search message to listeners
*
* @param searchMessage search message to send
*/
void fireSearchMessage(SearchMessage searchMessage)
{
Vector v = (Vector)searchReceivers.clone();
Enumeration e = v.elements();
while ( e.hasMoreElements() )
{
MessageReceiver receiver = (MessageReceiver)e.nextElement();
receiver.receiveSearch((SearchMessage)searchMessage);
}
}
/**
* Sends a push message to listener(s)
*
* @param pushMessage push message to send
*/
void firePushMessage(PushMessage pushMessage)
{
Vector v = (Vector)pushReceivers.clone();
Enumeration e = v.elements();
while ( e.hasMoreElements() )
{
MessageReceiver receiver = (MessageReceiver)e.nextElement();
receiver.receivePush((PushMessage)pushMessage);
}
}
/**
* Records a message to route
*
*/
class RouteMessage
{
Message m;
NodeConnection connection;
RouteMessage(Message m, NodeConnection connection)
{
this.m = m;
this.connection = connection;
}
/**
*
*
*/
Message getMessage()
{
return m;
}
/**
*
*
*/
NodeConnection getConnection()
{
return connection;
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -