📄 cms_client.c
字号:
/* * cms_client.c: cms daemon client operation * * Copyright (c) 2004 Intel Corp. * * Author: Zou Yixiong (yixiong.zou@intel.com) * Author: Zhu Yi (yi.zhu@intel.com) * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * */#include <sys/types.h>#include <sys/stat.h>#include <string.h>#include <stdlib.h>#include <assert.h>#include <clplumbing/cl_log.h>#include <clplumbing/GSource.h>#include <heartbeat.h>#include "cms_common.h"extern GHashTable * mq_open_pending_hash;extern GHashTable * mq_status_pending_hash;extern GHashTable * mq_ack_pending_hash;extern GHashTable * cms_client_table;static unsigned long gSendSeqNo = 0;static gbooleandelete_cms_client(gpointer key, gpointer value, gpointer user_data){ /* memory leak here? */ return TRUE;}voidcms_client_input_destroy(gpointer user_data){ dprintf("%s: received HUP.\n", __FUNCTION__); return;}intcms_client_init(cms_data_t * cmsdata){ IPC_WaitConnection * wait_ch = NULL; mode_t mask; GHashTable * attrs; char path[] = IPC_PATH_ATTR; char domainsocket[] = IPC_DOMAIN_SOCKET; char cms_socket[] = CMS_DOMAIN_SOCKET; cl_log(LOG_INFO, "initialize client tables and wait channel."); attrs = g_hash_table_new(g_str_hash, g_str_equal); g_hash_table_insert(attrs, path, cms_socket); mask = umask(0); wait_ch = ipc_wait_conn_constructor(domainsocket, attrs); if (!wait_ch) { cl_perror("Can't create wait channel"); return HA_FAIL; } mask = umask(mask); g_hash_table_destroy(attrs); cmsdata->wait_ch = wait_ch; cmsdata->client_table = g_hash_table_new(g_int_hash, g_int_equal); return HA_OK;}intcms_client_add(GHashTable ** cms_client_table, struct IPC_CHANNEL * newclient) { cms_client_t * cms_client; pid_t * key; if (!cms_client_table) { cl_log(LOG_ERR, "cms: can't find client table"); return HA_FAIL; } cms_client = g_hash_table_lookup(*cms_client_table, &(newclient->farside_pid)); if (cms_client) { cms_client->channel_count++; dprintf("farside_pid [%d] already exists, channel_count [%d]\n" , newclient->farside_pid, cms_client->channel_count); return HA_OK; } cms_client = (cms_client_t *) ha_malloc(sizeof(cms_client_t)); dprintf("Add farside_pid [%d] to daemon <%p>\n" , newclient->farside_pid, *cms_client_table); cms_client->channel_count = 1; cms_client->opened_mqueue_list = NULL; key = (pid_t *)ha_malloc(sizeof(pid_t)); *key = newclient->farside_pid; g_hash_table_insert(*cms_client_table, key, cms_client); return HA_OK;}voidcms_client_close_all(GHashTable * cms_client_table){ dprintf("In Func %s ...\n", __FUNCTION__); if (g_hash_table_size(cms_client_table)) { g_hash_table_foreach_remove(cms_client_table, delete_cms_client, NULL); } return;}intclient_process_qstatus(IPC_Channel * client, client_header_t *msg, cms_data_t * cmsdata){ client_mqueue_status_t * qstatus_msg; mqueue_t * queue; SaErrorT error; client_header_t reply; char * mqname; qstatus_msg = (client_mqueue_status_t *) msg; mqname = saname2str(qstatus_msg->header.name); /* get the queue/host mapping * * this function should be deterministic based * only on the queue name and the list of cluster * membership available. */ if ((queue = mqname_lookup(mqname, NULL)) == NULL) { dprintf("%s, queue %s not found.\n", __FUNCTION__, mqname); error = SA_ERR_NOT_EXIST; reply.type = msg->type; reply.len = sizeof(client_header_t); reply.flag = error; reply.name = msg->name; client_send_msg(client, reply.len, &reply); return TRUE; }; dprintf("%s, queue %s found.\n", __FUNCTION__, mqname); if (!is_cms_online(queue->host)) { cl_log(LOG_WARNING, "%s, cms daemon offline on [%s], return BAD_HANDLE for queue status.", __FUNCTION__, queue->host); error = SA_ERR_BAD_HANDLE; reply.type = msg->type; reply.len = sizeof(client_header_t); reply.flag = error; reply.name = msg->name; client_send_msg(client, reply.len, &reply); return TRUE; } /* Get queueUsed and numberOfMessages for saMsgQueueUsage[4] * from mqueue open node. */ g_hash_table_insert(mq_status_pending_hash, mqname, client); request_mqueue_status(queue, cmsdata); return TRUE;}intclient_process_mqopen(IPC_Channel * client, client_header_t * msg, cms_data_t * cmsdata){ mqueue_request_t request; IPC_Channel *cli; client_mqueue_open_t * m = (client_mqueue_open_t *) msg; mqueue_t * mq; char * mqname; int i; SaErrorT error = SA_OK; mqname = saname2str(m->header.name); if (g_hash_table_lookup(mq_open_pending_hash, mqname)) { cl_log(LOG_WARNING, "%s: mqname [%s] open pending from local" , __FUNCTION__, mqname); request.qname = mqname; request.gname = NULL; request.request_type = m->header.type; request.invocation = 0; client_send_client_qopen(client, &request, -1, SA_ERR_EXIST); ha_free(mqname); return TRUE; } /* Search from local database firstly, if local database checking * fails, we don't continue. * * Don't worry about retention time here, the original mqueue * owner will deal with it when receiving REOPEN from hb. */ if ((mq = mqname_lookup(mqname, NULL)) != NULL) { if (mq->mqstat != MQ_STATUS_CLOSE) error = SA_ERR_EXIST; /* Client provide a creationAttributes, but it is * different from what we already have. */ else if ((m->attr.creationFlags != -1) && (mq->status.openFlags != m->openflag || mq->status.creationFlags != m->attr.creationFlags || mq->status.retentionTime != m->attr.retentionTime)) error = SA_ERR_EXIST; } if (!mq && (m->header.type != CMS_QUEUEGROUP_CREATE) && !(m->openflag & SA_MSG_QUEUE_CREATE)) { error = SA_ERR_NOT_EXIST; cl_log(LOG_INFO, "%s: SA_MSG_QUEUE_CREATE not provided for %s" , __FUNCTION__, mqname); } if (error == SA_OK) goto proceed; request.qname = mqname; request.gname = NULL; request.request_type = m->header.type; request.invocation = 0; client_send_client_qopen(client, &request, -1, error); ha_free(mqname); return TRUE;proceed: if ((cli = (IPC_Channel *) ha_malloc(sizeof(IPC_Channel))) == NULL) { cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__); ha_free(mqname); return FALSE; } *cli = *client; mq = (mqueue_t *) ha_malloc(sizeof(mqueue_t)); if (!mq) { cl_log(LOG_ERR, "%s: ha_malloc for mqueue_t failed.\n" , __FUNCTION__); ha_free(cli); ha_free(mqname); return FALSE; } memset(mq, 0, sizeof(mqueue_t)); mq->name = mqname; mq->policy = m->policy; mq->mqstat = MQ_STATUS_OPEN_PENDING; mq->client = cli; g_hash_table_insert(mq_open_pending_hash, mq->name, mq); request.qname = mqname; request.gname = NULL; request.request_type = m->header.type; request.invocation = m->invocation; request.policy = m->policy; request.create_flag = m->attr.creationFlags; request.retention = m->attr.retentionTime; request.ack = 1; for (i = SA_MSG_MESSAGE_HIGHEST_PRIORITY ; i <= SA_MSG_MESSAGE_LOWEST_PRIORITY; i++) request.size[i] = m->attr.size[i]; cl_log(LOG_INFO, "%s: invocation = %d, policy = %d, type = %d" , __FUNCTION__, request.invocation, request.policy , request.request_type); if (request_mqname_open(&request, cmsdata) == FALSE) { cl_log(LOG_ERR, "%s: cluster_request_mqname failed" , __FUNCTION__); g_hash_table_remove(mq_open_pending_hash, mqname); ha_free(cli); ha_free(mq); ha_free(mqname); return FALSE; } return TRUE;}intclient_process_mqclose(IPC_Channel * client, client_header_t * msg, cms_data_t * cmsdata){ client_mqueue_close_t * m = (client_mqueue_close_t *) msg; mqueue_t *mq; client_header_t reply; cms_client_t * cms_client; CMS_TRACE(); mq = mqueue_handle_lookup(&(m->handle), NULL); if (mq == NULL) { reply.type = msg->type; reply.len = sizeof(client_header_t); reply.flag = SA_ERR_NOT_EXIST; reply.name = msg->name; client_send_msg(client, reply.len, &reply); cl_log(LOG_WARNING, "%s: Cannot find mq by handle [%u]" , __FUNCTION__, m->handle); return TRUE; } mq->status.closeTime = get_current_satime(); /* * close the mqueue in the cluster */ if (request_mqname_close(mq->name, cmsdata) == FALSE) { cl_log(LOG_ERR, "%s: mqname_close failed", __FUNCTION__); return FALSE; } mqueue_handle_remove(&(m->handle)); /* * remove this mq from client's opened_mqueue_list */ cms_client = g_hash_table_lookup(cmsdata->client_table, &client->farside_pid); assert(cms_client != NULL); cms_client->opened_mqueue_list = g_list_remove(cms_client->opened_mqueue_list, mq); #if 0 if (m->silent) return TRUE; #endif return TRUE;}intclient_process_mqunlink(IPC_Channel * client, client_header_t * msg, cms_data_t * cmsdata){ mqueue_t *mq; char * mqname; client_mqueue_unlink_t * m = (client_mqueue_unlink_t *) msg; client_header_t reply; cms_client_t * cms_client; mqname = saname2str(m->header.name); dprintf("%s: mqname=[%s]\n", __FUNCTION__, mqname); mq = mqueue_table_lookup(mqname, NULL); if (mq == NULL) { reply.type = msg->type; reply.len = sizeof(client_header_t); reply.flag = SA_ERR_NOT_EXIST; reply.name = msg->name; cl_log(LOG_WARNING, "%s: Cannot find mq by handle [%u]" , __FUNCTION__, m->handle); client_send_msg(client, reply.len, &reply); return TRUE; } if (mq->list != NULL) { /* * Remove the reference from my message queue */ } if (request_mqname_unlink(mqname, cmsdata) == FALSE) { cl_log(LOG_ERR, "%s: mqname_unlink failed", __FUNCTION__); ha_free(mqname); return FALSE; } mqueue_handle_remove(&(m->handle)); /* * remove this mq from client's opened_mqueue_list */ cms_client = g_hash_table_lookup(cmsdata->client_table, &client->farside_pid); assert(cms_client != NULL); cms_client->opened_mqueue_list = g_list_remove(cms_client->opened_mqueue_list, mq); ha_free(mqname); return TRUE;}intclient_process_mqsend(IPC_Channel * client, client_header_t * msg, cms_data_t * cmsdata){ mqueue_t *mq; char * mqname; mqueue_request_t request; IPC_Channel *cli = NULL; unsigned long * seq = NULL; client_message_t * m = (client_message_t *) msg; client_message_ack_t reply; SaErrorT error; mqname = saname2str(m->header.name); request.qname = mqname; request.gname = NULL; request.request_type = m->header.type; dprintf("request.request_type is %d\n", request.request_type); request.invocation = m->invocation; request.ack = m->ack; request.sendreceive = m->sendreceive; request.seq = gSendSeqNo++; dprintf("%s: mqname = %s\n", __FUNCTION__, mqname); m->data = (void *)((char *)msg + sizeof(client_message_t)); m->msg.data = m->data; mq = mqname_lookup(mqname, NULL); if (mq == NULL) { cl_log(LOG_WARNING, "%s: Cannot find mq by name [%s]" , __FUNCTION__, mqname); error = SA_ERR_NOT_EXIST; goto error; } /* * for message queue group */ if (IS_MQGROUP(mq)) { mqueue_t *mqg = mq; char *mqueue_name = NULL; request.gname = mqg->name; cl_log(LOG_INFO, "[%s] is a [Type %d] message queue group" , mqname, mqg->policy); cl_log(LOG_INFO, "MQ Group [%s] current is <%p>" , mqname, mqg->current->data); if (!(mqg->current) || !(mqueue_name = ((mqueue_t *)(mqg->current->data))->name) || !(mq = mqueue_table_lookup(mqueue_name, NULL))) { cl_log(LOG_ERR, "%s: Cannot find group current [%s]\n" , __FUNCTION__, mqueue_name); error = SA_ERR_NOT_EXIST; goto error; } dump_mqueue_list(mqg); mqg->current = CIRCLE_LIST_NEXT(mqg->list, mqg->current); request.qname = mqueue_name; request.gname = mqname; } if (request.ack) { dprintf("%s: insert ack packet ", __FUNCTION__); if ((cli = (IPC_Channel *) ha_malloc(sizeof(IPC_Channel))) == NULL || (seq = (unsigned long *) ha_malloc(sizeof(unsigned long))) == NULL ) { cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__); if (cli) ha_free(cli); error = SA_ERR_NO_MEMORY; goto error; } *cli = *client; *seq = request.seq; dprintf("seq = %ld, client = %p\n", *seq, cli); g_hash_table_insert(mq_ack_pending_hash, seq, cli); } if (request_mqname_send(&request, mq->host, NULL, &(m->msg) , cmsdata) == FALSE) { cl_log(LOG_ERR, "%s: mqname_send failed", __FUNCTION__); error = SA_ERR_LIBRARY; goto error; } ha_free(mqname); return TRUE;error: /* This is actually a error respond instead of a ACK. */ memset(&reply, 0, sizeof(client_message_ack_t)); reply.header.type = CMS_MSG_ACK; reply.header.len = sizeof(client_message_ack_t); reply.header.flag = error; reply.header.name = msg->name; reply.send_type = msg->type; reply.invocation = m->invocation; client_send_msg(client, reply.header.len, &reply); ha_free(mqname); return TRUE;}intclient_process_message_request(IPC_Channel * client, client_header_t * msg){ char * mqname; mqueue_t * mq; message_t * message; client_message_t * m; mqname = saname2str(msg->name); dprintf("%s: mqname is %s\n", __FUNCTION__, mqname);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -