📄 queueing.java
字号:
package sim;import broker.*;import util.*;import java.io.*;import java.util.*;import com.Lock;import com.Message;public class Queueing extends Simulation{ int pathLength = 3; long simulationSeed = 2354; Random rand = null; SimpleHashtable brokers = new SimpleHashtable(); double delay = 0.0d; int count = 0; public Queueing(Properties properties){ super(properties); //System.out.println("Queueing started"); } public void initializeProperties(Properties properties){ super.initializeProperties(properties); // set the length of an reconfiguration path pathLength = properties.getProperty("pathLength") != null ? Integer.parseInt(properties.getProperty("pathLength")) : pathLength; // set the random seed for this simulation simulationSeed = properties.getProperty("simulationSeed") != null ? Long.parseLong(properties.getProperty("simulationSeed")) : simulationSeed; rand = new Random(simulationSeed); } public void setup() { // create the overlay and broker network try{ net = Network.loadOverlay(this); }catch(Exception e){ System.out.println(e); e.printStackTrace(); System.exit(-1); } net.scaleCommunicationCosts(); net.assignProcessingCosts(); net.scaleProcessingCosts(); net.assignLoad(); net.scaleLoad(); net.createRandomTree(); // create the applications application = Application.createApplication(this); application.assignTo(net); // schedule Measurement Events measureInterval = 100000; scheduleEventAt(1000, new MeasureStartEvent()); for(int i=1000; i<=100000; i+=1000){ scheduleEventAt(i,new Event(){ public void handle(){ reconfigure(); } }); } scheduleEventAt(101001, new StopEvent()); } private void reconfigure(){ LinkedList brokers; LinkedList path; Broker first; Broker old1; Broker old2; int index; Lock lock; brokers = new LinkedList(); for(Iterator it = net.brokerIterator(); it.hasNext(); ){ brokers.add(it.next()); } path = null; first = null; while(path==null){ if(brokers.isEmpty()){ System.err.println("No reconfiguration path of length "+pathLength+" available."); } first = (Broker)brokers.get(rand.nextInt(brokers.size())); brokers.remove(first); path=getPath(first); } index = rand.nextInt(path.size()-1); old1 = (Broker)path.get(index); old2 = (Broker)path.get(index+1); lock = new Lock((Broker[])path.toArray(new Broker[0]),old1,old2); first.receive(first,lock); } private LinkedList getPath(Broker from){ return getPath(null, from, new LinkedList()); } private LinkedList getPath(Broker previous, Broker current, LinkedList path){ LinkedList neighbors; LinkedList returnPath; Broker next; path.addLast(current); if(path.size()>pathLength){ return path; } neighbors = new LinkedList(); neighbors.addAll(current.getNeighbors()); if(previous != null){ neighbors.remove(previous); } while(!neighbors.isEmpty()){ next = (Broker)neighbors.get(rand.nextInt(neighbors.size())); neighbors.remove(next); returnPath = getPath(current, next, path); if(returnPath != null){ return returnPath; } } path.removeLast(); return null; } public void collectStatisticsEnqueue(Broker broker, Message message) { SimpleHashtable messages; messages = (SimpleHashtable)brokers.get(broker); if(messages==null){ messages = new SimpleHashtable(); brokers.put(broker,messages); } messages.put(message,new Double(time)); } public void collectStatisticsDequeue(Broker broker, Message message) { double oldTime; SimpleHashtable messages; messages = (SimpleHashtable)brokers.get(broker); oldTime = ((Double)messages.remove(message)).doubleValue(); delay += time-oldTime; count++; } public void measure(){ double average; average = delay/((double)count); out.println("# [delay] [count] [average]"); out.println(delay+"\t"+count+"\t"+average); } // Static Part static String networkDir = "top/"; static String[] networks = {"top_100_a", "top_100_b", "top_100_c", "top_100_d", "top_100_e"}; static long[] networkSeeds = { 2270230565136524320L, 6257246777718212578L, -4797579689239329253L, 4619567827633319463L, 6243054050344827830L}; static long[] applicationSeeds = {-8211000103172028775L, -5355342339496376175L, 2233538913574359138L, 6492458982992999932L, 8798255783319245673L}; static long[] brokerSeeds = { 4891060577807607276L, -5100385821711518917L, 1171470125667063812L, -6684844427176012196L, -540086213384839093L}; static long[] simulationSeeds = { 2524247607147087032L, 2812265702356832639L, -3311317507624494025L, 7711578664448145493L, 7704669936301513498L}; static String order = "FIFO"; static String orderShort = "fifo"; /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub try{ if(args.length>=2){ if(args[1].equalsIgnoreCase("CAU")){ order="CAUSAL"; orderShort="cau"; } } if(args[0].equalsIgnoreCase("MAKE_EXPERIMENT")){ makeExperiment(); }else if(args[0].equalsIgnoreCase("COLLECT_RESULTS")){ collectResults(); } }catch(Exception e){ System.err.println(e); e.printStackTrace(); System.exit(-1); } } private static void makeExperiment() throws Exception{ File f; String expDir; String expFile; PrintWriter out; expDir = "exp/queueing/"+orderShort+"/"; f = new File(expDir); f.mkdirs(); for(int net=0; net<networks.length; net++){ for(int i=2; i<16; i++){ expFile = expDir+networks[net]+"_"+orderShort+"_"+i; out = new PrintWriter(new FileWriter(expFile+".exp"),true); out.println("# experiment: reconfiguration costs "+i); out.println(); out.println("# general:"); out.println("simulation=sim.Queueing"); out.println("outFile="+expFile+".rst"); out.println("netFile="+networkDir+networks[net]); out.println("measureInterval=100000"); out.println("pathLength="+i); out.println(); out.println("# random seeds"); out.println("networkSeed="+networkSeeds[net]); out.println("applicationSeed="+applicationSeeds[net]); out.println("brokerSeed="+brokerSeeds[net]); out.println("simulationSeed="+simulationSeeds[net]); out.println(); out.println("# network"); out.println("minCommunicationCosts=0"); out.println("maxCommunicationCosts=10"); out.println("minProcessingCosts=0"); out.println("maxProcessingCosts=10"); out.println("minLoad=1"); out.println("maxLoad=10"); out.println("communicationDelayConstant=0"); out.println("communicationDelayFactor=1"); out.println("processingDelayConstant=0"); out.println("processingDelayFactor=0.001"); out.println(); out.println("# application"); out.println("numberOfJobs=50"); out.println("numberOfSubscribers=9"); out.println("publicationInterval=EXPONENTIAL"); out.println("publicationConstant=0.2"); out.println("reassignmentInterval=CONSTANT"); out.println("reassignmentConstant=1000000"); out.println("locality=0.75"); out.println("localityPower=2"); out.println(); out.println("# broker"); out.println("heuristic=COSTS_AND_INTERESTS"); out.println("cacheSize=8192"); out.println("filterSize=100000"); out.println("broadcastInterval=250"); out.println("updateInterval=400"); out.println("order="+order); out.println("environmentSize=4"); out.println("numberOfHashs=5"); out.flush(); out.close(); } } } private static void collectResults() throws Exception{ String expDir; String expFile; PrintWriter out; LineNumberReader in; String line; String[] tokens; double s; double sum; double average; expDir = "exp/queueing/"+orderShort+"/"; out = new PrintWriter(new FileWriter(expDir+"RESULTS"),true); out.println("# [pathlength] [sum]"); for(int i=2; i<16; i++){ sum = 0; for(int net = 0; net < networks.length; net++){ expFile = expDir+networks[net]+"_"+orderShort+"_"+i+".rst"; in = new LineNumberReader(new FileReader(expFile)); s=0; while((line=in.readLine())!=null){ if(line.startsWith("#") || line.equals("")){ continue; } tokens = line.split("\t"); s = Double.parseDouble(tokens[2]); } sum += s; in.close(); } average = sum/((double)networks.length); out.println(i+"\t"+average); } out.flush(); out.close(); } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -