📄 messages.c
字号:
/* * Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net> * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include <crm/crm.h>#include <string.h>#include <crmd_fsa.h>#include <libxml/tree.h>#include <crm/msg_xml.h>#include <crm/common/xmlutils.h>#include <crm/common/msgutils.h>#include <crm/cib.h>#include <crmd.h>#include <crmd_messages.h>#include <crm/dmalloc_wrapper.h>FILE *msg_in_strm = NULL;FILE *router_strm = NULL;fsa_message_queue_t fsa_message_queue = NULL;gboolean relay_message(xmlNodePtr xml_relay_message, gboolean originated_locally);#ifdef MSG_LOG# define ROUTER_RESULT(x) char *msg_text = dump_xml(xml_relay_message);\ if(router_strm == NULL) { \ router_strm = fopen("/tmp/router.log", "w"); \ } \ fprintf(router_strm, "[%d RESULT (%s)]\t%s\t%s\n", \ AM_I_DC, \ xmlGetProp(xml_relay_message, XML_ATTR_REFERENCE),\ x, msg_text); \ fflush(router_strm); \ cl_free(msg_text);#else# define ROUTER_RESULT(x) CRM_DEBUG(x);#endif/* returns the current head of the FIFO queue */fsa_message_queue_tput_message(xmlNodePtr new_message){ fsa_message_queue_t next_message = (fsa_message_queue_t) cl_malloc(sizeof(struct fsa_message_queue_s)); CRM_DEBUG("Adding msg to queue"); next_message->message = new_message; next_message->next = NULL; if(fsa_message_queue == NULL) { fsa_message_queue = next_message; } else { fsa_message_queue->next = next_message; } CRM_DEBUG("Added msg to queue"); return fsa_message_queue;}/* returns the next message */fsa_message_queue_tget_message(void){ fsa_message_queue_t next_message = NULL; if(fsa_message_queue != NULL) { next_message = fsa_message_queue; fsa_message_queue = fsa_message_queue->next; next_message->next = NULL; } return next_message;}/* returns the current head of the FIFO queue */gbooleanis_message(void){ return (fsa_message_queue != NULL && fsa_message_queue->message != NULL);}/* A_MSG_STORE */enum crmd_fsa_inputdo_msg_store(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, void *data){/* xmlNodePtr new_message = (xmlNodePtr)data; */ FNIN();/* put_message(new_message); */ FNRET(I_NULL);}/* A_MSG_ROUTE */enum crmd_fsa_inputdo_msg_route(long long action, enum crmd_fsa_cause cause, enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input, void *data){ enum crmd_fsa_input result = I_NULL; xmlNodePtr xml_message = (xmlNodePtr)data; gboolean routed = FALSE, defer = TRUE, do_process = TRUE; FNIN();#if 0/* if(cause == C_IPC_MESSAGE) { */ if (crmd_authorize_message(root_xml_node, msg, curr_client) == FALSE) { CRM_DEBUG("Message not authorized"); do_process = FALSE; }/* } */#endif if(do_process) { /* try passing the buck first */ routed = relay_message(xml_message, cause==C_IPC_MESSAGE); if(routed == FALSE) { defer = TRUE; /* calculate defer */ result = handle_message(xml_message); switch(result) { case I_NULL: defer = FALSE; break; case I_DC_HEARTBEAT: defer = FALSE; break; /* what else should go here? */ default: CRM_DEBUG("Defering local processing of message"); put_message(xml_message); result = I_REQUEST; break; } } } FNRET(result);}voidcrmd_ha_input_callback(const struct ha_msg* msg, void* private_data){ xmlNodePtr root_xml_node = NULL; const char *to = NULL; const char *from = ha_msg_value(msg, F_ORIG); FNIN();#ifdef MSG_LOG if(msg_in_strm == NULL) { msg_in_strm = fopen("/tmp/inbound.log", "w"); }#endif if(from == NULL || strcmp(from, fsa_our_uname) == 0) {#ifdef MSG_LOG fprintf(msg_in_strm, "Discarded message [F_SEQ=%s] from ourselves.\n", ha_msg_value(msg, F_SEQ));#endif FNOUT(); } #ifdef MSG_LOG fprintf(msg_in_strm, "[%s (%s:%s)]\t%s\n", from, ha_msg_value(msg, F_SEQ), ha_msg_value(msg, F_TYPE), ha_msg_value(msg, "xml") ); fflush(msg_in_strm);#endif root_xml_node = find_xml_in_hamessage(msg); to = xmlGetProp(root_xml_node, XML_ATTR_HOSTTO); if(to != NULL && strlen(to) > 0 && strcmp(to, fsa_our_uname) != 0) {#ifdef MSG_LOG fprintf(msg_in_strm, "Discarding message [F_SEQ=%s] for someone else.", ha_msg_value(msg, F_SEQ));#endif FNOUT(); } set_xml_property_copy(root_xml_node, XML_ATTR_HOSTFROM, from); s_crmd_fsa(C_HA_MESSAGE, I_ROUTER, root_xml_node); free_xml(root_xml_node); FNOUT();}/* * Apparently returning TRUE means "stay connected, keep doing stuff". * Returning FALSE means "we're all done, close the connection" */gbooleancrmd_ipc_input_callback(IPC_Channel *client, gpointer user_data){ int lpc = 0; char *buffer = NULL; IPC_Message *msg = NULL; gboolean hack_return_good = TRUE; xmlNodePtr root_xml_node = NULL; crmd_client_t *curr_client = (crmd_client_t*)user_data; FNIN(); CRM_DEBUG2("Processing IPC message from %s", curr_client->table_key); while(client->ops->is_message_pending(client)) { if (client->ch_status == IPC_DISCONNECT) { /* The message which was pending for us is that * the IPC status is now IPC_DISCONNECT */ break; } if (client->ops->recv(client, &msg) != IPC_OK) { perror("Receive failure:"); FNRET(!hack_return_good); } if (msg == NULL) { CRM_DEBUG("No message this time"); continue; } lpc++; buffer = (char*)msg->msg_body; CRM_DEBUG3("Processing xml from %s [text=%s]", curr_client->table_key, buffer); root_xml_node = find_xml_in_ipcmessage(msg, FALSE); if (root_xml_node != NULL) { if (crmd_authorize_message(root_xml_node, msg, curr_client)) { CRM_DEBUG("Message authorized,about to relay"); s_crmd_fsa(C_IPC_MESSAGE, I_ROUTER, root_xml_node); } else { CRM_DEBUG("Message not authorized"); } } else { cl_log(LOG_INFO, "IPC Message was not valid... discarding."); } free_xml(root_xml_node); msg->msg_done(msg); msg = NULL; buffer = NULL; root_xml_node = NULL; } CRM_DEBUG2("Processed %d messages", lpc); if (client->ch_status == IPC_DISCONNECT) { cl_log(LOG_INFO, "received HUP from %s", curr_client->table_key); if (curr_client != NULL) { struct crm_subsystem_s *the_subsystem = NULL; if (curr_client->sub_sys == NULL) CRM_DEBUG("Client had not registered with us yet"); else if (strcmp(CRM_SYSTEM_PENGINE, curr_client->sub_sys) == 0) the_subsystem = pe_subsystem; else if (strcmp(CRM_SYSTEM_TENGINE, curr_client->sub_sys) == 0) the_subsystem = te_subsystem; else if (strcmp(CRM_SYSTEM_CIB, curr_client->sub_sys) == 0) the_subsystem = cib_subsystem; if(the_subsystem != NULL) { cleanup_subsystem(the_subsystem); } /* else that was a transient client */ if (curr_client->table_key != NULL) { /* * Key is destroyed below: curr_client->table_key * Value is cleaned up by G_main_del_IPC_Channel */ g_hash_table_remove(ipc_clients, curr_client->table_key); } if(curr_client->client_source != NULL) { gboolean det = G_main_del_IPC_Channel(curr_client->client_source); CRM_DEBUG2("crm_client was %s detached", det?"successfully":"not"); } cl_free(curr_client->table_key); cl_free(curr_client->sub_sys); cl_free(curr_client->uuid); cl_free(curr_client); } CRM_DEBUG("this client has now left the building."); FNRET(!hack_return_good); } FNRET(hack_return_good);}/* * This method adds a copy of xml_response_data */gbooleansend_request(xmlNodePtr msg_options, xmlNodePtr msg_data, const char *operation, const char *host_to, const char *sys_to){ gboolean was_sent = FALSE; xmlNodePtr request = NULL; FNIN(); msg_options = set_xml_attr(msg_options, XML_TAG_OPTIONS, XML_ATTR_OP, operation, TRUE); request = create_request(msg_options, msg_data, host_to, sys_to, AM_I_DC?CRM_SYSTEM_DC:CRM_SYSTEM_CRMD, NULL, NULL);/* xml_message_debug(request, "Final request..."); */ was_sent = relay_message(request, TRUE); if(was_sent == FALSE) { put_message(request); } free_xml(request); FNRET(was_sent);}/* * This method adds a copy of xml_response_data */gbooleanstore_request(xmlNodePtr msg_options, xmlNodePtr msg_data, const char *operation, const char *host_to, const char *sys_to){ xmlNodePtr request = NULL; FNIN(); msg_options = set_xml_attr(msg_options, XML_TAG_OPTIONS, XML_ATTR_OP, operation, TRUE); request = create_request(msg_options, msg_data, host_to, sys_to, AM_I_DC?CRM_SYSTEM_DC:CRM_SYSTEM_CRMD, NULL, NULL); put_message(request); FNRET(TRUE);}gbooleanrelay_message(xmlNodePtr xml_relay_message, gboolean originated_locally){ int is_for_dc = 0; int is_for_dcib = 0; int is_for_crm = 0; int is_for_cib = 0; int is_local = 0; gboolean dont_cc= TRUE;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -