📄 ccm.c
字号:
llm_info_t *llm = CCM_GET_LLM(info); for ( i = 0 ; i < CCM_GET_MEMCOUNT(info) ; i++ ) { indx = CCM_GET_MEMINDEX(info, i); if(strncmp(LLM_GET_NODEID(llm, indx), node, LLM_GET_NODEIDSIZE(llm)) == 0){ return i; } } return -1;}static intccm_get_my_membership_index(ccm_info_t *info){ int i; llm_info_t *llm = CCM_GET_LLM(info); for ( i = 0 ; i < CCM_GET_MEMCOUNT(info) ; i++ ) { if (CCM_GET_MEMINDEX(info, i) == LLM_GET_MYNODE(llm)){ return i; } } assert(0); /* should never reach here */ return -1;}static intccm_am_i_leader(ccm_info_t *info){ if (ccm_get_my_membership_index(info) == CCM_GET_CL(info)){ return TRUE; } return FALSE;}static intccm_already_joined(ccm_info_t *info){ if (CCM_GET_JOINED_TRANSITION(info)) { return TRUE; } return FALSE;}/* *//* END OF FUNCTIONS that keep track of stablized membership list *//* *//* *//* BEGIN OF FUNCTIONS THAT KEEP TRACK of cluster nodes that have shown *//* interest in joining the cluster. *//* *//* *//* NOTE: when a new node wants to join the cluster, it multicasts a *//* message asking for the necessary information to send out a join *//* message. (it needs the current major transistion number, the context *//* string i.e cookie, the protocol number that everybody is operating *//* in). *//* *//* The functions below track these messages sent out by new potential *//* members showing interest in acquiring the initial context. *//* */static void ccm_add_new_joiner(ccm_info_t *info, const char *orig){ /* check if there is already a cached request for the * joiner */ int idx = llm_get_index(CCM_GET_LLM(info), orig)+1; if(!g_slist_find(CCM_GET_JOINERHEAD(info),GINT_TO_POINTER(idx))) { CCM_SET_JOINERHEAD(info, g_slist_append(CCM_GET_JOINERHEAD(info), GINT_TO_POINTER(idx))); } else { if(global_debug) cl_log(LOG_DEBUG,"add_joiner %s already done", orig); } return;}static intccm_am_i_highest_joiner(ccm_info_t *info){ int joiner; char * joiner_name; gpointer jptr; char * hname = ccm_get_my_hostname(info); GSList * head = CCM_GET_JOINERHEAD(info); while ( head ) { jptr = g_slist_nth_data(head, 0); joiner = GPOINTER_TO_INT(jptr)-1; joiner_name = LLM_GET_NODEID(CCM_GET_LLM(info), joiner); if (strncmp(hname, joiner_name, LLM_GET_NODEIDSIZE(CCM_GET_LLM(info))) < 0) { return FALSE; } head = g_slist_next(head); } return TRUE;}static void ccm_remove_new_joiner(ccm_info_t *info, const char *orig){ int idx = llm_get_index(CCM_GET_LLM(info), orig)+1; CCM_SET_JOINERHEAD(info, g_slist_remove(CCM_GET_JOINERHEAD(info), GINT_TO_POINTER(idx))); return;}/* *//* END OF FUNCTIONS THAT KEEP TRACK of cluster nodes that have shown *//* interest in joining the cluster. *//* *//*///////////////////////////////////////////////////////////////////////// BEGIN OF FUNCTIONS THAT SEND OUT messages to nodes of the cluster///////////////////////////////////////////////////////////////////////*/static voidccm_delay_random_interval(void){ struct timeval tmp; /* seed the random with a random value */ gettimeofday(&tmp, NULL); srandom((unsigned int)tmp.tv_usec); usleep(random()%MAXNODE); /*sleep some random microsecond interval*/}/* *//* compute the final membership list from the acquired connectivity *//* information from other nodes. And send out the consolidated *//* members of the cluster information to the all the members of *//* that have participated in the CCM protocol. *//* *//* NOTE: Called by the cluster leader only. *//* */static voidccm_compute_and_send_final_memlist(ll_cluster_t *hb, ccm_info_t *info){ unsigned char *bitmap; uint maxtrans; char *string; char *cookie = NULL; int numBytes; int strsize; int repeat; /* get the maxmimum membership list */ maxtrans = ccm_memcomp_get_maxmembership(info, &bitmap); /* create a string with the membership information */ numBytes = bitmap_size(MAXNODE); strsize = ccm_bitmap2str(bitmap, numBytes, &string); /* check if the membership has changed from that before. If so we * have to generate a new cookie. */ if(ccm_memlist_changed(info, bitmap)) { cookie = ccm_generate_random_cookie(); } repeat = 0; while (ccm_send_final_memlist(hb, info, cookie, string, maxtrans+1) != HA_OK) { if(repeat < REPEAT_TIMES){ cl_log(LOG_ERR, "ccm_compute_and_send_final_memlist: failure " "to send finalmemlist"); cl_shortsleep(); repeat++; }else{ bitmap_delete(bitmap); g_free(string); return; } } /* fill my new memlist and update the new cookie if any */ ccm_fill_memlist_from_bitmap(info, bitmap); bitmap_delete(bitmap); g_free(string); /* increment the major transition number and reset the * minor transition number */ CCM_SET_MAJORTRANS(info, maxtrans+1); CCM_RESET_MINORTRANS(info); /* if cookie has changed update it. */ if (cookie) { cl_log(LOG_INFO, "ccm_compute_and_send_final_list: " "cookie changed "); CCM_SET_COOKIE(info, cookie); ccm_free_random_cookie(cookie); } /* check if any joiner is waiting for a response from us. * If so respond and free all the joiners. */ ccm_send_join_reply(hb, info); g_slist_free(CCM_GET_JOINERHEAD(info)); CCM_SET_JOINERHEAD(info, NULL); CCM_SET_CL(info, ccm_get_my_membership_index(info)); report_mbrs(info);/* call this before update_reset() */ update_reset(CCM_GET_UPDATETABLE(info)); ccm_memcomp_reset(info); CCM_SET_STATE(info, CCM_STATE_JOINED); if(!ccm_already_joined(info)) { CCM_SET_JOINED_TRANSITION(info, CCM_GET_MAJORTRANS(info)); } return;}/* *//* send a reply to the potential joiner, containing the neccessary *//* context needed by the joiner, to initiate a new round of a ccm *//* protocol. *//* NOTE: This function is called by the cluster leader only. *//* */static int ccm_send_joiner_reply(ll_cluster_t *hb, ccm_info_t *info, const char *joiner){ struct ha_msg *m; char activeproto[3]; char clsize[5]; char majortrans[15]; /* 10 is the maximum number of digits in UINT_MAX , adding a buffer of 5 */ char *cookie; int rc; /*send the membership information to all the nodes of the cluster*/ if ((m=ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "Cannot send CCM version msg"); return(HA_FAIL); } snprintf(activeproto, sizeof(activeproto), "%d", CCM_GET_ACTIVEPROTO(info)); snprintf(majortrans, sizeof(majortrans), "%d", CCM_GET_MAJORTRANS(info)); snprintf(clsize, sizeof(clsize), "%d", CCM_GET_MEMCOUNT(info)); cookie = CCM_GET_COOKIE(info); assert(cookie && *cookie); if ((ha_msg_add(m, F_TYPE, ccm_type2string(CCM_TYPE_PROTOVERSION_RESP)) == HA_FAIL) ||(ha_msg_add(m, CCM_PROTOCOL, activeproto) == HA_FAIL) ||(ha_msg_add(m, CCM_MAJORTRANS, majortrans) == HA_FAIL) ||(ha_msg_add(m, CCM_CLSIZE, clsize) == HA_FAIL) ||(ha_msg_add(m, CCM_COOKIE, cookie) == HA_FAIL)) { cl_log(LOG_ERR, "ccm_send_joiner_reply: Cannot create JOIN " "reply message"); rc = HA_FAIL; } else { rc = hb->llc_ops->sendnodemsg(hb, m, joiner); } ha_msg_del(m); return(rc);}/* *//* browse through the list of interested joiners and reply to each of *//* them. *//* */static void ccm_send_join_reply(ll_cluster_t *hb, ccm_info_t *info){ int joiner; gpointer jptr; const char *joiner_name; GSList *head = CCM_GET_JOINERHEAD(info); int repeat; while(head) { jptr = g_slist_nth_data(head, 0); joiner = GPOINTER_TO_INT(jptr)-1; joiner_name = LLM_GET_NODEID(CCM_GET_LLM(info), joiner); /* send joiner the neccessary information */ repeat = 0; while (ccm_send_joiner_reply(hb, info, joiner_name)!=HA_OK) { if(repeat < REPEAT_TIMES){ cl_log(LOG_ERR, "ccm_send_join_reply: failure " "to send join reply"); cl_shortsleep(); repeat++; }else{ break; } } head = g_slist_next(head); }}/* *//* send a final membership list to all the members who have participated *//* in the ccm protocol. *//* NOTE: Called by the cluster leader on. *//* */static intccm_send_final_memlist(ll_cluster_t *hb, ccm_info_t *info, char *newcookie, char *finallist, uint32_t max_tran){ struct ha_msg *m; char activeproto[3]; char majortrans[15]; /* 10 is the maximum number of digits in UINT_MAX , adding a buffer of 5 */ char minortrans[15]; /* 10 is the maximum number of digits in UINT_MAX , adding a buffer of 5 */ char maxtrans[15]; /* 10 is the maximum number of digits in UINT_MAX , adding a buffer of 5 */ char *cookie; int rc; /*send the membership information to all the nodes of the cluster*/ if ((m=ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "Cannot send CCM version msg"); return(HA_FAIL); } snprintf(activeproto, sizeof(activeproto), "%d", CCM_GET_ACTIVEPROTO(info)); snprintf(majortrans, sizeof(majortrans), "%d", CCM_GET_MAJORTRANS(info)); snprintf(minortrans, sizeof(minortrans), "%d", CCM_GET_MINORTRANS(info)); snprintf(maxtrans, sizeof(maxtrans), "%d", max_tran); cookie = CCM_GET_COOKIE(info); assert(cookie && *cookie); assert(finallist); if ((ha_msg_add(m, F_TYPE, ccm_type2string(CCM_TYPE_FINAL_MEMLIST)) == HA_FAIL) ||(ha_msg_add(m, CCM_MAJORTRANS, majortrans) == HA_FAIL) ||(ha_msg_add(m, CCM_MINORTRANS, minortrans) == HA_FAIL) ||(ha_msg_add(m, CCM_MAXTRANS, maxtrans) == HA_FAIL) ||(ha_msg_add(m, CCM_COOKIE, cookie) == HA_FAIL) ||(ha_msg_add(m, CCM_MEMLIST, finallist) == HA_FAIL) ||(!newcookie? FALSE: (ha_msg_add(m, CCM_NEWCOOKIE, newcookie) ==HA_FAIL))) { cl_log(LOG_ERR, "ccm_send_final_memlist: Cannot create " "FINAL_MEMLIST message"); rc = HA_FAIL; } else { rc = hb->llc_ops->sendclustermsg(hb, m); } ha_msg_del(m); return(rc);}/* *//* send out a message to the cluster asking for the context *//* NOTE: this context is used to intiate a new instance of *//* a CCM protocol. *//* */static intccm_send_protoversion(ll_cluster_t *hb, ccm_info_t *info){ struct ha_msg *m; char version[3]; /* in the life time of ccm, do not expect protocol versions running to 100! */ int rc; if ((m=ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "Cannot send CCM version msg"); return(HA_FAIL); } snprintf(version, sizeof(version), "%d", CCM_GET_HIPROTO(info)); if ((ha_msg_add(m, F_TYPE, ccm_type2string(CCM_TYPE_PROTOVERSION)) == HA_FAIL)) { cl_log(LOG_ERR, "ccm_send_join: Cannot create PROTOVERSION " "message"); rc = HA_FAIL; } else { rc = hb->llc_ops->sendclustermsg(hb, m); } ha_msg_del(m); return(rc);}/* *//* send out a abort message to whoever has initiated a new instance *//* of ccm protocol. *//* */static intccm_send_abort(ll_cluster_t *hb, ccm_info_t *info, const char *dest, const int major, const int minor){ struct ha_msg *m; char majortrans[15]; /* 10 is the maximum number of digits in UINT_MAX , adding a buffer of 5 */ char minortrans[15]; /* ditto */ char *cookie; int rc; if ((m=ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "Cannot send CCM version msg"); return(HA_FAIL); } snprintf(majortrans, sizeof(majortrans), "%d", major); snprintf(minortrans, sizeof(minortrans), "%d", minor); cookie = CCM_GET_COOKIE(info); if ((ha_msg_add(m, F_TYPE, ccm_type2string(CCM_TYPE_ABORT)) == HA_FAIL) ||(ha_msg_add(m, CCM_COOKIE, cookie) == HA_FAIL) ||(ha_msg_add(m, CCM_MAJORTRANS, majortrans) == HA_FAIL) ||(ha_msg_add(m, CCM_MINORTRANS, minortrans) == HA_FAIL)){ cl_log(LOG_ERR, "ccm_send_abort: Cannot create ABORT " "message"); rc = HA_FAIL; } else { rc = hb->llc_ops->sendnodemsg(hb, m, dest); } ha_msg_del(m); return(rc);}/* *//* send out a leave message to indicate to everybody that it is leaving *//* the cluster. *//* */static intccm_send_leave(ll_cluster_t *hb, ccm_info_t *info){ struct ha_msg *m; char majortrans[15]; /* 10 is the maximum number of digits in UINT_MAX , adding a buffer of 5 */ char minortrans[15]; /* ditto */ char *cookie; int rc; if ((m=ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "Cannot send CCM version msg"); return(HA_FAIL); } snprintf(majortrans, sizeof(majortrans), "%d", CCM_GET_MAJORTRANS(info)); snprintf(minortrans, sizeof(minortrans), "%d", CCM_GET_MINORTRANS(info)); cookie = CCM_GET_COOKIE(info); assert(cookie && *cookie); if ((ha_msg_add(m, F_TYPE, ccm_type2string(CCM_TYPE_LEAVE)) == HA_FAIL) ||(ha_msg_add(m, CCM_COOKIE, cookie) == HA_FAIL) ||(ha_msg_add(m, CCM_MAJORTRANS, majortrans) == HA_FAIL) ||(ha_msg_add(m, CCM_MINORTRANS, minortrans) == HA_FAIL)){ cl_log(LOG_ERR, "ccm_send_leave: Cannot create leave " "message"); rc = HA_FAIL; } else { rc = hb->llc_ops->sendclustermsg(hb, m); } ha_msg_del(m); return(rc);}/* *//* send out a join message. THis message will initiate a new instance of *//* the ccm protocol. *//* */static int
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -