📄 client_lib.c
字号:
ClearLog(); if (!ISOURS(lcl)) { ha_api_log(LOG_ERR, "get_ifstatus: bad cinfo"); return NULL; } pi = (llc_private_t*)lcl->ll_cluster_private; if (!pi->SignedOn) { ha_api_log(LOG_ERR, "not signed on"); return NULL; } if ((request = hb_api_boilerplate(API_IFSTATUS)) == NULL) { return NULL; } if (ha_msg_add(request, F_NODENAME, host) != HA_OK) { ha_api_log(LOG_ERR, "get_ifstatus: cannot add field"); ZAPMSG(request); return NULL; } if (ha_msg_add(request, F_IFNAME, ifname) != HA_OK) { ha_api_log(LOG_ERR, "get_ifstatus: cannot add field"); ZAPMSG(request); return NULL; } /* Send message */ if (msg2ipcchan(request, pi->chan) != HA_OK) { ZAPMSG(request); ha_api_perror("Can't send message to IPC Channel"); return NULL; } ZAPMSG(request); /* Read reply... */ if ((reply=read_api_msg(pi)) == NULL) { return NULL; } if ((result = ha_msg_value(reply, F_APIRESULT)) != NULL && strcmp(result, API_OK) == 0 && (status = ha_msg_value(reply,F_STATUS)) != NULL) { memset(statbuf, 0, sizeof(statbuf)); strncpy(statbuf, status, sizeof(statbuf) - 1); ret = statbuf; }else{ ret = NULL; } ZAPMSG(reply); return ret;}/* * Zap our list of nodes */static voidzap_nodelist(llc_private_t* pi){ destroy_stringlist(pi->nodelist); pi->nodelist=NULL; pi->nextnode = NULL;}/* * Zap our list of interfaces. */static voidzap_iflist(llc_private_t* pi){ destroy_stringlist(pi->iflist); pi->iflist=NULL; pi->nextif = NULL;}static voidzap_order_seq(llc_private_t* pi){ order_seq_t * order_seq = pi->order_seq_head.next; order_seq_t * next; while (order_seq != NULL){ next = order_seq->next; ha_free(order_seq); order_seq = next; } pi->order_seq_head.next = NULL;}static voidzap_order_queue(llc_private_t* pi){ order_queue_t * oq = pi->order_queue_head; order_queue_t * next; int i; while (oq != NULL) { next = oq->next; for (i = 0; i < MAXMSGHIST; i++){ if (oq->node.orderQ[i]){ ZAPMSG(oq->node.orderQ[i]); oq->node.orderQ[i] = NULL; } if (oq->cluster.orderQ[i]){ ZAPMSG(oq->cluster.orderQ[i]); oq->cluster.orderQ[i] = NULL; } } ha_free(oq); oq = next; } pi->order_queue_head = NULL;}/* * Create a new stringlist. */static struct stringlist*new_stringlist(const char *s){ struct stringlist* ret; char * cp; if (s == NULL) { return(NULL); } if ((cp = ha_strdup(s)) == NULL) { return(NULL); } if ((ret = MALLOCT(struct stringlist)) == NULL) { ha_free(cp); return(NULL); } ret->next = NULL; ret->value = cp; return(ret);}/* * Destroy (free) a stringlist. */static voiddestroy_stringlist(struct stringlist * s){ struct stringlist * this; struct stringlist * next; for (this=s; this; this=next) { next = this->next; ha_free(this->value); memset(this, 0, sizeof(*this)); ha_free(this); }}/* * Enqueue a message to be read later. */static intenqueue_msg(llc_private_t* pi, struct ha_msg* msg){ struct MsgQueue* newQelem; if (msg == NULL) { return(HA_FAIL); } if ((newQelem = MALLOCT(struct MsgQueue)) == NULL) { return(HA_FAIL); } newQelem->value = msg; newQelem->prev = pi->lastQdmsg; newQelem->next = NULL; if (pi->lastQdmsg != NULL) { pi->lastQdmsg->next = newQelem; } pi->lastQdmsg = newQelem; if (pi->firstQdmsg == NULL) { pi->firstQdmsg = newQelem; } return HA_OK;}/* * Dequeue a message. */static struct ha_msg *dequeue_msg(llc_private_t* pi){ struct MsgQueue* qret; struct ha_msg* ret = NULL; qret = pi->firstQdmsg; if (qret != NULL) { ret = qret->value; pi->firstQdmsg=qret->next; if (pi->firstQdmsg) { pi->firstQdmsg->prev = NULL; } memset(qret, 0, sizeof(*qret)); /* * The only two pointers to this element are the first pointer, * and the prev pointer of the next element in the queue. * (or possibly lastQdmsg... See below) */ ha_free(qret); } if (pi->firstQdmsg == NULL) { /* Zap lastQdmsg if it pointed at this Q element */ pi->lastQdmsg=NULL; } return(ret);}/* * Search the general callback list for the given message type */static gen_callback_t*search_gen_callback(const char * type, llc_private_t* lcp){ struct gen_callback* gcb; for (gcb=lcp->genlist; gcb != NULL; gcb=gcb->next) { if (strcmp(type, gcb->msgtype) == 0) { return(gcb); } } return(NULL);} /* * Add a general callback to the list of general callbacks. */static intadd_gen_callback(const char * msgtype, llc_private_t* lcp, llc_msg_callback_t funp, void* pd){ struct gen_callback* gcb; char * type; if ((gcb = search_gen_callback(msgtype, lcp)) == NULL) { gcb = MALLOCT(struct gen_callback); if (gcb == NULL) { return(HA_FAIL); } type = ha_strdup(msgtype); if (type == NULL) { ha_free(gcb); return(HA_FAIL); } gcb->msgtype = type; gcb->next = lcp->genlist; lcp->genlist = gcb; }else if (funp == NULL) { return(del_gen_callback(lcp, msgtype)); } gcb->cf = funp; gcb->pd = pd; return(HA_OK);}/* * Delete a general callback from the list of general callbacks. */static int del_gen_callback(llc_private_t* lcp, const char * msgtype){ struct gen_callback* gcb; struct gen_callback* prev = NULL; for (gcb=lcp->genlist; gcb != NULL; gcb=gcb->next) { if (strcmp(msgtype, gcb->msgtype) == 0) { if (prev) { prev->next = gcb->next; }else{ lcp->genlist = gcb->next; } ha_free(gcb->msgtype); gcb->msgtype = NULL; free(gcb); return(HA_OK); } prev = gcb; } return(HA_FAIL);} /* * Read an API message. All other messages are enqueued to be read later. */static struct ha_msg *read_api_msg(llc_private_t* pi){ for (;;) { struct ha_msg* msg; const char * type; pi->chan->ops->waitin(pi->chan); if ((msg=msgfromIPC(pi->chan)) == NULL) { ha_api_perror("read_api_msg: " "Cannot read reply from IPC channel"); continue; } if ((type=ha_msg_value(msg, F_TYPE)) != NULL && strcmp(type, T_APIRESP) == 0) { return(msg); } /* Got an unexpected non-api message */ /* Queue it up for reading later */ enqueue_msg(pi, msg); } /*NOTREACHED*/ return(NULL);}/* * Read a client status respond message either from local node or from * a remote node. All other messages are enqueued to be read later. */static struct ha_msg *read_cstatus_respond_msg(llc_private_t* pi, int timeout){ struct ha_msg* msg; const char * type; struct pollfd pfd; pfd.fd = pi->chan->ops->get_recv_select_fd(pi->chan); pfd.events = POLLIN; while ((pi->chan->ops->is_message_pending(pi->chan)) || (poll(&pfd, 1, timeout) > 0 && pfd.revents == POLLIN)) { while (pi->chan->ops->is_message_pending(pi->chan)) { if ((msg=msgfromIPC(pi->chan)) == NULL) { ha_api_perror("read_api_msg: " "Cannot read reply from IPC channel"); continue; } if (((type=ha_msg_value(msg, F_TYPE)) != NULL && strcmp(type, T_RCSTATUS) == 0) || ((type=ha_msg_value(msg, F_SUBTYPE)) != NULL && strcmp(type, T_RCSTATUS) == 0)) { return(msg); } /* Got an unexpected non-api message */ /* Queue it up for reading later */ enqueue_msg(pi, msg); } } /* Timeout or caught a signal */ return NULL;}/* This is the place to handle out of order messages from a restarted * client. If we receive messages from a restarted client yet no leave * message has been received for the previous client, we need to * save the restarted client's messages in backup queue. When the leave * message is received, we then call moveup_backupQ() so that the backup * queue is promoted to our current queue, not backup any more. */static voidmoveup_backupQ(struct orderQ* q){ int i; if (q == NULL){ return; } if (q->backupQ){ struct orderQ* backup_q = q->backupQ; memcpy(q, backup_q, sizeof(struct orderQ)); if (backup_q->backupQ != NULL){ cl_log(LOG_ERR, "moveup_backupQ:" "backupQ in backupQ is not NULL"); } ha_free(backup_q); q->backupQ = NULL; }else { /*the queue must be empty*/ for (i = 0; i < MAXMSGHIST; i++) { if (q->orderQ[i]){ cl_log(LOG_ERR, "moveup_backupQ:" "queue is not empty" " possible memory leak"); cl_log_message(LOG_ERR, q->orderQ[i]); } } q->curr_oseqno = 0; } return ;}/* * Pop up orderQ. */static struct ha_msg *pop_orderQ(struct orderQ * q){ struct ha_msg * msg; if (q->orderQ[q->curr_index]){ msg = q->orderQ[q->curr_index]; q->orderQ[q->curr_index] = NULL; q->curr_index = (q->curr_index + 1) % MAXMSGHIST; q->curr_oseqno++; return msg; } return NULL;}static intmsg_oseq_compare(seqno_t oseq1, seqno_t gen1, seqno_t oseq2, seqno_t gen2){ int ret; if ( gen1 > gen2){ ret = 1; } else if (gen1 < gen2){ ret = -1; } else { if (oseq1 > oseq2){ ret = 1; } else if (oseq1 < oseq2){ ret = -1; } else{ ret = 0; } } return ret; }static voidreset_orderQ(struct orderQ* q){ int i; for (i =0 ;i < MAXMSGHIST; i++){ if (q->orderQ[i]){ ha_msg_del(q->orderQ[i]); q->orderQ[i] = 0; } } if (q->backupQ != NULL){ reset_orderQ(q->backupQ); ha_free(q->backupQ); q->backupQ = NULL; } memset(q, 0, sizeof(struct orderQ)); return;}/* * Process ordered message */static voiddisplay_orderQ(struct orderQ* q){ if(!q){ return; } cl_log(LOG_INFO, "curr_index=%x, curr_oseqno=%lx, " "curr_gen=%lx, curr_client_gen=%lx", q->curr_index, q->curr_oseqno, q->curr_gen, q->curr_client_gen); cl_log(LOG_INFO, "first_msg_seq =%lx, first_msg_gen = %lx," "first_msg_client_gen =%lx", q->first_msg_seq, q->first_msg_gen, q->first_msg_client_gen); if (q->backupQ == NULL){ cl_log(LOG_INFO, "q->backupQ is NULL"); }else{ display_orderQ(q->backupQ); } }static struct ha_msg *process_ordered_msg(struct orderQ* q, struct ha_msg* msg, seqno_t gen, seqno_t cligen, seqno_t seq, seqno_t oseq, int popmsg){ int i; /* display_orderQ(q); */ /*if this is the first packet, pop it*/ if ( q->first_msg_seq == 0){ q->first_msg_seq = seq; q->first_msg_client_gen = cligen; q->first_msg_gen = gen; q->curr_gen = gen; q->curr_client_gen = cligen; q->curr_oseqno = oseq -1 ; goto out; } /*any message with lower sequence than q->first_msg_seq will be dropped*/ if (q->first_msg_seq != 0 && msg_oseq_compare(q->first_msg_seq, q->first_msg_gen, seq, gen) > 0 ) { return NULL; } if ( q->curr_oseqno == 0){ q->curr_gen = gen; q->curr_client_gen = cligen; goto out; } if ( gen > q->curr_gen ){ /*heartbeat restart, clean everything up*/ reset_orderQ(q); q->first_msg_seq = seq; q->first_msg_client_gen = cligen; q->first_msg_gen = gen; q->curr_gen = gen; q->curr_client_gen = cligen; q->curr_oseqno = oseq - 1; goto out; } else if (gen < q->curr_gen){ /* * message from previous heartbeat generation, * drop the message */ return NULL; } else if(cligen > q->curr_client_gen ){ /*client restarted*/ if (q->backupQ == NULL){ if ( (q->backupQ = ha_malloc(sizeof(struct orderQ))) ==NULL ){ cl_log(LOG_ERR, "process_ordered_msg: " "allocating memory for backupQ failed"); return NULL; } memset(q->backupQ, 0, sizeof(struct orderQ)); } process_ordered_msg(q->backupQ, msg, gen, cligen, seq, oseq, 0); return NULL; } else if (cligen < q->curr_client_gen){ /*Message from a previous client*/ /*this should never happend*/ cl_log(LOG_ERR, "process_ordered_msg: Received message" " from previous client. This should never happen"); cl_log_message(LOG_ERR, msg); return NULL; }else if (oseq - q->curr_oseqno >= MAXMSGHIST){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -