📄 diff_rate.cc
字号:
(dfh->sender_id).port_, dfh->pk_num, cmh->next_hop());
#endif // DEBUG_RATE
return;
}
if (org_type_ == UNICAST_ORG) {
for (cur_out = routing_table[dtype].active; cur_out!= NULL;
cur_out = OUT_NEXT(cur_out)) {
if (GRADIENT(cur_out) == ORIGINAL) {
cur_pkt = pkt->copy();
cur_iph = HDR_IP(cur_pkt);
cur_iph->dst_ = AGT_ADDR(cur_out);
cur_dfh = HDR_CDIFF(cur_pkt);
cur_dfh->forward_agent_id = here_;
cur_dfh->num_next = 1;
cur_dfh->next_nodes[0] = NODE_ADDR(cur_out);
cur_out->num_data_send++;
MACprepare(cur_pkt, NODE_ADDR(cur_out), NS_AF_INET,
MAC_RETRY_);
MACsend(cur_pkt, 0);
#ifdef DEBUG_RATE
cur_cmh = HDR_CMN(cur_pkt);
printf("DF node %x will send %s (%x, %x, %d) to %x\n",
THIS_NODE, MsgStr[cur_dfh->mess_type],
(cur_dfh->sender_id).addr_, (cur_dfh->sender_id).port_,
cur_dfh->pk_num, cur_cmh->next_hop());
#endif // DEBUG_RATE
} // endif
} // endfor
Packet::free(pkt);
return;
} // endif unicast original
}
void DiffusionRate::FwdData(Packet *pkt)
{
hdr_cdiff *dfh = HDR_CDIFF(pkt);
unsigned int dtype = dfh->data_type;
nsaddr_t forwarder_node;
ns_addr_t forward_agent;
bool forward_flag;
forwarder_node = (dfh->forward_agent_id).addr_;
forward_agent = dfh->forward_agent_id;
if (dfh->report_rate == SUB_SAMPLED) {
if(dfh->totalhop <= node->goodhop){
Packet::free(pkt);
return;
}
else
dfh->totalhop = dfh->totalhop-1;
forward_flag = FwdSubsample(pkt);
TriggerPosReinf(pkt, forward_agent);
if (forward_flag == false) {
Packet::free(pkt);
}
return;
}
// Then, report rate is ORIGINAL here.
if (routing_table[dtype].ExistOriginalGradient() == false
&& routing_table[dtype].sink == NULL) {
if (THIS_NODE != forwarder_node && NEG_REINF_ == true) {
BcastNeg(dtype);
routing_table[dtype].new_org_counter = 0;
routing_table[dtype].ClrAllNewOrg();
routing_table[dtype].ClrAllOldOrg();
}
Packet::free(pkt);
return;
}
if (routing_table[dtype].ExistOriginalGradient() == false) {
Packet::free(pkt);
return;
}
FwdOriginal(pkt);
}
void DiffusionRate::DataReqAll(unsigned int dtype, int report_rate)
{
Agent_List *cur_agent;
Packet *pkt;
hdr_cdiff *dfh;
for (cur_agent=routing_table[dtype].source; cur_agent != NULL;
cur_agent = AGENT_NEXT(cur_agent) ) {
pkt = prepare_message(dtype, AGT_ADDR(cur_agent), DATA_REQUEST);
dfh = HDR_CDIFF(pkt);
dfh->report_rate = report_rate;
send_to_dmux(pkt, 0);
}
}
void DiffusionRate::GenNeg(int dtype)
{
In_List *cur;
if (neg_thr_type_ == NEG_ABSOLUTE) {
for (cur= routing_table[dtype].iif; cur != NULL; cur= IN_NEXT(cur)) {
if (NEW_ORG_RECV(cur) <= 0 && OLD_ORG_RECV(cur) > MAX_DUP_DATA) {
UcastNeg(dtype, AGT_ADDR(cur));
cur->num_neg_send++;
}
}
return;
}
if (neg_thr_type_ == NEG_RELATIVE) {
int most= routing_table[dtype].MostRecvOrg();
for (cur= routing_table[dtype].iif; cur != NULL; cur= IN_NEXT(cur)) {
if (OLD_ORG_RECV(cur) > MAX_DUP_DATA &&
NEW_ORG_RECV(cur) <= NEG_MIN_RATIO*most) {
UcastNeg(dtype, AGT_ADDR(cur));
cur->num_neg_send++;
}
}
return;
}
}
void DiffusionRate::BcastNeg(int dtype)
{
ns_addr_t bcast_addr;
bcast_addr.addr_ = MAC_BROADCAST;
bcast_addr.port_ = ROUTING_PORT;
Packet *pkt=prepare_message(dtype, bcast_addr, NEG_REINFORCE);
MACprepare(pkt, MAC_BROADCAST, NS_AF_ILINK, 0);
MACsend(pkt, 0);
overhead++;
num_neg_bcast_send++;
#ifdef DEBUG_RATE
hdr_cdiff *dfh = HDR_CDIFF(pkt);
hdr_cmn *cmh = HDR_CMN(pkt);
printf("DF node %d will send %s (%x, %x, %d) to %x\n",
THIS_NODE, MsgStr[dfh->mess_type], (dfh->sender_id).addr_,
(dfh->sender_id).port_, dfh->pk_num, cmh->next_hop());
#endif // DEBUG_RATE
}
void DiffusionRate::UcastNeg(int dtype, ns_addr_t to)
{
Packet *pkt=prepare_message(dtype, to, NEG_REINFORCE);
MACprepare(pkt, to.addr_, NS_AF_INET, 0);
MACsend(pkt, 0);
overhead++;
#ifdef DEBUG_RATE
hdr_cdiff *dfh = HDR_CDIFF(pkt);
hdr_cmn *cmh = HDR_CMN(pkt);
printf("DF node %d will send %s (%x, %x, %d) to %x\n",
THIS_NODE, MsgStr[dfh->mess_type], (dfh->sender_id).addr_,
(dfh->sender_id).port_, dfh->pk_num, cmh->next_hop());
#endif
}
void DiffusionRate::ProcessNegReinf(Packet *pkt)
{
hdr_cdiff *dfh = HDR_CDIFF(pkt);
unsigned int dtype = dfh->data_type;
Out_List *cur_out;
PrvCurPtr RetVal;
RetVal=INTF_FIND(routing_table[dtype].active, dfh->forward_agent_id);
if (RetVal.cur == NULL) {
Packet::free(pkt);
return;
}
cur_out = (Out_List *)(RetVal.cur);
if (GRADIENT(cur_out) == SUB_SAMPLED) {
Packet::free(pkt);
return;
}
GRADIENT(cur_out) = SUB_SAMPLED;
if (routing_table[dtype].ExistOriginalGradient() == false &&
routing_table[dtype].sink == NULL) {
DataReqAll(dtype, SUB_SAMPLED);
if (NEG_REINF_ == true) {
BcastNeg(dtype);
routing_table[dtype].new_org_counter = 0;
routing_table[dtype].ClrAllNewOrg();
routing_table[dtype].ClrAllOldOrg();
}
}
Packet::free(pkt);
}
void DiffusionRate::ProcessPosReinf(Packet *pkt)
{
hdr_cdiff *dfh= HDR_CDIFF(pkt);
unsigned int dtype = dfh->data_type;
Out_List *cur_out, *OutPtr;
PrvCurPtr RetVal;
RetVal=INTF_FIND(routing_table[dtype].active, dfh->forward_agent_id);
if (RetVal.cur != NULL) {
cur_out = (Out_List *)(RetVal.cur);
GRADIENT(cur_out) = ORIGINAL;
GRAD_TMOUT(RetVal.cur) = max(GRAD_TMOUT(RetVal.cur),
dfh->ts_ + INTEREST_TIMEOUT);
NUM_POS_RECV(cur_out)++;
} else {
OutPtr = new Out_List;
AGT_ADDR(OutPtr) = dfh->forward_agent_id;
GRADIENT(OutPtr) = dfh->report_rate;
GRAD_TMOUT(OutPtr) = dfh->ts_ + INTEREST_TIMEOUT;
INTF_INSERT(routing_table[dtype].active, OutPtr);
routing_table[dtype].num_active ++;
NUM_POS_RECV(OutPtr)++;
}
DataReqAll(dtype, ORIGINAL);
Pkt_Hash_Entry *hashPtr;
nsaddr_t next_node;
In_List *recent_in;
In_List *cur;
switch(pos_type_) {
case POS_HASH:
hashPtr=PktTable.GetHash(dfh->info.sender, dfh->info.seq);
if (hashPtr == NULL) {
perror("Hey! I've never seen that packet before.\n");
Packet::free(pkt);
exit(-1);
}
next_node = (hashPtr->forwarder_id).addr_;
if (next_node == THIS_NODE) {
Packet::free(pkt);
return;
}
PosReinf(dtype, hashPtr->forwarder_id.addr_, dfh->info.sender,
dfh->info.seq);
routing_table[dtype].CntPosSend(hashPtr->forwarder_id);
routing_table[dtype].ClrNewSub(hashPtr->forwarder_id);
#ifdef DEBUG_RATE
printf("DF node %d will send %s to %x\n",
THIS_NODE, MsgStr[dfh->mess_type], hashPtr->forwarder_id.addr_);
#endif // DEBUG_RATE
Packet::free(pkt);
return;
case POS_LAST:
recent_in = routing_table[dtype].MostRecentIn();
if (recent_in == NULL) {
Packet::free(pkt);
return;
}
next_node = NODE_ADDR(recent_in);
if (next_node == THIS_NODE) {
Packet::free(pkt);
return;
}
PosReinf(dtype, NODE_ADDR(recent_in), dfh->info.sender, dfh->info.seq);
routing_table[dtype].CntPosSend(AGT_ADDR(recent_in));
routing_table[dtype].ClrNewSub(AGT_ADDR(recent_in));
#ifdef DEBUG_RATE
printf("DF node %d will send %s to %x\n",
THIS_NODE, MsgStr[dfh->mess_type], NODE_ADDR(recent_in));
#endif // DEBUG_RATE
Packet::free(pkt);
return;
case POS_ALL:
for (cur = routing_table[dtype].iif; cur!=NULL; cur = IN_NEXT(cur)) {
if (NEW_SUB_RECV(cur) <= 0) {
continue;
}
next_node = NODE_ADDR(cur);
if (next_node == THIS_NODE) {
continue;
}
PosReinf(dtype, NODE_ADDR(cur), dfh->info.sender, dfh->info.seq);
routing_table[dtype].CntPosSend(AGT_ADDR(cur));
routing_table[dtype].ClrNewSub(AGT_ADDR(cur));
#ifdef DEBUG_RATE
printf("DF node %d will send %s to %x\n",
THIS_NODE, MsgStr[dfh->mess_type], NODE_ADDR(cur));
#endif // DEBUG_RATE
}
Packet::free(pkt);
return;
default:
Packet::free(pkt);
return;
}
}
void DiffusionRate::PosReinf(int dtype, nsaddr_t to_node,
ns_addr_t info_sender, unsigned int info_seq)
{
ns_addr_t to_agent_addr;
to_agent_addr.addr_ = to_node;
to_agent_addr.port_ = ROUTING_PORT;
Packet *pkt=prepare_message(dtype, to_agent_addr, POS_REINFORCE);
hdr_cdiff *dfh = HDR_CDIFF(pkt);
dfh->report_rate = ORIGINAL;
dfh->info.sender = info_sender;
dfh->info.seq = info_seq;
MACprepare(pkt, to_node, NS_AF_INET, 1);
MACsend(pkt, 0);
overhead++;
#ifdef DEBUG_RATE
hdr_cmn *cmh = HDR_CMN(pkt);
printf("DF node %d will send %s (%x, %x, %d) to %x\n",
THIS_NODE, MsgStr[dfh->mess_type], (dfh->sender_id).addr_,
(dfh->sender_id).port_, dfh->pk_num, cmh->next_hop());
#endif
}
void DiffusionRate::Start()
{
DiffusionAgent::Start();
gradient_timer = new GradientTimer(this);
gradient_timer->resched(INTEREST_TIMEOUT);
if ( neg_win_type_ == NEG_TIMER && NEG_REINF_ == true) {
neg_reinf_timer = new NegativeReinforceTimer(this);
neg_reinf_timer->resched(NEG_CHECK);
}
}
void DiffusionRate::reset()
{
DiffusionAgent::reset();
DataTable.reset();
}
void DiffusionRate::Print_IOlist()
{
Out_List *cur_out;
In_List *cur_in;
int i;
for (i=0; i<1; i++) {
printf("Node %d DATA TYPE %d: send bcast data %d, not send %d, rcv %d\n",
THIS_NODE, i, num_data_bcast_send, num_not_send_bcast_data,
num_data_bcast_rcv);
printf("Node %d neg bcast send %d, neg bcast rcv %d\n",
THIS_NODE, num_neg_bcast_send, num_neg_bcast_rcv);
for (cur_out = routing_table[i].active; cur_out != NULL;
cur_out = OUT_NEXT(cur_out) ) {
printf("DF node %d has oif %d (%f,%d) send data %d recv neg %d pos %d\n",
THIS_NODE, NODE_ADDR(cur_out), GRADIENT(cur_out),
routing_table[i].num_active, NUM_DATA_SEND(cur_out),
NUM_NEG_RECV(cur_out), NUM_POS_RECV(cur_out));
}
for (cur_in = routing_table[i].iif; cur_in != NULL;
cur_in = IN_NEXT(cur_in) ) {
printf("Diffusion node %d has iif for %d\n",
THIS_NODE, NODE_ADDR(cur_in));
printf("Node %d recv new sub %d,new org %d,old org %d:send neg %d pos %d\n",
THIS_NODE, TOTAL_NEW_SUB_RECV(cur_in), TOTAL_NEW_ORG_RECV(cur_in),
TOTAL_OLD_ORG_RECV(cur_in), NUM_NEG_SEND(cur_in),
NUM_POS_SEND(cur_in));
}
}
}
int DiffusionRate::command(int argc, const char*const*argv)
{
if (argc == 2) {
if (strcasecmp(argv[1], "enable-suppression") == 0) {
DUP_SUP_ = true;
return TCL_OK;
}
if (strcasecmp(argv[1], "disable-suppression") == 0) {
DUP_SUP_ = false;
return TCL_OK;
}
}
else if (argc == 3) {
if (strcasecmp(argv[1], "set-sub-tx-type") == 0 ) {
sub_type_ = ParseSubType(argv[2]);
return TCL_OK;
}
if (strcasecmp(argv[1], "set-org-tx-type") == 0 ) {
org_type_ = ParseOrgType(argv[2]);
return TCL_OK;
}
if (strcasecmp(argv[1], "set-pos-type") == 0 ) {
pos_type_ = ParsePosType(argv[2]);
return TCL_OK;
}
if (strcasecmp(argv[1], "set-pos-node-type") == 0 ) {
pos_node_type_ = ParsePosNodeType(argv[2]);
return TCL_OK;
}
if (strcasecmp(argv[1], "set-neg-win-type") == 0 ) {
neg_win_type_ = ParseNegWinType(argv[2]);
return TCL_OK;
}
if (strcasecmp(argv[1], "set-neg-thr-type") == 0 ) {
neg_thr_type_ = ParseNegThrType(argv[2]);
return TCL_OK;
}
if (strcasecmp(argv[1], "set-neg-max-type") == 0 ) {
neg_max_type_ = ParseNegMaxType(argv[2]);
return TCL_OK;
}
}
return DiffusionAgent::command(argc, argv);
}
sub_t ParseSubType(const char* str)
{
if (strcasecmp(str, "BROADCAST") == 0) {
return BCAST_SUB;
}
if (strcasecmp(str, "UNICAST") == 0) {
return UNICAST_SUB;
}
fprintf(stderr,"ParseSubType Error -- Only BROADCAST or UNICAST\n");
exit(-1);
}
org_t ParseOrgType(const char* str)
{
if (strcasecmp(str, "BROADCAST") == 0) {
return BCAST_ORG;
}
if (strcasecmp(str, "UNICAST") == 0) {
return UNICAST_ORG;
}
fprintf(stderr,"ParseOrgType Error -- Only BROADCAST or UNICAST\n");
exit(-1);
}
pos_t ParsePosType(const char* str)
{
if (strcasecmp(str, "HASH") == 0) {
return POS_HASH;
}
if (strcasecmp(str, "LAST") == 0) {
return POS_LAST;
}
if (strcasecmp(str, "ALL") == 0) {
return POS_ALL;
}
fprintf(stderr,"ParsePosType Error -- Only HASH, LAST, or ALL\n");
exit(-1);
}
pos_ndt ParsePosNodeType(const char* str)
{
if (strcasecmp(str, "END") == 0) {
return END_POS;
}
if (strcasecmp(str, "INTM") == 0) {
return INTM_POS;
}
fprintf(stderr,"ParsePosNodeType Error -- Only END or INTM\n");
exit(-1);
}
neg_wint ParseNegWinType(const char* str)
{
if (strcasecmp(str, "COUNTER") == 0) {
return NEG_COUNTER;
}
if (strcasecmp(str, "TIMER") == 0) {
return NEG_TIMER;
}
fprintf(stderr,"ParseNegWinType Error -- Only COUNTER or TIMER\n");
exit(-1);
}
neg_tht ParseNegThrType(const char* str)
{
if (strcasecmp(str, "ABSOLUTE") == 0) {
return NEG_ABSOLUTE;
}
if (strcasecmp(str, "RELATIVE") == 0) {
return NEG_RELATIVE;
}
fprintf(stderr,"ParseNegThrType Error -- Only ABSOLUTE or RELATIVE\n");
exit(-1);
}
neg_mxt ParseNegMaxType(const char* str)
{
if (strcasecmp(str, "FIXED") == 0) {
return NEG_FIXED_MAX;
}
if (strcasecmp(str, "SCALE") == 0) {
return NEG_SCALE_MAX;
}
fprintf(stderr,"ParseNegMaxType Error -- Only FIXED or SCALE\n");
exit(-1);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -