📄 cms_cluster.c
字号:
g_list_foreach(mqg->notify_list, group_mem_dispatch, &buf); } return SA_OK;}intprocess_mqgroup_remove(struct ha_msg *msg){ const char *gname, *name; mqueue_t *mqg, *mq; notify_buffer_t buf; if ((gname = ha_msg_value(msg, F_MQGROUPNAME)) == NULL) { cl_log(LOG_ERR, "received NULL mq group name request"); return FALSE; } if ((name = ha_msg_value(msg, F_MQNAME)) == NULL) { cl_log(LOG_ERR, "received NULL mq name request"); return FALSE; } /* * Check carefully again here in case there are mess * message in cluster. */ if ((mqg = mqname_lookup(gname, NULL)) == NULL) { cl_log(LOG_ERR , "group name [%s] doesn't exist in local database!" , gname); return FALSE; } if (!IS_MQGROUP(mqg)) { cl_log(LOG_ERR, "[%s] is a mq name instead of a mq group name" , gname); return FALSE; } if ((mq = mqname_lookup(name, NULL)) == NULL) { cl_log(LOG_ERR , "mq name [%s] doesn't exist in local database!" , name); return FALSE; } /* * mqueue may already be removed from the group, i.e * this node is the master name node */ if (g_list_find(mqg->list, mq) != NULL) mqg->list = g_list_remove(mqg->list, mq); /* * remove the mqgroup from the mqueue list as well */ if (g_list_find(mq->list, mqg) != NULL) mq->list = g_list_remove(mq->list, mqg); /* * Notify my clients who care about the group * membership change message. */ if (mqg->notify_list != NULL) { strcpy(buf.changeonly_buff.member.queueName.value, name); buf.changeonly_buff.member.queueName.length = strlen(name) + 1; buf.changeonly_buff.member.queueStatus = mq->status; buf.changeonly_buff.change = SA_MSG_QUEUE_GROUP_REMOVED; buf.policy = mqg->policy; buf.number = 0; strcpy(buf.name.value, gname); buf.name.length = strlen(gname) + 1; g_list_foreach(mqg->list, mqueue_copy_notify_data, &buf); g_list_foreach(mqg->notify_list, group_mem_dispatch, &buf); }#if DEBUG_CLUSTER cl_log_message(msg);#endif return SA_OK;}intprocess_mqname_ack(struct ha_msg *msg){ const char * qname, * error, * request; const SaInvocationT * invocation; size_t invocation_len, seq_len; const unsigned long * seq; gpointer orig_seq, client; mqueue_request_t ack; gboolean found; if ((request = ha_msg_value(msg, F_MQREQUEST)) == NULL || (error = ha_msg_value(msg, F_MQERROR)) == NULL || (seq = cl_get_binary(msg, F_MQMSGSEQ, &seq_len)) == NULL || (invocation = cl_get_binary(msg, F_MQINVOCATION, &invocation_len)) == NULL ) { cl_log(LOG_ERR, "received NULL mq name request"); return FALSE; } qname = ha_msg_value(msg, F_MQNAME); found = g_hash_table_lookup_extended(mq_ack_pending_hash, seq, &orig_seq, &client); if (found) { /* * we have clients waiting for acks, send out reply */ dprintf("%s: found client <%p>\n", __FUNCTION__, client); if (qname) { ack.qname = ha_strdup(qname); } ack.request_type = cmsrequest_string2type(request); ack.invocation = *invocation; /* we don't ack for sendreceive here because we are waiting for the CMS_MSG_RECEIVE */ if (ack.request_type != CMS_MSG_SEND_RECEIVE) { client_send_ack_msg((IPC_Channel *) client, &ack, -1, saerror_string2type(error)); } ha_free(ack.qname); g_hash_table_remove(mq_open_pending_hash, seq); ha_free((unsigned long *) orig_seq); } else { cl_log(LOG_ERR, "client is not found. " "nobody to send the ack to. mqname = %s, seq = %ld", qname, *seq); } return TRUE;}intprocess_mqname_update(struct ha_msg *msg, cms_data_t * cmsdata){ const struct mq_info * info; size_t info_len; if ((info = cl_get_binary(msg, F_MQUPDATE, &info_len)) == NULL) { cl_log(LOG_INFO, "received NULL mq info update"); // cmsdata->cms_ready = 1; } /* * turn on the ready bit and start accepting client request */ if (mqueue_table_unpack(info, info_len) == HA_OK) { cmsdata->cms_ready = 1; } return HA_OK;}intrequest_mqueue_status(mqueue_t * mqueue, cms_data_t * cmsdata){ struct ha_msg * msg; ll_cluster_t * hb; dprintf("in request status, used is [%d]\n" , mqueue->status.saMsgQueueUsage[3].queueUsed); if ((msg = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (ha_msg_add(msg, F_TYPE, mqname_type2string(MQNAME_TYPE_STATUS_REQUEST)) == HA_FAIL || ha_msg_add(msg, F_MQNAME, mqueue->name) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); return FALSE; } hb = cmsdata->hb_handle; hb->llc_ops->sendnodemsg(hb, msg, mqueue->host); ha_msg_del(msg); return TRUE;}intreply_mqueue_status(struct ha_msg *msg, cms_data_t * cmsdata){ char usedstring[PACKSTRSIZE], numstring[PACKSTRSIZE]; const char *name, *host; mqueue_t * mq; struct ha_msg * m; ll_cluster_t * hb; const char * expire = "FALSE"; if ((name = ha_msg_value(msg, F_MQNAME)) == NULL) { cl_log(LOG_ERR, "%s: ha_msg_value failed", __FUNCTION__); return FALSE; } if ((mq = mqname_lookup(name, NULL)) == NULL) { cl_log(LOG_INFO, "%s: cannot find mqname [%s], return." , __FUNCTION__, name); return FALSE; } if ((host = ha_msg_value(msg, F_ORIG)) == NULL) { cl_log(LOG_ERR, "%s: ha_msg_value failed", __FUNCTION__); return FALSE; } if (RETENTION_TIME_EXPIRES(mq)) { cl_log(LOG_WARNING, "%s: retention time expires [%s]" , __FUNCTION__, name); expire = "TRUE"; request_mqname_unlink(name, cmsdata); } sa_mqueue_usage_encode(NULL, usedstring, numstring , mq->status.saMsgQueueUsage); dprintf("queueUsed [%s], numberOfMessages [%s]\n" , usedstring, numstring); /* * send reply to the request node */ if ((m = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (ha_msg_add(m, F_TYPE, mqname_type2string(MQNAME_TYPE_STATUS_REPLY)) == HA_FAIL || ha_msg_add(m, F_MQNAME, name) == HA_FAIL || ha_msg_add(m, F_MQEXPIRE, expire) == HA_FAIL || ha_msg_add(m, F_MQUSED, usedstring) == HA_FAIL || ha_msg_add(m, F_MQMSGNUM, numstring) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); return FALSE; } hb = cmsdata->hb_handle; hb->llc_ops->sendnodemsg(hb, m, host); ha_msg_del(m); return TRUE;}intprocess_mqueue_status(struct ha_msg *msg){ const char *name, *expire = NULL, *usedstring = NULL, *numstring = NULL; SaErrorT ret = SA_OK; IPC_Channel * client; mqueue_t * mq; gpointer orig_key, value; if ((name = ha_msg_value(msg, F_MQNAME)) == NULL || (expire = ha_msg_value(msg, F_MQEXPIRE)) == NULL || (usedstring = ha_msg_value(msg, F_MQUSED)) == NULL || (numstring = ha_msg_value(msg, F_MQMSGNUM)) == NULL) { cl_log(LOG_ERR, "%s: ha_msg_value failed", __FUNCTION__); ret = SA_ERR_LIBRARY; } if (!strncmp(expire, "TRUE", 4)) ret = SA_ERR_NOT_EXIST; /* TODO: currently there can be only one client for a mqueue * name in the hash table. But there might be more clients * query the mqueue status at the same time. Need to make a * GList for each mqueue name. */ if ((g_hash_table_lookup_extended(mq_status_pending_hash, name, &orig_key, &value)) == FALSE) { cl_log(LOG_ALERT, "%s: cannot find mqname [%s]" , __FUNCTION__, name); return FALSE; } g_hash_table_remove(mq_status_pending_hash, name); client = value; /* * update local mqueue hash table */ if ((mq = mqueue_table_lookup(name, NULL))) { sa_mqueue_usage_decode(NULL, usedstring, numstring , mq->status.saMsgQueueUsage); } /* * response to my client */ dprintf("%s: before respond to my client, ret = [%d]\n" , __FUNCTION__, ret); dprintf("queue status = %d\n", (mq->status).sendingState); if (ret != SA_OK) client_send_error_msg(client, name, CMS_QUEUE_STATUS, ret); else ret = client_send_qstatus(client, mq, SA_OK); ha_free(orig_key); return ret;}intrequest_mqinfo_update(cms_data_t * cmsdata){ int ret; ll_cluster_t *hb; const char * type; struct ha_msg *msg; type = mqname_type2string(MQNAME_TYPE_UPDATE_REQUEST); if ((msg = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); ret = FALSE; } else { hb = cmsdata->hb_handle; hb->llc_ops->sendclustermsg(hb, msg); ret = TRUE; } ha_msg_del(msg); return ret;}intprocess_mqinfo_update_request(struct ha_msg *msg, cms_data_t * cmsdata){ const char * host; const char * node; if ((node = ha_msg_value(msg, F_ORIG)) == NULL) { cl_log(LOG_ERR, "%s: cannot find node name", __FUNCTION__); return FALSE; } /* we ourselves just joined, no update needed. */ if (g_list_length(mqmember_list) <= 1) { return HA_OK; } /* * always the first node in the list should send out the mq * update. in case that the new node is the first node, * choose the second node to send out the mq update */ host = g_list_nth_data(mqmember_list, 0); if (strcmp(host, node) == 0) { host = g_list_nth_data(mqmember_list, 1); } /* are we the one that should send out the mq update? */ if (strcmp(host, cmsdata->my_nodeid) != 0) { return HA_OK; } cl_log(LOG_INFO, "%s: host is %s", __FUNCTION__, host); return reply_mqinfo_update(node, cmsdata);}intreply_mqinfo_update(const char * node, cms_data_t * cmsdata){ struct mq_info * mqinfo; size_t mqinfo_len; const char * type; struct ha_msg *msg; ll_cluster_t *hb; if (mqueue_table_pack(&mqinfo, &mqinfo_len) != HA_OK) { return HA_FAIL; } if ((msg = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return HA_FAIL; } type = mqname_type2string(MQNAME_TYPE_UPDATE); if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL || ha_msg_addbin(msg, F_MQUPDATE, mqinfo, mqinfo_len) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); return HA_FAIL; } hb = cmsdata->hb_handle;#if DEBUG_CLUSTER cl_log_message(msg);#endif hb->llc_ops->sendnodemsg(hb, msg, node); ha_free(mqinfo); ha_msg_del(msg); return HA_OK;}intprocess_mqsend_reply(struct ha_msg * msg, cms_data_t * cmsdata){ SaErrorT ret = SA_OK; const char * request_type, * ack_type; const void * data; const int * ack, * invocation, * msg_pri; const unsigned long * seq, * reply_seq; const SaSizeT * msg_type, * msg_ver, * msg_size, * sendreceive; const char *node; size_t data_len, ack_len, invocation_len, seq_len, sendreceive_len, reply_seq_len; size_t type_len, ver_len, pri_len, size_len; client_message_t * m; gboolean found; gpointer orig_seq, client; ll_cluster_t *hb; struct ha_msg * ack_msg; if ((request_type = ha_msg_value(msg, F_MQREQUEST)) == NULL || (msg_type = cl_get_binary(msg, F_MQMSGTYPE, &type_len)) == NULL || (msg_ver = cl_get_binary(msg, F_MQMSGVER, &ver_len)) == NULL || (msg_pri = cl_get_binary(msg, F_MQMSGPRI, &pri_len)) == NULL || (msg_size = cl_get_binary(msg, F_MQMSGSIZE, &size_len)) == NULL || (data = cl_get_binary(msg, F_MQMSGDATA, &data_len)) == NULL || (invocation = cl_get_binary(msg, F_MQINVOCATION, &invocation_len)) == NULL || (sendreceive = cl_get_binary(msg, F_SENDRECEIVE, &sendreceive_len)) == NULL || (reply_seq = cl_get_binary(msg, F_MQMSGREPLYSEQ, &reply_seq_len)) == NULL || (seq = cl_get_binary(msg, F_MQMSGSEQ, &seq_len)) == NULL || (ack = cl_get_binary(msg, F_MQMSGACK, &ack_len)) == NULL || (node = ha_msg_value(msg, F_ORIG)) == NULL || *msg_size != data_len ) { cl_log(LOG_ERR, "received bad mqname_send request."); return FALSE; } found = g_hash_table_lookup_extended(mq_ack_pending_hash, seq, &orig_seq, &client); if (found) { /* * we have clients waiting for reply, send it out */ dprintf("%s: found client <%p>\n", __FUNCTION__, client); m = (client_message_t *) ha_malloc(sizeof(client_message_t) + data_len); m->header.type = CMS_MSG_RECEIVE; m->header.len = sizeof(client_message_t) + data_len; m->header.flag = SA_OK; m->header.name.length = 0; m->handle = 0; m->msg.type = *msg_type; m->msg.version = *msg_ver; m->msg.size = *msg_size; m->msg.priority = * ((const SaUint8T *) msg_pri); m->msg.data = m + 1; m->invocation = 0; m->ack = 0; m->senderId = 0; m->data = m + 1; memcpy(m->data, data, data_len); ret = client_send_msg((IPC_Channel *) client, sizeof(client_message_t) + data_len, m); g_hash_table_remove(mq_open_pending_hash, seq); ha_free((unsigned long *) orig_seq); } else { cl_log(LOG_ERR, "client is not found. " "nobody to send the reply msg to. "); } if (ret == SA_OK && ack) { ack_type = mqname_type2string(MQNAME_TYPE_ACK); if ((ack_msg = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (ha_msg_add(ack_msg, F_TYPE, ack_type) == HA_FAIL || ha_msg_add(ack_msg, F_MQREQUEST, request_type) == HA_FAIL || ha_msg_addbin(ack_msg, F_MQINVOCATION, invocation, sizeof(int)) == HA_FAIL || ha_msg_addbin(ack_msg, F_MQMSGSEQ, reply_seq, sizeof(unsigned long)) == HA_FAIL || ha_msg_add(ack_msg, F_MQERROR, saerror_type2string(ret)) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); return FALSE; } hb = cmsdata->hb_handle; #if DEBUG_CLUSTER cl_log_message(ack_msg);#endif hb->llc_ops->sendnodemsg(hb, ack_msg, node); ha_msg_del(ack_msg); dprintf("send the ack, ret = %d\n", ret); } return TRUE;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -