⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 broker.java

📁 发布/订阅系统路由重配算法,可应用于ad hoc环境
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
				if(environment.evaluate(path)){					//System.out.println("positive path evaluation.");					sendMessage(this,new Request(reversePath(path),new double[path.length]));				}			}catch(NullPointerException e){				return new DestinationSet();			}		}		// message forwarding		destinations = new DestinationSet();		if(path.length <= environmentSize){			destinations.setMessage(new Broadcast(path, bloomFilter));			destinations.addAll(neighbors);			for(int i=0; i<path.length; i++) {				destinations.remove(path[i]);			}			// destinations.remove(from);		}		return destinations;	}		/**	 * Extends the path of a broadcast message by adding the current broker.	 * @param path the original path.	 * @return the extended path.	 */ 	private Broker[] extendPath(Broker[] path){		Broker[] extendedPath; // the extended path		extendedPath = new Broker[path.length+1];		System.arraycopy(path,0,extendedPath,0,path.length);		extendedPath[path.length] = this;		return extendedPath;	}		/**	 * Reverses the path of brokers.	 * @param path the original path.	 * @return the reversed path.	 */	private Broker[] reversePath(Broker[] path){		Broker[] reversedPath;		reversedPath = new Broker[path.length];		for(int i=0, j=path.length-1; j>=0; i++, j--){			reversedPath[j]=path[i];		}		return reversedPath;	}		// handle a request	private DestinationSet handleRequest(Destination from, Request request) {		DestinationSet destinations;		Broker[] path;		double[] costs;		int min;		// TODO remove		//System.out.println("Request received at "+this);				// Füge eigene Kosten hinzu, wenn Du nicht der issuer bist		path = request.getPath();		costs = request.getCosts();		try{			environment.addCosts(path,costs);		}catch(NullPointerException e){			return new DestinationSet();		}				destinations = new DestinationSet(request);		if (path[path.length-1] == this) {			min = costs.length-1;			for(int i=costs.length-1; i>=0; i--){				if(costs[min] > costs[i]){					min = i;				}			}						if(min!=path.length-1){			// TODO remove: Lock issued//			System.out.print("Kosten: ");//			for(int x=0; x<costs.length; x++){//				System.out.print(costs[x]+" ");//			}//			System.out.print("\n Dists: ");//			for(int x=0; x<path.length-1; x++){//				System.out.print(net.getCommunicationCosts(path[x],path[x+1])+" ");//			}//			System.out.println(" "+net.getCommunicationCosts(path[0],path[path.length-1]));								handleMessage(this,new Lock(reversePath(path),path[min+1],path[min]));			}		}else{			destinations.addAll(neighbors);			for(int i=0; i<path.length; i++){				if(path[i]==this){					destinations.restrictTo(path[i+1]);					break;				}			}		}				return destinations;	}	// reconfiguration stuff	private DestinationSet handleLock(Destination bSender, Lock lock){		DestinationSet destinations; // the returned addressed messages		Broker[] path;               // the path to lock		Broker bX;                   // the first node of the old edge on the path		Broker bY;                   // the second node of the old edge on the path		int n;                       // index of the last path's node				// TODO remove		//System.out.println("Locking issued");				// get all information from the unlock message		path = lock.getPath();		bX = lock.getBX();		bY = lock.getBY();		n = path.length-1;				// initialize an empty destination set		destinations = new DestinationSet(lock);				// if broker is not already locked		if(!locked){						// determine side, left and right neighbor			side = LEFT;			for(int i=0; i<path.length; i++){				if(path[i]==bY){					side = RIGHT;				}				if(path[i]==this){					bLeft = i==0 ? path[n] : path[i-1];					bRight = i==n ? path[0] : path[i+1];					break;				}			}						// add the initiator of the lock message as neighbor of the new edge			if(bRight==path[0]){				neighbors.add(bRight);				lock.colorGray();			}						// Does the path is stale?			if(neighbors.contains(bRight)){				// determine bNew				if(this == path[n]){					bNew = path[0];				}else if(this == path[0]) {					bNew = path[n];				}else{					bNew = null;				}								// determine bOld				if(this == bX){					bOld = bY;				}else if(this == bY){					bOld = bX;				}else{					bOld = null;				}								// determine bRelay				if(this == path[0]){					bRelay = bX;				}else if(this == path[n]){					bRelay = bY;				}else if(this == bX){					bRelay = path[0];				}else if(this == bY){					bRelay = path[n];				}else{					bRelay = null;				}				// bGray is empty.				bGray.clear();								// fill bQueued with brokers for wich we have to wait.				bQueued.clear();				for(int i=0, currentSide=LEFT; i<path.length; i++){					if(path[i] == bY){						currentSide = RIGHT;					}					if(order == CAUSAL){						bQueued.add(path[i]);					}else if(order == FIFO && side != currentSide){						bQueued.add(path[i]);					}				}								// set flags and color				relaying = false;				locked = true;				color = BLACK;								// forward the lock message to right neighbor on the path				destinations.add(bRight);			}						// the path is stale!			else{				// TODO remove				//System.out.println("path is stale!");								// reset everything				bLeft = null;				bRight = null;				// and return the lock message to the sender				if(bSender != this) {					destinations.add(bSender);				}			}					// the broker is already locked.		} else {			if (bSender == bLeft) {				// reservation succeeded				// TODO remove//				System.out.println("Locking succeded at "+this+" bLeft ist "+bLeft);//				for(int i=0; i<path.length; i++){//					System.out.print(path[i]+" ");//				}//				System.out.println();								neighbors.add(bLeft);				// TODO: remove//				System.out.println("Begin send from "+this+" to "+bY+" and to "+bX);				sendMessage(bY, new Begin());				sendMessage(bX, new Begin());			}			else if(bSender == bRight){				// reservation failed				if(bNew == null){										// TODO remove					//System.out.println("Reservation failed at "+this+" bRight="+bRight);					destinations.add(bLeft);				}				bQueued.clear();				bLeft = bRight = bNew = bOld = bRelay = null;				locked = false;			}			else {				// concurrent locking attempt				// TODO remove				//System.out.println("Concurrent locking at "+this);				if(bSender != this){					destinations.add(bSender);				}			}		}				// TODO remove//		if(net.getNumber(this)==73 || net.getNumber(this)==8) {//			System.out.println("I am: "+this+" Bleft="+bLeft+" bRight="+bRight+" bSender="+bSender);//			System.out.println("received path is ");//			for(int i=0; i<path.length; i++){//				System.out.print(path[i]+" ");//			}//			System.out.println();//		}				return destinations;	}						private DestinationSet handleBegin(Destination bSender, Begin begin){		Collection filters;        // filters associated to the old edge		Subscription subscription; // a subscription for each filter		Separator separator;		// TODO remove//		System.out.println("Begin received at "+this+" from "+bSender);				// start relaying		relaying = true; 				// get associated filters and send a subscription for each of them		filters = routingTable.getFilters(bOld);		for(Iterator it = filters.iterator(); it.hasNext(); ) {			subscription = new Subscription((Filter)it.next(),Message.RECONFIGURATION);			subscription.colorGray();			subscription.setRelayed();			sendMessage(bRelay, subscription);		}				// mark their end by a separator		separator = new Separator();		separator.setRelayed();		sendMessage(bRelay, separator);				// reconfiguration already finished on the other side		if(bGray.contains(bOld)){			sendMessage(bRelay, new End());			bRelay = null;			relaying = false;		}				return new DestinationSet();	}							private DestinationSet handleSeparator(Destination bSender, Separator separator){		DestinationSet destinations;		DestinationSet.Entry entry;		Collection filters;		Message message;				// TODO remove//		System.out.println("Separator received at "+this);				// message forwarding		destinations = new DestinationSet(separator);		if(bSender == bRight && bLeft != null){			destinations.add(bLeft);		}else if(bSender == bLeft && bRight != null){			destinations.add(bRight);		}				// changing color		if((bSender == bLeft && side == LEFT) || (bSender == bRight && side == RIGHT)){			color = GRAY;			separator.add(this);			if(bOld != null){				destinations.add(bSender);			}		}				// store gray brokers		bGray.addAll(separator.getBrokers());		bQueued.removeAll(separator.getBrokers());				// brokers at the new edge		if(bNew != null && bQueued.isEmpty()){			while(!qNot.isEmpty()){				message = qNot.dequeue();				sim.collectStatisticsDequeue(this,message);				handleMessage(qNot.getSender(),message);			}		}						// brokers at the old edge		if(bOld != null){			if(bSender == bOld){				if (relaying) {					sendMessage(bRelay,new End());					// TODO: remove	//					System.out.println("Setting bRelay=null bei: "+this);					bRelay = null;					relaying = false;				}			}else{				// TODO: remove//				System.out.println("Launching Unsubscriptions bei: "+this+" mit bSender="+bSender);//				System.out.print("bGray=");//				for(Iterator it = bGray.iterator(); it.hasNext(); ){//					System.out.print(it.next()+" ");//				}//				System.out.println();								filters = routingTable.getFilters(bOld);				//				System.out.println("Broker "+this+" launches "+ routingTable.getFilters(bOld).size()+ " Unsubscription for "+bOld);				for(Iterator it = filters.iterator(); it.hasNext(); ) {					handleMessage(bOld,new Unsubscription((Filter)it.next(),Message.RECONFIGURATION));				}								// TODO inspecting routing table//				System.out.println("Broker "+this+" still contains "+ routingTable.getFilters(bOld).size()+ " filters for "+bOld);			}						// start unlock process			if(bGray.contains(this) && bGray.contains(bOld)){                // TODO: remove//				System.out.print("bGray=");//				for(Iterator it = bGray.iterator(); it.hasNext(); ){//					System.out.print(it.next()+" ");//				}//				System.out.println();								for(Iterator it = destinations.iterator(); it.hasNext(); ){					entry = (DestinationSet.Entry)it.next();					sendMessage(entry.getDestintion(), entry.getMessage());				}							handleMessage(bOld, new Unlock());				neighbors.remove(bOld);				// TODO change back				//				System.out.println("neighbor "+bOld+" removed at "+this+": "+neighbors.remove(bOld));						bOld = null;				destinations.clear();			}		}				return destinations;	}					private DestinationSet handleEnd(Destination bSender, End end){				Message message; // a queued (un)subscription		// TODO remove//		System.out.println("End received at "+this+" from "+bSender);				// stop relaying		bRelay = null;		// process queued (un)subscriptions		while(!qSub.isEmpty()){			message = qSub.dequeue();			handleMessage(qSub.getSender(),message);		}		// unlock message (initiated by bRelay) already received		if( (bLeft == null && bRight == null) ||			(bLeft == bNew && bRight == null) ||			(bRight == bNew && bLeft == null)   ){						handleMessage(this, new Unlock());		}		return new DestinationSet();	}					private DestinationSet handleUnlock(Destination bSender, Unlock unlock){				DestinationSet destinations;				// message forwarding		destinations = new DestinationSet(unlock);				if( (side == LEFT  && bSender == bRight && bLeft  != null) ||			(side == RIGHT && bSender == bLeft  && bRight != null) ) {						destinations.add(bLeft);			destinations.add(bRight);		} 		else if ( (side == LEFT  && bSender == bRight && bNew != null) ||				  (side == RIGHT && bSender == bLeft  && bNew != null) ) {						destinations.add(bSender);			destinations.add(bNew);		}		else if (bSender == this) {			destinations.add(bNew);		}				//		if(bSender != bNew && bLeft != null && bRight != null) {//			destinations.add(bLeft);//			destinations.add(bRight);//		}//		else if(bNew != null && (bLeft == null || bRight == null) ){//			destinations.add(bNew);//		}						if(bSender == bOld){			destinations.remove(bOld);		}		if(bRelay != null){			// TODO remove//			System.out.println("sender: "+bSender+" bRelay: "+bRelay);//			System.out.println("bOld: "+bOld+" bThis: "+this);			destinations.remove(bNew);		}				// finalize		if(bSender == bLeft){			bLeft = null;		}		if(bSender == bRight){			bRight = null;		}		if(bNew != null && destinations.contains(bNew)){			bNew = null;		}		if(bNew == null){			color = BLACK;		}		if(bLeft == null && bRight == null && bNew == null){			bGray.clear();			locked = false;			// TODO remove			//System.out.println("Reconfiguration finished at "+this+"!!!!!!!!!!!!!!!!!!!!!!!!!!");		}		return destinations;	}			public void subscribe(Destination from, Subscription subscription) {		receive(from, subscription);	}		public void unsubscribe(Destination from, Unsubscription unsubscription) {		receive(from, unsubscription);	}		public void publish(Destination from, Notification notification) {		receive(from, notification);	}		public double getInterest(Broker b) {		return environment.getInterest(b);	}		public int queueSize(){		return qNot.size();		//+queueSub.size()+queueNot.size();	}		public String toString() {		return ""+net.getNumber(this);	}		public boolean isInCache(Notification n){		return cache.contains(n);	}	public void clearCache(){		cache.clear();	}	public Collection getFilters(Broker b){		return routingTable.getFilters(b);	}		public Collection getFilters(){		return routingTable.getFilters();	}		public void removeNeighbor(Broker b){		neighbors.remove(b);	}		public void addNeighbor(Broker b){		neighbors.add(b);	}}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -