⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 chordnode.java

📁 p2p仿真器。开发者可以工作在覆盖层中进行创造和测试逻辑算法或者创建和测试新的服务。PlanetSim还可以将仿真代码平稳转换为在Internet上的实验代码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
	protected NodeHandle closestPrecedingFinger(Id id) {
		cpfFound = false;
		temp[2] = this.nodeHandle; //n_id
		for (int i = bitsPerKey - 1; (i > -1 && !cpfFound); i--) {
			if (finger[i] != null && finger[i].getId().betweenE(this.id, id)) {
				temp[2] = finger[i];
				cpfFound = true;
			}
		}
		return temp[2];
	}
	/**
	 * Finds the successor of a node. If the result if null because the
	 * information its not contain in the finger table, it generate a request
	 * message and the the content response message has been store in the
	 * finger table
	 * 
	 * @param pos
	 *            position of NodeHandle in the start table
	 * @return successor NodeHandle of the node successor
	 */
	protected NodeHandle findSuccessor(int pos) {
		if (predecessor != null && start[pos].betweenE(predecessor.getId(), this.id)) {
			return this.nodeHandle;
		} else {
			return findPredecessor(pos);
		}
	}
	/**
	 * Finds the predecessor of a node. If the result if null because the
	 * information its not contain in the finger table, it generate a request
	 * message and the the content response message has been store in the
	 * finger table
	 * 
	 * @param pos
	 *            position of NodeHandle in the start table
	 * @return successor NodeHandle of the node predecessor
	 */
	protected NodeHandle findPredecessor(int pos) {
		if (start[pos].equals(this.id)) {
			return this.nodeHandle; 
		} else if (finger[0] != null && start[pos].betweenE(this.id, finger[0].getId())) {
			return finger[0];
		} else {
            temp[1] = closestPrecedingFinger(start[pos]); 
            String key = GenericFactory.generateKey();
            sendMessage(key,nodeHandle,temp[1],FIND_PRE,REQUEST,new IdMessage(start[pos]));
            addMessageListener(key, new FindPredListener(this, pos));
		}
		return finger[pos];
	}
	/**
	 * The node joins in the network
	 * 
	 * @param bootstrap
	 *            Id of arbitrary node in the network
	 */
	public void join(NodeHandle bootstrap) {
		if (bootstrap.equals(nodeHandle)) {
			for (int i = 0; i < bitsPerKey; i++) {
				finger[i] = this.nodeHandle;
			}
			predecessor = this.nodeHandle;
		} else {
            String key = GenericFactory.generateKey(); 
            predecessor = null;
            NodeHandle boots = null;
            sendMessage(key,nodeHandle,bootstrap, FIND_SUCC,REQUEST,null);
            addMessageListener(key, new FindSuccListener(this));
		}
	}
	
	/**
	 * The node leaves the network
	 */
	public void leave() {
		hasLeaved = true;
        sendMessage(null,nodeHandle,predecessor,SET_SUCC,REFRESH, new NodeMessage(finger[0]));
        sendMessage(null,nodeHandle,finger[0],SET_PRE,REFRESH,new NodeMessage(predecessor));
		Logger.log("LEAVING NODE " + id, Logger.EVENT_LOG);
	}
	/**
	 * Verify node immediate successor and tell the successor about this,
	 * moreover generate and update the successor list.
	 */
	protected void stabilize() {
		RouteMessage aMsg = null;
        
		if (hasReceivedSucc)
		{
			hasReceivedSucc = false;
            String key = GenericFactory.generateKey(); 
            sendMessage(key,nodeHandle,finger[0],GET_PRE,REQUEST,null);
            addMessageListener(key, new GetPreListener(this));
		}
		
		//if the links are been correct then sends the request to successor
		if (finger[0] != null && !finger[0].equals(this.nodeHandle)
				&& predecessor != null && !finger[0].equals(predecessor)) {
                sendMessage(null,nodeHandle,finger[0],SUCC_LIST,REQUEST,null);
		}
	}
	/**
	 * Verify if the indicated node is the true predecessor
	 * 
	 * @param nh
	 *            node to check
	 */
	protected void notify(NodeHandle nh) {
		if (predecessor == null || nh.getId().between(predecessor.getId(), this.id)) {
			predecessor = nh;
		}
	}
	/**
	 * This method allows to update a finger table position.
	 */
	protected void fixFingers() {
		if (nullPointers == 0) {
			nullPointers++;
		}
		setFinger(nullPointers, findSuccessor(nullPointers));
		nullPointers = (nullPointers + 1) % bitsPerKey;
	}
	
	/**
	 * Send a message to destination node directly.
	 * 
	 * @param message
	 *            Message to deliver from application.
	 * @param hint Proposed node handle for the next hop.
     * @param mode Message mode (REQUEST or REFRESH)
	 */
	protected void sendData(RouteMessage message, NodeHandle hint, int mode) {
		message.setNextHopHandle(hint);
		sendMessage(message);
		Results.incTraffic();
	}
    
	/**
	 * Send a message to unknown destination node via routing.
	 * 
	 * @param message
	 *            Message to deliver from application.
	 * @param nextHop Proposed next hop.
	 */
	protected void routingData(RouteMessage message, NodeHandle nextHop) {
		//enrutar missatge via key del missatge (el idMessage)
		if (predecessor != null && nextHop.getId().betweenE(predecessor.getId(), this.id)) {
			//deliver
			EndPoint endpoint = (EndPoint) endpoints.get(message
					.getApplicationId());
			endpoint.scheduleMessage(message, 0);
		} else if (finger[0] != null && nextHop.getId().betweenE(this.id, finger[0].getId())) {
			sendData(message, finger[0],REFRESH);
		} else {
            sendData(message,closestPrecedingFinger(nextHop.getId()),REQUEST);
			Results.updateHopsMsg(message.getSource().getId(), message.getKey());
		}
	}
	
	/**
	 * Sends a RouteMessage to the ring. If nextHop is null, it shows that the
	 * destination node is unknown and the message will be routed. If nextHop
	 * is not null, it's the destination NodeHandle and it must be reached
	 * directly.
	 * @param appId The applicationId The application identifier of the incoming 
	 * 				application message. 
	 * @param to The destination NodeHandle.
	 * @param nextHop The next hop NodeHandle if it is known.
	 * @param msg The incoming message from above layers. 
	 */
	public void routeData(String appId,NodeHandle to, NodeHandle nextHop, Message msg) {
		RouteMessage toSend = null;
		try {
			toSend = getDataMessage(appId,nodeHandle,to,nextHop,msg);
			if (nextHop != null) this.sendData(toSend, nextHop,REFRESH);
			else this.routingData(toSend, to);
		} catch (InitializationException e) {
			Logger.log("[" + id + "] cannot build a RouteMessage for this data [" + msg + "]",Logger.ERROR_LOG);
		}
	}

	/**
	 * Initiating a Broadcast Message
	 * 
	 * @param appId Application which sends this broadcast.
	 * @param to Destination of the broadcast.
	 * @param nextHop Next hop.
	 * @param msg Message to be sent into the broadcast.
	 */
	public void broadcast(String appId,
			NodeHandle to, NodeHandle nextHop, Message msg) {
		NodeHandle limit;
		NodeHandle r;
		for (int i = 0; i < bitsPerKey - 1; i++) {
			//Skip a redundant finger
			if (!finger[i].equals(finger[i + 1])) {
				r = finger[i];
				limit = finger[i + 1];
				RouteMessage aMsg = null;
				try {
					//String appId,Id from, Id to, Id nextHop
					aMsg = getBroadcastMessage(appId,
							this.nodeHandle, r, r,new BroadcastMessage(msg, limit));
					sendMessage(aMsg);
					Results.incTraffic();
				} catch (InitializationException e) {
					Logger.log("Cannot build a new instance of RouteMessage for BroadcastMessage",
									Logger.ERROR_LOG);
				}
			}
		}
		//Process the last finger
		RouteMessage aMsg = null;
		try {
			//String appId,Id from, Id to, Id nextHop
			aMsg = getBroadcastMessage(appId, this.nodeHandle,
					finger[bitsPerKey - 1],
					finger[bitsPerKey - 1],
					new BroadcastMessage(msg, nodeHandle));
			//I'm the limit of the last finger
			sendMessage(aMsg);
			Results.incTraffic();
		} catch (InitializationException e) {
			Logger.log("ERROR: Cannot get a RouteMessage of MessagePool\n"
					+ e.getMessage(), Logger.ERROR_LOG);
		}
	}
	/**
	 * Treats the messages and according to the case, executes the generic
	 * listeners or listeners specialized, forward the messages or send
	 * responses messages
	 * 
	 * @param msg
	 *            IMessage to treat
	 *  
	 */
	public void dispatcher(RouteMessage msg) {
		//Response message but not successor list type (key == null)
		if (msg.getMode() == REPLY && msg.getKey() != null) {
			String key = msg.getKey();
			try {
				((MessageListener) listeners.get(key)).onMessage(msg);
			} catch (NullPointerException e) {
				Logger.log("I'm [" + id + "]; not exist listener for key ["
						+ msg.getKey() + "]", Logger.ERROR_LOG);
			}
			removeMessageListener(key);
		} else if (msg.getMode() == Globals.ERROR) {
			if (msg.getKey() != null) {
				String key_fp = msg.getKey();
				Logger.log("Node " + this.id + " destroy message key " + key_fp
						+ " type " + Globals.typeToString(msg.getType()) + " content "
						+ msg.getMessage(), Logger.MSG_LOG);
				MessageListener lst = (MessageListener) listeners.get(key_fp);
				if (lst != null) {
					removeMessageListener(key_fp);
				}
			}
			//Successor lost
			if (finger[0] != null && msg.getSource().equals(finger[0])
					&& succList.size() > 0) {
				//if not exists, succ_list is unchanged
				succList.remove(finger[0]);
				if (succList.size() > 0) {
					finger[0] = (NodeHandle) succList.firstElement();
					//send notify
                    this.sendMessage(msg,null,nodeHandle,finger[0],finger[0],SET_PRE,REFRESH,new NodeMessage(nodeHandle));
				}
			} else
				//if source not exists, succ_list is unchanged
				succList.remove(msg.getSource());
		} else {
			switch (msg.getType()) {
				//DATA
				case DATA :
				    dispatchDataMessage(msg,REQUEST,REFRESH);
					break;
				//CONTROL
				//REFRESH
				case SET_SUCC :
					NodeHandle succ = ((NodeMessage) msg.getMessage()).getNode();
					setSucc(succ);
					GenericFactory.freeMessage(msg);
					break;
				case SET_PRE :
					setPred(((NodeMessage) msg.getMessage()).getNode());
					GenericFactory.freeMessage(msg);
					break;
				case NOTIFY :
					notify(msg.getSource());
					GenericFactory.freeMessage(msg);
					break;
				case BROADCAST :
					NodeHandle r, new_limit;
					BroadcastMessage bm = (BroadcastMessage) msg.getMessage();
					NodeHandle limit = bm.getLimit();
					Id limitId = limit.getId();
					planet.commonapi.Message info = bm.getInfo();
					
					for (int i = 0; i < bitsPerKey - 1; i++) {
						//Skip a redundant finger
						if (!finger[i].equals(finger[i + 1])) {
							//Forward while within "Limit"
							if (finger[i].getId().between(this.id, limitId)) {
								r = finger[i];
								//New Limit must not exceed Limit
								if (finger[i + 1].getId().between(this.id, limitId)) {
									new_limit = finger[i + 1];
								} else {
									new_limit = limit;
								}
								//no reuse of RouteMessage msg ==> send one
								// message to different nodes ==> requires
								// different messages
								planet.commonapi.RouteMessage aMsg = null;
								try {
									// String appId,Id from, Id to, Id nextHop
									aMsg = getBroadcastMessage(msg
											.getApplicationId(), this.nodeHandle, r, r,
											new BroadcastMessage(info,new_limit));
									sendMessage(aMsg);
									Results.incTraffic();
								} catch (InitializationException e) {
									Logger.log(
											"ERROR: Cannot get a RouteMessage of MessagePool\n"
											+ e.getMessage(), Logger.ERROR_LOG);
								}
							}
						}
					}
					Logger.log("Broadcast : Node " + this.id + " info : "
							+ info, Logger.EVENT_LOG);
					msg.setMessage(info);
					Results.decTraffic();
					((EndPoint) endpoints.get(msg.getApplicationId())).scheduleMessage(msg, 0);
					break;
				//REQUEST
				case FIND_SUCC :
					//source --> join();
					NodeHandle fSucc = findSuccessor(msg.getSource());
					if (fSucc != null) {
                        this.sendMessage(msg,msg.getKey(),nodeHandle,msg.getSource(),msg.getSource(),msg.getType(),REPLY,new NodeMessage(fSucc));
					} else {
						String key_fp = GenericFactory.generateKey(); 
                        NodeHandle aux = closestPrecedingFinger(msg.getSource().getId());
                        addMessageListener(key_fp, new FindPredListener(this, msg.getKey()));
                        this.sendMessage(msg,key_fp,nodeHandle,aux,aux,FIND_PRE,REQUEST,new IdMessage(msg.getSource().getId()));
					}
					break;
				case FIND_PRE :
				    Id idMesg = ((IdMessage) msg.getMessage()).getNode();
					if (finger[0] != null && idMesg.betweenE(this.id, finger[0].getId())) {
						//return successor
						Id msgId = ((IdMessage) msg.getMessage()).getNode();
						try {
                            sendMessage(msg,msg.getKey(),GenericFactory.buildNodeHandle(msgId,true),
                                    msg.getSource(),msg.getSource(),FIND_PRE,REPLY,new NodeMessage(getSucc()));
						} catch (InitializationException e1) {
							e1.printStackTrace();
						}
					 } else if (idMesg.equals(getId())) {
							try {
                                sendMessage(msg,msg.getKey(),GenericFactory.buildNodeHandle(idMesg, true),
                                        msg.getSource(),msg.getSource(),FIND_PRE,REPLY,new NodeMessage(nodeHandle));
							} catch(InitializationException e) {
								e.printStackTrace();
							}
					 } else {
						//next node
                        NodeHandle aux = closestPrecedingFinger(idMesg);
                        sendMessage(msg,msg.getKey(),msg.getSource(),aux,aux,msg.getType(),msg.getMode(),msg.getMessage());
					}
					break;
				case GET_PRE :
					//origen --> stabilize();
                    sendMessage(msg,msg.getKey(),nodeHandle,msg.getSource(),msg.getSource(),GET_PRE,REPLY,new NodeMessage(predecessor));
					break;
				case SUCC_LIST :
					if (msg.getMode() == REQUEST) {
                        this.sendMessage(msg,msg.getKey(),nodeHandle,msg.getSource(),msg.getSource(),SUCC_LIST,REPLY,new SuccListMessage(succList));
					} else if (msg.getMode() == REPLY) {
						SuccListMessage succs = (SuccListMessage) msg.getMessage();
                        succList.clear();
                        succList.add(msg.getSource());
                        succList.addAll(succs.getSuccs());
                        cleanSuccList();
						GenericFactory.freeMessage(msg);
					}
					break;
			}
		}
	}
	/**
	 * Given a time fraction, treats the messages and executes the periodicas
	 * actions, as for example, the stabilization methods
	 * @param actualStep Actual simulation step.
	 * @return true if the node needs to continue the stabilization
	 * process. false if node is just stabilized.
	 */
	public boolean process(int actualStep) {
		clearFingerChanges();
		super.process(actualStep);
		while (hasMoreMessages()) {
			dispatcher(nextMessage());
		}
		if (getFingerChanges()>0) stabRate = 0;
		else stabRate++;
		invokeByStepToAllApplications();
		return stabRate < realStabilizationRate; 
	}
	
	/**
	 * @see planet.commonapi.Node#isAlive() Test if Node is alive
	 */
	public boolean isAlive() {
		return !hasFailed && !hasLeaved;
	}

	public String toString() {
		return "{ChordNode [" + this.id + "]: succ[" + finger[0] + "]: succ2f["
				+ finger[1] + "]: succList[" + succList + "]: pred["
				+ predecessor + "]}";
	}
	
	/**
	 * Return the local NodeHandle.
	 * 
	 * @see planet.commonapi.Node#getLocalHandle()
	 * @return Actual NodeHandle for this Node.
	 */
	public NodeHandle getLocalHandle() {
		return nodeHandle;
	}
	
	

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -