📄 cms_cluster.c
字号:
mqueue_t *mq; struct ha_msg *reply; SaErrorT error = SA_OK; request = NULL; CMS_TRACE(); if ((name = ha_msg_value(msg, F_MQNAME)) == NULL || (request = ha_msg_value(msg, F_MQREQUEST)) == NULL || (invocation = cl_get_binary(msg, F_MQINVOCATION, &invocation_size)) == NULL || (policy = cl_get_binary(msg, F_MQPOLICY, NULL)) == NULL || (cflag = cl_get_binary(msg, F_MQCREATEFLAG, &cflag_size)) == NULL || (oflag = cl_get_binary(msg, F_MQOPENFLAG, &oflag_size)) == NULL || (retention = cl_get_binary(msg, F_MQRETENTION, &retention_size)) == NULL || (size_string = ha_msg_value(msg, F_MQSIZE)) == NULL) { cl_log(LOG_ERR, "received bad mq request: name = %s, request" " = %s, invo = %d, policy = %d", name, request , *invocation, *policy); return FALSE; } dprintf("queue (group) name = %s\n", name); if ((reply = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (((mq = mqname_lookup(name, NULL)) != NULL) && ((mq->mqstat != MQ_STATUS_CLOSE) || (mq->mqstat == MQ_STATUS_CLOSE && mq->policy != *policy))) { cl_log(LOG_INFO, "mq name [%s] already exists", name); error = SA_ERR_EXIST; type = mqname_type2string(MQNAME_TYPE_DENIED); if (ha_msg_add(reply, F_TYPE, type) == HA_FAIL || ha_msg_add(reply, F_MQREQUEST, request) == HA_FAIL || ha_msg_add(reply, F_MQNAME, name) == HA_FAIL || ha_msg_addbin(reply, F_MQINVOCATION, invocation, invocation_size) == HA_FAIL || ha_msg_add(reply, F_MQERROR, saerror_type2string(error)) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); return FALSE; } dprintf("error is %d\n", error); goto send_msg; } else if ((mq != NULL) && mq->mqstat == MQ_STATUS_CLOSE) { /* * The mq is closed, here need to be reopened. */ type = mqname_type2string(MQNAME_TYPE_REOPEN); error = SA_OK; /* we must not set mq->mqstat to MQ_STATUS_OPEN here * because on reopen case, the original master name * server need to check this bit before msgfeed. */ //mq->mqstat = MQ_STATUS_OPEN; mqhost = ha_msg_value(msg, F_ORIG); mq->list = NULL; mq->current = NULL; mq->notify_list = NULL; } else { type = mqname_type2string(MQNAME_TYPE_GRANTED); sending_state = SA_MSG_QUEUE_AVAILABLE; mq = (mqueue_t *) ha_malloc(sizeof(mqueue_t)); if (!mq) { cl_log(LOG_ERR, "%s: ha_malloc failed\n", __FUNCTION__); return FALSE; } memset(mq, 0, sizeof(mqueue_t)); mq->name = ha_strdup(name); mq->host = ha_strdup(ha_msg_value(msg, F_ORIG)); mq->mqstat = MQ_STATUS_OPEN; mq->policy = *policy; error = mqueue_table_insert(mq); } /* * master node broadcast the result in the cluster */ if (ha_msg_add(reply, F_TYPE, type) == HA_FAIL || ha_msg_add(reply, F_MQNAME, name) == HA_FAIL || ha_msg_add(reply, F_MQREQUEST, request) == HA_FAIL || ha_msg_addbin(reply, F_MQINVOCATION, invocation, invocation_size) == HA_FAIL || ha_msg_add(reply, F_MQHOST, (mqhost == NULL ? mq->host : mqhost)) == HA_FAIL || ha_msg_addbin(reply, F_MQSTATUS, &sending_state, sizeof(SaMsgQueueSendingStateT)) == HA_FAIL || ha_msg_addbin(reply, F_MQPOLICY, policy, sizeof(int)) == HA_FAIL || ha_msg_addbin(reply, F_MQCREATEFLAG, cflag, cflag_size) == HA_FAIL || ha_msg_addbin(reply, F_MQOPENFLAG, oflag, oflag_size) == HA_FAIL || ha_msg_addbin(reply, F_MQRETENTION, retention, retention_size) == HA_FAIL || ha_msg_add(reply, F_MQSIZE, size_string) == HA_FAIL || ha_msg_add(reply, F_MQERROR, saerror_type2string(error)) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); return FALSE; }send_msg: hb->llc_ops->sendclustermsg(hb, reply); ha_msg_del(reply); return TRUE;}intprocess_mqname_close(struct ha_msg *msg){ const char *name; mqueue_t *mq; client_header_t reply; CMS_TRACE(); if ((name = ha_msg_value(msg, F_MQNAME)) == NULL) { cl_log(LOG_ERR, "received NULL mq name request"); return FALSE; } if (((mq = mqname_lookup(name, NULL)) != NULL) && (mq->mqstat != MQ_STATUS_CLOSE)) { mq->mqstat = MQ_STATUS_CLOSE; cl_log(LOG_INFO, "%s: Set mq [%s] status to [%d]" , __FUNCTION__, name, mq->mqstat); } /* this is the node where the queue is opened */ if (mq->client) { cl_log(LOG_INFO, "%s, sending close reply to the client. ", __FUNCTION__); reply.type = CMS_QUEUE_CLOSE; reply.len = sizeof(client_header_t); reply.flag = SA_OK; reply.name.length = strlen(name); strncpy(reply.name.value, name, SA_MAX_NAME_LENGTH); reply.name.value[reply.name.length] = '\0'; client_send_msg(mq->client, reply.len, &reply); }#if DEBUG_CLUSTER cl_log_message(msg);#endif return SA_OK;}intprocess_mqname_unlink(struct ha_msg *msg){ const char *name; mqueue_t *mq; IPC_Channel * client = NULL; client_header_t reply; if ((name = ha_msg_value(msg, F_MQNAME)) == NULL) { cl_log(LOG_ERR, "received NULL mq name request"); return FALSE; } if ((mq = mqname_lookup(name, NULL)) != NULL) { /* this is the node where the queue is opened */ if (mq->client) { client = (IPC_Channel *)ha_malloc(sizeof(IPC_Channel)); *client = *mq->client; } mqueue_table_remove(name); /* * TODO: remove handle hash also */ } if (client) { cl_log(LOG_INFO, "%s, sending unlink reply to the client. ", __FUNCTION__); reply.type = CMS_QUEUE_UNLINK; reply.len = sizeof(client_header_t); reply.flag = SA_OK; reply.name.length = strlen(name); strncpy(reply.name.value, name, SA_MAX_NAME_LENGTH); reply.name.value[reply.name.length] = '\0'; client_send_msg(client, reply.len, &reply); ha_free(client); }#if DEBUG_CLUSTER cl_log_message(msg);#endif return SA_OK;}intprocess_mqname_send(struct ha_msg *msg, cms_data_t * cmsdata){ const char *name, * gname, * request_type; const void * data, * ack, *invocation, *msg_pri; const SaSizeT * msg_type, * msg_ver, * msg_size, * sendreceive; const char *node; const unsigned long * seq; mqueue_request_t request; message_t * message; SaErrorT ret = SA_OK; size_t data_len, ack_len, invocation_len, seq_len, sendreceive_len; size_t type_len, ver_len, pri_len, size_len; mqueue_t *mq; client_mqueue_notify_t m; const SaUint8T * priority; enum cms_client_msg_type req_type; if ((name = ha_msg_value(msg, F_MQNAME)) == NULL || (request_type = ha_msg_value(msg, F_MQREQUEST)) == NULL || (msg_type = cl_get_binary(msg, F_MQMSGTYPE, &type_len)) == NULL || (msg_ver = cl_get_binary(msg, F_MQMSGVER, &ver_len)) == NULL || (msg_pri = cl_get_binary(msg, F_MQMSGPRI, &pri_len)) == NULL || (msg_size = cl_get_binary(msg, F_MQMSGSIZE, &size_len)) == NULL || (data = cl_get_binary(msg, F_MQMSGDATA, &data_len)) == NULL || (invocation = cl_get_binary(msg, F_MQINVOCATION, &invocation_len)) == NULL || (sendreceive = cl_get_binary(msg, F_SENDRECEIVE, &sendreceive_len)) == NULL || (seq = cl_get_binary(msg, F_MQMSGSEQ, &seq_len)) == NULL || (ack = cl_get_binary(msg, F_MQMSGACK, &ack_len)) == NULL || (node = ha_msg_value(msg, F_ORIG)) == NULL || *msg_size != data_len ) { cl_log(LOG_ERR, "received bad mqname_send request."); return FALSE; } gname = ha_msg_value(msg, F_MQGROUPNAME); priority = (const SaUint8T *) msg_pri; if ((*priority) > SA_MSG_MESSAGE_LOWEST_PRIORITY) { cl_log(LOG_ALERT, "Wrong priorty [%u]\n", *priority); return FALSE; } dprintf("%s: going to send to %s\n", __FUNCTION__, name); if ((mq = mqname_lookup(name, NULL)) != NULL) { dprintf("%s: data_len = %d\n", __FUNCTION__, (int)data_len); dprintf("buff_avai[%d] = %lu\n", *priority, BUFFER_AVAILABLE(mq, *priority)); /* * don't deliver the msg if no buffer */ if ((BUFFER_AVAILABLE(mq, *priority) - data_len - sizeof(SaMsgMessageT)) < 0) { cl_log(LOG_DEBUG, "%s: buffer over flow, msg not " "delivered. ", __FUNCTION__); ret = SA_ERR_QUEUE_FULL; } else { /* * save message to mq->message_buffer */ message = (message_t *) ha_malloc(sizeof(message_t) + data_len); memset(message, 0, sizeof(message_t) + data_len); if (*sendreceive) { message->msgInfo.senderId = get_senderId_by_name(node, *seq); } message->msg.type = *msg_type; message->msg.version = *msg_ver; message->msg.size = *msg_size; message->msg.priority = * priority; message->msg.data = (char *)message + sizeof(message_t); memcpy(message->msg.data, data, data_len); enqueue_message(mq, *priority, message); /* * send only limited info to client */ m.header.type = CMS_MSG_NOTIFY; m.header.len = sizeof(client_header_t); m.header.flag = SA_OK; m.header.name.length = strlen(name) + 1; m.handle = mq->handle; strncpy(m.header.name.value, name, SA_MAX_NAME_LENGTH); /* * send the info to the client */ ret = client_send_msg(mq->client, sizeof(client_mqueue_notify_t), &m); mq->notified = TRUE; ret = SA_OK; } /* * send the ack back, but not for sendreceive */ req_type = cmsrequest_string2type(request_type); if (req_type != CMS_MSG_SEND_RECEIVE && ret == SA_OK && ack) { if (gname) { request.qname = ha_strdup(gname); } else { request.qname = ha_strdup(name); } request.gname = NULL; request.request_type = cmsrequest_string2type(request_type); request.invocation = *(const int *)invocation; request.ack = *(const int *)ack; request.seq = *(const unsigned long *) seq; mqname_send_ack(&request, node, NULL, ret, cmsdata); ha_free(request.qname); dprintf("send the ack, ret = %d\n", ret); } } else { node = ha_msg_value(msg, F_ORIG); cl_log(LOG_ERR, "%s: msg queue not found. the name server" " database on node %s is bad.", __FUNCTION__, node); } return SA_OK;}static voidsend_msg_notify(gpointer data, gpointer user_data){ client_mqueue_notify_t m; mqueue_t * mq = user_data; m.header.type = CMS_MSG_NOTIFY; m.header.len = sizeof(client_header_t); m.header.flag = SA_OK; m.header.name.length = strlen(mq->name) + 1; m.handle = mq->handle; strncpy(m.header.name.value, mq->name, SA_MAX_NAME_LENGTH); client_send_msg(mq->client, sizeof(client_mqueue_notify_t), &m);}static voidsend_migrate_message_notify(mqueue_t * mq){ SaUint8T i; for (i = SA_MSG_MESSAGE_HIGHEST_PRIORITY ; i <= SA_MSG_MESSAGE_LOWEST_PRIORITY ; i++) g_list_foreach(mq->message_buffer[i], send_msg_notify, mq);}/** * process_mqname_granted - process the granted message from the master node * for this message queue name * @msg: received message from heartbeat IPC Channel */intprocess_mqname_granted(struct ha_msg *msg, cms_data_t * cmsdata){ const char *name, *host, *error, *request; const int * invocation, *policy; size_t invocation_size; const SaMsgQueueCreationFlagsT *cflag = NULL, *oflag = NULL; const SaMsgQueueSendingStateT * sending_state; const SaTimeT * retention = NULL; const char *size_string; IPC_Channel *client; mqueue_t *mq, *mq_pending; mqueue_request_t mq_request; cms_client_t * cms_client; guint handle; int flag; if ((name = ha_msg_value(msg, F_MQNAME)) == NULL || (request = ha_msg_value(msg, F_MQREQUEST)) == NULL || (invocation = cl_get_binary(msg, F_MQINVOCATION, &invocation_size)) == NULL || (host = ha_msg_value(msg, F_MQHOST)) == NULL || (policy = cl_get_binary(msg, F_MQPOLICY, NULL)) == NULL || (cflag = cl_get_binary(msg, F_MQCREATEFLAG, NULL)) == NULL || (oflag = cl_get_binary(msg, F_MQOPENFLAG, NULL)) == NULL || (retention = cl_get_binary(msg, F_MQRETENTION, NULL)) == NULL || (sending_state = cl_get_binary(msg, F_MQSTATUS, NULL)) == NULL || (size_string = ha_msg_value(msg, F_MQSIZE)) == NULL || (error = ha_msg_value(msg, F_MQERROR)) == NULL) { cl_log(LOG_ERR, "%s: ha_msg_value error", __FUNCTION__); return FALSE; } flag = saerror_string2type(error); /* * This node might be the mqname master node, so make sure don't * duplicate insertion. */ if ((mq = mqueue_table_lookup(name, NULL)) == NULL) { /* * this is not the master node */ mq = (mqueue_t *) ha_malloc(sizeof(mqueue_t)); if (!mq) { cl_log(LOG_ERR, "%s: ha_malloc failed\n", __FUNCTION__); return FALSE; } memset(mq, 0, sizeof(mqueue_t)); mq->name = ha_strdup(name); mq->host = ha_strdup(host); mq->mqstat = MQ_STATUS_OPEN; mq->client = NULL; mq->policy = *policy; mqueue_table_insert(mq); } sa_mqueue_usage_decode(size_string, NULL, NULL , mq->status.saMsgQueueUsage); /* * set SaMsgQueueStatus in local database */ mq->status.sendingState = *sending_state; mq->status.creationFlags = *cflag; mq->status.openFlags = *oflag; mq->status.retentionTime = *retention; mq->status.headerLength = 0; mq_pending = g_hash_table_lookup(mq_open_pending_hash, name); if (mq_pending != NULL) { /* * This is the local node for this msg queue, * we have clients open pending, send out reply. */ client = mq_pending->client; handle = mqueue_handle_insert(mq); /* * insert this mq to client's opened_mqueue_list */ dprintf("lookup farside_pid [%d] in <%p>\n" , client->farside_pid, cmsdata->client_table); cms_client = g_hash_table_lookup(cmsdata->client_table, &(client->farside_pid)); if (cms_client == NULL){ /* this happens when the impatient client quit before it the response is received. */ cl_log(LOG_WARNING, "the client who requested queue [%s] to be opened does not exist any more.", name); g_hash_table_remove(mq_open_pending_hash, name); ha_free(mq_pending->name); ha_free(mq_pending->client); ha_free(mq_pending); request_mqname_close(name, cmsdata); /* todo: need better error handling here. we should be able to unlink this queue from the client side. request_mqname_unlink(name, cmsdata); */ return FALSE; } cms_client->opened_mqueue_list = g_list_append(cms_client->opened_mqueue_list, mq); mq->client = (IPC_Channel *)ha_malloc(sizeof(IPC_Channel)); if (mq->client == NULL) { cl_log(LOG_ERR, "%s: ha_malloc failed\n", __FUNCTION__); return FALSE; } *mq->client = *mq_pending->client; mq_request.qname = ha_strdup(name); mq_request.invocation = *invocation; mq_request.request_type = cmsrequest_string2type(request); client_send_client_qopen(client, &mq_request, handle, flag); ha_free(mq_request.qname); g_hash_table_remove(mq_open_pending_hash, name); dprintf("%p %p %p\n", mq_pending->name, mq_pending->client , mq_pending); ha_free(mq_pending->name); ha_free(mq_pending->client); ha_free(mq_pending); /* * send out notify msg for migratable mq if any */ send_migrate_message_notify(mq); } return TRUE;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -