⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 job.java

📁 发布/订阅系统路由重配算法,可应用于ad hoc环境
💻 JAVA
字号:
package sim;import broker.*;import graph.*;import java.util.*;import com.Message;import com.Notification;import com.Subscription;import com.Unsubscription;/** * This class represents a job (load fraction) in a publish/subscribe system. * A job consists of one publisher producing notifications and several * subscribers which are interested in them.  */public class Job implements Destination {	/**	 * The current simulation for scheduling events.	 */	private Simulation sim = null;	/**	 * The job's publisher.	 */	private Broker publisher = null;		/**	 * The job's subscriber.	 */	private Broker[] subscribers = null;		/**	 * The filter belonging to this job.	 */	private Filter filter = new Filter(this);		/**	 * The distribution of publication intervals.	 */	private Distribution publicationDistribution = null;		/**	 * The next (scheduled) publication event.	 */	private Event publicationEvent = null; 		// TODO weiss noch nicht//	private LinkedList nots = new LinkedList(); 	private int num = 0;		/**	 * Creates a new unassigned job.	 * @param sim the current simulation.	 * @param publicationDistribution the distribution of publication intervals.	 */	public Job(Simulation sim, Distribution publicationDistribution) {		this.sim = sim;		this.publicationDistribution = publicationDistribution;	}		/**	 * Returns false.	 */	public boolean isBroker() {		return false;	}	/**	 * Returns true.	 */	public boolean isClient() {		return true;	}	/**	 * Receives an notification the job is subscribed to.	 */	public void receive(Destination sender, Message message) {		// TODO remove//		Notification n;//		n = (Notification)message;		//		if(!((Broker)sender).isInCache(n)){//			System.out.println("Not in Cache ?????????????????????????");//		}		//		if(n.getJob()!=this) {//			System.out.println("falscher Job");//		}//		LinkedList l = (LinkedList)((Broker)sender).getProperty(missing);//		if(!nots.remove(new Integer(n.getNumber()))) {//			System.out.println("Notification duplicate at "+sender+"?????????????????");//		}//		if(l.size() > 100) {//			System.out.println("Notifications are missing !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");//		}    			System.out.println("")//		Integer num = (Integer)((Broker)sender).getProperty(number);//		if(num.intValue() != n.getNumber()-1) {//			System.out.println("No FIFO order !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");//			sim.getNetwork().checkSize();	//		}//		((Broker)sender).setProperty(number,new Integer(n.getNumber()));	}	/**	 * Sets the publishing broker.	 * @param publisher the publishing broker.	 */	public void setPublisher(Broker publisher) {		this.publisher = publisher;				// TODO remove		//System.out.println("Job: "+this+" Publisher: "+publisher);	}		public Broker getPublisher(){		return publisher;	}		/**	 * Sets the subscribing brokers. Existing subscriptions are cancelled	 * at the old brokers and new subscriptions are issued at the new brokers. 	 * @param subscribers the subscribing brokers.	 */	public void setSubscribers(Broker[] subscribers) {		// unsubscribe		if(this.subscribers != null) {			for(int i=0; i<this.subscribers.length; i++) {				this.subscribers[i].unsubscribe(this, new Unsubscription(filter));			}		}				// set brokers		this.subscribers = subscribers;				// subscribe		if(this.subscribers != null) {			for(int i=0; i<this.subscribers.length; i++) {				this.subscribers[i].subscribe(this, new Subscription(filter));			}		}				// TODO remove//		System.out.print("Job: "+this+" Subscriber: ");//		for(int i=0; i<this.subscribers.length; i++) {//			System.out.print(""+this.subscribers[i]+" ");//		}//		System.out.println();				// TODO weis noch nicht//		for(int i=0; i<this.subscribers.length; i++) {//			this.subscribers[i].setProperty(missing, new LinkedList());//			this.subscribers[i].setProperty(number, new Integer(0));//		}	}		/**	 * Starts the publishing of notifications.	 */	public void start() {				double interval; // the interval to wait		// publishing already scheduled?		if(publicationEvent != null) {			return;		}				// create and schedule a new publication event		publicationEvent = new Event() {			public void handle() {				Job.this.publish();			}		};		interval = publicationDistribution.getValue();		// TODO remove		interval = 0;		sim.scheduleEventIn(interval, publicationEvent);			}		/**	 * Publishes a new notification and schedules the next one.	 */	private void publish() {		double interval;		Notification n;		n = new Notification(this,num);		publisher.publish(this,n);				// TODO weis noch nicht//		for(int i=0; i<subscribers.length; i++) {//			nots.add(new Integer(num));//		}		num++;//		if(nots.size()>500) {//			System.out.println("missing notifications ??????????????");//		}//			LinkedList l = (LinkedList)subscribers[i].getProperty(missing);//			l.add(n);//		}				publicationEvent = new Event() {			public void handle() {				Job.this.publish();			}		};		interval = publicationDistribution.getValue();		sim.scheduleEventIn(interval, publicationEvent);	}		// TODO: weis noch nicht//	private Property missing = new Property();//	private Property number = new Property();	//private int num = 1;	// TODO: weis noch nicht	public Broker[] getSubscribers(){		return subscribers;	}		//	public void assign(){//		//		Broker[] brokers;//		double[] probs;//		double[] loads;//		double[] costs;//		////		int n;//		int pos;//		Iterator it;////		//		n = net.numberOfBrokers();//		brokers = new Broker[n];//		loads = new double[n];//		probs = new double[n];//		costs = new double[n];//		//		it = net.getBrokerIterator();//		for(int i=0; i<n; i++){ //			brokers[i] = (Broker)it.next();//			loads[i] = net.getLoadFraction(brokers[i]);//		}//		//		this.publisher = pickBroker(brokers, loads);//		//		pos = 0;//		for(int i = 0; i<n; i++){//			if(this.publisher==brokers[i]){//				pos = i;//			}//			loads[i] = net.getLoadFraction(brokers[i]);//			costs[i] = costs[i] = net.getLinkCost(publisher, brokers[i]);//		}////		localize(costs);//		probs = merge(locality,costs,loads);//		probs[pos]=0;//		for(int i=0; i<this.subscribers.length; i++){//			if(this.subscribers[i]!=null){//				unsubscribe(subscribers[i]);//			}//			subscribers[i] = pickBroker(brokers, probs);//			subscribe(subscribers[i]);//		}//		if(publishing==null){//			publishing = new Event(){//				public void handle(){//					Job.this.publish();//				}	//			};	//			sim.scheduleEventNow(publishing);//		}//	}//	//	private Broker pickBroker(Broker[] brokers, double[] probs){//		//		double sum;	 // the sum of all probability values//		double rand; // equal distributed random number between 0 and sum.//		//		// calculate sum//		sum = 0;//		for(int i=0; i< probs.length; i++){//			sum += probs[i];//		}//		//		// get next random number//		rand = sum*random.nextDouble();//		//		// find associated broker//		sum = 0;//		for(int i=0; i<probs.length; i++){//			sum += probs[i];//			if(rand <= sum){//				// delete its probability to get picked the next time//				probs[i] = 0;//				return brokers[i];//			}//		}//		// neutral value//		return brokers[brokers.length-1];//	}//	//	private void localize(double[] costs){////		double min; // minimal costs;//		double max; // maximal costs;//		//		// determine minimal and maximal costs//		min = Double.MAX_VALUE;//		max = 0;//		for(int i=0; i < costs.length; i++){//			if(costs[i] < min){//				min = costs[i];//			}//			if(costs[i] > max){//				max = costs[i];//			}//		}//		//		// mirror values at (min+max)/2//		for(int i=0; i < costs.length; i++){//			costs[i] = min + max - costs[i];//		}//	}//	//	//	private double[] merge(double factor, double[] probs1, double[] probs2){////		int n;        // arraysize//		double sum1;  // sum of all probabilities of array 1//		double sum2;  // sum of all probabilities of array 2//		double[] rst; // the merged probabilities//		//		// get the size of the smallest array//		n = Math.min(probs1.length,probs2.length);//		//		// determine sums//		sum1 = sum2 = 0;//		for(int i=0; i<n; i++){//			sum1 += probs1[i];//			sum2 += probs2[i];//		}//		//		// merge both probabilities and respect their fraction//		rst = new double[n];//		for(int i=0; i<n; i++){//			rst[i]= probs1[i]*factor/sum1 + probs2[i]*(1.0d-factor)/sum2; //		}//		//		return rst;//	}//	//	private void publish(){//		net.send(this,publisher,new Notification(this));//		if(publishEvent!=null){//			publishEvent = new Event(){//				public void handle(){//					Job.this.publish();//				}//			};//			sim.scheduleEvent(publishInterval.next(), publishEvent);//		}//	}//	//	private void subscribe(Broker broker){//		net.send(this,broker,new Subscription(this.filter));//	}//	//	private void unsubscribe(Broker broker){//		net.send(this,broker,new Unsubscription(this.filter));//	}}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -