📄 callbacks.c
字号:
crm_trace("Processing call id: %s", call_id); rc = cib_get_operation_id(request, &call_type); if(rc == cib_ok && cib_server_ops[call_type].needs_privileges && privileged == FALSE) { /* abort */ rc = cib_not_authorized; } if(rc == cib_ok && cib_server_ops[call_type].needs_section) { section = cl_get_string(request, F_CIB_SECTION); crm_trace("Unpacked section as: %s", section); } if(rc == cib_ok && cib_server_ops[call_type].needs_data) { crm_trace("Unpacking data in %s", F_CIB_CALLDATA); input = get_message_xml(request, F_CIB_CALLDATA); } if(rc == cib_ok) { rc = cib_server_ops[call_type].fn( op, call_options, section, input, &output); } crm_devel("Processing reply cases"); if((call_options & cib_discard_reply) || reply == NULL) { return rc; } crm_devel("Creating the reply"); /* make the basic reply */ *reply = ha_msg_new(8); ha_msg_add(*reply, F_TYPE, T_CIB); ha_msg_add(*reply, F_CIB_OPERATION, op); ha_msg_add(*reply, F_CIB_CALLID, call_id); ha_msg_add_int(*reply, F_CIB_RC, rc); tmp = cl_get_string(request, F_CIB_CLIENTID); ha_msg_add(*reply, F_CIB_CLIENTID, tmp); tmp = cl_get_string(request, F_CIB_CALLOPTS); ha_msg_add(*reply, F_CIB_CALLOPTS, tmp); /* attach the output if necessary */ if(output != NULL) { add_message_xml(*reply, F_CIB_CALLDATA, output); } crm_devel("Cleaning up"); free_xml(output); free_xml(input); return rc;}intsend_via_callback_channel(HA_Message *msg, const char *token) { cib_client_t *hash_client = NULL; GList *list_item = NULL; crm_devel("Delivering msg %p to client %s", msg, token); if(msg == NULL) { crm_err("No message to send"); return cib_reply_failed; } else if(token == NULL) { crm_err("No client id token, cant send message"); return cib_missing; } hash_client = g_hash_table_lookup(client_list, token); if(hash_client == NULL) { crm_err("Cannot find client for token %s", token); return cib_client_gone; } else if(hash_client->channel == NULL) { crm_err("Cannot find channel for client %s", token); return cib_client_corrupt; } list_item = g_list_find_custom( hash_client->delegated_calls, msg, cib_GCompareFunc); if(list_item != NULL) { /* remove it - no need to time it out */ HA_Message *orig_msg = list_item->data; crm_devel("Removing msg from delegated list"); hash_client->delegated_calls = g_list_remove( hash_client->delegated_calls, orig_msg); CRM_DEV_ASSERT(orig_msg != msg); crm_msg_del(orig_msg); } crm_devel("Delivering reply to client %s", token); cl_log_message(LOG_DEV, msg); if(msg2ipcchan(msg, hash_client->channel) != HA_OK) { crm_err("Delivery of reply to client %s failed", token); return cib_reply_failed; } return cib_ok;}gint cib_GCompareFunc(gconstpointer a, gconstpointer b){ const HA_Message *a_msg = a; const HA_Message *b_msg = b; int msg_a_id = 0; int msg_b_id = 0; ha_msg_value_int(a_msg, F_CIB_CALLID, &msg_a_id); ha_msg_value_int(b_msg, F_CIB_CALLID, &msg_b_id); if(msg_a_id == msg_b_id) { return 0; } else if(msg_a_id < msg_b_id) { return -1; } return 1;}gbooleancib_msg_timeout(gpointer data){ crm_trace("Checking if any clients have timed out messages"); g_hash_table_foreach(client_list, cib_GHFunc, NULL); return TRUE;}voidcib_GHFunc(gpointer key, gpointer value, gpointer user_data){ cib_client_t *client = value; GListPtr list = client->delegated_calls; HA_Message *msg = NULL; while(list != NULL) { int seen = 0; int timeout = 5; /* 1 iteration == 1 seconds */ HA_Message *reply = NULL; msg = list->data; ha_msg_value_int(msg, F_CIB_SEENCOUNT, &seen); ha_msg_value_int(msg, F_CIB_TIMEOUT, &timeout); crm_trace("Timeout %d, seen %d", timeout, seen); if(timeout > 0 && seen < timeout) { int seen2 = 0; crm_trace("Updating seen count for msg from client %s", client->id); seen++; ha_msg_mod_int(msg, F_CIB_SEENCOUNT, seen); ha_msg_value_int(msg, F_CIB_SEENCOUNT, &seen2); list = list->next; continue; } crm_warn("Sending operation timeout msg to client %s", client->id); reply = ha_msg_new(4); ha_msg_add(reply, F_TYPE, T_CIB); ha_msg_add(reply, F_CIB_OPERATION, cl_get_string(msg, F_CIB_OPERATION)); ha_msg_add(reply, F_CIB_CALLID, cl_get_string(msg, F_CIB_CALLID)); ha_msg_add_int(reply, F_CIB_RC, cib_master_timeout); msg2ipcchan(reply, client->channel); list = list->next; client->delegated_calls = g_list_remove( client->delegated_calls, msg); crm_msg_del(reply); crm_msg_del(msg); }}gbooleancib_process_disconnect(IPC_Channel *channel, cib_client_t *cib_client){ if (channel->ch_status == IPC_DISCONNECT && cib_client != NULL) { crm_info("Cleaning up after %s channel disconnect from client (%p) %s", cib_client->channel_name, cib_client, crm_str(cib_client->id)); if(cib_client->id != NULL) { g_hash_table_remove(client_list, cib_client->id); } if(cib_client->source != NULL) { crm_devel("deleting the IPC Channel"); G_main_del_IPC_Channel(cib_client->source); cib_client->source = NULL; } crm_devel("Freeing the cib client %s", crm_str(cib_client->id));#if 0 /* todo - put this back in once i recheck its safe */ crm_free(cib_client->callback_id); crm_free(cib_client->id);#endif crm_free(cib_client); crm_devel("Freed the cib client"); return FALSE; } else if (channel->ch_status == IPC_DISCONNECT) { crm_warn("Unknown client disconnected"); return FALSE; } return TRUE;}gbooleancib_ha_dispatch(IPC_Channel *channel, gpointer user_data){ int lpc = 0; ll_cluster_t *hb_cluster = (ll_cluster_t*)user_data; while(hb_cluster->llc_ops->msgready(hb_cluster)) { lpc++; /* invoke the callbacks but dont block */ hb_cluster->llc_ops->rcvmsg(hb_cluster, 0); } crm_trace("%d HA messages dispatched", lpc); if (channel && (channel->ch_status == IPC_DISCONNECT)) { crm_crit("Lost connection to heartbeat service... exiting"); exit(100); return FALSE; } return TRUE;}voidcib_peer_callback(const HA_Message * msg, void* private_data){ int is_done = 1; int call_type = 0; int call_options = 0; gboolean process = TRUE; gboolean needs_reply = TRUE; gboolean local_notify = FALSE; enum cib_errors rc = cib_ok; HA_Message *op_reply = NULL; const char *originator = cl_get_string(msg, F_ORIG); const char *request_to = cl_get_string(msg, F_CIB_HOST); const char *reply_to = cl_get_string(msg, F_CIB_ISREPLY); const char *update = cl_get_string(msg, F_CIB_GLOBAL_UPDATE); const char *delegated = cl_get_string(msg, F_CIB_DELEGATED); const char *client_id = NULL; if(safe_str_eq(originator, cib_our_uname)) { crm_devel("Discarding message %s from ourselves", cl_get_string(msg, F_SEQ)); return; } if(cib_get_operation_id(msg, &call_type) != cib_ok) { crm_err("Invalid operation... discarding msg %s", cl_get_string(msg, F_SEQ)); return; } crm_trace("%s Processing msg %s", cib_our_uname, cl_get_string(msg, F_SEQ)); if(request_to != NULL && strlen(request_to) == 0) { request_to = NULL; } if(cib_server_ops[call_type].modifies_cib || (reply_to == NULL && cib_is_master) || request_to != NULL) { is_done = 0; } crm_info("Processing message from peer to %s...", request_to); crm_log_message(LOG_DEV, msg); if(safe_str_eq(update, XML_BOOLEAN_TRUE) && safe_str_eq(reply_to, cib_our_uname)) { crm_devel("Processing global update that originated from us"); needs_reply = FALSE; local_notify = TRUE; } else if(safe_str_eq(update, XML_BOOLEAN_TRUE)) { crm_devel("Processing global update"); needs_reply = FALSE; } else if(request_to != NULL && safe_str_eq(request_to, cib_our_uname)) { crm_devel("Processing request sent to us"); } else if(delegated != NULL && cib_is_master == TRUE) { crm_devel("Processing request sent to master instance"); } else if(reply_to != NULL && safe_str_eq(reply_to, cib_our_uname)) { crm_devel("Forward reply sent from %s to local clients", originator); process = FALSE; needs_reply = FALSE; local_notify = TRUE; } else if(delegated != NULL) { crm_devel("Ignoring msg for master instance"); return; } else if(request_to != NULL) { /* this is for a specific instance and we're not it */ crm_devel("Ignoring msg for instance on %s", crm_str(request_to)); return; } else if(reply_to == NULL && cib_is_master == FALSE) { /* this is for the master instance and we're not it */ crm_devel("Ignoring reply to %s", crm_str(reply_to)); return; } else { crm_warn("Nothing for us to do?"); return; } crm_devel("Finished determining processing actions"); ha_msg_value_int(msg, F_CIB_CALLOPTS, &call_options); crm_trace("Retrieved call options: %d", call_options); if(process) { crm_devel("Performing local processing: op=%s origin=%s/%s,%s (update=%s)", cl_get_string(msg, F_CIB_OPERATION), originator, cl_get_string(msg, F_CIB_CLIENTID), cl_get_string(msg, F_CIB_CALLID), update); rc = cib_process_command(msg, &op_reply, TRUE); } if(local_notify) { /* send callback to originating child */ cib_client_t *client_obj = NULL; HA_Message *client_reply = NULL; crm_trace("find the client"); if(process == FALSE) { client_reply = ha_msg_copy(msg); } else { client_reply = ha_msg_copy(op_reply); } client_id = cl_get_string(msg, F_CIB_CLIENTID); if(client_id != NULL) { client_obj = g_hash_table_lookup( client_list, client_id); } else { crm_err("No client to sent the response to." " F_CIB_CLIENTID not set."); } crm_devel("Sending callback to originator of delegated request"); if(client_obj != NULL) { if(is_done == 0) { crm_devel("Sending local modify response"); } else { crm_devel("Sending master response"); } if(call_options & cib_sync_call) { crm_devel("Sending sync response: %d", call_options); send_via_callback_channel( client_reply, client_obj->id); } else { crm_devel("Sending async response"); send_via_callback_channel( client_reply, client_obj->callback_id); } } else { crm_warn("Client %s may have left us", crm_str(client_id)); } crm_msg_del(client_reply); } if(needs_reply == FALSE) { /* nothing more to do... * this was a non-originating slave update */ crm_devel("Completed slave update"); crm_msg_del(op_reply); return; } crm_trace("add the originator to message"); /* from now on we are the server */ if(rc == cib_ok && cib_server_ops[call_type].modifies_cib && !(call_options & cib_scope_local)) { /* this (successful) call modified the CIB _and_ the * change needs to be broadcast... * send via HA to other nodes */ HA_Message *op_bcast = ha_msg_copy(msg); crm_devel("Sending update request to everyone"); ha_msg_add(op_bcast, F_CIB_ISREPLY, originator); ha_msg_add(op_bcast, F_CIB_GLOBAL_UPDATE, XML_BOOLEAN_TRUE); crm_log_message(LOG_DEV, op_bcast); hb_conn->llc_ops->sendclustermsg(hb_conn, op_bcast); crm_msg_del(op_bcast); } else { /* send reply via HA to originating node */ crm_devel("Sending request result to originator only"); ha_msg_add(op_reply, F_CIB_ISREPLY, originator); crm_log_message(LOG_DEV, op_reply); hb_conn->llc_ops->send_ordered_nodemsg( hb_conn, op_reply, originator); } crm_msg_del(op_reply); return;}enum cib_errorscib_get_operation_id(const HA_Message * msg, int *operation) { int lpc = 0; int max_msg_types = DIMOF(cib_server_ops); const char *op = cl_get_string(msg, F_CIB_OPERATION); for (lpc = 0; lpc < max_msg_types; lpc++) { if (safe_str_eq(op, cib_server_ops[lpc].operation)) { *operation = lpc; return cib_ok; } } crm_err("Operation %s is not valid", op); *operation = -1; return cib_operation;}voidcib_client_status_callback(const char * node, const char * client, const char * status, void * private){ crm_notice("Status update: Client %s/%s now has status [%s]\n", node, client, status); g_hash_table_replace(peer_hash, crm_strdup(node), crm_strdup(status)); return;}gboolean cib_ccm_dispatch(int fd, gpointer user_data){ int rc = 0; oc_ev_t *ccm_token = (oc_ev_t*)user_data; crm_devel("received callback"); rc = oc_ev_handle_event(ccm_token); if(0 == rc) { return TRUE; } else { crm_err("CCM connection appears to have failed: rc=%d.", rc); return FALSE; }}void cib_ccm_msg_callback( oc_ed_t event, void *cookie, size_t size, const void *data){ crm_devel("received callback"); crm_info("event=%s", event==OC_EV_MS_NEW_MEMBERSHIP?"NEW MEMBERSHIP": event==OC_EV_MS_NOT_PRIMARY?"NOT PRIMARY": event==OC_EV_MS_PRIMARY_RESTORED?"PRIMARY RESTORED": event==OC_EV_MS_EVICTED?"EVICTED": "NO QUORUM MEMBERSHIP"); if(event==OC_EV_MS_NEW_MEMBERSHIP || event==OC_EV_MS_NOT_PRIMARY || event==OC_EV_MS_PRIMARY_RESTORED) { cib_have_quorum = TRUE; } else { cib_have_quorum = FALSE; } oc_ev_callback_done(cookie); return;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -