📄 diff_rate.cc
字号:
} if (org_type_ == UNICAST_ORG) { for (cur_out = routing_table[dtype].active; cur_out!= NULL; cur_out = OUT_NEXT(cur_out)) { if (GRADIENT(cur_out) == ORIGINAL) { cur_pkt = pkt->copy(); cur_iph = HDR_IP(cur_pkt); cur_iph->dst_ = AGT_ADDR(cur_out); cur_dfh = HDR_CDIFF(cur_pkt); cur_dfh->forward_agent_id = here_; cur_dfh->num_next = 1; cur_dfh->next_nodes[0] = NODE_ADDR(cur_out); cur_out->num_data_send++; MACprepare(cur_pkt, NODE_ADDR(cur_out), NS_AF_INET, MAC_RETRY_); MACsend(cur_pkt, 0); #ifdef DEBUG_RATE cur_cmh = HDR_CMN(cur_pkt); printf("DF node %x will send %s (%x, %x, %d) to %x\n", THIS_NODE, MsgStr[cur_dfh->mess_type], (cur_dfh->sender_id).addr_, (cur_dfh->sender_id).port_, cur_dfh->pk_num, cur_cmh->next_hop());#endif // DEBUG_RATE } // endif } // endfor Packet::free(pkt); return; } // endif unicast original}void DiffusionRate::FwdData(Packet *pkt){ hdr_cdiff *dfh = HDR_CDIFF(pkt); unsigned int dtype = dfh->data_type; nsaddr_t forwarder_node; ns_addr_t forward_agent; bool forward_flag; forwarder_node = (dfh->forward_agent_id).addr_; forward_agent = dfh->forward_agent_id; if (dfh->report_rate == SUB_SAMPLED) { forward_flag = FwdSubsample(pkt); TriggerPosReinf(pkt, forward_agent); if (forward_flag == false) { Packet::free(pkt); } return; } // Then, report rate is ORIGINAL here. if (routing_table[dtype].ExistOriginalGradient() == false && routing_table[dtype].sink == NULL) { if (THIS_NODE != forwarder_node && NEG_REINF_ == true) { BcastNeg(dtype); routing_table[dtype].new_org_counter = 0; routing_table[dtype].ClrAllNewOrg(); routing_table[dtype].ClrAllOldOrg(); } Packet::free(pkt); return; } if (routing_table[dtype].ExistOriginalGradient() == false) { Packet::free(pkt); return; } FwdOriginal(pkt);}void DiffusionRate::DataReqAll(unsigned int dtype, int report_rate){ Agent_List *cur_agent; Packet *pkt; hdr_cdiff *dfh; for (cur_agent=routing_table[dtype].source; cur_agent != NULL; cur_agent = AGENT_NEXT(cur_agent) ) { pkt = prepare_message(dtype, AGT_ADDR(cur_agent), DATA_REQUEST); dfh = HDR_CDIFF(pkt); dfh->report_rate = report_rate; send_to_dmux(pkt, 0); }}void DiffusionRate::GenNeg(int dtype){ In_List *cur; if (neg_thr_type_ == NEG_ABSOLUTE) { for (cur= routing_table[dtype].iif; cur != NULL; cur= IN_NEXT(cur)) { if (NEW_ORG_RECV(cur) <= 0 && OLD_ORG_RECV(cur) > MAX_DUP_DATA) { UcastNeg(dtype, AGT_ADDR(cur)); cur->num_neg_send++; } } return; } if (neg_thr_type_ == NEG_RELATIVE) { int most= routing_table[dtype].MostRecvOrg(); for (cur= routing_table[dtype].iif; cur != NULL; cur= IN_NEXT(cur)) { if (OLD_ORG_RECV(cur) > MAX_DUP_DATA && NEW_ORG_RECV(cur) <= NEG_MIN_RATIO*most) { UcastNeg(dtype, AGT_ADDR(cur)); cur->num_neg_send++; } } return; }}void DiffusionRate::BcastNeg(int dtype){ ns_addr_t bcast_addr; bcast_addr.addr_ = MAC_BROADCAST; bcast_addr.port_ = ROUTING_PORT; Packet *pkt=prepare_message(dtype, bcast_addr, NEG_REINFORCE); MACprepare(pkt, MAC_BROADCAST, NS_AF_ILINK, 0); MACsend(pkt, 0); overhead++; num_neg_bcast_send++;#ifdef DEBUG_RATE hdr_cdiff *dfh = HDR_CDIFF(pkt); hdr_cmn *cmh = HDR_CMN(pkt); printf("DF node %d will send %s (%x, %x, %d) to %x\n", THIS_NODE, MsgStr[dfh->mess_type], (dfh->sender_id).addr_, (dfh->sender_id).port_, dfh->pk_num, cmh->next_hop());#endif // DEBUG_RATE}void DiffusionRate::UcastNeg(int dtype, ns_addr_t to){ Packet *pkt=prepare_message(dtype, to, NEG_REINFORCE); MACprepare(pkt, to.addr_, NS_AF_INET, 0); MACsend(pkt, 0); overhead++;#ifdef DEBUG_RATE hdr_cdiff *dfh = HDR_CDIFF(pkt); hdr_cmn *cmh = HDR_CMN(pkt); printf("DF node %d will send %s (%x, %x, %d) to %x\n", THIS_NODE, MsgStr[dfh->mess_type], (dfh->sender_id).addr_, (dfh->sender_id).port_, dfh->pk_num, cmh->next_hop());#endif}void DiffusionRate::ProcessNegReinf(Packet *pkt){ hdr_cdiff *dfh = HDR_CDIFF(pkt); unsigned int dtype = dfh->data_type; Out_List *cur_out; PrvCurPtr RetVal; RetVal=INTF_FIND(routing_table[dtype].active, dfh->forward_agent_id); if (RetVal.cur == NULL) { Packet::free(pkt); return; } cur_out = (Out_List *)(RetVal.cur); if (GRADIENT(cur_out) == SUB_SAMPLED) { Packet::free(pkt); return; } GRADIENT(cur_out) = SUB_SAMPLED; if (routing_table[dtype].ExistOriginalGradient() == false && routing_table[dtype].sink == NULL) { DataReqAll(dtype, SUB_SAMPLED); if (NEG_REINF_ == true) { BcastNeg(dtype); routing_table[dtype].new_org_counter = 0; routing_table[dtype].ClrAllNewOrg(); routing_table[dtype].ClrAllOldOrg(); } } Packet::free(pkt);}void DiffusionRate::ProcessPosReinf(Packet *pkt){ hdr_cdiff *dfh= HDR_CDIFF(pkt); unsigned int dtype = dfh->data_type; Out_List *cur_out, *OutPtr; PrvCurPtr RetVal; RetVal=INTF_FIND(routing_table[dtype].active, dfh->forward_agent_id); if (RetVal.cur != NULL) { cur_out = (Out_List *)(RetVal.cur); GRADIENT(cur_out) = ORIGINAL; GRAD_TMOUT(RetVal.cur) = max(GRAD_TMOUT(RetVal.cur), dfh->ts_ + INTEREST_TIMEOUT); NUM_POS_RECV(cur_out)++; } else { OutPtr = new Out_List; AGT_ADDR(OutPtr) = dfh->forward_agent_id; GRADIENT(OutPtr) = dfh->report_rate; GRAD_TMOUT(OutPtr) = dfh->ts_ + INTEREST_TIMEOUT; INTF_INSERT(routing_table[dtype].active, OutPtr); routing_table[dtype].num_active ++; NUM_POS_RECV(OutPtr)++; } DataReqAll(dtype, ORIGINAL); Pkt_Hash_Entry *hashPtr; nsaddr_t next_node; In_List *recent_in; In_List *cur; switch(pos_type_) { case POS_HASH: hashPtr=PktTable.GetHash(dfh->info.sender, dfh->info.seq); if (hashPtr == NULL) { perror("Hey! I've never seen that packet before.\n"); Packet::free(pkt); exit(-1); } next_node = (hashPtr->forwarder_id).addr_; if (next_node == THIS_NODE) { Packet::free(pkt); return; } PosReinf(dtype, hashPtr->forwarder_id.addr_, dfh->info.sender, dfh->info.seq); routing_table[dtype].CntPosSend(hashPtr->forwarder_id); routing_table[dtype].ClrNewSub(hashPtr->forwarder_id);#ifdef DEBUG_RATE printf("DF node %d will send %s to %x\n", THIS_NODE, MsgStr[dfh->mess_type], hashPtr->forwarder_id.addr_);#endif // DEBUG_RATE Packet::free(pkt); return; case POS_LAST: recent_in = routing_table[dtype].MostRecentIn(); if (recent_in == NULL) { Packet::free(pkt); return; } next_node = NODE_ADDR(recent_in); if (next_node == THIS_NODE) { Packet::free(pkt); return; } PosReinf(dtype, NODE_ADDR(recent_in), dfh->info.sender, dfh->info.seq); routing_table[dtype].CntPosSend(AGT_ADDR(recent_in)); routing_table[dtype].ClrNewSub(AGT_ADDR(recent_in));#ifdef DEBUG_RATE printf("DF node %d will send %s to %x\n", THIS_NODE, MsgStr[dfh->mess_type], NODE_ADDR(recent_in));#endif // DEBUG_RATE Packet::free(pkt); return; case POS_ALL: for (cur = routing_table[dtype].iif; cur!=NULL; cur = IN_NEXT(cur)) { if (NEW_SUB_RECV(cur) <= 0) { continue; } next_node = NODE_ADDR(cur); if (next_node == THIS_NODE) { continue; } PosReinf(dtype, NODE_ADDR(cur), dfh->info.sender, dfh->info.seq); routing_table[dtype].CntPosSend(AGT_ADDR(cur)); routing_table[dtype].ClrNewSub(AGT_ADDR(cur));#ifdef DEBUG_RATE printf("DF node %d will send %s to %x\n", THIS_NODE, MsgStr[dfh->mess_type], NODE_ADDR(cur));#endif // DEBUG_RATE } Packet::free(pkt); return; default: Packet::free(pkt); return; }}void DiffusionRate::PosReinf(int dtype, nsaddr_t to_node, ns_addr_t info_sender, unsigned int info_seq){ ns_addr_t to_agent_addr; to_agent_addr.addr_ = to_node; to_agent_addr.port_ = ROUTING_PORT; Packet *pkt=prepare_message(dtype, to_agent_addr, POS_REINFORCE); hdr_cdiff *dfh = HDR_CDIFF(pkt); dfh->report_rate = ORIGINAL; dfh->info.sender = info_sender; dfh->info.seq = info_seq; MACprepare(pkt, to_node, NS_AF_INET, 1); MACsend(pkt, 0); overhead++;#ifdef DEBUG_RATE hdr_cmn *cmh = HDR_CMN(pkt); printf("DF node %d will send %s (%x, %x, %d) to %x\n", THIS_NODE, MsgStr[dfh->mess_type], (dfh->sender_id).addr_, (dfh->sender_id).port_, dfh->pk_num, cmh->next_hop());#endif}void DiffusionRate::Start(){ DiffusionAgent::Start(); gradient_timer = new GradientTimer(this); gradient_timer->resched(INTEREST_TIMEOUT); if ( neg_win_type_ == NEG_TIMER && NEG_REINF_ == true) { neg_reinf_timer = new NegativeReinforceTimer(this); neg_reinf_timer->resched(NEG_CHECK); }}void DiffusionRate::reset(){ DiffusionAgent::reset(); DataTable.reset();}void DiffusionRate::Print_IOlist(){ Out_List *cur_out; In_List *cur_in; int i; for (i=0; i<1; i++) { printf("Node %d DATA TYPE %d: send bcast data %d, not send %d, rcv %d\n", THIS_NODE, i, num_data_bcast_send, num_not_send_bcast_data, num_data_bcast_rcv); printf("Node %d neg bcast send %d, neg bcast rcv %d\n", THIS_NODE, num_neg_bcast_send, num_neg_bcast_rcv); for (cur_out = routing_table[i].active; cur_out != NULL; cur_out = OUT_NEXT(cur_out) ) { printf("DF node %d has oif %d (%f,%d) send data %d recv neg %d pos %d\n", THIS_NODE, NODE_ADDR(cur_out), GRADIENT(cur_out), routing_table[i].num_active, NUM_DATA_SEND(cur_out), NUM_NEG_RECV(cur_out), NUM_POS_RECV(cur_out)); } for (cur_in = routing_table[i].iif; cur_in != NULL; cur_in = IN_NEXT(cur_in) ) { printf("Diffusion node %d has iif for %d\n", THIS_NODE, NODE_ADDR(cur_in)); printf("Node %d recv new sub %d,new org %d,old org %d:send neg %d pos %d\n", THIS_NODE, TOTAL_NEW_SUB_RECV(cur_in), TOTAL_NEW_ORG_RECV(cur_in), TOTAL_OLD_ORG_RECV(cur_in), NUM_NEG_SEND(cur_in), NUM_POS_SEND(cur_in)); } }}int DiffusionRate::command(int argc, const char*const*argv) { if (argc == 2) { if (strcasecmp(argv[1], "enable-suppression") == 0) { DUP_SUP_ = true; return TCL_OK; } if (strcasecmp(argv[1], "disable-suppression") == 0) { DUP_SUP_ = false; return TCL_OK; } } else if (argc == 3) { if (strcasecmp(argv[1], "set-sub-tx-type") == 0 ) { sub_type_ = ParseSubType(argv[2]); return TCL_OK; } if (strcasecmp(argv[1], "set-org-tx-type") == 0 ) { org_type_ = ParseOrgType(argv[2]); return TCL_OK; } if (strcasecmp(argv[1], "set-pos-type") == 0 ) { pos_type_ = ParsePosType(argv[2]); return TCL_OK; } if (strcasecmp(argv[1], "set-pos-node-type") == 0 ) { pos_node_type_ = ParsePosNodeType(argv[2]); return TCL_OK; } if (strcasecmp(argv[1], "set-neg-win-type") == 0 ) { neg_win_type_ = ParseNegWinType(argv[2]); return TCL_OK; } if (strcasecmp(argv[1], "set-neg-thr-type") == 0 ) { neg_thr_type_ = ParseNegThrType(argv[2]); return TCL_OK; } if (strcasecmp(argv[1], "set-neg-max-type") == 0 ) { neg_max_type_ = ParseNegMaxType(argv[2]); return TCL_OK; } } return DiffusionAgent::command(argc, argv);}sub_t ParseSubType(const char* str){ if (strcasecmp(str, "BROADCAST") == 0) { return BCAST_SUB; } if (strcasecmp(str, "UNICAST") == 0) { return UNICAST_SUB; } fprintf(stderr,"ParseSubType Error -- Only BROADCAST or UNICAST\n"); exit(-1);}org_t ParseOrgType(const char* str){ if (strcasecmp(str, "BROADCAST") == 0) { return BCAST_ORG; } if (strcasecmp(str, "UNICAST") == 0) { return UNICAST_ORG; } fprintf(stderr,"ParseOrgType Error -- Only BROADCAST or UNICAST\n"); exit(-1);}pos_t ParsePosType(const char* str){ if (strcasecmp(str, "HASH") == 0) { return POS_HASH; } if (strcasecmp(str, "LAST") == 0) { return POS_LAST; } if (strcasecmp(str, "ALL") == 0) { return POS_ALL; } fprintf(stderr,"ParsePosType Error -- Only HASH, LAST, or ALL\n"); exit(-1);}pos_ndt ParsePosNodeType(const char* str){ if (strcasecmp(str, "END") == 0) { return END_POS; } if (strcasecmp(str, "INTM") == 0) { return INTM_POS; } fprintf(stderr,"ParsePosNodeType Error -- Only END or INTM\n"); exit(-1);}neg_wint ParseNegWinType(const char* str){ if (strcasecmp(str, "COUNTER") == 0) { return NEG_COUNTER; } if (strcasecmp(str, "TIMER") == 0) { return NEG_TIMER; } fprintf(stderr,"ParseNegWinType Error -- Only COUNTER or TIMER\n"); exit(-1);}neg_tht ParseNegThrType(const char* str){ if (strcasecmp(str, "ABSOLUTE") == 0) { return NEG_ABSOLUTE; } if (strcasecmp(str, "RELATIVE") == 0) { return NEG_RELATIVE; } fprintf(stderr,"ParseNegThrType Error -- Only ABSOLUTE or RELATIVE\n"); exit(-1);}neg_mxt ParseNegMaxType(const char* str){ if (strcasecmp(str, "FIXED") == 0) { return NEG_FIXED_MAX; } if (strcasecmp(str, "SCALE") == 0) { return NEG_SCALE_MAX; } fprintf(stderr,"ParseNegMaxType Error -- Only FIXED or SCALE\n"); exit(-1);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -