📄 callbacks.c
字号:
/* $Id: callbacks.c,v 1.23 2005/02/19 18:11:03 andrew Exp $ *//* * Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net> * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include <sys/param.h>#include <stdio.h>#include <sys/types.h>#include <unistd.h>#include <stdlib.h>#include <errno.h>#include <fcntl.h>#include <hb_api.h>#include <clplumbing/uids.h>#include <crm/crm.h>#include <crm/cib.h>#include <crm/msg_xml.h>#include <crm/common/ipc.h>#include <crm/common/ctrl.h>#include <crm/common/xml.h>#include <crm/common/msg.h>#include <cibio.h>#include <callbacks.h>#include <cibmessages.h>#include <crm/dmalloc_wrapper.h>gint cib_GCompareFunc(gconstpointer a, gconstpointer b);gboolean cib_msg_timeout(gpointer data);void cib_GHFunc(gpointer key, gpointer value, gpointer user_data);GHashTable *peer_hash = NULL;int next_client_id = 0;gboolean cib_is_master = FALSE;gboolean cib_have_quorum = FALSE;GHashTable *client_list = NULL;extern const char *cib_our_uname;extern ll_cluster_t *hb_conn;/* technically bump does modify the cib... * but we want to split the "bump" from the "sync" */cib_operation_t cib_server_ops[] = { {NULL, FALSE, FALSE, FALSE, FALSE, cib_process_default}, {CRM_OP_NOOP, FALSE, FALSE, FALSE, FALSE, cib_process_default}, {CRM_OP_RETRIVE_CIB, FALSE, FALSE, FALSE, FALSE, cib_process_query}, {CRM_OP_CIB_SLAVE, FALSE, TRUE, FALSE, FALSE, cib_process_readwrite}, {CRM_OP_CIB_SLAVEALL,TRUE, TRUE, FALSE, FALSE, cib_process_readwrite}, {CRM_OP_CIB_MASTER, FALSE, TRUE, FALSE, FALSE, cib_process_readwrite}, {CRM_OP_CIB_ISMASTER,FALSE, TRUE, FALSE, FALSE, cib_process_readwrite}, {CRM_OP_CIB_BUMP, FALSE, TRUE, TRUE, FALSE, cib_process_bump}, {CRM_OP_CIB_REPLACE, TRUE, TRUE, TRUE, TRUE, cib_process_replace}, {CRM_OP_CIB_CREATE, TRUE, TRUE, TRUE, TRUE, cib_process_modify}, {CRM_OP_CIB_UPDATE, TRUE, TRUE, TRUE, TRUE, cib_process_modify}, {CRM_OP_JOINACK, TRUE, TRUE, TRUE, TRUE, cib_process_modify}, {CRM_OP_SHUTDOWN_REQ,TRUE, TRUE, TRUE, TRUE, cib_process_modify}, {CRM_OP_CIB_DELETE, TRUE, TRUE, TRUE, TRUE, cib_process_modify}, {CRM_OP_CIB_QUERY, FALSE, FALSE, TRUE, FALSE, cib_process_query}, {CRM_OP_QUIT, FALSE, TRUE, FALSE, FALSE, cib_process_quit}, {CRM_OP_PING, FALSE, FALSE, FALSE, FALSE, cib_process_ping}, {CRM_OP_CIB_ERASE, TRUE, TRUE, TRUE, FALSE, cib_process_erase}};int send_via_callback_channel(HA_Message *msg, const char *token);enum cib_errors cib_process_command( const HA_Message *request, HA_Message **reply, gboolean privileged);gboolean cib_common_callback( IPC_Channel *channel, gpointer user_data, gboolean privileged);enum cib_errors cib_get_operation_id(const HA_Message * msg, int *operation);gboolean cib_process_disconnect(IPC_Channel *channel, cib_client_t *cib_client);gbooleancib_client_connect(IPC_Channel *channel, gpointer user_data){ gboolean auth_failed = FALSE; gboolean can_connect = TRUE; gboolean (*client_callback)(IPC_Channel *channel, gpointer user_data) = NULL; cib_client_t *new_client = NULL; crm_devel("Connecting channel"); if (channel == NULL) { crm_err("Channel was NULL"); can_connect = FALSE; } else if (channel->ch_status == IPC_DISCONNECT) { crm_err("Channel was disconnected"); can_connect = FALSE; } else if(user_data == NULL) { crm_err("user_data must contain channel name"); can_connect = FALSE; } else { crm_malloc(new_client, sizeof(cib_client_t)); new_client->id = NULL; new_client->callback_id = NULL; new_client->source = NULL; new_client->channel = channel; new_client->channel_name = user_data; new_client->delegated_calls = NULL; crm_devel("Created channel %p for channel %s", new_client, new_client->channel_name); client_callback = NULL; /* choose callback and do auth based on channel_name */ if(safe_str_eq(new_client->channel_name, cib_channel_callback)) { client_callback = cib_null_callback; } else { uuid_t client_id; uuid_generate(client_id); crm_malloc(new_client->id, sizeof(char)*36); uuid_unparse(client_id, new_client->id); new_client->id[35] = EOS; uuid_generate(client_id); crm_malloc(new_client->callback_id, sizeof(char)*36); uuid_unparse(client_id, new_client->callback_id); new_client->callback_id[35] = EOS; client_callback = cib_ro_callback; if(safe_str_eq(new_client->channel_name, cib_channel_rw)) { client_callback = cib_rw_callback; } } } if(auth_failed) { crm_err("Connection to %s channel failed authentication", (char *)user_data); can_connect = FALSE; } if(can_connect == FALSE) { if(new_client) { crm_free(new_client->id); crm_free(new_client->callback_id); } crm_free(new_client); return FALSE; } channel->ops->set_recv_qlen(channel, 100); channel->ops->set_send_qlen(channel, 100); if(client_callback != NULL) { new_client->source = G_main_add_IPC_Channel( G_PRIORITY_LOW, channel, FALSE, client_callback, new_client, default_ipc_connection_destroy); } if(client_callback != cib_null_callback) { /* send msg to client with uuid to use when signing up for * callback channel */ HA_Message *reg_msg = ha_msg_new(3); ha_msg_add(reg_msg, F_CIB_OPERATION, CRM_OP_REGISTER); ha_msg_add(reg_msg, F_CIB_CLIENTID, new_client->id); ha_msg_add( reg_msg, F_CIB_CALLBACK_TOKEN, new_client->callback_id); msg2ipcchan(reg_msg, channel); crm_msg_del(reg_msg); /* make sure we can find ourselves later for sync calls * redirected to the master instance */ g_hash_table_insert(client_list, new_client->id, new_client); } crm_devel("Channel %s connected for client %s", new_client->channel_name, new_client->id); return TRUE;}gbooleancib_rw_callback(IPC_Channel *channel, gpointer user_data){ return cib_common_callback(channel, user_data, TRUE);}gbooleancib_ro_callback(IPC_Channel *channel, gpointer user_data){ return cib_common_callback(channel, user_data, FALSE);}gbooleancib_null_callback(IPC_Channel *channel, gpointer user_data){ gboolean did_disconnect = TRUE; HA_Message *op_request = NULL; cib_client_t *cib_client = user_data; cib_client_t *hash_client = NULL; const char *type = NULL; const char *uuid_ticket = NULL; if(cib_client == NULL) { crm_err("Discarding IPC message from unknown source" " on callback channel."); return FALSE; } while(channel->ops->is_message_pending(channel)) { if (channel->ch_status == IPC_DISCONNECT) { /* The message which was pending for us is that * the IPC status is now IPC_DISCONNECT */ break; } op_request = msgfromIPC_noauth(channel); type = cl_get_string(op_request, F_CIB_OPERATION); if(safe_str_neq(type, CRM_OP_REGISTER) ) { crm_warn("Discarding IPC message from %s on callback channel", cib_client->id); crm_msg_del(op_request); continue; } uuid_ticket = cl_get_string(op_request, F_CIB_CALLBACK_TOKEN); hash_client = g_hash_table_lookup(client_list, uuid_ticket); if(hash_client != NULL) { crm_err("Duplicate registration request... disconnecting"); crm_msg_del(op_request); return FALSE; } cib_client->id = crm_strdup(uuid_ticket); g_hash_table_insert(client_list, cib_client->id, cib_client); crm_info("Registered %s on %s channel", cib_client->id, cib_client->channel_name); crm_msg_del(op_request); op_request = ha_msg_new(2); ha_msg_add(op_request, F_CIB_OPERATION, CRM_OP_REGISTER); ha_msg_add(op_request, F_CIB_CLIENTID, cib_client->id); msg2ipcchan(op_request, channel); crm_msg_del(op_request); } did_disconnect = cib_process_disconnect(channel, cib_client); if(did_disconnect) { crm_info("Client disconnected"); } return did_disconnect;}gbooleancib_common_callback( IPC_Channel *channel, gpointer user_data, gboolean privileged){ int rc = cib_ok; int lpc = 0; int call_type = 0; int call_options = 0; const char *op = NULL; const char *host = NULL; HA_Message *op_request = NULL; HA_Message *op_reply = NULL; gboolean needs_processing = FALSE; cib_client_t *cib_client = user_data; if(cib_client == NULL) { crm_err("Receieved call from unknown source. Discarding."); return FALSE; } crm_verbose("Callback for %s on %s channel", cib_client->id, cib_client->channel_name); while(channel->ops->is_message_pending(channel)) { if (channel->ch_status == IPC_DISCONNECT) { /* The message which was pending for us is that * the IPC status is now IPC_DISCONNECT */ break; } op_request = msgfromIPC(channel); if (op_request == NULL) { perror("Receive failure:"); break; } crm_verbose("Processing IPC message from %s on %s channel", cib_client->id, cib_client->channel_name); crm_log_message(LOG_MSG, op_request); crm_log_message_adv(LOG_DEV, "cib.client-in.log", op_request); lpc++; rc = cib_ok; if(HA_OK != ha_msg_add( op_request, F_CIB_CLIENTID, cib_client->id)) { crm_err("Couldnt add F_CIB_CLIENTID to message"); rc = cib_msg_field_add; } if(rc == cib_ok) { ha_msg_value_int( op_request, F_CIB_CALLOPTS, &call_options); crm_devel("Call options: %.8lx", (long)call_options); host = cl_get_string(op_request, F_CIB_HOST); op = cl_get_string(op_request, F_CIB_OPERATION); rc = cib_get_operation_id(op_request, &call_type); } if(rc == cib_ok && cib_server_ops[call_type].needs_privileges && privileged == FALSE) { rc = cib_not_authorized; } needs_processing = FALSE; if(rc != cib_ok) { /* TODO: construct error reply */ crm_err("Pre-processing of command failed: %s", cib_error2string(rc)); } else if(host == NULL && cib_is_master && !(call_options & cib_scope_local)) { crm_devel("Processing master %s op locally", op); needs_processing = TRUE; } else if( (host == NULL && (call_options & cib_scope_local)) || safe_str_eq(host, cib_our_uname)) { crm_devel("Processing %s op locally", op); needs_processing = TRUE; } else { /* send via HA to other nodes */ ha_msg_add(op_request, F_CIB_DELEGATED, cib_our_uname); crm_log_message(LOG_MSG, op_request); if(host != NULL) { crm_devel("Forwarding %s op to %s", op, host); hb_conn->llc_ops->send_ordered_nodemsg( hb_conn, op_request, host); } else { crm_info("Forwarding %s op to master instance", op); hb_conn->llc_ops->sendclustermsg( hb_conn, op_request); } if(call_options & cib_discard_reply) { crm_trace("Client not interested in reply"); } else if(call_options & cib_sync_call) { /* keep track of the request so we can time it * out if required */ HA_Message *saved = ha_msg_copy(op_request); crm_devel("Registering delegated call from %s", cib_client->id); cib_client->delegated_calls = g_list_append( cib_client->delegated_calls, saved); } crm_msg_del(op_request); op_request = NULL; continue; } if(needs_processing) { crm_verbose("Processing %s op", op); rc = cib_process_command( op_request, &op_reply, privileged); crm_devel("Performing local processing: op=%s origin=%s/%s,%s (update=%s)", op, cib_our_uname, cib_client->id, cl_get_string(op_request, F_CIB_CALLID), (rc==cib_ok && cib_server_ops[call_type].modifies_cib)?"true":"false"); crm_devel("Processing complete"); } crm_devel("processing response cases"); if(rc != cib_ok) { crm_err("Input message"); crm_log_message(LOG_ERR, op_request); crm_err("Output message"); crm_log_message(LOG_ERR, op_reply); crm_log_message_adv(LOG_ERR, DEVEL_DIR"/cib.out.log", op_reply); } if(op_reply == NULL) { crm_trace("No reply is required for op %s", crm_str(op)); } else if(call_options & cib_sync_call) { crm_devel("Sending sync reply to %s op", crm_str(op)); crm_log_message(LOG_MSG, op_reply); if(msg2ipcchan(op_reply, channel) != HA_OK) { crm_err("Sync reply failed: %s", cib_error2string(cib_reply_failed)); if(rc == cib_ok) { rc = cib_reply_failed; } } } else { enum cib_errors local_rc = cib_ok; /* send reply via client's callback channel */ crm_devel("Sending async reply %p to %s op", op_reply, crm_str(op)); crm_log_message(LOG_MSG, op_reply); local_rc = send_via_callback_channel( op_reply, cib_client->callback_id); if(local_rc != cib_ok) { crm_err("ASync reply failed: %s", cib_error2string(local_rc)); if(rc != cib_ok) { local_rc = cib_reply_failed; } } } crm_devel("Cleaning up reply"); crm_msg_del(op_reply); op_reply = NULL; crm_devel("Processing forward cases"); if(rc == cib_ok && cib_server_ops[call_type].modifies_cib && !(call_options & cib_scope_local)) { /* send via HA to other nodes */ crm_info("Forwarding %s op to all instances", op); ha_msg_add(op_request, F_CIB_GLOBAL_UPDATE, XML_BOOLEAN_TRUE); cl_log_message(LOG_DEV, op_request); CRM_DEV_ASSERT(hb_conn->llc_ops->sendclustermsg( hb_conn, op_request) == HA_OK); } else { if(call_options & cib_scope_local ) { crm_devel("Request not broadcast : local scope"); } if(cib_server_ops[call_type].modifies_cib == FALSE) { crm_devel("Request not broadcast : R/O call"); } if(rc != cib_ok) { crm_devel("Request not broadcast : call failed : %s", cib_error2string(rc)); } } crm_devel("Cleaning up request"); crm_msg_del(op_request); op_request = NULL; } crm_verbose("Processed %d messages", lpc); return cib_process_disconnect(channel, cib_client);}enum cib_errorscib_process_command( const HA_Message *request, HA_Message **reply, gboolean privileged){ crm_data_t *output = NULL; crm_data_t *input = NULL; int call_type = 0; int call_options = 0; enum cib_errors rc = cib_ok; const char *op = NULL; const char *call_id = NULL; const char *section = NULL; const char *tmp = NULL; if(reply) { *reply = NULL; } /* Start processing the request... */ op = cl_get_string(request, F_CIB_OPERATION); call_id = cl_get_string(request, F_CIB_CALLID); ha_msg_value_int(request, F_CIB_CALLOPTS, &call_options);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -