📄 job.java
字号:
package sim;import broker.*;import graph.*;import java.util.*;import com.Message;import com.Notification;import com.Subscription;import com.Unsubscription;/** * This class represents a job (load fraction) in a publish/subscribe system. * A job consists of one publisher producing notifications and several * subscribers which are interested in them. */public class Job implements Destination { /** * The current simulation for scheduling events. */ private Simulation sim = null; /** * The job's publisher. */ private Broker publisher = null; /** * The job's subscriber. */ private Broker[] subscribers = null; /** * The filter belonging to this job. */ private Filter filter = new Filter(this); /** * The distribution of publication intervals. */ private Distribution publicationDistribution = null; /** * The next (scheduled) publication event. */ private Event publicationEvent = null; // TODO weiss noch nicht// private LinkedList nots = new LinkedList(); private int num = 0; /** * Creates a new unassigned job. * @param sim the current simulation. * @param publicationDistribution the distribution of publication intervals. */ public Job(Simulation sim, Distribution publicationDistribution) { this.sim = sim; this.publicationDistribution = publicationDistribution; } /** * Returns false. */ public boolean isBroker() { return false; } /** * Returns true. */ public boolean isClient() { return true; } /** * Receives an notification the job is subscribed to. */ public void receive(Destination sender, Message message) { // TODO remove// Notification n;// n = (Notification)message; // if(!((Broker)sender).isInCache(n)){// System.out.println("Not in Cache ?????????????????????????");// } // if(n.getJob()!=this) {// System.out.println("falscher Job");// }// LinkedList l = (LinkedList)((Broker)sender).getProperty(missing);// if(!nots.remove(new Integer(n.getNumber()))) {// System.out.println("Notification duplicate at "+sender+"?????????????????");// }// if(l.size() > 100) {// System.out.println("Notifications are missing !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");// } System.out.println("")// Integer num = (Integer)((Broker)sender).getProperty(number);// if(num.intValue() != n.getNumber()-1) {// System.out.println("No FIFO order !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");// sim.getNetwork().checkSize(); // }// ((Broker)sender).setProperty(number,new Integer(n.getNumber())); } /** * Sets the publishing broker. * @param publisher the publishing broker. */ public void setPublisher(Broker publisher) { this.publisher = publisher; // TODO remove //System.out.println("Job: "+this+" Publisher: "+publisher); } public Broker getPublisher(){ return publisher; } /** * Sets the subscribing brokers. Existing subscriptions are cancelled * at the old brokers and new subscriptions are issued at the new brokers. * @param subscribers the subscribing brokers. */ public void setSubscribers(Broker[] subscribers) { // unsubscribe if(this.subscribers != null) { for(int i=0; i<this.subscribers.length; i++) { this.subscribers[i].unsubscribe(this, new Unsubscription(filter)); } } // set brokers this.subscribers = subscribers; // subscribe if(this.subscribers != null) { for(int i=0; i<this.subscribers.length; i++) { this.subscribers[i].subscribe(this, new Subscription(filter)); } } // TODO remove// System.out.print("Job: "+this+" Subscriber: ");// for(int i=0; i<this.subscribers.length; i++) {// System.out.print(""+this.subscribers[i]+" ");// }// System.out.println(); // TODO weis noch nicht// for(int i=0; i<this.subscribers.length; i++) {// this.subscribers[i].setProperty(missing, new LinkedList());// this.subscribers[i].setProperty(number, new Integer(0));// } } /** * Starts the publishing of notifications. */ public void start() { double interval; // the interval to wait // publishing already scheduled? if(publicationEvent != null) { return; } // create and schedule a new publication event publicationEvent = new Event() { public void handle() { Job.this.publish(); } }; interval = publicationDistribution.getValue(); // TODO remove interval = 0; sim.scheduleEventIn(interval, publicationEvent); } /** * Publishes a new notification and schedules the next one. */ private void publish() { double interval; Notification n; n = new Notification(this,num); publisher.publish(this,n); // TODO weis noch nicht// for(int i=0; i<subscribers.length; i++) {// nots.add(new Integer(num));// } num++;// if(nots.size()>500) {// System.out.println("missing notifications ??????????????");// }// LinkedList l = (LinkedList)subscribers[i].getProperty(missing);// l.add(n);// } publicationEvent = new Event() { public void handle() { Job.this.publish(); } }; interval = publicationDistribution.getValue(); sim.scheduleEventIn(interval, publicationEvent); } // TODO: weis noch nicht// private Property missing = new Property();// private Property number = new Property(); //private int num = 1; // TODO: weis noch nicht public Broker[] getSubscribers(){ return subscribers; } // public void assign(){// // Broker[] brokers;// double[] probs;// double[] loads;// double[] costs;// //// int n;// int pos;// Iterator it;//// // n = net.numberOfBrokers();// brokers = new Broker[n];// loads = new double[n];// probs = new double[n];// costs = new double[n];// // it = net.getBrokerIterator();// for(int i=0; i<n; i++){ // brokers[i] = (Broker)it.next();// loads[i] = net.getLoadFraction(brokers[i]);// }// // this.publisher = pickBroker(brokers, loads);// // pos = 0;// for(int i = 0; i<n; i++){// if(this.publisher==brokers[i]){// pos = i;// }// loads[i] = net.getLoadFraction(brokers[i]);// costs[i] = costs[i] = net.getLinkCost(publisher, brokers[i]);// }//// localize(costs);// probs = merge(locality,costs,loads);// probs[pos]=0;// for(int i=0; i<this.subscribers.length; i++){// if(this.subscribers[i]!=null){// unsubscribe(subscribers[i]);// }// subscribers[i] = pickBroker(brokers, probs);// subscribe(subscribers[i]);// }// if(publishing==null){// publishing = new Event(){// public void handle(){// Job.this.publish();// } // }; // sim.scheduleEventNow(publishing);// }// }// // private Broker pickBroker(Broker[] brokers, double[] probs){// // double sum; // the sum of all probability values// double rand; // equal distributed random number between 0 and sum.// // // calculate sum// sum = 0;// for(int i=0; i< probs.length; i++){// sum += probs[i];// }// // // get next random number// rand = sum*random.nextDouble();// // // find associated broker// sum = 0;// for(int i=0; i<probs.length; i++){// sum += probs[i];// if(rand <= sum){// // delete its probability to get picked the next time// probs[i] = 0;// return brokers[i];// }// }// // neutral value// return brokers[brokers.length-1];// }// // private void localize(double[] costs){//// double min; // minimal costs;// double max; // maximal costs;// // // determine minimal and maximal costs// min = Double.MAX_VALUE;// max = 0;// for(int i=0; i < costs.length; i++){// if(costs[i] < min){// min = costs[i];// }// if(costs[i] > max){// max = costs[i];// }// }// // // mirror values at (min+max)/2// for(int i=0; i < costs.length; i++){// costs[i] = min + max - costs[i];// }// }// // // private double[] merge(double factor, double[] probs1, double[] probs2){//// int n; // arraysize// double sum1; // sum of all probabilities of array 1// double sum2; // sum of all probabilities of array 2// double[] rst; // the merged probabilities// // // get the size of the smallest array// n = Math.min(probs1.length,probs2.length);// // // determine sums// sum1 = sum2 = 0;// for(int i=0; i<n; i++){// sum1 += probs1[i];// sum2 += probs2[i];// }// // // merge both probabilities and respect their fraction// rst = new double[n];// for(int i=0; i<n; i++){// rst[i]= probs1[i]*factor/sum1 + probs2[i]*(1.0d-factor)/sum2; // }// // return rst;// }// // private void publish(){// net.send(this,publisher,new Notification(this));// if(publishEvent!=null){// publishEvent = new Event(){// public void handle(){// Job.this.publish();// }// };// sim.scheduleEvent(publishInterval.next(), publishEvent);// }// }// // private void subscribe(Broker broker){// net.send(this,broker,new Subscription(this.filter));// }// // private void unsubscribe(Broker broker){// net.send(this,broker,new Unsubscription(this.filter));// }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -