📄 cms_cluster.c
字号:
}intrequest_mqname_update(const char * node, cms_data_t * cmsdata){ struct mq_info * mqinfo; size_t mqinfo_len; const char * type; struct ha_msg *msg; ll_cluster_t *hb; if (mqueue_table_pack(&mqinfo, &mqinfo_len) != HA_OK) { return HA_FAIL; } if ((msg = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return HA_FAIL; } type = mqname_type2string(MQNAME_TYPE_UPDATE); if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL || ha_msg_addbin(msg, F_MQUPDATE, mqinfo, mqinfo_len) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); return HA_FAIL; } hb = cmsdata->hb_handle;#if DEBUG_CLUSTER cl_log_message(msg);#endif hb->llc_ops->sendnodemsg(hb, msg, node); ha_free(mqinfo); ha_msg_del(msg); return HA_OK;}/** * reply_mqname_open - process the request message as the master node * for this message queue name * * @hb: heartbeat IPC Channel handle * @msg: received message from heartbeat IPC Channel */intreply_mqname_open(ll_cluster_t *hb, struct ha_msg *msg){ const char *name, *type, *request; size_t invocation_size, cflag_size, oflag_size, retention_size; const SaInvocationT * invocation = NULL; const SaMsgQueueCreationFlagsT *cflag = NULL, *oflag = NULL; const SaTimeT * retention = NULL; const int * policy = NULL; const char * size_string; SaMsgQueueSendingStateT sending_state; mqueue_t *mq; struct ha_msg *reply; SaErrorT error = SA_OK; request = NULL; if ((name = ha_msg_value(msg, F_MQNAME)) == NULL || (request = ha_msg_value(msg, F_MQREQUEST)) == NULL || (invocation = cl_get_binary(msg, F_MQINVOCATION, &invocation_size)) == NULL || (policy = cl_get_binary(msg, F_MQPOLICY, NULL)) == NULL || (cflag = cl_get_binary(msg, F_MQCREATEFLAG, &cflag_size)) == NULL || (oflag = cl_get_binary(msg, F_MQOPENFLAG, &oflag_size)) == NULL || (retention = cl_get_binary(msg, F_MQRETENTION, &retention_size)) == NULL || (size_string = ha_msg_value(msg, F_MQSIZE)) == NULL) { cl_log(LOG_ERR, "received bad mq request: name = %s, request" " = %s, invo = %d, policy = %d", name, request , *invocation, *policy); return FALSE; } if ((reply = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (((mq = mqname_lookup(name, NULL)) != NULL) && ((mq->mqstat != MQ_STATUS_CLOSE) || (mq->mqstat == MQ_STATUS_CLOSE && mq->policy != *policy))) { cl_log(LOG_INFO, "mq name [%s] already exists", name); error = SA_ERR_EXIST; type = mqname_type2string(MQNAME_TYPE_DENIED); if (ha_msg_add(reply, F_TYPE, type) == HA_FAIL || ha_msg_add(reply, F_MQREQUEST, request) == HA_FAIL || ha_msg_add(reply, F_MQNAME, name) == HA_FAIL || ha_msg_addbin(reply, F_MQINVOCATION, invocation, invocation_size) == HA_FAIL || ha_msg_add(reply, F_MQERROR, saerror_type2string(error)) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); return FALSE; } dprintf("error is %d\n", error); goto send_msg; } else if ((mq != NULL) && mq->mqstat == MQ_STATUS_CLOSE) { /* * The mq is closed, here need to be reopened. */ type = mqname_type2string(MQNAME_TYPE_REOPEN); error = SA_OK; ha_free(mq->host); mq->host = ha_strdup(ha_msg_value(msg, F_ORIG)); /* we must not set mq->mqstat to MQ_STATUS_OPEN here * because on reopen case, the original master name * server need to check this bit before msgfeed. */ /* mq->mqstat = MQ_STATUS_OPEN; */ mq->list = NULL; mq->current = NULL; mq->notify_list = NULL; } else { type = mqname_type2string(MQNAME_TYPE_GRANTED); sending_state = SA_MSG_QUEUE_AVAILABLE; mq = (mqueue_t *) ha_malloc(sizeof(mqueue_t)); if (!mq) { cl_log(LOG_ERR, "%s: ha_malloc failed\n", __FUNCTION__); return FALSE; } memset(mq, 0, sizeof(mqueue_t)); mq->name = ha_strdup(name); mq->host = ha_strdup(ha_msg_value(msg, F_ORIG)); mq->mqstat = MQ_STATUS_OPEN; mq->policy = *policy; error = mqueue_table_insert(mq); } /* * master node broadcast the result in the cluster */ if (ha_msg_add(reply, F_TYPE, type) == HA_FAIL || ha_msg_add(reply, F_MQNAME, name) == HA_FAIL || ha_msg_add(reply, F_MQREQUEST, request) == HA_FAIL || ha_msg_addbin(reply, F_MQINVOCATION, invocation, invocation_size) == HA_FAIL || ha_msg_add(reply, F_MQHOST, mq->host) == HA_FAIL || ha_msg_addbin(reply, F_MQSTATUS, &sending_state, sizeof(SaMsgQueueSendingStateT)) == HA_FAIL || ha_msg_addbin(reply, F_MQPOLICY, policy, sizeof(int)) == HA_FAIL || ha_msg_addbin(reply, F_MQCREATEFLAG, cflag, cflag_size) == HA_FAIL || ha_msg_addbin(reply, F_MQOPENFLAG, oflag, oflag_size) == HA_FAIL || ha_msg_addbin(reply, F_MQRETENTION, retention, retention_size) == HA_FAIL || ha_msg_add(reply, F_MQSIZE, size_string) == HA_FAIL || ha_msg_add(reply, F_MQERROR, saerror_type2string(error)) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); return FALSE; }send_msg: hb->llc_ops->sendclustermsg(hb, reply); ha_msg_del(reply); return TRUE;}intprocess_mqname_close(struct ha_msg *msg){ const char *name; mqueue_t *mq; CMS_TRACE(); if ((name = ha_msg_value(msg, F_MQNAME)) == NULL) { cl_log(LOG_ERR, "received NULL mq name request"); return FALSE; } if (((mq = mqname_lookup(name, NULL)) != NULL) && (mq->mqstat != MQ_STATUS_CLOSE)) { mq->mqstat = MQ_STATUS_CLOSE; cl_log(LOG_INFO, "%s: Set mq [%s] status to [%d]" , __FUNCTION__, name, mq->mqstat); }#if DEBUG_CLUSTER cl_log_message(msg);#endif return SA_OK;}intprocess_mqname_unlink(struct ha_msg *msg){ const char *name; mqueue_t *mq; if ((name = ha_msg_value(msg, F_MQNAME)) == NULL) { cl_log(LOG_ERR, "received NULL mq name request"); return FALSE; } if ((mq = mqname_lookup(name, NULL)) != NULL) { mqueue_table_remove(name); /* * TODO: remove handle hash also */ }#if DEBUG_CLUSTER cl_log_message(msg);#endif return SA_OK;}intprocess_mqname_send(struct ha_msg *msg, cms_data_t * cmsdata){ const char *name, * gname, * request_type; const void * data, * ack, *invocation, *seq, *msg_pri; const SaSizeT * msg_type, * msg_ver, * msg_size; const char *node; mqueue_request_t request; SaMsgMessageT * message; SaErrorT ret = SA_OK; size_t data_len, ack_len, invocation_len, seq_len; size_t type_len, ver_len, pri_len, size_len; mqueue_t *mq; client_mqueue_notify_t m; const SaUint8T * priority; if ((name = ha_msg_value(msg, F_MQNAME)) == NULL || (request_type = ha_msg_value(msg, F_MQREQUEST)) == NULL || (msg_type = cl_get_binary(msg, F_MQMSGTYPE, &type_len)) == NULL || (msg_ver = cl_get_binary(msg, F_MQMSGVER, &ver_len)) == NULL || (msg_pri = cl_get_binary(msg, F_MQMSGPRI, &pri_len)) == NULL || (msg_size = cl_get_binary(msg, F_MQMSGSIZE, &size_len)) == NULL || (data = cl_get_binary(msg, F_MQMSGDATA, &data_len)) == NULL || (invocation = cl_get_binary(msg, F_MQINVOCATION, &invocation_len)) == NULL || (seq = cl_get_binary(msg, F_MQMSGSEQ, &seq_len)) == NULL || (ack = cl_get_binary(msg, F_MQMSGACK, &ack_len)) == NULL || *msg_size != data_len ) { cl_log(LOG_ERR, "received bad mqname_send request."); return FALSE; } gname = ha_msg_value(msg, F_MQGROUPNAME); priority = (const SaUint8T *) msg_pri; if ((*priority) > SA_MSG_MESSAGE_LOWEST_PRIORITY) { cl_log(LOG_ALERT, "Wrong priorty [%u]\n", *priority); return FALSE; } dprintf("%s: going to send to %s\n", __FUNCTION__, name); if ((mq = mqname_lookup(name, NULL)) != NULL) { dprintf("%s: data_len = %d\n", __FUNCTION__, (int)data_len); dprintf("buff_avai[%d] = %lu\n", *priority, BUFFER_AVAILABLE(mq, *priority)); /* * don't deliver the msg if no buffer */ if ((BUFFER_AVAILABLE(mq, *priority) - data_len - sizeof(SaMsgMessageT)) < 0) { cl_log(LOG_DEBUG, "%s: buffer over flow, msg not " "delivered. ", __FUNCTION__); ret = SA_ERR_QUEUE_FULL; } else { /* * save message to mq->message_buffer */ message = (SaMsgMessageT*) ha_malloc(sizeof(SaMsgMessageT) + data_len); if (!message) return SA_ERR_NO_MEMORY; message->type = *msg_type; message->version = *msg_ver; message->size = *msg_size; message->priority = * priority; message->data = (char *)message + sizeof(SaMsgMessageT); memcpy(message->data, data, data_len); enqueue_message(mq, *priority, message); /* * send only limited info to client */ m.header.type = CMS_MSG_NOTIFY; m.header.len = sizeof(client_header_t); m.header.flag = SA_OK; m.header.name.length = strlen(name) + 1; m.handle = mq->handle; strncpy(m.header.name.value, name, SA_MAX_NAME_LENGTH); /* * send the info to the client */ ret = client_send_msg(mq->client, sizeof(client_mqueue_notify_t), &m); mq->notified = TRUE; ret = SA_OK; } /* * send the ack back */ if (ret == SA_OK && ack) { node = ha_msg_value(msg, F_ORIG); if (gname) { request.qname = ha_strdup(gname); } else { request.qname = ha_strdup(name); } request.gname = NULL; request.request_type = cmsrequest_string2type(request_type); request.invocation = *(const int *)invocation; request.ack = *(const int *)ack; request.seq = *(const unsigned long *) seq; mqname_send_ack(&request, node, NULL, ret, cmsdata); ha_free(request.qname); dprintf("send the ack, ret = %d\n", ret); } } else { node = ha_msg_value(msg, F_ORIG); cl_log(LOG_ERR, "%s: msg queue not found. the name server" " database on node %s is bad.", __FUNCTION__, node); } return SA_OK;}static voidsend_msg_notify(gpointer data, gpointer user_data){ client_mqueue_notify_t m; mqueue_t * mq = user_data; m.header.type = CMS_MSG_NOTIFY; m.header.len = sizeof(client_header_t); m.header.flag = SA_OK; m.header.name.length = strlen(mq->name) + 1; m.handle = mq->handle; strncpy(m.header.name.value, mq->name, SA_MAX_NAME_LENGTH); client_send_msg(mq->client, sizeof(client_mqueue_notify_t), &m);}static voidsend_migrate_message_notify(mqueue_t * mq){ SaUint8T i; for (i = SA_MSG_MESSAGE_HIGHEST_PRIORITY ; i <= SA_MSG_MESSAGE_LOWEST_PRIORITY ; i++) g_list_foreach(mq->message_buffer[i], send_msg_notify, mq);}/** * process_mqname_granted - process the granted message from the master node * for this message queue name * @msg: received message from heartbeat IPC Channel */intprocess_mqname_granted(struct ha_msg *msg, cms_data_t * cmsdata){ const char *name, *host, *error, *request; const int * invocation, *policy; size_t invocation_size; const SaMsgQueueCreationFlagsT *cflag = NULL, *oflag = NULL; const SaMsgQueueSendingStateT * sending_state; const SaTimeT * retention = NULL; const char *size_string; IPC_Channel *client; mqueue_t *mq, *mq_pending; mqueue_request_t mq_request; cms_client_t * cms_client; guint handle; int flag; if ((name = ha_msg_value(msg, F_MQNAME)) == NULL || (request = ha_msg_value(msg, F_MQREQUEST)) == NULL || (invocation = cl_get_binary(msg, F_MQINVOCATION, &invocation_size)) == NULL || (host = ha_msg_value(msg, F_MQHOST)) == NULL || (policy = cl_get_binary(msg, F_MQPOLICY, NULL)) == NULL || (cflag = cl_get_binary(msg, F_MQCREATEFLAG, NULL)) == NULL || (oflag = cl_get_binary(msg, F_MQOPENFLAG, NULL)) == NULL || (retention = cl_get_binary(msg, F_MQRETENTION, NULL)) == NULL || (sending_state = cl_get_binary(msg, F_MQSTATUS, NULL)) == NULL || (size_string = ha_msg_value(msg, F_MQSIZE)) == NULL || (error = ha_msg_value(msg, F_MQERROR)) == NULL) { cl_log(LOG_ERR, "%s: ha_msg_value error", __FUNCTION__); return FALSE; } flag = saerror_string2type(error); /* * This node might be the mqname master node, so make sure don't * duplicate insertion. */ if ((mq = mqueue_table_lookup(name, NULL)) == NULL) { /* * this is not the master node */ mq = (mqueue_t *) ha_malloc(sizeof(mqueue_t)); if (!mq) { cl_log(LOG_ERR, "%s: ha_malloc failed\n", __FUNCTION__); return FALSE; } memset(mq, 0, sizeof(mqueue_t));
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -