📄 cms_cluster.c
字号:
static intsend_undelivered_message(ll_cluster_t *hb, mqueue_t *mq, const char *node){ message_t * message; struct ha_msg *m; const char *type = mqname_type2string(MQNAME_TYPE_REOPEN_MSGFEED); int invalid = FALSE; const char * request_type; char size_string[PACKSTRSIZE]; sa_mqueue_usage_encode(size_string, NULL, NULL , mq->status.saMsgQueueUsage); dprintf("Timer: current - close %s retention\n" , get_current_satime() - mq->status.closeTime < mq->status.retentionTime ? "<" : ">="); if (RETENTION_TIME_EXPIRES(mq)) { /* retention timer expires */ cl_log(LOG_INFO, "Original node: open a expired queue [%s]" , mq->name); invalid = TRUE; goto end; } while ((message = dequeue_message(mq))) { if ((m = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (ha_msg_add(m, F_TYPE, type) == HA_FAIL || ha_msg_add(m, F_MQNAME, mq->name) == HA_FAIL || ha_msg_addbin(m, F_SENDRECEIVE, (char *) &(message->msgInfo.senderId), sizeof(int)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGTYPE, (char *) &message->msg.type, sizeof(SaSizeT)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGVER, (char *) &message->msg.version, sizeof(SaSizeT)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGSIZE, (char *) &message->msg.size, sizeof(SaSizeT)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGPRI, (char *) &message->msg.priority, sizeof(SaUint8T)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGDATA, (char *) message->msg.data, message->msg.size) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); ha_msg_del(m); return FALSE; } hb->llc_ops->sendnodemsg(hb, m, node); dprintf("Send 1 msgfeed %d size msg to %s\n" , message->msg.size, node); ha_msg_del(m); }end: /* * send the MSGFEED_END message */ type = mqname_type2string(MQNAME_TYPE_MSGFEED_END); if ((m = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (invalid == TRUE) request_type = "invalid"; else request_type = "valid"; if (ha_msg_add(m, F_TYPE, type) == HA_FAIL || ha_msg_add(m, F_MQNAME, mq->name) == HA_FAIL || ha_msg_add(m, F_MQREQUEST, request_type) == HA_FAIL || ha_msg_addbin(m, F_MQPOLICY, &mq->policy, sizeof(int)) == HA_FAIL || ha_msg_addbin(m, F_MQCREATEFLAG, &mq->status.creationFlags, sizeof(SaMsgQueueCreationFlagsT)) == HA_FAIL || ha_msg_addbin(m, F_MQOPENFLAG, &mq->status.openFlags, sizeof(SaMsgQueueOpenFlagsT)) == HA_FAIL || ha_msg_addbin(m, F_MQRETENTION, &mq->status.retentionTime, sizeof(SaTimeT)) == HA_FAIL || ha_msg_add(m, F_MQSIZE, size_string) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); ha_msg_del(m); return FALSE; } hb->llc_ops->sendnodemsg(hb, m, node); dprintf("Send msgfeed_end msg to %s\n", node); ha_msg_del(m); return TRUE;}intprocess_mqname_reopen(struct ha_msg *msg, enum mqname_type type, cms_data_t * cmsdata){ static struct ha_msg * saved_msg = NULL; struct ha_msg * new_msg; const char *name, *node; ll_cluster_t *hb = cmsdata->hb_handle; mqueue_t * mq; message_t * message; const SaSizeT * msg_type, * msg_ver, * msg_size, * msg_pri, * data, * sendreceive; size_t type_len, ver_len, pri_len, size_len, data_len, sendreceive_len; const char *valid; const char *size_string = NULL; const int *s_invocation, *policy; size_t s_invocation_size, cflag_size, oflag_size, retention_size; const SaTimeT * retention = NULL; char *s_request, *s_error; const SaMsgQueueCreationFlagsT *cflag = NULL, *oflag = NULL; SaMsgQueueSendingStateT sending_state = SA_MSG_QUEUE_AVAILABLE; dprintf("%s: type is %d\n", __FUNCTION__, type); if ((name = ha_msg_value(msg, F_MQNAME)) == NULL) { cl_log(LOG_ERR, "%s: ha_msg_value failed",__FUNCTION__); return FALSE; } switch (type) { case MQNAME_TYPE_REOPEN: if ((node = ha_msg_value(msg, F_MQHOST)) == NULL) { cl_log(LOG_ERR, "%s: ha_msg_value failed",__FUNCTION__); return FALSE; } if (((mq = mqname_lookup(name, NULL)) != NULL) && mq->mqstat == MQ_STATUS_CLOSE) { /* Do I have the original mq? */ cl_log(LOG_INFO, "Original mq host is %s", mq->host); if (is_host_local(mq->host, cmsdata)) { /* Send undelivered message to new mq */ send_undelivered_message(hb, mq, node); } } dprintf("mq->mqstat = <%d>\n", mq->mqstat); if (!is_host_local(node, cmsdata)) return TRUE; saved_msg = ha_msg_copy(msg); cl_log(LOG_INFO, "%s: waiting for msgfeed...", __FUNCTION__); break; case MQNAME_TYPE_REOPEN_MSGFEED: if ((sendreceive = cl_get_binary(msg, F_SENDRECEIVE, &sendreceive_len)) == NULL || (msg_type = cl_get_binary(msg, F_MQMSGTYPE, &type_len)) == NULL || (msg_ver = cl_get_binary(msg, F_MQMSGVER, &ver_len)) == NULL || (msg_pri = cl_get_binary(msg, F_MQMSGPRI, &pri_len)) == NULL || (msg_size = cl_get_binary(msg, F_MQMSGSIZE, &size_len)) == NULL || (data = cl_get_binary(msg, F_MQMSGDATA, &data_len)) == NULL) { cl_log(LOG_ERR, "received bad msgfeed msg."); return FALSE; } if ((mq = mqname_lookup(name, NULL)) == NULL && mq->mqstat != MQ_STATUS_CLOSE) { cl_log(LOG_ALERT, "State machine BUG"); return FALSE; } message = (message_t *) ha_malloc(sizeof(SaMsgMessageT) + data_len); if (!message) { cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__); return FALSE; } memset(message, 0, sizeof(message_t)); message->msgInfo.senderId = *sendreceive; message->msg.type = *msg_type; message->msg.version = *msg_ver; message->msg.size = *msg_size; message->msg.priority = *(const SaUint8T *) msg_pri; message->msg.data = (char *)message + sizeof(message_t); memcpy(message->msg.data, data, data_len); enqueue_message(mq, message->msg.priority, message); break; case MQNAME_TYPE_MSGFEED_END: if ((mq = mqname_lookup(name, NULL)) == NULL && mq->mqstat != MQ_STATUS_CLOSE) { cl_log(LOG_ALERT, "State machine BUG"); return FALSE; } mq->mqstat = MQ_STATUS_OPEN; dprintf("in feedend, used is [%d]\n" , mq->status.saMsgQueueUsage[3].queueUsed); /* * read saved_msg */ if (!saved_msg) { /* somehow we received another msg_feedend out of order */ return FALSE; } if ((s_request = ha_strdup(ha_msg_value(saved_msg, F_MQREQUEST))) == NULL || (s_invocation = cl_get_binary(saved_msg, F_MQINVOCATION, &s_invocation_size)) == NULL || (s_error = ha_strdup(ha_msg_value(saved_msg,F_MQERROR))) == NULL) { cl_log(LOG_ERR, "%s: ha_msg_value error", __FUNCTION__); return FALSE; } /* * If mq retention time not expired, we will receive * a valid mq status information, so that we need to * use this original mq status. */ if ((valid = ha_msg_value(msg, F_MQREQUEST)) == NULL) { cl_log(LOG_ERR, "%s: cannot read invalid bit" , __FUNCTION__); return FALSE; } if (strncmp(valid, "invalid", 7) == 0) goto invalid; /* * read msgfeedend */ if ((policy = cl_get_binary(msg, F_MQPOLICY, NULL)) == NULL || (cflag = cl_get_binary(msg, F_MQCREATEFLAG , &cflag_size)) == NULL || (oflag = cl_get_binary(msg, F_MQOPENFLAG , &oflag_size)) == NULL || (retention = cl_get_binary(msg, F_MQRETENTION , &retention_size)) == NULL || (size_string = ha_msg_value(msg, F_MQSIZE)) == NULL) { cl_log(LOG_ERR, "%s: cl_get_binary failed" , __FUNCTION__); } /* * create a new msg according to msgfeedend */ if ((new_msg = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: no memory", __FUNCTION__); return FALSE; } if (ha_msg_add(new_msg, F_TYPE , mqname_type2string(MQNAME_TYPE_GRANTED)) == HA_FAIL || ha_msg_add(new_msg, F_MQNAME, name) == HA_FAIL || ha_msg_add(new_msg, F_MQREQUEST, s_request) == HA_FAIL || ha_msg_addbin(new_msg, F_MQINVOCATION, s_invocation, s_invocation_size) == HA_FAIL || ha_msg_add(new_msg, F_MQHOST, mq->host) == HA_FAIL || ha_msg_addbin(new_msg, F_MQSTATUS, &sending_state, sizeof(SaMsgQueueSendingStateT)) == HA_FAIL || ha_msg_addbin(new_msg, F_MQPOLICY, policy, sizeof(int)) == HA_FAIL || ha_msg_addbin(new_msg, F_MQCREATEFLAG, cflag, cflag_size) == HA_FAIL || ha_msg_addbin(new_msg, F_MQOPENFLAG, oflag, oflag_size) == HA_FAIL || ha_msg_addbin(new_msg, F_MQRETENTION, retention, retention_size) == HA_FAIL || ha_msg_add(new_msg, F_MQSIZE, size_string) == HA_FAIL || ha_msg_add(new_msg, F_MQERROR, s_error) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); ha_msg_del(new_msg); return FALSE; } ha_msg_del(saved_msg); saved_msg = new_msg; goto sendmsg;invalid: /* * If the open request is not set SA_MSG_QUEUE_CREATE, * deny this request, since retention timer expired. */ mq = g_hash_table_lookup(mq_open_pending_hash, name); if (!mq) { cl_log(LOG_ERR, "BUG: cannot find mq in pending hash"); return TRUE; } if (!(mq->status.creationFlags & SA_MSG_QUEUE_CREATE)) { mqueue_request_t reply; cl_log(LOG_INFO, "retention timer expired and " "SA_MSG_QUEUE_CREATE is not set, reject!"); reply.qname = ha_strdup(name); reply.gname = NULL; reply.request_type = cmsrequest_string2type(s_request); reply.invocation = *s_invocation; client_send_client_qopen(mq->client, &reply, -1 , SA_ERR_NOT_EXIST); g_hash_table_remove(mq_open_pending_hash, name); /* * We get the change to unlink the mqueue here. */ request_mqname_unlink(name, cmsdata); ha_free(mq); ha_free(reply.qname); return TRUE; }sendmsg: /* * Send granted message to all. */ ha_msg_mod(saved_msg, F_TYPE , mqname_type2string(MQNAME_TYPE_GRANTED)); hb->llc_ops->sendclustermsg(hb, saved_msg); ha_msg_del(saved_msg); saved_msg = NULL; return TRUE; default: break; } return TRUE;}/** * process_mqname_denied - process the denied message from the master node * for this message queue name * @msg: received message from heartbeat IPC Channel */intprocess_mqname_denied(struct ha_msg *msg){ const char * name, * error, * request; const int * invocation; size_t invocation_size; IPC_Channel *client = NULL; mqueue_request_t reply; mqueue_t * mq_pending; int flag; if ((name = ha_msg_value(msg, F_MQNAME)) == NULL || (error = ha_msg_value(msg, F_MQERROR)) == NULL || (request = ha_msg_value(msg, F_MQREQUEST)) == NULL || (invocation = cl_get_binary(msg, F_MQINVOCATION, &invocation_size)) == NULL ) { cl_log(LOG_ERR, "received NULL mq name or mq error reply"); return FALSE; } flag = saerror_string2type(error); mq_pending = g_hash_table_lookup(mq_open_pending_hash, name); if (mq_pending != NULL) { /* * we have clients open pending, send out reply */ client = mq_pending->client; cl_log(LOG_INFO, "%s: found client <%p>", __FUNCTION__, client); reply.qname = ha_strdup(name); reply.gname = NULL; reply.request_type = cmsrequest_string2type(request); reply.invocation = *invocation; client_send_client_qopen(client, &reply, -1, flag); ha_free(reply.qname); g_hash_table_remove(mq_open_pending_hash, name); ha_free(mq_pending->name); ha_free(mq_pending); } return TRUE;}static voidgroup_mem_dispatch(gpointer data, gpointer user_data){ IPC_Message msg; client_mqgroup_notify_t * cmg = NULL; int size; client_mqgroup_track_t * track = (client_mqgroup_track_t *)data; notify_buffer_t * notify = (notify_buffer_t *)user_data; cmg = (client_mqgroup_notify_t *) malloc(sizeof(client_mqgroup_notify_t)); if (cmg == NULL) { cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__); return; } cmg->header.type = CMS_QUEUEGROUP_NOTIFY; cmg->header.name = notify->name; cmg->policy = notify->policy; cmg->group_name = notify->name; switch (track->flag) { case SA_TRACK_CHANGES: dprintf("group_mem_dispatch: SA_TRACK_CHANGES\n"); size = notify->number * sizeof(SaMsgQueueGroupNotificationT); msg.msg_len = sizeof(client_mqgroup_notify_t) + size; cmg->number = notify->number; cmg = realloc(cmg, msg.msg_len); cmg->data = (char *)cmg + sizeof(client_mqgroup_notify_t); memcpy(cmg->data, notify->change_buff, size); break; case SA_TRACK_CHANGES_ONLY: dprintf("group_mem_dispatch: SA_TRACK_CHANGES_ONLY\n"); size = sizeof(SaMsgQueueGroupNotificationT); msg.msg_len = sizeof(client_mqgroup_notify_t) + size; cmg->number = 1; cmg = realloc(cmg, msg.msg_len); cmg->data = (char *)cmg + sizeof(client_mqgroup_notify_t); *(SaMsgQueueGroupNotificationT *)(cmg->data) = notify->changeonly_buff; break; default: cl_log(LOG_ERR, "Unknown track flag [%d]", track->flag); return; } msg.msg_body = cmg; msg.msg_private = &msg; msg.msg_done = NULL; msg.msg_buf = NULL; /* TODO: msg.msg_done to free memory here */ dprintf("%s: Send Track information to my clients...\n", __FUNCTION__); track->ch->ops->send(track->ch, &msg);}intprocess_mqgroup_insert(struct ha_msg *msg){ const char *gname, *name; mqueue_t *mqg, *mq; notify_buffer_t buf; if ((gname = ha_msg_value(msg, F_MQGROUPNAME)) == NULL) { cl_log(LOG_ERR, "received NULL mq group name request"); return FALSE; } if ((name = ha_msg_value(msg, F_MQNAME)) == NULL) { cl_log(LOG_ERR, "received NULL mq name request"); return FALSE; } dprintf("%s: gname = %s, qname = %s\n", __FUNCTION__, gname, name); /* * Check carefully again here in case there are mess * message in cluster. */ if ((mqg = mqname_lookup(gname, NULL)) == NULL) { cl_log(LOG_ERR , "group name [%s] doesn't exist in local database!" , gname); return FALSE; } if (mqg->policy == 0) { cl_log(LOG_ERR, "[%s] is a mq group name instead of a mq name" , gname); return FALSE; } if ((mq = mqname_lookup(name, NULL)) == NULL) { cl_log(LOG_ERR , "mq name [%s] doesn't exist in local database!" , name); return FALSE; } /* * The mqueue may already in the group, i.e this * node is the master name node. */ if (g_list_find(mqg->list, mq) == NULL) { mqg->list = g_list_append(mqg->list, mq); cl_log(LOG_INFO, "Adding mq <%p> to [%s] list", mq, gname); } /* * Update the mqueue list to point to append the group. */ if (g_list_find(mq->list, mqg) == NULL) { mq->list = g_list_append(mq->list, mqg); cl_log(LOG_INFO, "Adding mqg <%p> to [%s] list", mqg, name); } /* * Current Round Robin counter set to the first list, * we may want to set it as a random index to gain * more load balance. */ if (mqg->current == NULL) mqg->current = g_list_first(mqg->list); /* * Notify my clients who care about the group * membership change message. */ if (mqg->notify_list != NULL) { strcpy(buf.changeonly_buff.member.queueName.value, name); buf.changeonly_buff.member.queueName.length = strlen(name) + 1; buf.changeonly_buff.member.queueStatus = mq->status; buf.changeonly_buff.change = SA_MSG_QUEUE_GROUP_ADDED; buf.policy = mqg->policy; buf.number = 0; strcpy(buf.name.value, gname); buf.name.length = strlen(gname) + 1; buf.change_buff = NULL; g_list_foreach(mqg->list, mqueue_copy_notify_data, &buf);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -