📄 leader.cc
字号:
// Invia il messaggio send_ACTION(fragment_identity, to, leData.round); // Memorizza il messaggio lanciato leData.buffer_action[leData.round] = pair<NodeAddress, NodeAddress>(fragment_identity, to); // Fa partire il timeout. timer->launchActionTimeouts(to, leData.round);}voidLEADER_Agent::send_ACTION(NodeAddress fragment_identity, NodeAddress to, int round) { Packet* p = allocpkt(); struct hdr_leader * leaderh = HDR_LEADER(p); // Common structures. leaderh->msg_type = LEADER_ACTION; // Message. leaderh->action_message.current_fragment.ID = fragment_identity; leaderh->action_message.round = round; size_t size = sizeof(ActionMessage) + sizeof(LeaderMessageType); // Invia il messaggio. sendDown(p, size, to, LEADER_Agent::max_delay);}voidLEADER_Agent::send_REQUEST(NodeAddress to, LeaderMessageType type, int round) { Packet* p = allocpkt(); struct hdr_leader * leaderh = HDR_LEADER(p); // Common structures. leaderh->msg_type = LEADER_REQUEST; // Message leaderh->request_message.type = type; leaderh->request_message.round = round; size_t size = sizeof(RequestMessage) + sizeof(LeaderMessageType); // Invia il messaggio. sendDown(p, size, to, LEADER_Agent::max_delay);}/****************************//* Utility. *//****************************/voidLEADER_Agent::emptyInfoBuffer(){ // // Effettua una copia di backup del buffer in modo che eventuali nuove bufferizzazioni // non corrompano il processo di svuotamento. // list<pair<NodeAddress, struct InfoMessage> > tmp(leData.buffer_info); // Svuota il buffer. leData.buffer_info.clear(); // Effettua la ricezione di tutti i messaggi bufferizzati. for (list<pair<NodeAddress, struct InfoMessage> >::iterator i = tmp.begin(); i != tmp.end(); i++) { if (_DEBUG_) printf("\tFROM BUFFER\n"); recvInfoMessage(i->first, i->second); }}voidLEADER_Agent::emptyFeedbackBuffer(){ // // Effettua una copia di backup del buffer in modo che eventuali nuove bufferizzazioni // non corrompano il processo di svuotamento. // list<pair<NodeAddress, struct FeedbackMessage> > tmp(leData.buffer_feedback); // Svuota il buffer. leData.buffer_feedback.clear(); // Effettua la ricezione di tutti i messaggi bufferizzati. for (list<pair<NodeAddress, struct FeedbackMessage> >::iterator i = tmp.begin(); i != tmp.end(); i++) { if (_DEBUG_) printf("\tFROM BUFFER\n"); recvFeedbackMessage(i->first, i->second); }}/****************************//* Algorithm procedures and/* events./****************************/voidLEADER_Agent::endModule(){ if (_DEBUG_) printf("%d FINITO: leader = %d, nodi = %d\n", myAddress, leData.leader, leData.tree_size); // Entra nello stato finale. status = LEADER_STATUS_END; if (utility) if (leData.leader == myAddress) ((LeaderUtility*)utility)->setLeader(myAddress); // Segnala la conclusione del processo di clustering. ClusteringModule::endModule(); // Start upper module if (upTarget) Separator::instance().endCurrentModule(myAddress);}//// Inizia il processo di leader election.//voidLEADER_Agent::startModule() { // Inizializza le informazioni comuni del clustering. ClusteringModule::startModule(); if (_DEBUG_) { printf("%d neighbors: ", myAddress); for (NodeList::iterator n = neighbors.begin(); n != neighbors.end(); n++) printf("%d ", *n); printf("\n"); } status = LEADER_STATUS_INIT; // Inizializza le strutture necessarie. initializeLeaderElection(); if (fake_leader != -1) { if (fake_leader == myAddress) { beginLeaderPropagation(); } else { } } else { // Il nodo invia informazioni sul suo stato. send_INFO(leData.fragment.ID, leData.fragment.ID, 1, -1); // Il nodo inizia un ciclo di INFO. init_INFO_cycle(); }}//// Procedura di gestione dei messaggi di INFO.// Questa procedura viene chiamata con le seguenti assunzioni:// 1) In ogni round ogni vicino invia uno e un solo messaggio di INFO.// 2) Tutti i vicini inviano quel messaggio.// 3) Non vengono ricevuti altri messaggi prima di aver ricevuto ogni messaggio di INFO per quel round.//voidLEADER_Agent::receive_INFO(NodeAddress from, struct InfoMessage info){ if (_DEBUG_) printf("%d receive INFO (ID=%d, NEW_ID=%d, SIZE=%d, PARENT=%d, ROUND=%d, FROM=%d)\n", myAddress, info.fragment.ID, info.new_fragment.ID, info.new_fragment.size, info.parent, info.round, from); // Elimina il timeout del messaggio di INFO. // timer->cancelInfo(from, .seq); // Elimina il nodo dalla lista di coloro che devono inviare un messaggio INFO. leData.info_messages.erase(from); // FIXME: // Se il messaggio di questo round proviene dal vicino che nel round // precedente e'stato il "maximal_fragment_node", memorizza il messaggio proveniente // da questo nodo nel caso il messaggio di ACTION giunga al nodo dopo il messaggio di INFO. // Questo fenomeno puo'accadere solo ai nodi che hanno come "stored_path" in quel round -1 // (e quindi erano degli "edge_node"). if (leData.round != 0) if (leData.stored_path[leData.round - 1] == -1) if (leData.maximal_fragment_node[leData.round - 1] == from) leData.maximal_fragment_node_info_message[leData.round] = info; if (belongToAnotherFragment(from, info)) { // Il mittente appartiene ad un frammento differente: // Aggiorna la conoscenza del vicino massimale. updateMaximalFragment(from, info); if (_DEBUG_) if (leData.maximal_fragment_available[leData.round]) printf("\t%d [MAXID=%d, SIZE=%d, PATH=%d, NODE=%d]\n", myAddress, leData.maximal_fragment[leData.round].ID, leData.maximal_fragment[leData.round].size, leData.stored_path[leData.round], leData.maximal_fragment_node[leData.round]); } else { // Il mittente appartiene allo stesso frammento. if (leData.tree.parent != -1) { // Il nodo conosce il suo genitore. if (leData.tree.parent == from) { // Il nodo genitore ha inviato il messaggio di INFO. if (!isInfoInBuffer(info.round)) { // Il nodo non aveva ancora mai inviato il messaggio di INFO per // questo round: il nodo nel round precendente si e'legato al frammento di // quel nodo tramite esso. // La vecchia identita'del nodo e'gia'impostata correttamente. // La nuova identita'viene agggiornata per quanto riguarda la dimensione del // frammento. // Il nodo edge legato invia un messaggio di INFO con la dimensione dettatagli // dal frammento a cui e'legato. leData.fragment = info.new_fragment; send_INFO(leData.old_fragment_identity, leData.fragment.ID, leData.fragment.size, from); } else { // Il genitore mi sta inviando un messaggio duplicato: // lo scarto ma, viste le ipotesi, questa parte non verra'mai // chiamata. } } else { // Un nodo mio vicino dello stesso frammento ha inviato un messaggio di INFO. if (info.parent == myAddress) { // Il nodo ha impostato il nodo corrente come genitore: // imposto il nodo mittente come figlio. leData.tree.childs.insert(from); if (_DEBUG_) printf("\t\t%d ADD CHILD %d\n", myAddress, from); } else { // Il messaggio proviene da un nodo vicino, dello stesso // frammento ma che non mi ha scelto come genitore. // printf("ERROR\n"); } } } else { if (leData.candidate) { if (info.parent == myAddress) { // Il nodo ha impostato il nodo corrente come genitore: // imposto il nodo mittente come figlio. leData.tree.childs.insert(from); if (_DEBUG_) printf("\t\t%d ADD CHILD %d\n", myAddress, from); } else { // Il nodo appartiene allo stesso frammento ma non sta inviando informazioni // diretamente al nodo corrente. // Ignora. } } else { if (info.parent == myAddress) { // Errore: un nodo non puo'ricevere un messaggio da un // figlio se prima non conosce il padre. } else { // Se il nodo non e'candidato allora vuol dire che non conosce ancora il suo genitore: // Ricevuto il messaggio di INFO lo propaga. leData.tree.parent = from; leData.old_fragment_identity = leData.fragment.ID; leData.fragment = info.new_fragment; send_INFO(leData.old_fragment_identity, leData.fragment.ID, leData.fragment.size, from); } } } } if (!leData.info_messages.empty()) return; // Ho ricevuto il messaggio di info da tutti i miei vicini // (e ho inviato il mio messaggio per questo round). // Comincia il ciclo di feedback init_FEEDBACK_cycle();}//// La funzione inizializza un ciclo di FEEDBACK://voidLEADER_Agent::init_FEEDBACK_cycle(){ if (_DEBUG_) printf("%d BEGIN FEEDBACK CYCLE\n", myAddress); // Imposta lo stato che caratterizza il ciclo di FEEDBACK. status = LEADER_STATUS_FEEDBACK; // Imposta la lista dei vicini che devono inviare un messaggio di FEEDBACK // ovvero tutti i figli del nodo nello stesso frammento. leData.feedback_messages = leData.tree.childs; // Imposta a zero la conoscenza della grandezza del frammento. leData.sub_tree_partial_count = 0; // // Effettua i calcoli per avanzare di fase. // Necessario nel caso in cui non si abbiano nodi figli. // if (leData.feedback_messages.empty()) { processLEStatus(); return; } // Svuota il buffer dei messaggi FEEDBACK. emptyFeedbackBuffer(); }//// Questa funzione viene lanciata dopo aver inviato il messaggio di INFO per un round:// prepara le strutture di dati per poter ricevere i messaggi di INFO da tutti i vicini.// I timeout verso i nodi che non hanno ancora risposto vanno inviati dopo l'invio del // messaggio di INFO.//voidLEADER_Agent::init_INFO_cycle(bool initialize) { if (_DEBUG_) printf("%d BEGIN INFO CYCLE\n", myAddress); // Imposta lo stato che caratterizza il ciclo di ricezione di INFO. status = LEADER_STATUS_INFO; // Imposta la lista dei vicini che devono ancora mandare messaggi di INFO. leData.info_messages = neighbors; // Resetta la conoscenza di frammenti vicini massimali. leData.maximal_fragment_available[leData.round] = false; // Resetta la conoscenza di frammenti vicini massimali. leData.maximal_fragment_node[leData.round] = -1; // Imposta a zero la conoscenza della struttura del frammento. // Il genitore va resettato solo se non e'stato ricevuto un messaggio di ACTION. if (initialize) leData.tree.parent = -1; leData.tree.childs.clear(); // Svuota il buffer dei messaggi INFO. emptyInfoBuffer();}//// Questa funzione viene lanciata dopo aver inviato il messaggio di INFO per un round:// prepara le strutture di dati per poter ricevere i messaggi di INFO da tutti i vicini.// I timeout verso i nodi che non hanno ancora risposto vanno inviati dopo l'invio del // messaggio di INFO.//voidLEADER_Agent::init_LEADER_cycle() { // Imposta lo stato che caratterizza il ciclo di ricezione di INFO. status = LEADER_STATUS_LEADER; // Imposta la lista dei vicini che devono ancora mandare messaggi di INFO. leData.leader_messages = neighbors; // Imposta a zero la conoscenza della struttura del frammento leData.parent = -1; leData.tree_size = 0; leData.level = 0; leData.leader = myAddress;}voidLEADER_Agent::receive_FEEDBACK(NodeAddress from, struct FeedbackMessage feedback){ if (_DEBUG_) printf("%d receive FEEDBACK (FROM=%d, MAX=%d, SIZE=%d, ROUND=%d)\n", myAddress, from, feedback.maximal_fragment.ID, feedback.maximal_fragment.size, feedback.round); // Elimina il nodo dalla lista di coloro che devono inviare un messaggio FEEDBACK. leData.feedback_messages.erase(from); // Aggiorna il conteggio del frammento corrente leData.sub_tree_partial_count += feedback.node_count; // Aggiorna la situazione sul frammento massimale. updateMaximalFragment(from, feedback); if (_DEBUG_) if (leData.maximal_fragment_available[leData.round]) printf("\t%d [MAXID=%d, SIZE=%d, PATH=%d, NODE=%d]\n", myAddress, leData.maximal_fragment[leData.round].ID, leData.maximal_fragment[leData.round].size, leData.stored_path[leData.round], leData.maximal_fragment_node[leData.round]); if (!leData.feedback_messages.empty()) return; // Sono giunti tutti i messaggi di FEEDBACK: si decide lo stato del nodo. processLEStatus();}//// Gestione dei messaggi di REQUEST//voidLEADER_Agent::receive_REQUEST(NodeAddress from, struct RequestMessage rm){ struct InfoMessage im; struct FeedbackMessage fm; switch (rm.type) { /* case LEADER_INFO : // Viene richiesto un messaggio di INFO ma il nodo non lo ha ancora // inviato. // Quando sara'in grado di mandarlo lo inviera'e si aspettera' // la ricevuta di ritorno. if (!isInfoInBuffer(rm.round)) { timer->addConfirmation(from, rm.round); send_CONFIRM(from, rm.round, LEADER_INFO, true); return; } // // Il messaggio è in mio possesso: lo invio senza richiedere la conferma. // for (BufferInfoMessages::iterator i = leData.stored_messages.info.begin(); i != leData.stored_messages.info.end(); i++) { if (i->first == rm.round) { im = i->second; send_INFO(im.fragment.ID, im.new_fragment.ID, im.new_fragment.size, im.parent, im.round, from); } } break; */ }}voidLEADER_Agent::receive_CONFIRM(NodeAddress from, struct ConfirmMessage cm){ switch (cm.type) { case LEADER_INFO : if (_DEBUG_) printf("%d receive CONFIRM (INFO) from %d (ROUND=%d)\n", myAddress, from, cm.round); // Nuovo CONFIRM: da modificare: timer->cancelInfo(from, cm.round); /* if (cm.send) { timer->cancelInfo(from); leData.confirmation_nodes_info[cm.round].insert(from); } else timer->cancelConfirmation(from, cm.round); */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -