📄 swarm_rtable.cc
字号:
#include <swarm/swarm_rtable.h>#include <swarm/swarm_packet.h>#include <swarm/swarm.h>#include <random.h>//TODO: buffer incoming packets to reorder correctly.//configurable delay, buffer size. need per flow sequencing?voidSwarmEnvironment::scheduleCallback(nsaddr_t address, double delay){ SwarmEnvironmentMaintenanceEvent *e=new SwarmEnvironmentMaintenanceEvent(); e->dest=address; if(SWARM::DEBUG) { stringstream o; o << "Scheduling maintenance callback for " << address; log(o.str()); } Scheduler::instance().schedule(this, e, delay);}//TODO: I THINK THAT THE AODV RQUEUE MAY NOT CORRECTLY DROP PACKETSvoid SwarmEnvironment::releasePacketsFor(nsaddr_t address){ Packet *p; double delay=0; while (p = rqueue.deque(address)) { hdr_swarm *swarmPacket = HDR_SWARM(p); swarmPacket->chooseNextHop(*this, p); sendPacket(p, delay); //THIS IS THE DELAY THAT AODV USES: ARP_DELAY=0.01 delay+=0.01; }}//NOTE: the aodv queue indexes by IP->dstbool SwarmEnvironment::hasPacketsQueuedFor(nsaddr_t address) { return rqueue.find(address);}void SwarmEnvironment::queuePacket(Packet *p){ rqueue.enque(p);}void SwarmEnvironment::sendControlPacketTo(nsaddr_t dest){ swarm_rt_entry & routingEntry = getRoutingEntry(dest); Packet *response = Packet::alloc(); agent->initpkt(response); HDR_IP(response)->daddr() = dest; HDR_CMN(response)->ptype() = PT_SWARM; if(SWARM::DEBUG) { stringstream o; o << "Sending control packet to " << dest; log(o.str()); } hdr_swarm *swarmPacket = HDR_SWARM(response); swarmPacket->initializeFromIpPacket(*this, response); swarmPacket->chooseNextHop(*this, response); swarmPacket->swarm_type=SWARMTYPE_RESPONSE; sendPacket(response, 0);}void SwarmEnvironment::handle(Event *event){ SwarmEnvironmentMaintenanceEvent &e = (SwarmEnvironmentMaintenanceEvent &) *event; swarm_rt_entry & routingEntry = getRoutingEntry(e.dest); if(SWARM::DEBUG) { stringstream o; o << "got callback for " <<e.dest<<". my last advertised is " << routingEntry.lastAdvertised <<" numB4 is " << routingEntry.numberReceiveBeforeProactiveResponse; log(o.str()); } if(routingEntry.isProactiveResponseNeeded()) { sendControlPacketTo(e.dest); } delete event;}swarm_rt_entry & SwarmEnvironment::getRoutingEntry(nsaddr_t dest){ if(routingTable.count(dest) == 0) { swarm_rt_entry & routingEntry = routingTable[dest]; routingEntry.env = this; if(dest == networkId) { routingEntry.isThisNode = true; routingEntry.currentEstimate = 0; } } return routingTable[dest];}voidSwarmEnvironment::log(string s){ agent->log(s);}voidSwarmEnvironment::dump(nsaddr_t dest){ swarm_rt_entry & routingEntry = getRoutingEntry(dest); routingEntry.update(); stringstream o; o << "-dest " << dest << " -current " << routingEntry.currentEstimate << " -congestionProb " << getCongestionRelatedSuccessProbability() << " -packetRate " << getPacketReceiveRate() << " -lastAdvert " << routingEntry.lastAdvertised << " -lastAdvertAge " << (CURRENT_TIME - routingEntry.lastAdvertisedTime) << " -lastForwardAge " << (CURRENT_TIME - routingEntry.lastForwardedPacketTowards) ; log(o.str()); map < nsaddr_t, swarm_neighbour >::iterator iter = neighbours.begin(); for (; iter != neighbours.end(); iter++) { std::pair < nsaddr_t, swarm_neighbour > entry = *(iter); stringstream s; swarm_nexthop &next_hop = routingEntry.next_hops[entry.first]; s << "-dest " << dest << " -via " << entry.first << " -prob " << entry.second.getProbabilitySuccessfulUnicast() << " -sent " << entry.second.numberAttemptedSend.getCurrentValue() << " -failed " << entry.second.numberFailedSend.getCurrentValue() << " -received " << entry.second.numberReceived.getCurrentValue() << " -estimatedValue " << next_hop.getCurrentEstimate() << " -qvalue " << routingEntry.getQValue(entry.first) ; log(s.str()); }}voidSwarmEnvironment::receivePacketLocally(Packet * p){ agent->receivePacketLocally(p);}voidSwarmEnvironment::sendPacket(Packet * p, double delay){ agent->sendPacket(p, delay);}voidSwarmEnvironment::drop(Packet * p, const char *reason){ agent->drop(p, reason);}void SwarmEnvironment::incrementPacketsReceived(){ packetsReceived.increment();}double SwarmEnvironment::getPacketReceiveRate(){ return (double)packetsReceived.getCurrentValue() / packetsReceived.getHistoryLength();}double SwarmEnvironment::getCongestionRelatedSuccessProbability(){ double receiveRate = getPacketReceiveRate(); double normalizedRate = receiveRate / SWARM::MAX_PACKET_RECEIVE_RATE; double exponentialPart; if(normalizedRate>=1) { exponentialPart =0; } else { exponentialPart = pow( (1-normalizedRate), SWARM::CONGESTION_PROBABILITY_EXPONENTIAL); } assert(exponentialPart>=0); assert(exponentialPart<=1); return ((1-SWARM::CONGESTION_PROBABILITY_WEIGHT) + (SWARM::CONGESTION_PROBABILITY_WEIGHT * exponentialPart) );}SwarmEnvironment::SwarmEnvironment() : seqno(1), rqueue(), packetsReceived(SWARM::PROBABILITY_HISTORY, SWARM::PROBABILITY_SAMPLES) {}double swarm_rt_entry::getQValue(nsaddr_t hop){ swarm_neighbour &neighbour = env->neighbours[hop]; double linkSuccessProbability = neighbour.getProbabilitySuccessfulUnicast(); double congestionSuccessProbability = env->getCongestionRelatedSuccessProbability(); double successProbability = linkSuccessProbability * congestionSuccessProbability; swarm_nexthop &next_hop = next_hops[hop]; if(successProbability<0.001) return SWARM::COST_MAX; return SWARM::UNICAST_FAIL_REWARD *(1-successProbability) / successProbability + (next_hop.getCurrentEstimate() + SWARM::UNICAST_SUCCESS_REWARD);}void swarm_nexthop::setLastAdvertised(double value){ lastAdvertised = value; lastAdvertisedTime = CURRENT_TIME;}swarm_nexthop::swarm_nexthop(){ lastAdvertised=(SWARM::COST_MAX); lastAdvertisedTime=(0);}// USE multiplicative for the moment since it commutes with multiple hops.double swarm_nexthop::getCurrentEstimate(){ double timeElapsed = CURRENT_TIME - lastAdvertisedTime; if(timeElapsed>SWARM::ROUTE_TIMEOUT) return SWARM::COST_MAX; return lastAdvertised * pow(SWARM::DECAY_PER_SECOND, timeElapsed);}void swarm_rt_entry::update(){ if(isThisNode) currentEstimate=0; else { map < nsaddr_t, swarm_nexthop >::iterator iter = next_hops.begin(); for (; iter != next_hops.end(); iter++) { std::pair < nsaddr_t, swarm_nexthop > entry = *(iter); if( (CURRENT_TIME-entry.second.lastAdvertisedTime) > SWARM::ROUTE_TIMEOUT ) { entry.second.lastAdvertised=SWARM::COST_MAX; entry.second.lastAdvertisedTime = -SWARM::ROUTE_TIMEOUT; } } double oldEstimate; do { oldEstimate=currentEstimate; double newEstimate = SWARM::COST_MAX; double latestUpdateTime = -SWARM::ROUTE_TIMEOUT; map < nsaddr_t, swarm_nexthop >::iterator iter = next_hops.begin(); for (; iter != next_hops.end(); iter++) { std::pair < nsaddr_t, swarm_nexthop > entry = *(iter); double valueUsingHop = getQValue(entry.first); newEstimate = max(newEstimate, valueUsingHop); latestUpdateTime = max(latestUpdateTime , entry.second.lastAdvertisedTime); } if((CURRENT_TIME-latestUpdateTime) > SWARM::ROUTE_TIMEOUT) { //TODO: purge tables in this case. currentEstimate = SWARM::COST_MAX; lastAdvertised = SWARM::COST_MAX; return; } else { currentEstimate=newEstimate; } } while( fabs(currentEstimate-oldEstimate) > 0.1); }}voidswarm_rt_entry::onReceive(const Packet * p){ struct hdr_swarm *swarmPacket = HDR_SWARM(p); struct hdr_cmn *ch = HDR_CMN(p); nsaddr_t prevHopAddress = ch->prev_hop_; swarm_nexthop & prevHop = next_hops[prevHopAddress]; prevHop.setLastAdvertised(swarmPacket->valueFromSource); int srcSeqno = swarmPacket->src_seqno; update(); if (srcSeqno > maxSequenceNumberSeen) { maxSequenceNumberSeen = srcSeqno; } if (srcSeqno >= minSequenceNumberSeen) { if (sequenceNumberBestValue.count(srcSeqno) ==0) { sequenceNumberBestValue[srcSeqno] = SWARM::COST_MAX; } } numberReceiveBeforeProactiveResponse--;}void swarm_rt_entry::onForwardingFrom(const Packet * p){ struct hdr_swarm *swarmPacket = HDR_SWARM(p); int srcSeqno = swarmPacket->src_seqno; if (srcSeqno > maxSequenceNumberSeen) { maxSequenceNumberSeen = srcSeqno; } if (srcSeqno < minSequenceNumberSeen) { minSequenceNumberSeen = srcSeqno; } sequenceNumberBestValue[srcSeqno]=currentEstimate; fixSeqnoMemory();}bool swarm_rt_entry::isNewOrImproved(const Packet * p){ struct hdr_swarm *swarmPacket = HDR_SWARM(p); int srcSeqno = swarmPacket->src_seqno; if(srcSeqno < minSequenceNumberSeen) return false; else { double bestPrevious = sequenceNumberBestValue[srcSeqno]; return (currentEstimate-bestPrevious) > SWARM::IMPROVEMENT_FOR_RESEND; }}bool swarm_rt_entry::isProactiveResponseNeeded(){ return numberReceiveBeforeProactiveResponse<=0 || lastAdvertised==SWARM::COST_MAX;}void swarm_rt_entry::onSendToDestination(){ lastForwardedPacketTowards = CURRENT_TIME; lastAdvertised = currentEstimate; lastAdvertisedTime = CURRENT_TIME; numberReceiveBeforeProactiveResponse=SWARM::MAXIMUM_RECEIVES_WITHOUT_SEND;}voidswarm_rt_entry::onPromiscuousReceiveFrom(const Packet * p){ struct hdr_cmn *ch = HDR_CMN(p); struct hdr_swarm *swarmPacket = HDR_SWARM(p); nsaddr_t prevHopAddress = ch->prev_hop_; swarm_nexthop & prevHop = next_hops[prevHopAddress]; prevHop.setLastAdvertised(swarmPacket->valueFromSource); update();}voidswarm_rt_entry::onPromiscuousReceiveTo(const Packet * p){ struct hdr_cmn *ch = HDR_CMN(p); struct hdr_swarm *swarmPacket = HDR_SWARM(p); nsaddr_t prevHopAddress = ch->prev_hop_; swarm_nexthop & prevHop = next_hops[prevHopAddress]; prevHop.setLastAdvertised(swarmPacket->valueToDest); update();}voidswarm_rt_entry::onNextHopSendFailed(nsaddr_t nextHop){ if (next_hops.count(nextHop) > 0) { removeNextHop(nextHop); }}nsaddr_tswarm_rt_entry::pickNextHop(){ double maxValue = SWARM::COST_MAX; double totalExpectedReward = 0; update(); map < nsaddr_t, swarm_nexthop >::iterator iter = next_hops.begin(); for (; iter != next_hops.end(); iter++) { std::pair < nsaddr_t, swarm_nexthop > entry = *(iter); swarm_nexthop &next_hop = entry.second; double valueUsingHop = getQValue(entry.first); if( (next_hop.getCurrentEstimate()-currentEstimate) >= SWARM::MINIMUM_REWARD) { double er = exp(valueUsingHop / SWARM::TEMPERATURE); totalExpectedReward += er; } } double broadcastReward = exp((currentEstimate+SWARM::BROADCAST_REWARD) / SWARM::TEMPERATURE); totalExpectedReward += broadcastReward; double x = Random::uniform() * totalExpectedReward; x-=broadcastReward; if(x<=0) { return IP_BROADCAST; } else { iter = next_hops.begin(); for (; iter != next_hops.end(); iter++) { std::pair < nsaddr_t, swarm_nexthop > entry = *(iter); swarm_nexthop &next_hop = entry.second; double valueUsingHop = getQValue(entry.first); if( (next_hop.getCurrentEstimate()-currentEstimate) >= SWARM::MINIMUM_REWARD) { double er = exp(valueUsingHop / SWARM::TEMPERATURE); x-=er; if(x<=0) return entry.first; } } return IP_BROADCAST; }}voidswarm_rt_entry::removeNextHop(nsaddr_t address){ next_hops.erase(address);}void swarm_rt_entry::fixSeqnoMemory(){ while(sequenceNumberBestValue.size()>SWARM::NUMBER_OF_SEQNOS_TO_STORE) { sequenceNumberBestValue.erase(minSequenceNumberSeen++); }}swarm_rt_entry::swarm_rt_entry(){ currentEstimate = lastAdvertised = SWARM::COST_MAX; isThisNode=false; maxSequenceNumberSeen=0; minSequenceNumberSeen=0; lastAdvertisedTime=0; lastForwardedPacketTowards=0; numberReceiveBeforeProactiveResponse=SWARM::MAXIMUM_RECEIVES_WITHOUT_SEND;}bool swarm_rt_entry::hasSomeRoute(){ return currentEstimate>SWARM::COST_MAX;}swarm_neighbour::swarm_neighbour(): numberAttemptedSend(SWARM::PROBABILITY_HISTORY, SWARM::PROBABILITY_SAMPLES), numberFailedSend(SWARM::PROBABILITY_HISTORY, SWARM::PROBABILITY_SAMPLES), numberReceived(SWARM::PROBABILITY_HISTORY, SWARM::PROBABILITY_SAMPLES){}double swarm_neighbour::getProbabilitySuccessfulUnicast(){ int attempted=numberAttemptedSend.getCurrentValue(); int failed=numberFailedSend.getCurrentValue(); int received=numberReceived.getCurrentValue(); double numerator = (double)((attempted-failed) + SWARM::CONFIDENCE_FROM_RECEIVE * SWARM::RECEIVE_SIGNIFICANCE * received); double divisor = (double)(attempted + SWARM::RECEIVE_SIGNIFICANCE * received); if(divisor <=0) return 0; double result = numerator/divisor; if(result < 0) return 0; if(result > 1) return 1; return result;}void timed_counter::check(){ while(lastPushBack+(historyToKeep/numberSamples) < CURRENT_TIME) { data.push_front(currentCount); samplesStored++; currentCount=0; lastPushBack+=(historyToKeep/numberSamples); } while(samplesStored>numberSamples) { totalCount-=data.back(); data.pop_back(); samplesStored--; }} timed_counter::timed_counter(double historyToKeep, int numberSamples): numberSamples(numberSamples), historyToKeep(historyToKeep){ currentCount=0; totalCount=0; lastPushBack=CURRENT_TIME; samplesStored=0;}void timed_counter::increment(){ check(); currentCount++; totalCount++;}int timed_counter::getCurrentValue(){ check(); return totalCount;}double timed_counter::getHistoryLength(){ return historyToKeep * samplesStored / numberSamples; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -