📄 cms_cluster.c
字号:
client_send_client_qopen(client, &reply, -1, flag); ha_free(reply.qname); g_hash_table_remove(mq_open_pending_hash, name); ha_free(mq_pending->name); ha_free(mq_pending); } return TRUE;}static voidgroup_mem_dispatch(gpointer data, gpointer user_data){ IPC_Message msg; client_mqgroup_notify_t * cmg = NULL; int size; client_mqgroup_track_t * track = (client_mqgroup_track_t *)data; notify_buffer_t * notify = (notify_buffer_t *)user_data; cmg = (client_mqgroup_notify_t *) ha_malloc(sizeof(client_mqgroup_notify_t)); if (cmg == NULL) { cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__); return; } cmg->header.type = CMS_QUEUEGROUP_NOTIFY; cmg->header.name = notify->name; cmg->policy = notify->policy; cmg->group_name = notify->name; switch (track->flag) { case SA_TRACK_CHANGES: size = notify->number * sizeof(SaMsgQueueGroupNotificationT); msg.msg_len = sizeof(client_mqgroup_notify_t) + size; cmg->number = notify->number; cmg = realloc(cmg, msg.msg_len); cmg->data = (char *)cmg + sizeof(client_mqgroup_notify_t); memcpy(cmg->data, notify->change_buff, size); break; case SA_TRACK_CHANGES_ONLY: size = sizeof(SaMsgQueueGroupNotificationT); msg.msg_len = sizeof(client_mqgroup_notify_t) + size; cmg->number = 1; cmg = realloc(cmg, msg.msg_len); cmg->data = (char *)cmg + sizeof(client_mqgroup_notify_t); *(SaMsgQueueGroupNotificationT *)(cmg->data) = notify->changeonly_buff; break; default: cl_log(LOG_ERR, "Unknown track flag [%d]", track->flag); ha_free(cmg); return; } msg.msg_body = cmg; msg.msg_private = &msg; msg.msg_done = NULL; /* TODO: msg.msg_done to free memory here */ dprintf("%s: Send Track information to my clients...\n", __FUNCTION__); track->ch->ops->send(track->ch, &msg);}intprocess_mqgroup_insert(struct ha_msg *msg){ const char *gname, *name; mqueue_t *mqg, *mq; notify_buffer_t buf; if ((gname = ha_msg_value(msg, F_MQGROUPNAME)) == NULL) { cl_log(LOG_ERR, "received NULL mq group name request"); return FALSE; } if ((name = ha_msg_value(msg, F_MQNAME)) == NULL) { cl_log(LOG_ERR, "received NULL mq name request"); return FALSE; } /* * Check carefully again here in case there are mess * message in cluster. */ if ((mqg = mqname_lookup(gname, NULL)) == NULL) { cl_log(LOG_ERR , "group name [%s] doesn't exist in local database!" , gname); return FALSE; } if (mqg->policy == 0) { cl_log(LOG_ERR, "[%s] is a mq group name instead of a mq name" , gname); return FALSE; } if ((mq = mqname_lookup(name, NULL)) == NULL) { cl_log(LOG_ERR , "mq name [%s] doesn't exist in local database!" , name); return FALSE; } /* * The mqueue may already in the group, i.e this * node is the master name node. */ if (g_list_find(mqg->list, mq) == NULL) { mqg->list = g_list_append(mqg->list, mq); cl_log(LOG_INFO, "Adding mq <%p> to [%s] list", mq, gname); } /* * Update the mqueue list to point to append the group. */ if (g_list_find(mq->list, mqg) == NULL) { mq->list = g_list_append(mq->list, mqg); cl_log(LOG_INFO, "Adding mqg <%p> to [%s] list", mqg, name); } /* * Current Round Robin counter set to the first list, * we may want to set it as a random index to gain * more load balance. */ if (mqg->current == NULL) mqg->current = g_list_first(mqg->list); /* * Notify my clients who care about the group * membership change message. */ if (mqg->notify_list != NULL) { strcpy(buf.changeonly_buff.member.queueName.value, name); buf.changeonly_buff.member.queueName.length = strlen(name) + 1; buf.changeonly_buff.member.queueStatus = mq->status; buf.changeonly_buff.change = SA_MSG_QUEUE_GROUP_ADDED; buf.policy = mqg->policy; buf.number = 0; strcpy(buf.name.value, gname); buf.name.length = strlen(gname) + 1; buf.change_buff = NULL; g_list_foreach(mqg->list, mqueue_copy_notify_data, &buf); g_list_foreach(mqg->notify_list, group_mem_dispatch, &buf); } return SA_OK;}intprocess_mqgroup_remove(struct ha_msg *msg){ const char *gname, *name; mqueue_t *mqg, *mq; notify_buffer_t buf; if ((gname = ha_msg_value(msg, F_MQGROUPNAME)) == NULL) { cl_log(LOG_ERR, "received NULL mq group name request"); return FALSE; } if ((name = ha_msg_value(msg, F_MQNAME)) == NULL) { cl_log(LOG_ERR, "received NULL mq name request"); return FALSE; } /* * Check carefully again here in case there are mess * message in cluster. */ if ((mqg = mqname_lookup(gname, NULL)) == NULL) { cl_log(LOG_ERR , "group name [%s] doesn't exist in local database!" , gname); return FALSE; } if (!IS_MQGROUP(mqg)) { cl_log(LOG_ERR, "[%s] is a mq name instead of a mq group name" , gname); return FALSE; } if ((mq = mqname_lookup(name, NULL)) == NULL) { cl_log(LOG_ERR , "mq name [%s] doesn't exist in local database!" , name); return FALSE; } /* * mqueue may already be removed from the group, i.e * this node is the master name node */ if (g_list_find(mqg->list, mq) != NULL) mqg->list = g_list_remove(mqg->list, mq); /* * remove the mqgroup from the mqueue list as well */ if (g_list_find(mq->list, mqg) != NULL) mq->list = g_list_remove(mq->list, mqg); /* * Notify my clients who care about the group * membership change message. */ if (mqg->notify_list != NULL) { strcpy(buf.changeonly_buff.member.queueName.value, name); buf.changeonly_buff.member.queueName.length = strlen(name) + 1; buf.changeonly_buff.member.queueStatus = mq->status; buf.changeonly_buff.change = SA_MSG_QUEUE_GROUP_REMOVED; buf.policy = mqg->policy; buf.number = 0; strcpy(buf.name.value, gname); buf.name.length = strlen(gname) + 1; g_list_foreach(mqg->list, mqueue_copy_notify_data, &buf); g_list_foreach(mqg->notify_list, group_mem_dispatch, &buf); }#if DEBUG_CLUSTER cl_log_message(msg);#endif return SA_OK;}intprocess_mqname_ack(struct ha_msg *msg){ const char * qname, * error, * request; const SaInvocationT * invocation; size_t invocation_len, seq_len; const unsigned long * seq; gpointer orig_seq, client; mqueue_request_t ack; gboolean found; if ((qname = ha_msg_value(msg, F_MQNAME)) == NULL || (request = ha_msg_value(msg, F_MQREQUEST)) == NULL || (error = ha_msg_value(msg, F_MQERROR)) == NULL || (seq = cl_get_binary(msg, F_MQMSGSEQ, &seq_len)) == NULL || (invocation = cl_get_binary(msg, F_MQINVOCATION, &invocation_len)) == NULL ) { cl_log(LOG_ERR, "received NULL mq name request"); return FALSE; } found = g_hash_table_lookup_extended(mq_ack_pending_hash, seq, &orig_seq, &client); if (found) { /* * we have clients waiting for acks, send out reply */ dprintf("%s: found client <%p>\n", __FUNCTION__, client); ack.qname = ha_strdup(qname); ack.request_type = cmsrequest_string2type(request); ack.invocation = *invocation; client_send_ack_msg((IPC_Channel *) client, &ack, -1, saerror_string2type(error)); ha_free(ack.qname); g_hash_table_remove(mq_open_pending_hash, seq); ha_free((unsigned long *) orig_seq); ha_free((IPC_Channel *) client); } else { cl_log(LOG_ERR, "client is not found. " "nobody to send the ack to. mqname = %s", qname); } return TRUE;}intprocess_mqname_update(struct ha_msg *msg, cms_data_t * cmsdata){ const struct mq_info * info; size_t info_len; if ((info = cl_get_binary(msg, F_MQUPDATE, &info_len)) == NULL) { cl_log(LOG_ERR, "received NULL mq info update"); return HA_FAIL; } /* * turn on the ready bit and start accepting client request */ if (mqueue_table_unpack(info, info_len) == HA_OK) { cmsdata->cms_ready = 1; } return HA_OK;}intrequest_mqueue_status(mqueue_t * mqueue, cms_data_t * cmsdata){ struct ha_msg * msg; ll_cluster_t * hb; dprintf("in request status, used is [%d]\n" , mqueue->status.saMsgQueueUsage[3].queueUsed); if ((msg = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (ha_msg_add(msg, F_TYPE, mqname_type2string(MQNAME_TYPE_STATUS_REQUEST)) == HA_FAIL || ha_msg_add(msg, F_MQNAME, mqueue->name) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); return FALSE; } hb = cmsdata->hb_handle; hb->llc_ops->sendnodemsg(hb, msg, mqueue->host); ha_msg_del(msg); return TRUE;}intreply_mqueue_status(struct ha_msg *msg, cms_data_t * cmsdata){ char usedstring[PACKSTRSIZE], numstring[PACKSTRSIZE]; const char * name; mqueue_t * mq; struct ha_msg * m; ll_cluster_t * hb; const char * expire = "FALSE"; if ((name = ha_msg_value(msg, F_MQNAME)) == NULL) { cl_log(LOG_ERR, "%s: ha_msg_value failed", __FUNCTION__); return FALSE; } if ((mq = mqname_lookup(name, NULL)) == NULL) { cl_log(LOG_ERR, "%s: cannot find mqname [%s]" , __FUNCTION__, name); return FALSE; } if (RETENTION_TIME_EXPIRES(mq)) { cl_log(LOG_WARNING, "%s: retention time expires [%s]" , __FUNCTION__, name); expire = "TRUE"; request_mqname_unlink(name, cmsdata); } sa_mqueue_usage_encode(NULL, usedstring, numstring , mq->status.saMsgQueueUsage); dprintf("queueUsed [%s], numberOfMessages [%s]\n" , usedstring, numstring); /* * send reply to the request node */ if ((m = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (ha_msg_add(m, F_TYPE, mqname_type2string(MQNAME_TYPE_STATUS_REPLY)) == HA_FAIL || ha_msg_add(m, F_MQNAME, name) == HA_FAIL || ha_msg_add(m, F_MQEXPIRE, expire) == HA_FAIL || ha_msg_add(m, F_MQUSED, usedstring) == HA_FAIL || ha_msg_add(m, F_MQMSGNUM, numstring) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); return FALSE; } hb = cmsdata->hb_handle; hb->llc_ops->sendnodemsg(hb, m, mq->host); ha_msg_del(m); return TRUE;}intprocess_mqueue_status(struct ha_msg *msg){ const char *name, *expire = NULL, *usedstring = NULL, *numstring = NULL; SaErrorT ret = SA_OK; IPC_Channel * client; mqueue_t * mq; gpointer orig_key, value; if ((name = ha_msg_value(msg, F_MQNAME)) == NULL || (expire = ha_msg_value(msg, F_MQEXPIRE)) == NULL || (usedstring = ha_msg_value(msg, F_MQUSED)) == NULL || (numstring = ha_msg_value(msg, F_MQMSGNUM)) == NULL) { cl_log(LOG_ERR, "%s: ha_msg_value failed", __FUNCTION__); ret = SA_ERR_LIBRARY; } if (!strncmp(expire, "TRUE", 4)) ret = SA_ERR_NOT_EXIST; /* TODO: currently there can be only one client for a mqueue * name in the hash table. But there might be more clients * query the mqueue status at the same time. Need to make a * GList for each mqueue name. */ if ((g_hash_table_lookup_extended(mq_status_pending_hash, name, &orig_key, &value)) == FALSE) { cl_log(LOG_ALERT, "%s: cannot find mqname [%s]" , __FUNCTION__, name); return FALSE; } g_hash_table_remove(mq_status_pending_hash, name); client = value; /* * update local mqueue hash table */ if ((mq = mqueue_table_lookup(name, NULL))) { sa_mqueue_usage_decode(NULL, usedstring, numstring , mq->status.saMsgQueueUsage); } /* * response to my client */ dprintf("%s: before respond to my client, ret = [%d]\n" , __FUNCTION__, ret); if (ret != SA_OK) client_send_error_msg(client, name, CMS_QUEUE_STATUS, ret); else ret = client_send_qstatus(client, mq, SA_OK); ha_free(orig_key); return ret;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -