📄 cms_mqueue.c
字号:
/* * cms_mqueue.c: cms daemon message queue operation * * Copyright (c) 2004 Intel Corp. * * Author: Zou Yixiong (yixiong.zou@intel.com) * Author: Zhu Yi (yi.zhu@intel.com) * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * */#include <portability.h>#include <string.h>#include <stdlib.h>#include "hb_api.h"#include "clplumbing/GSource.h"#include "saf/ais_message.h"#include "heartbeat.h"#include "cms_common.h"static GHashTable * mqtable_name_hash; /* mq hash key by name */static GHashTable * mqtable_handle_hash; /* mq hash key by handle, ditto */static guint __msghandle_counter = 0; /* mqtable_handle_hash handle key */intmqueue_table_init(void){ mqtable_name_hash = g_hash_table_new(g_str_hash, g_str_equal); mqtable_handle_hash = g_hash_table_new(g_int_hash, g_int_equal); return HA_OK;}guintmqueue_handle_insert(mqueue_t *mq){ guint * handle; handle = (guint *) ha_malloc(sizeof(guint)); if (!handle) { cl_log(LOG_CRIT, "malloc handle failed for mqueue_handle_insert."); return 0; } *handle = __msghandle_counter++; g_hash_table_insert(mqtable_handle_hash, handle, mq); mq->handle = *handle; return *handle;}intmqueue_table_insert(mqueue_t *mq){ g_hash_table_insert(mqtable_name_hash, mq->name, mq); return SA_OK;}voiddump_mqueue_list(mqueue_t * mq){ dprintf("Begin to dump mq%s list for [%s]...\n" , IS_MQGROUP(mq) ? "group" : "queue", mq->name); dprintf("first <%p>, current %s <%p>, last <%p>\n" , g_list_first(mq->list), mq->name, mq->current , g_list_last(mq->list));}static voidunref_mqgroup(gpointer data, gpointer user_data){ mqueue_t *mqg = (mqueue_t *)data; mqueue_t *mq = (mqueue_t *)user_data; mqgroup_unref_mqueue(mqg, mq);}intmqgroup_unref_mqueue(mqueue_t * mqg, mqueue_t * mq){ dump_mqueue_list(mqg); dprintf("%s: mq <%p>\n", __FUNCTION__, mq); mqg->current = CIRCLE_LIST_NEXT(mqg->list, mqg->current); if (mq == mqg->current->data) mqg->current = CIRCLE_LIST_NEXT(mqg->list, mqg->current); mqg->list = g_list_remove(mqg->list, mq); if (G_LIST_EMPTY(mqg->list)) mqg->current = NULL; dump_mqueue_list(mqg); return TRUE;}intmqueue_table_remove(const char * qname){ mqueue_t * mq; CMS_TRACE(); mq = g_hash_table_lookup(mqtable_name_hash, qname); if (!mq) return HA_FAIL; g_hash_table_remove(mqtable_name_hash, qname); /* TODO: list memory needs to be freed as well */ ha_free(mq->name); ha_free(mq->host); if (mq->client) ha_free(mq->client); if (mq->list) { /* * unreference this mqueue from mqgroups include it */ g_list_foreach(mq->list, unref_mqgroup, mq); g_list_free(mq->list); } if (mq->notify_list) g_list_free(mq->notify_list); ha_free(mq); return HA_OK;}intmqueue_handle_remove(guint *hd){ gpointer orig_hd, mq; gboolean found; found = g_hash_table_lookup_extended(mqtable_handle_hash, hd, &orig_hd, &mq); if (!found) return HA_FAIL; g_hash_table_remove(mqtable_handle_hash, hd); ha_free((guint *)orig_hd); /* * mq is not freed here. in case the queue might be reopened later */ return HA_OK;}mqueue_t *mqueue_table_lookup(const char * qname, int * group){ mqueue_t *mq; if ((mq = g_hash_table_lookup(mqtable_name_hash, qname))) { if (group != NULL) { if (mq->policy == 0) *group = FALSE; else *group = TRUE; } return mq; } return NULL;}/** * mqname_lookup - lookup a message queue in the cluster * @name: message queue name * @group: It will be set to TRUE is the name belongs to a message * queue group, otherwise set to FALSE. * * If the message queue is found, returns the message queue; otherwise * returns NULL. This call is non-blocking. */mqueue_t *mqname_lookup(const char *name, int *group){ return mqueue_table_lookup(name, group);}mqueue_t *mqueue_handle_lookup(guint *handle, int * group){ mqueue_t *mq; if ((mq = g_hash_table_lookup(mqtable_handle_hash, handle))) return mq; return NULL;}static voiddump_mqinfo(const struct mq_info * buf, size_t buflen){ const struct mq_info * pbuf; const struct mq_groupinfo * pgbuf; int gcount = 0; int i, j = 0; pbuf = buf; dprintf("mq info update: \n"); while ((const char *)pbuf != ((const char *) buf + buflen)) { j++; dprintf(" mq_info No. %d: \n", j); dprintf(" qname = %s\n", pbuf->qname.value); dprintf(" host = %s\n", pbuf->host.value); dprintf(" mqstat = %d\n", pbuf->mqstat); dprintf(" policy = %d\n", pbuf->policy); dprintf(" mq_groupinfo_count = %d\n" , pbuf->mq_groupinfo_count); gcount = pbuf->mq_groupinfo_count; ++pbuf; if (gcount > 0) { pgbuf = (const struct mq_groupinfo *) pbuf; for (i = 0; i < gcount; i++) { dprintf(" name = %s\n", pgbuf->name.value); pgbuf++; } pbuf = (const struct mq_info *) (&pgbuf); } }}static voidcount_group(gpointer key, gpointer value, gpointer user_data){ mqueue_t * mq = (mqueue_t *) value; size_t * count = (size_t *) user_data; if (mq->list && g_list_length(mq->list)) { *count += g_list_length(mq->list); } return;}static voidpack_mqinfo(gpointer key, gpointer value, gpointer mqinfo){ int i; mqueue_t * mq = (mqueue_t *) value; struct mq_info * info = * ((struct mq_info * *) mqinfo); size_t gcount = 0; mqueue_t * mqg = NULL; struct mq_groupinfo * group; group = (struct mq_groupinfo *) ha_malloc(sizeof(struct mq_groupinfo)); if (!group) { cl_log(LOG_CRIT, "malloc failed for group in pack mqinfo."); return; } info->qname.length = strlen(mq->name) + 1; strncpy(info->qname.value, mq->name, SA_MAX_NAME_LENGTH); info->host.length = strlen(mq->host) + 1; strncpy(info->host.value, mq->host, SA_MAX_NAME_LENGTH); info->mqstat = mq->mqstat; info->policy = mq->policy; info->mq_groupinfo_count = (mq->list) ? g_list_length(mq->list) : 0; if (info->mq_groupinfo_count) gcount = info->mq_groupinfo_count; info++; if (gcount) { group = (struct mq_groupinfo *) info; for (i = 0; i < gcount; i++) { mqg = g_list_nth_data(mq->list, i); group->name.length = strlen(mqg->name) + 1; strncpy(group->name.value, mqg->name, SA_MAX_NAME_LENGTH); group++; } info = (struct mq_info *) (&group); } *((struct mq_info * *) mqinfo) = info; return;}intmqueue_table_pack(struct mq_info * * mqinfo_buf, size_t * mqinfo_len){ size_t buflen, count, gcount = 0; struct mq_info * buf, * pbuf; *mqinfo_buf = NULL; *mqinfo_len = 0; count = g_hash_table_size(mqtable_name_hash); g_hash_table_foreach(mqtable_name_hash, count_group, &gcount); buflen = sizeof(struct mq_info) * count + sizeof(struct mq_groupinfo) * gcount; buf = (struct mq_info *) ha_malloc(buflen); pbuf = buf; g_hash_table_foreach(mqtable_name_hash, pack_mqinfo, &pbuf); dump_mqinfo(buf, buflen); *mqinfo_buf = buf; *mqinfo_len = buflen; return HA_OK;}intmqueue_table_unpack(const struct mq_info * buf, size_t buflen){ const struct mq_info * pbuf; const struct mq_groupinfo * pgbuf; size_t gcount = 0; int i, j = 0; mqueue_t * mq, * mqg; int group_listing = 0; pbuf = buf; while ((const char *)pbuf != (const char *) buf + buflen) { j++; dprintf(" mq_info No. %d: \n", j); if ((mq = (mqueue_t *) ha_malloc(sizeof(mqueue_t))) == NULL) { cl_log(LOG_ERR, "%s: ha_malloc failed for mqueue ", __FUNCTION__); return HA_FAIL; } memset(mq, 0, sizeof(mqueue_t)); dprintf(" qname = %s\n", pbuf->qname.value); mq->name = ha_strdup(pbuf->qname.value); dprintf(" host = %s\n", pbuf->host.value); mq->host = ha_strdup(pbuf->host.value); dprintf(" mqstat = %d\n", pbuf->mqstat); mq->mqstat = pbuf->mqstat; dprintf(" policy = %d\n", pbuf->policy); mq->policy = pbuf->policy; dprintf(" mq_groupinfo_count = %d\n" , pbuf->mq_groupinfo_count); gcount = pbuf->mq_groupinfo_count; ++pbuf; if (gcount > 0) { group_listing = 1; pgbuf = (const struct mq_groupinfo *) pbuf; for (i = 0; i < gcount; i++) { dprintf(" name = %s\n", pgbuf->name.value); pgbuf++; } pbuf = (const struct mq_info *) (&pgbuf); } mqueue_table_insert(mq); } if (!group_listing) return HA_OK; /* mq_info unpacking is a little bit more complicated. we have to do a two pass thing to get all the group listing right. Is there any other way of doing this? */ pbuf = buf; while ((const char *)pbuf != (const char *) buf + buflen) { gcount = pbuf->mq_groupinfo_count; ++pbuf; if (gcount == 0) { continue; } mqg = mqueue_table_lookup(pbuf->qname.value, NULL); pgbuf = (const struct mq_groupinfo *) pbuf; for (i = 0; i < gcount; i++) { dprintf(" name = %s\n", pgbuf->name.value); mq = mqueue_table_lookup(pgbuf->name.value, NULL); if (!mq) { cl_log(LOG_ERR, "queue name does not " "exists in current local database."); continue; } mqg->list = g_list_append(mqg->list, mq); cl_log(LOG_INFO, "Adding mq <%s> to [%s] list" , mq->name, mqg->name); pgbuf++; } if (mqg->current == NULL) mqg->current = g_list_first(mqg->list); pbuf = (const struct mq_info *) (&pgbuf); } return HA_OK;}static voidclose_mqueue(gpointer key, gpointer value, gpointer user_data){ char * node = (char *)user_data; mqueue_t * mq = (mqueue_t *)value; dprintf("%s: node is %s\n", __FUNCTION__, node); if (strcmp(node, mq->host) == 0) { dprintf("close mqueue [%s] on Node [%s]\n", mq->name, node); request_mqname_close(mq->name, &cms_data); }}voidmqueue_close_node(char * node){ g_hash_table_foreach(mqtable_name_hash, close_mqueue, node);}voidenqueue_message(mqueue_t * mq, SaUint8T prio, SaMsgMessageT * msg){ dprintf("%s: mq = [%s], priority = %u\n", __FUNCTION__, mq->name, prio); mq->message_buffer[prio] = g_list_append(mq->message_buffer[prio], msg); mqueue_update_usage(mq, prio, msg->size);}SaMsgMessageT *dequeue_message(mqueue_t * mq){ SaMsgMessageT * message = NULL; GList *head, *queue; SaUint8T i; for (i = SA_MSG_MESSAGE_HIGHEST_PRIORITY ; i <= SA_MSG_MESSAGE_LOWEST_PRIORITY ; i++) { queue = mq->message_buffer[i]; if (g_list_length(queue)) { head = g_list_first(queue); mq->message_buffer[i] = g_list_remove_link(queue, head); message = head->data; g_list_free_1(head); mqueue_update_usage(mq, i, -message->size); return message; } dprintf("%s: mq [%s] priority %u is NULL\n" , __FUNCTION__, mq->name, i); } return NULL;}voidmqueue_update_usage(mqueue_t * mq, int priority, SaSizeT size){ if (priority < 0 || priority > SA_MSG_MESSAGE_LOWEST_PRIORITY) { cl_log(LOG_ALERT, "%s: Invalid priority [%d]" , __FUNCTION__, priority); return; } mq->status.saMsgQueueUsage[priority].queueUsed += size; mq->status.saMsgQueueUsage[priority].numberOfMessages += size < 0 ? -1 : 1; if (mq->status.saMsgQueueUsage[priority].queueUsed < 0) mq->status.saMsgQueueUsage[priority].queueUsed = 0; if (mq->status.saMsgQueueUsage[priority].queueUsed > mq->status.saMsgQueueUsage[priority].queueSize) mq->status.saMsgQueueUsage[priority].queueUsed = mq->status.saMsgQueueUsage[priority].queueSize; if ((SaInt32T) (mq->status.saMsgQueueUsage[priority].numberOfMessages) < 0) mq->status.saMsgQueueUsage[priority].numberOfMessages = 0; dprintf("%s: queueUsed [%d], numberOfMessages [%lu]\n" , __FUNCTION__ , mq->status.saMsgQueueUsage[priority].queueUsed , mq->status.saMsgQueueUsage[priority].numberOfMessages);}intsa_mqueue_usage_encode(char *size, char *used, char * number, SaMsgQueueUsageT * usage){ char *p = size, *q = used, *r = number; int i; for (i = 0; i <= SA_MSG_MESSAGE_LOWEST_PRIORITY; i++) { if (p) { p += sprintf(p, "%lx:", usage[i].queueSize); } if (q) { q += sprintf(q, "%x:", usage[i].queueUsed); } if (r) { r += sprintf(r, "%lx:", usage[i].numberOfMessages); } } return TRUE;}intsa_mqueue_usage_decode(const char *size, const char *used, const char * number, SaMsgQueueUsageT * usage){ const char *p = size, *q = used, *r = number; char * c; int i; for (i = 0; i <= SA_MSG_MESSAGE_LOWEST_PRIORITY; i++) { if (p) { if ((c = strchr(size, ':'))) { usage[i].queueSize = strtoul(p, &c, 16); p = c + 1; } else break; } if (q) { if ((c = strchr(used, ':'))) { usage[i].queueUsed = strtoul(q, &c, 16); q = c + 1; } else break; } if (r) { if ((c = strchr(number, ':'))) { usage[i].numberOfMessages = strtoul(r, &c, 16); r = c + 1; } else break; } } if (i != SA_MSG_MESSAGE_LOWEST_PRIORITY + 1) { cl_log(LOG_ALERT, "%s: Invalid priority [%d]" , __FUNCTION__, i); return FALSE; } return TRUE;}voidmqueue_copy_notify_data(gpointer data, gpointer user_data){ SaMsgQueueGroupNotificationT * current; notify_buffer_t * buf = (notify_buffer_t *)user_data; buf->change_buff = realloc(buf->change_buff , (++(buf->number)) * sizeof(SaMsgQueueGroupNotificationT)); if (!buf->change_buff) { cl_log(LOG_CRIT, "realloc failed for mqueue_copy_notify_data."); return; } dprintf("%s: mqname is [%s]\n", __FUNCTION__, ((mqueue_t *)data)->name); current = buf->change_buff + buf->number - 1; strncpy(current->member.queueName.value, ((mqueue_t *)data)->name , SA_MAX_NAME_LENGTH); current->member.queueName.length = strlen(((mqueue_t *)data)->name) + 1; current->member.queueStatus = ((mqueue_t *)data)->status; if (strcmp(((mqueue_t *)data)->name , buf->changeonly_buff.member.queueName.value) == 0) current->change = buf->changeonly_buff.change; else current->change = SA_MSG_QUEUE_GROUP_NO_CHANGE;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -