📄 cms_cluster.c
字号:
mq->name = ha_strdup(name); mq->host = ha_strdup(host); mq->mqstat = MQ_STATUS_OPEN; mq->client = NULL; mq->policy = *policy; mqueue_table_insert(mq); } sa_mqueue_usage_decode(size_string, NULL, NULL , mq->status.saMsgQueueUsage); /* * set SaMsgQueueStatus in local database */ mq->status.sendingState = *sending_state; mq->status.creationFlags = *cflag; mq->status.openFlags = *oflag; mq->status.retentionTime = *retention; mq->status.headerLength = 0; mq_pending = g_hash_table_lookup(mq_open_pending_hash, name); if (mq_pending != NULL) { /* * This is the local node for this msg queue, * we have clients open pending, send out reply. */ client = mq_pending->client; handle = mqueue_handle_insert(mq); /* * insert this mq to client's opened_mqueue_list */ dprintf("lookup farside_pid [%d] in <%p>\n" , client->farside_pid, cmsdata->client_table); cms_client = g_hash_table_lookup(cmsdata->client_table, &(client->farside_pid)); assert(cms_client != NULL); cms_client->opened_mqueue_list = g_list_append(cms_client->opened_mqueue_list, mq); mq->client = (IPC_Channel *)ha_malloc(sizeof(IPC_Channel)); if (mq->client == NULL) { cl_log(LOG_ERR, "%s: ha_malloc failed\n", __FUNCTION__); return FALSE; } *mq->client = *mq_pending->client; mq_request.qname = ha_strdup(name); mq_request.invocation = *invocation; mq_request.request_type = cmsrequest_string2type(request); client_send_client_qopen(client, &mq_request, handle, flag); ha_free(mq_request.qname); g_hash_table_remove(mq_open_pending_hash, name); dprintf("%p %p %p\n", mq_pending->name, mq_pending->client , mq_pending); ha_free(mq_pending->name); ha_free(mq_pending->client); ha_free(mq_pending); /* * send out notify msg for migratable mq if any */ send_migrate_message_notify(mq); } return TRUE;}static intsend_undelivered_message(ll_cluster_t *hb, mqueue_t *mq, const char *node){ SaMsgMessageT * message; struct ha_msg *m; const char *type = mqname_type2string(MQNAME_TYPE_REOPEN_MSGFEED); int invalid = FALSE; const char * request_type; char size_string[PACKSTRSIZE]; sa_mqueue_usage_encode(size_string, NULL, NULL , mq->status.saMsgQueueUsage); dprintf("Timer: current - close %s retention\n" , get_current_satime() - mq->status.closeTime < mq->status.retentionTime ? "<" : ">="); if (RETENTION_TIME_EXPIRES(mq)) { /* retention timer expires */ cl_log(LOG_INFO, "Original node: open a expired queue [%s]" , mq->name); invalid = TRUE; goto end; } while ((message = dequeue_message(mq))) { if ((m = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (ha_msg_add(m, F_TYPE, type) == HA_FAIL || ha_msg_add(m, F_MQNAME, mq->name) == HA_FAIL || ha_msg_addbin(m, F_MQMSGTYPE, (char *) &message->type, sizeof(SaSizeT)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGVER, (char *) &message->version, sizeof(SaSizeT)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGSIZE, (char *) &message->size, sizeof(SaSizeT)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGPRI, (char *) &message->priority, sizeof(SaUint8T)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGDATA, (char *) message->data, message->size) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); ha_msg_del(m); return FALSE; } hb->llc_ops->sendnodemsg(hb, m, node); dprintf("Send 1 msgfeed %d size msg to %s\n" , message->size, node); ha_msg_del(m); }end: /* * send the MSGFEED_END message */ type = mqname_type2string(MQNAME_TYPE_MSGFEED_END); if ((m = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (invalid == TRUE) request_type = "invalid"; else request_type = "valid"; if (ha_msg_add(m, F_TYPE, type) == HA_FAIL || ha_msg_add(m, F_MQNAME, mq->name) == HA_FAIL || ha_msg_add(m, F_MQREQUEST, request_type) == HA_FAIL || ha_msg_addbin(m, F_MQPOLICY, &mq->policy, sizeof(int)) == HA_FAIL || ha_msg_addbin(m, F_MQCREATEFLAG, &mq->status.creationFlags, sizeof(SaMsgQueueCreationFlagsT)) == HA_FAIL || ha_msg_addbin(m, F_MQOPENFLAG, &mq->status.openFlags, sizeof(SaMsgQueueOpenFlagsT)) == HA_FAIL || ha_msg_addbin(m, F_MQRETENTION, &mq->status.retentionTime, sizeof(SaTimeT)) == HA_FAIL || ha_msg_add(m, F_MQSIZE, size_string) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); ha_msg_del(m); return FALSE; } hb->llc_ops->sendnodemsg(hb, m, node); dprintf("Send msgfeed_end msg to %s\n", node); ha_msg_del(m); return TRUE;}intprocess_mqname_reopen(struct ha_msg *msg, enum mqname_type type, cms_data_t * cmsdata){ static struct ha_msg * saved_msg = NULL; struct ha_msg * new_msg; const char *name, *node; ll_cluster_t *hb = cmsdata->hb_handle; mqueue_t * mq; SaMsgMessageT * message; const SaSizeT * msg_type, * msg_ver, * msg_size, * msg_pri, * data; size_t type_len, ver_len, pri_len, size_len, data_len; const char *valid; const char *size_string = NULL; const int *s_invocation, *policy; size_t s_invocation_size, cflag_size, oflag_size, retention_size; const SaTimeT * retention = NULL; char *s_request, *s_error; const SaMsgQueueCreationFlagsT *cflag = NULL, *oflag = NULL; SaMsgQueueSendingStateT sending_state = SA_MSG_QUEUE_AVAILABLE; dprintf("%s: type is %d\n", __FUNCTION__, type); if ((name = ha_msg_value(msg, F_MQNAME)) == NULL) { cl_log(LOG_ERR, "%s: ha_msg_value failed",__FUNCTION__); return FALSE; } switch (type) { case MQNAME_TYPE_REOPEN: if ((node = ha_msg_value(msg, F_MQHOST)) == NULL) { cl_log(LOG_ERR, "%s: ha_msg_value failed",__FUNCTION__); return FALSE; } if (((mq = mqname_lookup(name, NULL)) != NULL) && mq->mqstat == MQ_STATUS_CLOSE) { /* Do I have the original mq? */ cl_log(LOG_INFO, "Original mq host is %s", mq->host); if (is_host_local(mq->host, cmsdata)) { /* Send undelivered message to new mq */ send_undelivered_message(hb, mq, node); } } dprintf("mq->mqstat = <%d>\n", mq->mqstat); if (!is_host_local(node, cmsdata)) return TRUE; saved_msg = ha_msg_copy(msg); cl_log(LOG_INFO, "%s: waiting for msgfeed...", __FUNCTION__); break; case MQNAME_TYPE_REOPEN_MSGFEED: if ((msg_type = cl_get_binary(msg, F_MQMSGTYPE, &type_len)) == NULL || (msg_ver = cl_get_binary(msg, F_MQMSGVER, &ver_len)) == NULL || (msg_pri = cl_get_binary(msg, F_MQMSGPRI, &pri_len)) == NULL || (msg_size = cl_get_binary(msg, F_MQMSGSIZE, &size_len)) == NULL || (data = cl_get_binary(msg, F_MQMSGDATA, &data_len)) == NULL) { cl_log(LOG_ERR, "received bad msgfeed msg."); return FALSE; } if ((mq = mqname_lookup(name, NULL)) == NULL || mq->mqstat != MQ_STATUS_CLOSE) { cl_log(LOG_ALERT, "State machine BUG"); return FALSE; } message = (SaMsgMessageT *) ha_malloc(sizeof(SaMsgMessageT) + data_len); if (!message) { cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__); return FALSE; } message->type = *msg_type; message->version = *msg_ver; message->size = *msg_size; message->priority = *(const SaUint8T *) msg_pri; message->data = (char *)message + sizeof(SaMsgMessageT); memcpy(message->data, data, data_len); enqueue_message(mq, message->priority, message); break; case MQNAME_TYPE_MSGFEED_END: if ((mq = mqname_lookup(name, NULL)) == NULL || mq->mqstat != MQ_STATUS_CLOSE) { cl_log(LOG_ALERT, "State machine BUG"); return FALSE; } mq->mqstat = MQ_STATUS_OPEN; dprintf("in feedend, used is [%d]\n" , mq->status.saMsgQueueUsage[3].queueUsed); /* * read saved_msg */ if ((s_request = ha_strdup(ha_msg_value(saved_msg, F_MQREQUEST))) == NULL || (s_invocation = cl_get_binary(saved_msg, F_MQINVOCATION, &s_invocation_size)) == NULL || (s_error = ha_strdup(ha_msg_value(saved_msg,F_MQERROR))) == NULL) { cl_log(LOG_ERR, "%s: ha_msg_value error", __FUNCTION__); return FALSE; } /* * If mq retention time not expired, we will receive * a valid mq status information, so that we need to * use this original mq status. */ if ((valid = ha_msg_value(msg, F_MQREQUEST)) == NULL) { cl_log(LOG_ERR, "%s: cannot read invalid bit" , __FUNCTION__); return FALSE; } if (strncmp(valid, "invalid", 7) == 0) goto invalid; /* * read msgfeedend */ if ((policy = cl_get_binary(msg, F_MQPOLICY, NULL)) == NULL || (cflag = cl_get_binary(msg, F_MQCREATEFLAG , &cflag_size)) == NULL || (oflag = cl_get_binary(msg, F_MQOPENFLAG , &oflag_size)) == NULL || (retention = cl_get_binary(msg, F_MQRETENTION , &retention_size)) == NULL || (size_string = ha_msg_value(msg, F_MQSIZE)) == NULL) { cl_log(LOG_ERR, "%s: cl_get_binary failed" , __FUNCTION__); } /* * create a new msg according to msgfeedend */ if ((new_msg = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: no memory", __FUNCTION__); return FALSE; } if (ha_msg_add(new_msg, F_TYPE , mqname_type2string(MQNAME_TYPE_GRANTED)) == HA_FAIL || ha_msg_add(new_msg, F_MQNAME, name) == HA_FAIL || ha_msg_add(new_msg, F_MQREQUEST, s_request) == HA_FAIL || ha_msg_addbin(new_msg, F_MQINVOCATION, s_invocation, s_invocation_size) == HA_FAIL || ha_msg_add(new_msg, F_MQHOST, mq->host) == HA_FAIL || ha_msg_addbin(new_msg, F_MQSTATUS, &sending_state, sizeof(SaMsgQueueSendingStateT)) == HA_FAIL || ha_msg_addbin(new_msg, F_MQPOLICY, policy, sizeof(int)) == HA_FAIL || ha_msg_addbin(new_msg, F_MQCREATEFLAG, cflag, cflag_size) == HA_FAIL || ha_msg_addbin(new_msg, F_MQOPENFLAG, oflag, oflag_size) == HA_FAIL || ha_msg_addbin(new_msg, F_MQRETENTION, retention, retention_size) == HA_FAIL || ha_msg_add(new_msg, F_MQSIZE, size_string) == HA_FAIL || ha_msg_add(new_msg, F_MQERROR, s_error) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); ha_msg_del(new_msg); return FALSE; } ha_msg_del(saved_msg); saved_msg = new_msg; goto sendmsg;invalid: /* * If the open request set SA_MSG_QUEUE_OPEN_ONLY, * deny this request, since retention timer expired. */ mq = g_hash_table_lookup(mq_open_pending_hash, name); if (!mq) { cl_log(LOG_ERR, "BUG: cannot find mq in pending hash"); return TRUE; } if ((mq->status.creationFlags & SA_MSG_QUEUE_OPEN_ONLY)) { mqueue_request_t reply; cl_log(LOG_INFO, "retention timer expired and " "SA_MSG_QUEUE_OPEN_ONLY is set, reject!"); reply.qname = ha_strdup(name); reply.gname = NULL; reply.request_type = cmsrequest_string2type(s_request); reply.invocation = *s_invocation; client_send_client_qopen(mq->client, &reply, -1 , SA_ERR_NOT_EXIST); g_hash_table_remove(mq_open_pending_hash, name); /* * We get the change to unlink the mqueue here. */ request_mqname_unlink(name, cmsdata); ha_free(mq); ha_free(reply.qname); return TRUE; }sendmsg: /* * Send granted message to all. */ ha_msg_mod(saved_msg, F_TYPE , mqname_type2string(MQNAME_TYPE_GRANTED)); hb->llc_ops->sendclustermsg(hb, saved_msg); ha_msg_del(saved_msg); saved_msg = NULL; return TRUE; default: break; } return TRUE;}/** * process_mqname_denied - process the denied message from the master node * for this message queue name * @msg: received message from heartbeat IPC Channel */intprocess_mqname_denied(struct ha_msg *msg){ const char * name, * error, * request; const int * invocation; size_t invocation_size; IPC_Channel *client = NULL; mqueue_request_t reply; mqueue_t * mq_pending; int flag; if ((name = ha_msg_value(msg, F_MQNAME)) == NULL || (error = ha_msg_value(msg, F_MQERROR)) == NULL || (request = ha_msg_value(msg, F_MQREQUEST)) == NULL || (invocation = cl_get_binary(msg, F_MQINVOCATION, &invocation_size)) == NULL ) { cl_log(LOG_ERR, "received NULL mq name or mq error reply"); return FALSE; } flag = saerror_string2type(error); mq_pending = g_hash_table_lookup(mq_open_pending_hash, name); if (mq_pending != NULL) { /* * we have clients open pending, send out reply */ client = mq_pending->client; cl_log(LOG_INFO, "%s: found client <%p>", __FUNCTION__, client); reply.qname = ha_strdup(name); reply.gname = NULL; reply.request_type = cmsrequest_string2type(request); reply.invocation = *invocation;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -