📄 reconfiguration.java
字号:
package sim;import broker.*;import java.io.*;import java.util.*;import com.Lock;import com.Message;import com.Subscription;import com.Unsubscription;public class Reconfiguration extends Simulation { static final int IMPROVED = 0; static final int STRAWMAN = 1; int pathLength = 3; long simulationSeed = 2354; Random rand = null; int reconfigurationMethod = IMPROVED; double messagesSent = 0.0; double messagesProcessed = 0.0; double messages; public Reconfiguration(Properties properties){ super(properties); //System.out.println("Reconfiguration started."); } public void initializeProperties(Properties properties){ super.initializeProperties(properties); // parse the requested reconfiguration method if(properties.getProperty("reconfigurationMethod") != null){ if(properties.getProperty("reconfigurationMethod").equalsIgnoreCase("STRAWMAN")){ reconfigurationMethod = STRAWMAN; } } // set the length of an reconfiguration path pathLength = properties.getProperty("pathLength") != null ? Integer.parseInt(properties.getProperty("pathLength")) : pathLength; // set the random seed for this simulation simulationSeed = properties.getProperty("simulationSeed") != null ? Long.parseLong(properties.getProperty("simulationSeed")) : simulationSeed; rand = new Random(simulationSeed); } public void setup() { // create the overlay and broker network try{ net = Network.loadOverlay(this); }catch(Exception e){ System.out.println(e); e.printStackTrace(); System.exit(-1); } //System.out.println("Things are scaled."); net.scaleCommunicationCosts(); net.assignProcessingCosts(); net.scaleProcessingCosts(); net.assignLoad(); net.scaleLoad(); net.createRandomTree(); // create the applications application = Application.createApplication(this); application.assignTo(net); // schedule Measurement Events measureInterval = 100000; scheduleEventAt(1000, new Event() { public void handle() { if(measureEvent == null) { costs = 0; processingCosts = 0; communicationCosts = 0; messages = 0; messagesSent = 0; messagesProcessed = 0; measureEvent = new MeasureEvent(); scheduleEventIn(measureInterval, measureEvent); } } }); for(int i=1000; i<=100000; i+=1000){ scheduleEventAt(i,new Event(){ public void handle(){ reconfigure(); } }); } scheduleEventAt(101001, new StopEvent()); } private void reconfigure(){ LinkedList brokers; LinkedList path; Broker first; Broker last; Broker old1; Broker old2; int index; Lock lock; Collection filters; Filter filter;// Broker neighbor;// LinkedList neighbors; brokers = new LinkedList(); for(Iterator it = net.brokerIterator(); it.hasNext(); ){ brokers.add(it.next()); } path = null; first = null; while(path==null){ if(brokers.isEmpty()){ System.err.println("No reconfiguration path of length "+pathLength+" available."); } first = (Broker)brokers.get(rand.nextInt(brokers.size())); brokers.remove(first); path=getPath(first); } last = (Broker)path.getLast(); index = rand.nextInt(path.size()-1); old1 = (Broker)path.get(index); old2 = (Broker)path.get(index+1); // TODO remove// System.out.print("Reconfiguration: ");// for(Iterator it = path.iterator(); it.hasNext(); ){// System.out.print(it.next()+", ");// }// System.out.println(" old: ("+old1+", "+old2+")"); if(reconfigurationMethod==IMPROVED){ lock = new Lock((Broker[])path.toArray(new Broker[0]),old1,old2); first.receive(first,lock); } else if (reconfigurationMethod==STRAWMAN){ first.addNeighbor(last); filters = first.getFilters(); for(Iterator it = filters.iterator(); it.hasNext(); ){ filter = (Filter)it.next(); net.send(first, last, new Subscription(filter,Message.RECONFIGURATION)); } // neighbors = new LinkedList();// neighbors.addAll(first.getNeighbors());// for(Iterator it = neighbors.iterator(); it.hasNext(); ){// neighbor = (Broker)it.next();// filters = first.getFilters(neighbor);// for(Iterator jt = filters.iterator(); jt.hasNext(); ){// filter = (Filter)jt.next();// last.receive(first, new Subscription(filter,Message.RECONFIGURATION));// }// } last.addNeighbor(first); filters = last.getFilters(); for(Iterator it = filters.iterator(); it.hasNext(); ){ filter = (Filter)it.next(); net.send(last, first, new Subscription(filter,Message.RECONFIGURATION)); } // neighbors = new LinkedList();// neighbors.addAll(last.getNeighbors());// for(Iterator it = neighbors.iterator(); it.hasNext(); ){// neighbor = (Broker)it.next();// filters = last.getFilters(neighbor);// for(Iterator jt = filters.iterator(); jt.hasNext(); ){// filter = (Filter)jt.next();// first.receive(first, new Subscription(filter,Message.RECONFIGURATION));// }// } filters = old1.getFilters(old2); for(Iterator it = filters.iterator(); it.hasNext(); ){ filter = (Filter)it.next(); old1.receive(old2, new Unsubscription(filter,Message.RECONFIGURATION)); } old1.removeNeighbor(old2); filters = old2.getFilters(old1); for(Iterator it = filters.iterator(); it.hasNext(); ){ filter = (Filter)it.next(); old2.receive(old1, new Unsubscription(filter,Message.RECONFIGURATION)); } old2.removeNeighbor(old1); } } private LinkedList getPath(Broker from){ return getPath(null, from, new LinkedList()); } private LinkedList getPath(Broker previous, Broker current, LinkedList path){ LinkedList neighbors; LinkedList returnPath; Broker next; path.addLast(current); if(path.size()>pathLength){ return path; } neighbors = new LinkedList(); neighbors.addAll(current.getNeighbors()); if(previous != null){ neighbors.remove(previous); } while(!neighbors.isEmpty()){ next = (Broker)neighbors.get(rand.nextInt(neighbors.size())); neighbors.remove(next); returnPath = getPath(current, next, path); if(returnPath != null){ return returnPath; } } path.removeLast(); return null; } public void collectStatisticsSending(Destination from, Destination to, Message message) { double c; if(from.isClient() || to.isClient()){ return; } // just count reconfiguration messages if(message.getType()!=Message.RECONFIGURATION){ return; } //System.out.println("Sending: "+from+"->"+to+": "+message); c = net.getCommunicationCosts((Broker)from,(Broker)to); costs += c; communicationCosts += c; messages += 1.0; messagesSent += 1.0; } public void collectStatisticsProcessing(Broker broker, Message message) { double p; // just count reconfiguration messages if(message.getType()!=Message.RECONFIGURATION){ return; } //System.out.println("Processing at "+broker+": "+message); p = net.getProcessingCosts(broker); costs += p; processingCosts += p; messages += 1.0; messagesProcessed += 1.0; } public void measure(){ out.println("# [communicationCosts]\t[processingCosts]\t[cost]\t[sent]\t[processed]\t[sum]"); out.println(communicationCosts+"\t"+processingCosts+"\t"+costs+"\t"+messagesSent+"\t"+messagesProcessed+"\t"+messages); } // Static Part static String networkDir = "top/"; static String[] networks = {"top_100_a", "top_100_b", "top_100_c", "top_100_d", "top_100_e"}; static long[] networkSeeds = {-6255810370950998874L, -8051342473365194286L, -1390916269166494642L, 8080423594414472518L, 5547521982829248484L}; static long[] applicationSeeds = {-2067381050185900006L, 4362103324546593174L, -1943519645000524770L, 1924503829199384093L, -7208959892574227353L}; static long[] brokerSeeds = { 6378216616296692392L, -4233968643976491332L, 4589043698922687000L, 1629433751663455621L, 7965684433806821695L}; static long[] simulationSeeds = { 2415418943316428194L, 6261982696546457172L, 5624361009926391953L, -5665391500306727729L, 3949610851384898079L}; static String method = "IMPROVED"; static String methodShort = "imp"; /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub try{ if(args.length>=2){ if(args[1].equalsIgnoreCase("STR")){ method="STRAWMAN"; methodShort="str"; } } if(args[0].equalsIgnoreCase("MAKE_EXPERIMENT")){ makeExperiment(); }else if(args[0].equalsIgnoreCase("COLLECT_RESULTS")){ collectResults(); } }catch(Exception e){ System.err.println(e); e.printStackTrace(); System.exit(-1); } } private static void makeExperiment() throws Exception{ File f; String expDir; String expFile; PrintWriter out; expDir = "exp/reconfiguration/"+methodShort+"/"; f = new File(expDir); f.mkdirs(); for(int net=0; net<networks.length; net++){ for(int i=2; i<16; i++){ expFile = expDir+networks[net]+"_"+methodShort+"_"+i; out = new PrintWriter(new FileWriter(expFile+".exp"),true); out.println("# experiment: reconfiguration costs "+i); out.println(); out.println("# general:"); out.println("simulation=sim.Reconfiguration"); out.println("outFile="+expFile+".rst"); out.println("netFile="+networkDir+networks[net]); out.println("measureInterval=100000"); out.println("reconfigurationMethod="+method); out.println("pathLength="+i); out.println(); out.println("# random seeds"); out.println("networkSeed="+networkSeeds[net]); out.println("applicationSeed="+applicationSeeds[net]); out.println("brokerSeed="+brokerSeeds[net]); out.println("simulationSeed="+simulationSeeds[net]); out.println(); out.println("# network"); out.println("minCommunicationCosts=0"); out.println("maxCommunicationCosts=10"); out.println("minProcessingCosts=0"); out.println("maxProcessingCosts=10"); out.println("minLoad=1"); out.println("maxLoad=10"); out.println("communicationDelayConstant=0"); out.println("communicationDelayFactor=1"); out.println("processingDelayConstant=0"); out.println("processingDelayFactor=0.001"); out.println(); out.println("# application"); out.println("numberOfJobs=50"); out.println("numberOfSubscribers=9"); out.println("publicationInterval=EXPONENTIAL"); out.println("publicationConstant=0.2"); out.println("reassignmentInterval=CONSTANT"); out.println("reassignmentConstant=1000000"); out.println("locality=0.75"); out.println("localityPower=2"); out.println(); out.println("# broker"); out.println("heuristic=COSTS_AND_INTERESTS"); out.println("cacheSize=8192"); out.println("filterSize=100000"); out.println("broadcastInterval=250"); out.println("updateInterval=400"); out.println("order=UNORDERED"); out.println("environmentSize=4"); out.println("numberOfHashs=5"); out.flush(); out.close(); } } } private static void collectResults() throws Exception{ String expDir; String expFile; PrintWriter out; LineNumberReader in; String line; String[] tokens; double s; double sum; double average; expDir = "exp/reconfiguration/"+methodShort+"/"; out = new PrintWriter(new FileWriter(expDir+"RESULTS"),true); out.println("# [pathlength] [sum]"); for(int i=2; i<16; i++){ sum = 0; for(int net = 0; net < networks.length; net++){ expFile = expDir+networks[net]+"_"+methodShort+"_"+i+".rst"; in = new LineNumberReader(new FileReader(expFile)); s=0; while((line=in.readLine())!=null){ if(line.startsWith("#") || line.equals("")){ continue; } tokens = line.split("\t"); s = Double.parseDouble(tokens[5]); } sum += s; in.close(); } average = sum/((double)networks.length); out.println(i+"\t"+average); } out.flush(); out.close(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -