📄 broker.java
字号:
if(environment.evaluate(path)){ //System.out.println("positive path evaluation."); sendMessage(this,new Request(reversePath(path),new double[path.length])); } }catch(NullPointerException e){ return new DestinationSet(); } } // message forwarding destinations = new DestinationSet(); if(path.length <= environmentSize){ destinations.setMessage(new Broadcast(path, bloomFilter)); destinations.addAll(neighbors); for(int i=0; i<path.length; i++) { destinations.remove(path[i]); } // destinations.remove(from); } return destinations; } /** * Extends the path of a broadcast message by adding the current broker. * @param path the original path. * @return the extended path. */ private Broker[] extendPath(Broker[] path){ Broker[] extendedPath; // the extended path extendedPath = new Broker[path.length+1]; System.arraycopy(path,0,extendedPath,0,path.length); extendedPath[path.length] = this; return extendedPath; } /** * Reverses the path of brokers. * @param path the original path. * @return the reversed path. */ private Broker[] reversePath(Broker[] path){ Broker[] reversedPath; reversedPath = new Broker[path.length]; for(int i=0, j=path.length-1; j>=0; i++, j--){ reversedPath[j]=path[i]; } return reversedPath; } // handle a request private DestinationSet handleRequest(Destination from, Request request) { DestinationSet destinations; Broker[] path; double[] costs; int min; // TODO remove //System.out.println("Request received at "+this); // Füge eigene Kosten hinzu, wenn Du nicht der issuer bist path = request.getPath(); costs = request.getCosts(); try{ environment.addCosts(path,costs); }catch(NullPointerException e){ return new DestinationSet(); } destinations = new DestinationSet(request); if (path[path.length-1] == this) { min = costs.length-1; for(int i=costs.length-1; i>=0; i--){ if(costs[min] > costs[i]){ min = i; } } if(min!=path.length-1){ // TODO remove: Lock issued// System.out.print("Kosten: ");// for(int x=0; x<costs.length; x++){// System.out.print(costs[x]+" ");// }// System.out.print("\n Dists: ");// for(int x=0; x<path.length-1; x++){// System.out.print(net.getCommunicationCosts(path[x],path[x+1])+" ");// }// System.out.println(" "+net.getCommunicationCosts(path[0],path[path.length-1])); handleMessage(this,new Lock(reversePath(path),path[min+1],path[min])); } }else{ destinations.addAll(neighbors); for(int i=0; i<path.length; i++){ if(path[i]==this){ destinations.restrictTo(path[i+1]); break; } } } return destinations; } // reconfiguration stuff private DestinationSet handleLock(Destination bSender, Lock lock){ DestinationSet destinations; // the returned addressed messages Broker[] path; // the path to lock Broker bX; // the first node of the old edge on the path Broker bY; // the second node of the old edge on the path int n; // index of the last path's node // TODO remove //System.out.println("Locking issued"); // get all information from the unlock message path = lock.getPath(); bX = lock.getBX(); bY = lock.getBY(); n = path.length-1; // initialize an empty destination set destinations = new DestinationSet(lock); // if broker is not already locked if(!locked){ // determine side, left and right neighbor side = LEFT; for(int i=0; i<path.length; i++){ if(path[i]==bY){ side = RIGHT; } if(path[i]==this){ bLeft = i==0 ? path[n] : path[i-1]; bRight = i==n ? path[0] : path[i+1]; break; } } // add the initiator of the lock message as neighbor of the new edge if(bRight==path[0]){ neighbors.add(bRight); lock.colorGray(); } // Does the path is stale? if(neighbors.contains(bRight)){ // determine bNew if(this == path[n]){ bNew = path[0]; }else if(this == path[0]) { bNew = path[n]; }else{ bNew = null; } // determine bOld if(this == bX){ bOld = bY; }else if(this == bY){ bOld = bX; }else{ bOld = null; } // determine bRelay if(this == path[0]){ bRelay = bX; }else if(this == path[n]){ bRelay = bY; }else if(this == bX){ bRelay = path[0]; }else if(this == bY){ bRelay = path[n]; }else{ bRelay = null; } // bGray is empty. bGray.clear(); // fill bQueued with brokers for wich we have to wait. bQueued.clear(); for(int i=0, currentSide=LEFT; i<path.length; i++){ if(path[i] == bY){ currentSide = RIGHT; } if(order == CAUSAL){ bQueued.add(path[i]); }else if(order == FIFO && side != currentSide){ bQueued.add(path[i]); } } // set flags and color relaying = false; locked = true; color = BLACK; // forward the lock message to right neighbor on the path destinations.add(bRight); } // the path is stale! else{ // TODO remove //System.out.println("path is stale!"); // reset everything bLeft = null; bRight = null; // and return the lock message to the sender if(bSender != this) { destinations.add(bSender); } } // the broker is already locked. } else { if (bSender == bLeft) { // reservation succeeded // TODO remove// System.out.println("Locking succeded at "+this+" bLeft ist "+bLeft);// for(int i=0; i<path.length; i++){// System.out.print(path[i]+" ");// }// System.out.println(); neighbors.add(bLeft); // TODO: remove// System.out.println("Begin send from "+this+" to "+bY+" and to "+bX); sendMessage(bY, new Begin()); sendMessage(bX, new Begin()); } else if(bSender == bRight){ // reservation failed if(bNew == null){ // TODO remove //System.out.println("Reservation failed at "+this+" bRight="+bRight); destinations.add(bLeft); } bQueued.clear(); bLeft = bRight = bNew = bOld = bRelay = null; locked = false; } else { // concurrent locking attempt // TODO remove //System.out.println("Concurrent locking at "+this); if(bSender != this){ destinations.add(bSender); } } } // TODO remove// if(net.getNumber(this)==73 || net.getNumber(this)==8) {// System.out.println("I am: "+this+" Bleft="+bLeft+" bRight="+bRight+" bSender="+bSender);// System.out.println("received path is ");// for(int i=0; i<path.length; i++){// System.out.print(path[i]+" ");// }// System.out.println();// } return destinations; } private DestinationSet handleBegin(Destination bSender, Begin begin){ Collection filters; // filters associated to the old edge Subscription subscription; // a subscription for each filter Separator separator; // TODO remove// System.out.println("Begin received at "+this+" from "+bSender); // start relaying relaying = true; // get associated filters and send a subscription for each of them filters = routingTable.getFilters(bOld); for(Iterator it = filters.iterator(); it.hasNext(); ) { subscription = new Subscription((Filter)it.next(),Message.RECONFIGURATION); subscription.colorGray(); subscription.setRelayed(); sendMessage(bRelay, subscription); } // mark their end by a separator separator = new Separator(); separator.setRelayed(); sendMessage(bRelay, separator); // reconfiguration already finished on the other side if(bGray.contains(bOld)){ sendMessage(bRelay, new End()); bRelay = null; relaying = false; } return new DestinationSet(); } private DestinationSet handleSeparator(Destination bSender, Separator separator){ DestinationSet destinations; DestinationSet.Entry entry; Collection filters; Message message; // TODO remove// System.out.println("Separator received at "+this); // message forwarding destinations = new DestinationSet(separator); if(bSender == bRight && bLeft != null){ destinations.add(bLeft); }else if(bSender == bLeft && bRight != null){ destinations.add(bRight); } // changing color if((bSender == bLeft && side == LEFT) || (bSender == bRight && side == RIGHT)){ color = GRAY; separator.add(this); if(bOld != null){ destinations.add(bSender); } } // store gray brokers bGray.addAll(separator.getBrokers()); bQueued.removeAll(separator.getBrokers()); // brokers at the new edge if(bNew != null && bQueued.isEmpty()){ while(!qNot.isEmpty()){ message = qNot.dequeue(); sim.collectStatisticsDequeue(this,message); handleMessage(qNot.getSender(),message); } } // brokers at the old edge if(bOld != null){ if(bSender == bOld){ if (relaying) { sendMessage(bRelay,new End()); // TODO: remove // System.out.println("Setting bRelay=null bei: "+this); bRelay = null; relaying = false; } }else{ // TODO: remove// System.out.println("Launching Unsubscriptions bei: "+this+" mit bSender="+bSender);// System.out.print("bGray=");// for(Iterator it = bGray.iterator(); it.hasNext(); ){// System.out.print(it.next()+" ");// }// System.out.println(); filters = routingTable.getFilters(bOld); // System.out.println("Broker "+this+" launches "+ routingTable.getFilters(bOld).size()+ " Unsubscription for "+bOld); for(Iterator it = filters.iterator(); it.hasNext(); ) { handleMessage(bOld,new Unsubscription((Filter)it.next(),Message.RECONFIGURATION)); } // TODO inspecting routing table// System.out.println("Broker "+this+" still contains "+ routingTable.getFilters(bOld).size()+ " filters for "+bOld); } // start unlock process if(bGray.contains(this) && bGray.contains(bOld)){ // TODO: remove// System.out.print("bGray=");// for(Iterator it = bGray.iterator(); it.hasNext(); ){// System.out.print(it.next()+" ");// }// System.out.println(); for(Iterator it = destinations.iterator(); it.hasNext(); ){ entry = (DestinationSet.Entry)it.next(); sendMessage(entry.getDestintion(), entry.getMessage()); } handleMessage(bOld, new Unlock()); neighbors.remove(bOld); // TODO change back // System.out.println("neighbor "+bOld+" removed at "+this+": "+neighbors.remove(bOld)); bOld = null; destinations.clear(); } } return destinations; } private DestinationSet handleEnd(Destination bSender, End end){ Message message; // a queued (un)subscription // TODO remove// System.out.println("End received at "+this+" from "+bSender); // stop relaying bRelay = null; // process queued (un)subscriptions while(!qSub.isEmpty()){ message = qSub.dequeue(); handleMessage(qSub.getSender(),message); } // unlock message (initiated by bRelay) already received if( (bLeft == null && bRight == null) || (bLeft == bNew && bRight == null) || (bRight == bNew && bLeft == null) ){ handleMessage(this, new Unlock()); } return new DestinationSet(); } private DestinationSet handleUnlock(Destination bSender, Unlock unlock){ DestinationSet destinations; // message forwarding destinations = new DestinationSet(unlock); if( (side == LEFT && bSender == bRight && bLeft != null) || (side == RIGHT && bSender == bLeft && bRight != null) ) { destinations.add(bLeft); destinations.add(bRight); } else if ( (side == LEFT && bSender == bRight && bNew != null) || (side == RIGHT && bSender == bLeft && bNew != null) ) { destinations.add(bSender); destinations.add(bNew); } else if (bSender == this) { destinations.add(bNew); } // if(bSender != bNew && bLeft != null && bRight != null) {// destinations.add(bLeft);// destinations.add(bRight);// }// else if(bNew != null && (bLeft == null || bRight == null) ){// destinations.add(bNew);// } if(bSender == bOld){ destinations.remove(bOld); } if(bRelay != null){ // TODO remove// System.out.println("sender: "+bSender+" bRelay: "+bRelay);// System.out.println("bOld: "+bOld+" bThis: "+this); destinations.remove(bNew); } // finalize if(bSender == bLeft){ bLeft = null; } if(bSender == bRight){ bRight = null; } if(bNew != null && destinations.contains(bNew)){ bNew = null; } if(bNew == null){ color = BLACK; } if(bLeft == null && bRight == null && bNew == null){ bGray.clear(); locked = false; // TODO remove //System.out.println("Reconfiguration finished at "+this+"!!!!!!!!!!!!!!!!!!!!!!!!!!"); } return destinations; } public void subscribe(Destination from, Subscription subscription) { receive(from, subscription); } public void unsubscribe(Destination from, Unsubscription unsubscription) { receive(from, unsubscription); } public void publish(Destination from, Notification notification) { receive(from, notification); } public double getInterest(Broker b) { return environment.getInterest(b); } public int queueSize(){ return qNot.size(); //+queueSub.size()+queueNot.size(); } public String toString() { return ""+net.getNumber(this); } public boolean isInCache(Notification n){ return cache.contains(n); } public void clearCache(){ cache.clear(); } public Collection getFilters(Broker b){ return routingTable.getFilters(b); } public Collection getFilters(){ return routingTable.getFilters(); } public void removeNeighbor(Broker b){ neighbors.remove(b); } public void addNeighbor(Broker b){ neighbors.add(b); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -