📄 nodeimpl.java
字号:
package planet.generic.commonapi;
import java.util.Collection;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Vector;
import planet.commonapi.Application;
import planet.commonapi.EndPoint;
import planet.commonapi.Id;
import planet.commonapi.Message;
import planet.commonapi.Node;
import planet.commonapi.NodeHandle;
import planet.commonapi.RouteMessage;
import planet.commonapi.exception.InitializationException;
import planet.commonapi.results.ResultsConstraint;
import planet.commonapi.results.ResultsEdge;
import planet.generic.commonapi.factory.GenericFactory;
import planet.simulate.Logger;
import planet.simulate.MessageListener;
import planet.simulate.Results;
import planet.util.Properties;
import planet.util.Queue;
import planet.util.QueueFull;
import planet.util.timer.TimerTask;
/**
* Superclass which represents a node in a peer-to-peer system, regardless of
* the underlying protocol. All nodes, implement the methods of this class.
*
* @author Pedro Garc韆
* @author Carles Pairot
* @author Ruben Mondejar
*/
public abstract class NodeImpl
implements
planet.commonapi.Node,
java.io.Serializable {
protected Id id;
protected transient Hashtable listeners;
private transient Queue incoming;
private transient Queue outgoing;
private int processed = 0;
/**
* NodeHandle for the actual Node.
*/
protected NodeHandle nodeHandle = null;
/**
* To contain int[] with [firstTime][period] values for each task.
* If period is zero (0), corresponds with a task to execute only once.
*/
private Vector timer = null;
/**
* To contain the jobs (TimerTask) to execute at each time.
*/
private Vector tasks = null;
private long timerCount = 0; //counter
private long timerNext = Long.MAX_VALUE; //next time to activate
/**
* Local EndPoints.
*/
protected Hashtable endpoints;
protected boolean role = true;
/**
* Initializes internal data structures.
*/
public NodeImpl() {
timer = new Vector(2);
tasks = new Vector(2);
endpoints = new Hashtable();
init();
}
/**
* Inicialite
*/
private void init() {
listeners = new Hashtable();
incoming = new Queue(Properties.simulatorQueueSize);
outgoing = new Queue(Properties.simulatorQueueSize);
}
/**
* The node joins in the network
*
* @param bootstrap
* Id of a node in the network
*/
public abstract void join(NodeHandle bootstrap);
/**
* The node leaves the network
*/
public abstract void leave();
/**
* Given a time fraction, the node it can do everything what needs during
* this
*
* @param actualStep
* Actual step in simulation time.
* @return Always true.
*/
public boolean process(int actualStep) {
processed = 0;
processTasks();
return true;
}
/**
* Invokes to each registered Application, by the related EndPoint,
* the <b>byStep()</b> method. This inform to each application for
* a new step. This method must to be invoked at the end of each
* <b>process(int)</b> node implementation.
*/
protected void invokeByStepToAllApplications() {
Iterator it = endpoints.values().iterator();
while (it.hasNext())
((EndPoint)it.next()).byStep();
}
/**
* Puts a message in the incoming queue of this node
*
* @param msg
* received Message
*/
public void receive(RouteMessage msg) throws QueueFull {
Logger.logReceive(id, msg, Logger.MSG_LOG);
incoming.add(msg);
}
/**
* A wrapper method, that sends a RouteMessage with the specified data.
* If any exception has ocurred during the send, a log with the description
* is made.
* @param key Key of the communication.
* @param from Communication source
* @param to Communication destination
* @param nextHop Next hop in the communication.
* @param type RouteMessage type
* @param mode RouteMessage mode
* @param appId Name of the related application.
* @param msg Data to be sent with the RouteMessage
* @return A valid RouteMessage with the specified data or null, if
* any error has ocurred.
*/
public RouteMessage buildMessage(String key, NodeHandle from, NodeHandle to, NodeHandle nextHop, int type, int mode, String appId, Message msg)
{
RouteMessage bMsg = null;
try {
bMsg = GenericFactory.getMessage(key,from,to,nextHop,msg,type,mode,appId);
} catch (InitializationException e) {
Logger.log("ERROR: Cannot get a RouteMessage of MessagePool\n"
+ e.getMessage(), Logger.ERROR_LOG);
}
return bMsg;
}
/**
* Builds a new RouteMessage with all the values appeared in <b>toCopy</b>, and
* the specified <b>nextHop</b>.
* @param toCopy Message to be cloned.
* @return A valid RouteMessage or null if there are any error.
*/
public RouteMessage buildMessage(RouteMessage toCopy)
{
RouteMessage msg = null;
try {
msg = GenericFactory.getMessage(toCopy.getKey(),toCopy.getSource(),toCopy.getDestination(),toCopy.getNextHopHandle(),toCopy.getMessage(),toCopy.getType(),toCopy.getMode(),toCopy.getApplicationId());
//don't update statistics
} catch (InitializationException e)
{
Logger.log("ERROR: Cannot get a RouteMessage of MessagePool\n" + e.getMessage(), Logger.ERROR_LOG);
GenericFactory.freeMessage(msg);
msg = null;
}
return msg;
}
/**
* Puts a message in the outcoming queue of this node
*
* @param msg
* sended Message
*/
public void send(RouteMessage msg) throws QueueFull {
outgoing.add(msg);
Logger.logSend(id, msg, Logger.MSG_LOG);
}
/**
* Puts the RouteMessage <b>msg</b> to the outgoing queue of this node.
* @param msg RouteMessage to be sent.
*/
public boolean sendMessage(RouteMessage msg)
{
try {
send(msg);
} catch (QueueFull e){
Logger.log("Outgoing Queue of Node " + this.id + " is Full",
Logger.ERROR_LOG);
GenericFactory.freeMessage(msg);
return false;
}
return true;
}
/**
* A wrapper method, that sends a RouteMessage with the specified data.
* If any exception has ocurred during the send, a log with the description
* is made.
* @param key Key of the communication.
* @param from Communication source
* @param to Communication destination
* @param type RouteMessage type
* @param mode RouteMessage mode
* @param msg Data to be sent with the RouteMessage
*/
public boolean sendMessage(String key, NodeHandle from, NodeHandle to, int type, int mode, Message msg)
{
RouteMessage bMsg = null;
try {
bMsg = GenericFactory.getMessage(key, from, to, type, mode);
bMsg.setMessage(msg);
return sendMessage(bMsg);
} catch (InitializationException e) {
Logger.log("ERROR: Cannot get a RouteMessage of MessagePool\n"
+ e.getMessage(), Logger.ERROR_LOG);
return false;
}
}
/**
* A wrapper method, that sends a RouteMessage with the specified data.
* If any exception has ocurred during the send, a log with the description
* is made.
* @param key Key of the communication.
* @param from Communication source
* @param to Communication destination
* @param nextHop Next hop in the communication.
* @param type RouteMessage type
* @param mode RouteMessage mode
* @param msg Data to be sent with the RouteMessage
*/
public boolean sendMessage(String key, NodeHandle from, NodeHandle to, NodeHandle nextHop, int type, int mode, Message msg)
{
RouteMessage bMsg = null;
try {
bMsg = GenericFactory.getMessage(key,from,to,nextHop,msg,type,mode,null);
return sendMessage(bMsg);
} catch (InitializationException e) {
Logger.log("ERROR: Cannot get a RouteMessage of MessagePool\n"
+ e.getMessage(), Logger.ERROR_LOG);
return false;
}
}
/**
* A wrapper method, that send a RouteMessage with the specified data.
* If any error has ocurred during the send, a log with the description
* is made.
* @param rMsg RouteMessage to be used.
* @param key Communication key
* @param from Communication source.
* @param to Communication destination.
* @param type RouteMessage type
* @param mode RouteMessage mode
* @param msg Data to be sent with the RouteMessage
*/
public void sendMessage(RouteMessage rMsg,String key, NodeHandle from, NodeHandle to, NodeHandle nextHop,int type, int mode, Message msg)
{
rMsg.setValues(key,from,to,nextHop,type,mode,msg,rMsg.getApplicationId());
sendMessage(rMsg);
}
/**
* Returns the present outgoing queue of this node
*
* @return outgoing Queue of Messages
*/
public Queue outMessages() {
return outgoing;
}
public Queue inMessages()
{
return incoming;
}
/**
* Checks if the incoming queue have a messages to send
*
* @return return true if has incoming messages to process
*/
protected boolean hasMoreMessages() {
return processed < Properties.simulatorProcessedMessages && incoming.size() > 0;
}
/**
* Return the next message and dequeue this of the incoming queue
*
* @return return the next Message
*/
protected RouteMessage nextMessage() {
processed++;
return (RouteMessage) incoming.remove();
}
/**
* Returns the id of the node *
*
* @return Id node identificator
*/
public Id getId() {
return id;
}
/**
* Adds a listener to the node so that it executes herself when the message
* response arrives
*
* @param key
* String representation of id routing message
* @param listener
* MessageListener linked to Message
*/
public void addMessageListener(String key, MessageListener listener) {
listeners.put(key, listener);
}
/**
* Remove the message listener of the node
*
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -