📄 ccm.c
字号:
* This bug was noticed: when testing with partitioned * clusters. */ cookie = ccm_generate_random_cookie(); /* fill my new memlist and update the new cookie if any */ ccm_fill_memlist_from_bitmap(info, bitmap); bitmap_delete(bitmap); /* increment the major transition number and reset the * minor transition number */ CCM_INCREMENT_MAJORTRANS(info); CCM_RESET_MINORTRANS(info); /* if cookie has changed update it. */ if (cookie) { cl_log(LOG_INFO, "ccm_joining_to_joined: " "cookie changed "); CCM_SET_COOKIE(info, cookie); ccm_free_random_cookie(cookie); } /* check if any joiner is waiting for a response from us. * If so respond */ ccm_send_join_reply(hb, info); g_slist_free(CCM_GET_JOINERHEAD(info)); CCM_SET_JOINERHEAD(info, NULL); CCM_SET_CL(info, ccm_get_my_membership_index(info)); update_reset(CCM_GET_UPDATETABLE(info)); CCM_SET_STATE(info, CCM_STATE_JOINED); report_mbrs(info); if(!ccm_already_joined(info)) CCM_SET_JOINED_TRANSITION(info, 1); return;}/* *//* Move the state of this ccm node, from init state directly to *//* the joined state. *//* *//* NOTE: this is generally called when a node when it determines *//* that it is all alone in the cluster. *//* */static voidccm_init_to_joined(ccm_info_t *info){ int numBytes; unsigned char *bitlist; char *cookie; numBytes = bitmap_create(&bitlist, MAXNODE); bitmap_mark(LLM_GET_MYUUID(CCM_GET_LLM(info)), bitlist,MAXNODE); ccm_fill_memlist_from_bitmap(info, bitlist); bitmap_delete(bitlist); CCM_SET_MAJORTRANS(info, 1); CCM_SET_MINORTRANS(info, 0); cookie = ccm_generate_random_cookie(); CCM_SET_COOKIE(info, cookie); ccm_free_random_cookie(cookie); CCM_SET_CL(info, ccm_get_my_membership_index(info)); CCM_SET_STATE(info, CCM_STATE_JOINED); CCM_SET_JOINED_TRANSITION(info, 1); report_mbrs(info); return;}/* *//* The state machine that processes message when it is *//* the CCM_STATE_VERSION_REQUEST state *//* */static voidccm_state_version_request(enum ccm_type ccm_msg_type, struct ha_msg *reply, ll_cluster_t *hb, ccm_info_t *info){ const char *orig, *proto, *cookie, *trans, *clsize; uint trans_val; int proto_val, clsize_val; int try; int repeat; /* who sent this message */ if ((orig = ha_msg_value(reply, F_ORIG)) == NULL) { cl_log(LOG_WARNING, "ccm_state_version_request: " "received message from unknown"); return; } if(!llm_is_valid_node(CCM_GET_LLM(info), orig)) { cl_log(LOG_WARNING, "ccm_state_version_request: " "received message from unknown host %s", orig); return; } switch (ccm_msg_type) { case CCM_TYPE_PROTOVERSION_RESP: /* get the protocol version */ if ((proto = ha_msg_value(reply, CCM_PROTOCOL)) == NULL) { cl_log(LOG_WARNING, "ccm_state_version_request: " "no protocol information"); return; } proto_val = atoi(proto); /*TOBEDONE*/ if (proto_val >= CCM_VER_LAST) { cl_log(LOG_WARNING, "ccm_state_version_request: " "unknown protocol value"); ccm_reset(info); return; } /* if this reply has come from a node which is a member * of a larger cluster, we will try to join that cluster * else we will wait for some time, by dropping this * response. */ if(resp_can_i_drop()) { if ((clsize = ha_msg_value(reply, CCM_CLSIZE)) == NULL){ cl_log(LOG_WARNING, "ccm_state_version_request: " " no cookie information"); return; } clsize_val = atoi(clsize); if((clsize_val+1) <= ((llm_get_active_nodecount(CCM_GET_LLM(info))+1)/2)) { /* drop the response. We will wait for * a response from a bigger group */ resp_dropped(); cl_shortsleep(); /* sleep for a while */ /* send a fresh version request message */ version_reset(CCM_GET_VERSION(info)); CCM_SET_STATE(info, CCM_STATE_NONE); /* free all the joiners that we accumulated */ g_slist_free(CCM_GET_JOINERHEAD(info)); CCM_SET_JOINERHEAD(info, NULL); break; } } resp_reset(); /* get the cookie string */ if ((cookie = ha_msg_value(reply, CCM_COOKIE)) == NULL) { cl_log(LOG_WARNING, "ccm_state_version_request: no cookie " "information"); return; } /* get the major transition version */ if ((trans = ha_msg_value(reply, CCM_MAJORTRANS)) == NULL) { cl_log(LOG_WARNING, "ccm_state_version_request: " "no protocol information"); return; } trans_val = atoi(trans); /* send the alive message to the cluster The alive msg means: "I want to join this partition!"*/ CCM_SET_ACTIVEPROTO(info, proto_val); CCM_SET_MAJORTRANS(info, trans_val); CCM_SET_MINORTRANS(info, 0); CCM_SET_COOKIE(info, cookie); version_set_nresp(CCM_GET_VERSION(info),0); repeat = 0; while(ccm_send_alive_msg(hb, info) != HA_OK){ if(repeat < REPEAT_TIMES){ cl_log(LOG_WARNING, "ccm_state_version_request: failure to send alive"); cl_shortsleep(); repeat++; }else{ break; } } /* initialize the update table and set our state to NEW_NODE_WAIT_FOR_MEM_LIST */ update_reset(CCM_GET_UPDATETABLE(info)); new_node_mem_list_time_init(); CCM_SET_STATE(info, CCM_STATE_NEW_NODE_WAIT_FOR_MEM_LIST); /* free all the joiners that we accumulated */ g_slist_free(CCM_GET_JOINERHEAD(info)); CCM_SET_JOINERHEAD(info, NULL); break; case CCM_TYPE_TIMEOUT: try = version_retry(CCM_GET_VERSION(info), CCM_TMOUT_GET_VRS(info)); switch (try) { case VER_NO_CHANGE: break; case VER_TRY_AGAIN: CCM_SET_STATE(info, CCM_STATE_NONE); break; case VER_TRY_END: if(ccm_am_i_highest_joiner(info)) { if(global_debug) cl_log(LOG_DEBUG,"joined"); ccm_init_to_joined(info); } else { if(global_debug) cl_log(LOG_DEBUG, "joined but not really"); version_reset(CCM_GET_VERSION(info)); CCM_SET_STATE(info, CCM_STATE_NONE); } /* free all the joiners that we accumulated */ g_slist_free(CCM_GET_JOINERHEAD(info)); CCM_SET_JOINERHEAD(info, NULL); break; } break; case CCM_TYPE_PROTOVERSION: /* * cache this request. If we declare ourselves as * a single member group, and if we find that * somebody else also wanted to join the group. * we will restart the join. */ ccm_add_new_joiner(info, orig); break; case CCM_TYPE_JOIN: case CCM_TYPE_REQ_MEMLIST: case CCM_TYPE_RES_MEMLIST: case CCM_TYPE_FINAL_MEMLIST: case CCM_TYPE_ABORT: /* note down there is some activity going * on and we are not yet alone in the cluster */ version_some_activity(CCM_GET_VERSION(info)); case CCM_TYPE_LEAVE: case CCM_TYPE_ERROR: default: /* nothing to do. Just forget the message */ break; } return;}/* *//* The state machine that processes message when it is *//* CCM_STATE_JOINED state. *//* */static voidccm_state_joined(enum ccm_type ccm_msg_type, struct ha_msg *reply, ll_cluster_t *hb, ccm_info_t *info){ const char *orig, *trans, *uptime; uint trans_majorval=0, trans_minorval=0, uptime_val; int repeat; if ((orig = ha_msg_value(reply, F_ORIG)) == NULL) { cl_log(LOG_WARNING, "ccm_state_joined: received message " "from unknown"); return; } if(!llm_is_valid_node(CCM_GET_LLM(info), orig)) { cl_log(LOG_WARNING, "ccm_state_joined: received message " "from unknown host %s", orig); return; } if(ccm_msg_type != CCM_TYPE_PROTOVERSION) { if(strncmp(CCM_GET_COOKIE(info), ha_msg_value(reply, CCM_COOKIE), COOKIESIZE) != 0){ cl_log(LOG_WARNING, "ccm_state_joined: received message " "with unknown cookie, just dropping"); return; } /* get the major transition version */ if ((trans = ha_msg_value(reply, CCM_MAJORTRANS)) == NULL) { cl_log(LOG_WARNING, "ccm_state_joined: no transition major " "information"); return; } trans_majorval = atoi(trans); /*drop the message if it has lower major transition number */ if (CCM_TRANS_EARLIER(trans_majorval, CCM_GET_MAJORTRANS(info))) { cl_log(LOG_WARNING, "ccm_state_joined: received " "%s message with " "a earlier major transition number " "recv_trans=%d, mytrans=%d", ccm_type2string(ccm_msg_type), trans_majorval, CCM_GET_MAJORTRANS(info)); return; } /* get the minor transition version */ if ((trans = ha_msg_value(reply, CCM_MINORTRANS)) == NULL) { cl_log(LOG_WARNING, "ccm_state_joined: no transition minor " "information"); return; } trans_minorval = atoi(trans); } switch (ccm_msg_type) { int index; case CCM_TYPE_PROTOVERSION_RESP: cl_log(LOG_WARNING, "ccm_state_joined: dropping message " "of type %s. Is this a Byzantime failure?", ccm_type2string(ccm_msg_type)); break; case CCM_TYPE_PROTOVERSION: /* If we were leader in the last successful itteration, * then we shall respond with the neccessary information */ if (ccm_am_i_leader(info)){ repeat = 0; while (ccm_send_joiner_reply(hb, info, orig) != HA_OK) { if(repeat < REPEAT_TIMES){ cl_log(LOG_WARNING, "ccm_state_joined: " "failure to send join reply"); cl_shortsleep(); repeat++; }else{ break; } } } break; case CCM_TYPE_JOIN: /* get the update value */ if ((uptime = ha_msg_value(reply, CCM_UPTIME)) == NULL){ cl_log(LOG_WARNING, "ccm_state_joined: no update " "information"); return; } uptime_val = atoi(uptime); /* update the minor transition number if it is of * higher value and send a fresh JOIN message */ assert (trans_minorval >= CCM_GET_MINORTRANS(info)); update_reset(CCM_GET_UPDATETABLE(info)); update_add(CCM_GET_UPDATETABLE(info), CCM_GET_LLM(info), orig, uptime_val, TRUE); CCM_SET_MINORTRANS(info, trans_minorval); repeat = 0; while (ccm_send_join(hb, info) != HA_OK) { if(repeat < REPEAT_TIMES){ cl_log(LOG_WARNING, "ccm_state_joined: failure " "to send join"); cl_shortsleep(); repeat++; }else{ break; } } CCM_SET_STATE(info, CCM_STATE_JOINING); break; case CCM_TYPE_LEAVE: index = ccm_get_membership_index(info, orig); if (index == -1) break; /* If the dead node is the partition leader, go to * JOINING state */ if (index == CCM_GET_CL(info)){ update_reset(CCM_GET_UPDATETABLE(info)); repeat = 0; while (ccm_send_join(hb, info) != HA_OK) { if(repeat < REPEAT_TIMES){ cl_log(LOG_WARNING, "ccm_state_joined:" " failure to send join"); cl_shortsleep(); repeat++; }else{ break; } } CCM_SET_STATE(info, CCM_STATE_JOINING); return; } /* If I'm the leader, record this "I received the * LEAVE message" and transit to WAIT_FOR_CHANGE */ if(ccm_am_i_leader(info)){ reset_change_info(info); update_reset(CCM_GET_UPDATETABLE(info)); add_change_msg(info, orig, CCM_GET_MYNODE_ID(info), NODE_LEAVE); update_add(CCM_GET_UPDATETABLE(info), CCM_GET_LLM(info), CCM_GET_MYNODE_ID(info), CCM_GET_JOINED_TRANSITION(info), FALSE); if(received_all_change_msg(info)){ char *newcookie = ccm_generate_random_cookie(); update_membership(info, orig, NODE_LEAVE); CCM_SET_MAJORTRANS(info, trans_majorval+1); CCM_RESET_MINORTRANS(info); CCM_SET_COOKIE(info, newcookie); ccm_free_random_cookie(newcookie); report_mbrs(info); return; } change_time_init(); CCM_SET_STATE(info, CCM_STATE_WAIT_FOR_CHANGE); }else { /* I'm not leader, send CCM_TYPE_NODE_LEAVE to leader */ send_node_leave_to_leader(hb, info, orig); mem_list_time_init(); CCM_SET_STATE(info,CCM_STATE_WAIT_FOR_MEM_LIST); } break; case CCM_TYPE_NODE_LEAVE: if ((uptime = ha_msg_value(reply, CCM_UPTIME)) == NULL){ cl_log(LOG_WARNING, "ccm_state_joined: no update " "information"); return; } uptime_val = atoi(uptime); /* If I'm leader, record received LEAVE message by orig * and transition to WAIT_FOR_CHANGE state */ if(ccm_am_i_leader(info)){ const char *node = ha_msg_value(reply, F_NODE); reset_change_info(info); update_reset(CCM_GET_UPDATETABLE(info)); add_change_msg(info,node,orig,NODE_LEAVE); update_add(CCM_GET_UPDATETABLE(info), CCM_GET_LLM(info), orig, uptime_val, FALSE); change_time_init(); CCM_SET_STATE(info, CCM_STATE_WAIT_FOR_CHANGE); } break; case CCM_TYPE_ALIVE: /* If I'm leader, record I received the ALIVE message and * transit to WAIT_FOR_CHANGE */ if (ccm_am_i_leader(info)){ reset_change_info(info); update_reset(CCM_GET_UPDATETABLE(info)); add_change_msg(info,orig, CCM_GET_MYNODE_ID(inf
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -