📄 messages.c
字号:
/* * Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net> * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include <sys/param.h>#include <crm/crm.h>#include <string.h>#include <crmd_fsa.h>#include <hb_api.h>#include <lrm/lrm_api.h>#include <crm/msg_xml.h>#include <crm/common/xml.h>#include <crm/common/msg.h>#include <crm/cib.h>#include <crmd.h>#include <crmd_messages.h>#include <crm/dmalloc_wrapper.h>FILE *msg_out_strm = NULL;FILE *router_strm = NULL;GListPtr fsa_message_queue = NULL;extern void crm_shutdown(int nsig);gboolean send_ha_message(ll_cluster_t *hb_fd, crm_data_t *root);enum crmd_fsa_input handle_request(ha_msg_input_t *stored_msg);enum crmd_fsa_input handle_response(ha_msg_input_t *stored_msg);enum crmd_fsa_input handle_shutdown_request(HA_Message *stored_msg);ha_msg_input_t *copy_ha_msg_input(ha_msg_input_t *orig);#ifdef MSG_LOG# define ROUTER_RESULT(x) \ do_crm_log(LOG_DEV, __FUNCTION__, DEVEL_DIR"/router.log", x); \ crm_log_message_adv(LOG_MSG, DEVEL_DIR"/router.log", relay_message); \ crm_devel(x); \ crm_log_message(LOG_MSG, relay_message);#else# define ROUTER_RESULT(x) crm_devel("Router result: %s", x); \ crm_log_message(LOG_MSG, relay_message); #endif/* debug only, can wrap all it likes */int last_data_id = 0;voidregister_fsa_error_adv( enum crmd_fsa_cause cause, enum crmd_fsa_input input, fsa_data_t *cur_data, void *new_data, const char *raised_from){ /* save the current actions */ register_fsa_input_adv(cur_data?cur_data->fsa_cause:C_FSA_INTERNAL, I_NULL, cur_data?cur_data->data:NULL, fsa_actions, FALSE, __FUNCTION__); /* reset the action list */ fsa_actions = A_NOTHING; /* register the error */ register_fsa_input_adv( cause, input, new_data, A_NOTHING, FALSE, raised_from);}voidregister_fsa_input_adv( enum crmd_fsa_cause cause, enum crmd_fsa_input input, void *data, long long with_actions, gboolean after, const char *raised_from){ int old_len = g_list_length(fsa_message_queue); fsa_data_t *fsa_data = NULL; crm_debug("%s raised FSA input %s (cause=%s) %s data", raised_from,fsa_input2string(input), fsa_cause2string(cause), data?"with":"without"); if(input == I_WAIT_FOR_EVENT) { do_fsa_stall = TRUE; set_bit_inplace(fsa_actions, with_actions); with_actions = A_NOTHING; crm_debug("Stalling the FSA pending further input"); } if(input == I_NULL && with_actions == A_NOTHING /* && data == NULL */){ /* no point doing anything */ return; } crm_malloc(fsa_data, sizeof(fsa_data_t)); fsa_data->id = ++last_data_id; fsa_data->fsa_input = input; fsa_data->fsa_cause = cause; fsa_data->origin = raised_from; fsa_data->data = NULL; fsa_data->data_type = fsa_dt_none; fsa_data->actions = with_actions; if(with_actions != A_NOTHING) { crm_devel("Adding actions %.16llx to input", with_actions); } if(data != NULL) { switch(cause) { case C_FSA_INTERNAL: case C_CRMD_STATUS_CALLBACK: case C_IPC_MESSAGE: case C_HA_MESSAGE: crm_devel("Copying %s data from %s as a HA msg", fsa_cause2string(cause), raised_from); fsa_data->data = copy_ha_msg_input(data); fsa_data->data_type = fsa_dt_ha_msg; break; case C_LRM_OP_CALLBACK: crm_devel("Copying %s data from %s as lrm_op_t", fsa_cause2string(cause), raised_from); fsa_data->data = copy_lrm_op((lrm_op_t*)data); fsa_data->data_type = fsa_dt_lrm; break; case C_CCM_CALLBACK: crm_devel("Copying %s data from %s as CCM data", fsa_cause2string(cause), raised_from); fsa_data->data = copy_ccm_data(data); fsa_data->data_type = fsa_dt_ccm; break; case C_SUBSYSTEM_CONNECT: case C_LRM_MONITOR_CALLBACK: case C_TIMER_POPPED: case C_SHUTDOWN: case C_HEARTBEAT_FAILED: case C_HA_DISCONNECT: case C_ILLEGAL: case C_UNKNOWN: case C_STARTUP: crm_err("Copying %s data (from %s)" " not yet implemented", fsa_cause2string(cause), raised_from); exit(1); break; } crm_trace("%s data copied", fsa_cause2string(fsa_data->fsa_cause)); } /* make sure to free it properly later */ if(after) { crm_trace("Appending input"); fsa_message_queue = g_list_append(fsa_message_queue, fsa_data); } else { crm_trace("Prepending input"); fsa_message_queue = g_list_prepend(fsa_message_queue, fsa_data); } crm_verbose("Queue len: %d -> %d", old_len, g_list_length(fsa_message_queue)); fsa_dump_queue(LOG_VERBOSE); if(old_len == g_list_length(fsa_message_queue)){ crm_err("Couldnt add message to the queue"); } }voidfsa_dump_queue(int log_level) { if(log_level < crm_log_level) { return; } slist_iter( data, fsa_data_t, fsa_message_queue, lpc, do_crm_log(log_level, __FUNCTION__, NULL, "queue[%d(%d)]: input %s raised by %s()\t(cause=%s)", lpc, data->id, fsa_input2string(data->fsa_input), data->origin, fsa_cause2string(data->fsa_cause)); ); }ha_msg_input_t *copy_ha_msg_input(ha_msg_input_t *orig) { ha_msg_input_t *input_copy = NULL; crm_malloc(input_copy, sizeof(ha_msg_input_t)); if(orig != NULL) { crm_trace("Copy msg"); input_copy->msg = ha_msg_copy(orig->msg); if(orig->xml != NULL) { crm_trace("Copy xml"); input_copy->xml = copy_xml_node_recursive(orig->xml); } } else { crm_devel("No message to copy"); } return input_copy;}voiddelete_fsa_input(fsa_data_t *fsa_data) { lrm_op_t *op = NULL; crm_data_t *foo = NULL; struct crmd_ccm_data_s *ccm_input = NULL; if(fsa_data == NULL) { return; } crm_trace("About to free %s data", fsa_cause2string(fsa_data->fsa_cause)); if(fsa_data->data != NULL) { switch(fsa_data->data_type) { case fsa_dt_ha_msg: delete_ha_msg_input(fsa_data->data); break; case fsa_dt_xml: foo = fsa_data->data; free_xml(foo); break; case fsa_dt_lrm: op = (lrm_op_t*)fsa_data->data; crm_free(op->rsc->id); crm_free(op->rsc->type); crm_free(op->rsc->class); crm_free(op->rsc->provider); crm_free(op->rsc); crm_free(op->user_data); crm_free(op->output); crm_free(op->rsc_id); crm_free(op->app_name); crm_free(op); break; case fsa_dt_ccm: ccm_input = (struct crmd_ccm_data_s *) fsa_data->data; crm_free(ccm_input->oc); crm_free(ccm_input); break; case fsa_dt_none: if(fsa_data->data != NULL) { crm_err("Dont know how to free %s data from %s", fsa_cause2string(fsa_data->fsa_cause), fsa_data->origin); exit(1); } break; } crm_trace("%s data freed", fsa_cause2string(fsa_data->fsa_cause)); } crm_free(fsa_data);}/* returns the next message */fsa_data_t *get_message(void){ fsa_data_t* message = g_list_nth_data(fsa_message_queue, 0); fsa_message_queue = g_list_remove(fsa_message_queue, message); return message;}/* returns the current head of the FIFO queue */gbooleanis_message(void){ return (g_list_length(fsa_message_queue) > 0);}void *fsa_typed_data_adv( fsa_data_t *fsa_data, enum fsa_data_type a_type, const char *caller){ void *ret_val = NULL; if(fsa_data == NULL) { do_crm_log(LOG_ERR, caller, NULL, "No FSA data available"); } else if(fsa_data->data == NULL) { do_crm_log(LOG_ERR, caller, NULL, "No message data available"); } else if(fsa_data->data_type != a_type) { do_crm_log(LOG_CRIT, caller, NULL, "Message data was the wrong type! %d vs. requested=%d." " Origin: %s", fsa_data->data_type, a_type, fsa_data->origin); CRM_ASSERT(fsa_data->data_type == a_type); } else { ret_val = fsa_data->data; } return ret_val;}/* A_MSG_ROUTE */enum crmd_fsa_inputdo_msg_route(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, fsa_data_t *msg_data){ enum crmd_fsa_input result = I_NULL; ha_msg_input_t *input = fsa_typed_data(fsa_dt_ha_msg); gboolean routed = FALSE; if(msg_data->fsa_cause != C_IPC_MESSAGE && msg_data->fsa_cause != C_HA_MESSAGE) { /* dont try and route these */ crm_warn("Can only process HA and IPC messages"); return I_NULL; } /* try passing the buck first */ crm_trace("Attempting to route message"); routed = relay_message(input->msg, cause==C_IPC_MESSAGE); if(routed == FALSE) { crm_trace("Message wasn't routed... try handling locally"); /* calculate defer */ result = handle_message(input); switch(result) { case I_NULL: break; case I_DC_HEARTBEAT: break; case I_CIB_OP: break; /* what else should go here? */ default: crm_trace("Defering local processing of message"); register_fsa_input_later( cause, result, msg_data->data); result = I_NULL; break; } if(result == I_NULL) { crm_trace("Message processed"); } else { register_fsa_input(cause, result, msg_data->data); } } else { crm_trace("Message routed..."); input->msg = NULL; } return I_NULL;}/* * This method frees msg */gbooleansend_request(HA_Message *msg, char **msg_reference){ gboolean was_sent = FALSE;/* crm_xml_devel(request, "Final request..."); */ if(msg_reference != NULL) { *msg_reference = crm_strdup( cl_get_string(msg, XML_ATTR_REFERENCE)); } was_sent = relay_message(msg, TRUE); if(was_sent == FALSE) { ha_msg_input_t *fsa_input = new_ha_msg_input(msg); register_fsa_input(C_IPC_MESSAGE, I_ROUTER, fsa_input); delete_ha_msg_input(fsa_input); crm_msg_del(msg); } return was_sent;}/* unless more processing is required, relay_message is freed */gbooleanrelay_message(HA_Message *relay_message, gboolean originated_locally){ int is_for_dc = 0; int is_for_dcib = 0; int is_for_crm = 0; int is_for_cib = 0; int is_local = 0; gboolean processing_complete = FALSE; const char *host_to = cl_get_string(relay_message, F_CRM_HOST_TO); const char *sys_to = cl_get_string(relay_message, F_CRM_SYS_TO); const char *type = cl_get_string(relay_message, F_TYPE); const char *msg_error = NULL; crm_devel("Routing message %s", cl_get_string(relay_message, XML_ATTR_REFERENCE)); if(relay_message == NULL) { msg_error = "Cannot route empty message"; } else if(safe_str_eq(CRM_OP_HELLO, cl_get_string(relay_message, F_CRM_TASK))){ /* quietly ignore */ processing_complete = TRUE; } else if(safe_str_neq(type, T_CRM)) { msg_error = "Bad message type"; } else if(sys_to == NULL) { msg_error = "Bad message destination: no subsystem"; } if(msg_error != NULL) { processing_complete = TRUE; crm_err("%s", msg_error); crm_log_message(LOG_ERR, relay_message); } if(processing_complete) { crm_msg_del(relay_message); return TRUE; } processing_complete = TRUE; is_for_dc = (strcmp(CRM_SYSTEM_DC, sys_to) == 0); is_for_dcib = (strcmp(CRM_SYSTEM_DCIB, sys_to) == 0); is_for_cib = (strcmp(CRM_SYSTEM_CIB, sys_to) == 0); is_for_crm = (strcmp(CRM_SYSTEM_CRMD, sys_to) == 0); is_local = 0; if(host_to == NULL || strlen(host_to) == 0) { if(is_for_dc) { is_local = 0; } else if(is_for_crm && originated_locally) { is_local = 0; } else { is_local = 1; } } else if(strcmp(fsa_our_uname, host_to) == 0) { is_local=1; } if(is_for_dc || is_for_dcib) { if(AM_I_DC) { ROUTER_RESULT("Message result: DC/CRMd process"); processing_complete = FALSE; /* more to be done by caller */ } else if(originated_locally) { ROUTER_RESULT("Message result: External relay to DC"); send_msg_via_ha(fsa_cluster_conn, relay_message); } else { /* discard */ ROUTER_RESULT("Message result: Discard, not DC"); crm_msg_del(relay_message); } } else if(is_local && (is_for_crm || is_for_cib)) { ROUTER_RESULT("Message result: CRMd process"); processing_complete = FALSE; /* more to be done by caller */ } else if(is_local) { ROUTER_RESULT("Message result: Local relay"); send_msg_via_ipc(relay_message, sys_to); } else { ROUTER_RESULT("Message result: External relay"); send_msg_via_ha(fsa_cluster_conn, relay_message); } return processing_complete;}gbooleancrmd_authorize_message(ha_msg_input_t *client_msg, crmd_client_t *curr_client){ /* check the best case first */ const char *sys_from = cl_get_string(client_msg->msg, F_CRM_SYS_FROM); char *uuid = NULL; char *client_name = NULL; char *major_version = NULL; char *minor_version = NULL; const char *filtered_from; gpointer table_key = NULL; gboolean auth_result = FALSE; struct crm_subsystem_s *the_subsystem = NULL; gboolean can_reply = FALSE; /* no-one has registered with this id */ const char *op = cl_get_string(client_msg->msg, F_CRM_TASK); if (safe_str_neq(CRM_OP_HELLO, op)) { if(sys_from == NULL) { crm_warn("Message [%s] was had no value for %s... discarding", cl_get_string(client_msg->msg, XML_ATTR_REFERENCE), F_CRM_SYS_FROM); return FALSE; } filtered_from = sys_from; /* The CIB can have two names on the DC */ if(strcmp(sys_from, CRM_SYSTEM_DCIB) == 0) filtered_from = CRM_SYSTEM_CIB; if (g_hash_table_lookup (ipc_clients, filtered_from) != NULL) { can_reply = TRUE; /* reply can be routed */ } crm_verbose("Message reply can%s be routed from %s.", can_reply?"":" not", sys_from); if(can_reply == FALSE) { crm_warn("Message [%s] not authorized", cl_get_string(client_msg->msg, XML_ATTR_REFERENCE)); } #if 0 if(ha_msg_value(msg, XML_ATTR_REFERENCE) == NULL) { ha_msg_add(new_input->msg, XML_ATTR_REFERENCE, seq); }#endif register_fsa_input(C_IPC_MESSAGE, I_ROUTER, client_msg);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -