📄 leader.cc
字号:
/** * Copyright (c) 2006 Michele Mastrogiovanni. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include "leader.h"#include "Separator.h"int hdr_leader::offset_;int LEADER_Agent::_DEBUG_;NodeAddress LEADER_Agent::fake_leader;double LEADER_Agent::max_delay;int LEADER_Agent::max_timeout_confirm;double LEADER_Agent::jitter_timeout_confirm;double LEADER_Agent::timeout_confirm; int LEADER_Agent::max_timeout_info;double LEADER_Agent::jitter_timeout_info;double LEADER_Agent::timeout_info; int LEADER_Agent::max_timeout_action;double LEADER_Agent::jitter_timeout_action;double LEADER_Agent::timeout_action; int LEADER_Agent::max_timeout_feedback;double LEADER_Agent::jitter_timeout_feedback;double LEADER_Agent::timeout_feedback; int LEADER_Agent::max_timeout_leader;double LEADER_Agent::jitter_timeout_leader;double LEADER_Agent::timeout_leader; static class LeaderHeaderClass : public PacketHeaderClass {public: LeaderHeaderClass() : PacketHeaderClass("PacketHeader/LEADER", sizeof(hdr_leader)) { bind_offset(&hdr_leader::offset_); }} class_leaderhdr;static class LeaderClass : public TclClass {public: LeaderClass() : TclClass("Agent/LEADER") {} TclObject* create(int , const char*const* ) { return(new LEADER_Agent()); }} class_leader;LEADER_Agent::LEADER_Agent() : ClusteringModule(PT_LEADER){ timer = new LeaderTimer(this); bind_bool("debug", &_DEBUG_); bind("fake-leader", &fake_leader); bind("max-delay", &max_delay); bind("max-timeout-confirm", &max_timeout_confirm); bind("jitter-timeout-confirm", &jitter_timeout_confirm); bind("timeout-confirm", &timeout_confirm); bind("max-timeout-info", &max_timeout_info); bind("jitter-timeout-info", &jitter_timeout_info); bind("timeout-info", &timeout_info); bind("max-timeout-action", &max_timeout_action); bind("jitter-timeout-action", &jitter_timeout_action); bind("timeout-action", &timeout_action); bind("max-timeout-feedback", &max_timeout_feedback); bind("jitter-timeout-feedback", &jitter_timeout_feedback); bind("timeout-feedback", &timeout_feedback); bind("max-timeout-leader", &max_timeout_leader); bind("jitter-timeout-leader", &jitter_timeout_leader); bind("timeout-leader", &timeout_leader);}//// Procedura di inizializzazione delle strutture dati // della fase di Leader Election.//voidLEADER_Agent::initializeLeaderElection(){ // Ogni nodo inizialmente e'un candidato leData.candidate = true; // Il frammento di appartenenza del nodo e'costituito dal nodo stesso. leData.fragment.ID = myAddress; leData.fragment.size = 1; // La struttura ad albero del frammento si limita al solo nodo, // senza genitore e senza figli. leData.tree.parent = -1; leData.tree.childs.clear(); // Il nodo e'un nodo "edge" per il suo frammento. leData.stored_path[0] = -1; leData.maximal_fragment_node[0] = -1; leData.maximal_fragment_available[0] = false; // L'identita'precedente del frammento di appartenenza e'indefinita. leData.old_fragment_identity = -1; // Svuota i buffer dei messaggi ricevuti leData.buffer_info.clear(); leData.buffer_feedback.clear(); leData.buffer_action.clear(); // Svuota i buffer dei messaggi inviati leData.stored_messages.info.clear(); leData.stored_messages.feedback.clear(); // Imposta il numero di round. leData.round = 0; for (NodeList::iterator n = neighbors.begin(); n != neighbors.end(); n++) leData.levels[*n] = -1; // Svuota l'elenco dei nodi ai quali deve confermare. leData.confirmation_nodes_info.clear();}//// Funzione di ricezione di un pacchetto di clustering.//void LEADER_Agent::receive(Packet *p, Handler *h) { struct hdr_leader *leaderh = HDR_LEADER(p); struct hdr_ip *iph = HDR_IP(p); NodeAddress sender_address = iph->saddr(); // Destinazione. NodeAddress destination_address = iph->daddr(); bool result; switch (leaderh->msg_type) { case LEADER_INFO : // NUOVO CONFIRM: da modificare: send_CONFIRM(sender_address, leaderh->info_message.round, LEADER_INFO, true); // Se l'algoritmo e'terminato si ignora il messaggio if (status == LEADER_STATUS_END) return; // // Se non sono nella fase di ricezione dei messaggi di INFO, // metto nel buffer il messaggio di INFO. // if (status != LEADER_STATUS_INFO) { leData.buffer_info.push_back(pair<NodeAddress, struct InfoMessage>(sender_address, leaderh->info_message)); return; } // Riceve il messaggio recvInfoMessage(sender_address, leaderh->info_message); break; case LEADER_FEEDBACK : // Invia un messaggio di conferma. send_CONFIRM(sender_address, leaderh->feedback_message.round, LEADER_FEEDBACK, true); // Se l'algoritmo e'terminato si ignora il messaggio if (status == LEADER_STATUS_END) return; // // Se il messaggio è passato, viene ignorato. // if (leaderh->feedback_message.round < leData.round) return; // // Se non sono nella fase di ricezione dei messaggi di FEEDBACK, // metto nel buffer il messaggio di FEEDBACK. // if (status != LEADER_STATUS_FEEDBACK) { leData.buffer_feedback.push_back(pair<NodeAddress, struct FeedbackMessage>(sender_address, leaderh->feedback_message)); return; } // Riceve il messaggio recvFeedbackMessage(sender_address, leaderh->feedback_message); break; case LEADER_ACTION : // Conferma la ricezione send_CONFIRM(sender_address, leaderh->action_message.round, LEADER_ACTION, false); // Se il messaggio e'passato lo ignora. if (leaderh->action_message.round < leData.round) return; // Solo se si trova nella fase di INFO puo'ricevere messaggi di ACTION. if (status != LEADER_STATUS_INFO) return; // Se il messaggio e'gia'stato ricevuto viene scartato if (leData.action_for_round[leData.round]) return; // Riceve il messaggio di ACTION. receive_ACTION(sender_address, leaderh->action_message); break; case LEADER_LEADER : // Se il messaggio e' UNICAST allora puo'essere o una ritrasmissione // oppure una risposta ad una ritrasmissione. if (destination_address == myAddress) // Se desidera che confermi il messaggio. if (leaderh->leader_message.confirm) // Invio nuovamente il mio messaggio di LEADER per conferma // (se la fase e'la leader election o una successiva). switch (status) { case LEADER_STATUS_LEADER : case LEADER_STATUS_END : send_LEADER(leData.leader, leData.tree_size, leData.parent, leData.level, false, sender_address); break; } // Se l'algoritmo e'terminato si ignora il messaggio switch (status) { case LEADER_STATUS_END : return; default : break; } // Se il nodo mi ha gia'inviato un messaggio LEADER. if (status == LEADER_STATUS_LEADER) if (leData.leader_messages.find(sender_address) == leData.leader_messages.end()) return; receive_LEADER(sender_address, leaderh->leader_message, destination_address); break; case LEADER_REQUEST : receive_REQUEST(sender_address, leaderh->request_message); break; case LEADER_CONFIRM : receive_CONFIRM(sender_address, leaderh->confirm_message); break; }}/********************//* Messages *//********************///// Funzione per inviare un nuovo messaggio di INFO.// Fa partire i timeout per le conferme.//voidLEADER_Agent::send_INFO(NodeAddress fragment_identity, NodeAddress new_fragment_identity, int new_fragment_size, NodeAddress parent){ if (_DEBUG_) printf("\t\t%d __send__ INFO (ID=%d, NEW_ID=%d, SIZE=%d, PARENT=%d, ROUND=%d, FROM=%d)\n", myAddress, fragment_identity, new_fragment_identity, new_fragment_size, parent, leData.round, -1); // Invia il messaggio di INFO. send_INFO(fragment_identity, new_fragment_identity, new_fragment_size, parent, leData.round, -1); // Lancia i timeout verso i nodi che devono ancora inviare messaggi di INFO. // Puo'succedere che un nodo invii il proprio INFO dopo aver ricevuto alcuni messaggi di INFO. // La variabile leData.info_messages indica il numero di vicini che devono ancora inviare messaggi di INFO // per quel round. timer->launchInfoTimeouts(neighbors, leData.round); /* if (!leData.info_messages.empty()) timer->launchInfoTimeouts(leData.info_messages); */}voidLEADER_Agent::send_INFO(NodeAddress fragment_identity, NodeAddress new_fragment_identity, int new_fragment_size, NodeAddress parent, int round, NodeAddress to){ Packet* p = allocpkt(); struct hdr_leader * leaderh = HDR_LEADER(p); // Strutture comuni. leaderh->msg_type = LEADER_INFO; // Imposta il contenuto del messaggio. leaderh->info_message.fragment.ID = fragment_identity; leaderh->info_message.new_fragment.ID = new_fragment_identity; leaderh->info_message.new_fragment.size = new_fragment_size; leaderh->info_message.parent = parent; leaderh->info_message.round = round; // Calcola la dimensione del messaggio. size_t size = sizeof(InfoMessage) + sizeof(LeaderMessageType); // Se sto inviando un messaggio di INFO mai inviato... if (!isInfoInBuffer(round)) { // Lancia i timeout verso tutti i nodi che devono confermarmi // l'arrivo del messaggio che sto per inviare. // timer->launchConfirmation(round); // Memorizza il messaggio da inviare. leData.stored_messages.info[round] = leaderh->info_message; } // Invia il messaggio. sendDown(p, size, to, LEADER_Agent::max_delay); /* InfoMessage info = leaderh->info_message; if (_DEBUG_) printf("\t\t%d send INFO (ID=%d, NEW_ID=%d, SIZE=%d, PARENT=%d, ROUND=%d, TO=%d)\n", myAddress, info.fragment.ID, info.new_fragment.ID, info.new_fragment.size, info.parent, info.round, to); */}voidLEADER_Agent::send_FEEDBACK(NodeAddress to, NodeAddress fragment_identity, bool internal_flag, NodeAddress maximal_fragment_identity, int maximal_fragment_size, int node_count){ if (_DEBUG_) printf("\t\t%d __send__ FEEDBACK (TO=%d, COUNT=%d, ROUND=%d)\n", myAddress, to, node_count, leData.round); send_FEEDBACK(to, fragment_identity, internal_flag, maximal_fragment_identity, maximal_fragment_size, node_count, leData.round);}voidLEADER_Agent::send_FEEDBACK(NodeAddress to, NodeAddress fragment_identity, bool internal_flag, NodeAddress maximal_fragment_identity, int maximal_fragment_size, int node_count, int round) { Packet* p = allocpkt(); struct hdr_leader * leaderh = HDR_LEADER(p); // Common structure leaderh->msg_type = LEADER_FEEDBACK; // Message leaderh->feedback_message.fragment.ID = fragment_identity; leaderh->feedback_message.maximal_fragment_available = internal_flag; leaderh->feedback_message.maximal_fragment.ID = maximal_fragment_identity; leaderh->feedback_message.maximal_fragment.size = maximal_fragment_size; leaderh->feedback_message.node_count = node_count; leaderh->feedback_message.round = round; // Record message. leData.stored_messages.feedback[round] = leaderh->feedback_message; size_t size = sizeof(FeedbackMessage) + sizeof(LeaderMessageType); // Invia il messaggio. sendDown(p, size, to, LEADER_Agent::max_delay);}void LEADER_Agent::send_LEADER(NodeAddress leader, int fragment_size, NodeAddress parent, int level, bool confirm, NodeAddress to){ Packet* p = allocpkt(); struct hdr_leader * leaderh = HDR_LEADER(p); // Common structures. leaderh->msg_type = LEADER_LEADER; // Message leaderh->leader_message.leader.ID = leader; leaderh->leader_message.leader.size = fragment_size; leaderh->leader_message.parent = parent; leaderh->leader_message.level = level; leaderh->leader_message.confirm = confirm; size_t size = sizeof(LeaderMessage) + sizeof(LeaderMessageType); // Invia il messaggio. sendDown(p, size, to, LEADER_Agent::max_delay);}//// Invia la conferma di ricezione per un certo messaggio di un certo round.// Il messaggio viene utilizzato per confermare i messaggi di INFO, di FEEDBACK e di ACTION.// Il flag "send" serve per indicare se la conferma fa parte di un invio o di una risposta.//voidLEADER_Agent::send_CONFIRM(NodeAddress to, int round, LeaderMessageType type, bool send) { Packet* p = allocpkt(); struct hdr_leader * leaderh = HDR_LEADER(p); // Common structures. leaderh->msg_type = LEADER_CONFIRM; // Message. leaderh->confirm_message.send = send; leaderh->confirm_message.round = round; leaderh->confirm_message.type = type; size_t size = sizeof(ConfirmMessage) + sizeof(LeaderMessageType); // Invia il messaggio. sendDown(p, size, to, LEADER_Agent::max_delay);}voidLEADER_Agent::send_ACTION(NodeAddress fragment_identity, NodeAddress to) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -