📄 client_lib.c
字号:
order_seq = next; } pi->order_seq_head.next = NULL;}static voidzap_order_queue(llc_private_t* pi){ order_queue_t * oq = pi->order_queue_head; order_queue_t * next; int i; while (oq != NULL) { next = oq->next; for (i = 0; i < MAXMSGHIST; i++){ if (oq->node.orderQ[i]){ ZAPMSG(oq->node.orderQ[i]); oq->node.orderQ[i] = NULL; } if (oq->cluster.orderQ[i]){ ZAPMSG(oq->cluster.orderQ[i]); oq->cluster.orderQ[i] = NULL; } } ha_free(oq); oq = next; } pi->order_queue_head = NULL;}/* * Create a new stringlist. */static struct stringlist*new_stringlist(const char *s){ struct stringlist* ret; char * cp; if (s == NULL) { return(NULL); } if ((cp = ha_strdup(s)) == NULL) { return(NULL); } if ((ret = MALLOCT(struct stringlist)) == NULL) { ha_free(cp); return(NULL); } ret->next = NULL; ret->value = cp; return(ret);}/* * Destroy (free) a stringlist. */static voiddestroy_stringlist(struct stringlist * s){ struct stringlist * this; struct stringlist * next; for (this=s; this; this=next) { next = this->next; ha_free(this->value); memset(this, 0, sizeof(*this)); ha_free(this); }}/* * Enqueue a message to be read later. */static intenqueue_msg(llc_private_t* pi, struct ha_msg* msg){ struct MsgQueue* newQelem; if (msg == NULL) { return(HA_FAIL); } if ((newQelem = MALLOCT(struct MsgQueue)) == NULL) { return(HA_FAIL); } newQelem->value = msg; newQelem->prev = pi->lastQdmsg; newQelem->next = NULL; if (pi->lastQdmsg != NULL) { pi->lastQdmsg->next = newQelem; } pi->lastQdmsg = newQelem; if (pi->firstQdmsg == NULL) { pi->firstQdmsg = newQelem; } return HA_OK;}/* * Dequeue a message. */static struct ha_msg *dequeue_msg(llc_private_t* pi){ struct MsgQueue* qret; struct ha_msg* ret = NULL; qret = pi->firstQdmsg; if (qret != NULL) { ret = qret->value; pi->firstQdmsg=qret->next; if (pi->firstQdmsg) { pi->firstQdmsg->prev = NULL; } memset(qret, 0, sizeof(*qret)); /* * The only two pointers to this element are the first pointer, * and the prev pointer of the next element in the queue. * (or possibly lastQdmsg... See below) */ ha_free(qret); } if (pi->firstQdmsg == NULL) { /* Zap lastQdmsg if it pointed at this Q element */ pi->lastQdmsg=NULL; } return(ret);}/* * Search the general callback list for the given message type */static gen_callback_t*search_gen_callback(const char * type, llc_private_t* lcp){ struct gen_callback* gcb; for (gcb=lcp->genlist; gcb != NULL; gcb=gcb->next) { if (strcmp(type, gcb->msgtype) == 0) { return(gcb); } } return(NULL);} /* * Add a general callback to the list of general callbacks. */static intadd_gen_callback(const char * msgtype, llc_private_t* lcp, llc_msg_callback_t funp, void* pd){ struct gen_callback* gcb; char * type; if ((gcb = search_gen_callback(msgtype, lcp)) == NULL) { gcb = MALLOCT(struct gen_callback); if (gcb == NULL) { return(HA_FAIL); } type = ha_strdup(msgtype); if (type == NULL) { ha_free(gcb); return(HA_FAIL); } gcb->msgtype = type; gcb->next = lcp->genlist; lcp->genlist = gcb; }else if (funp == NULL) { return(del_gen_callback(lcp, msgtype)); } gcb->cf = funp; gcb->pd = pd; return(HA_OK);}/* * Delete a general callback from the list of general callbacks. */static int del_gen_callback(llc_private_t* lcp, const char * msgtype){ struct gen_callback* gcb; struct gen_callback* prev = NULL; for (gcb=lcp->genlist; gcb != NULL; gcb=gcb->next) { if (strcmp(msgtype, gcb->msgtype) == 0) { if (prev) { prev->next = gcb->next; }else{ lcp->genlist = gcb->next; } ha_free(gcb->msgtype); gcb->msgtype = NULL; free(gcb); return(HA_OK); } prev = gcb; } return(HA_FAIL);} /* * Read an API message. All other messages are enqueued to be read later. */static struct ha_msg *read_api_msg(llc_private_t* pi){ for (;;) { struct ha_msg* msg; const char * type; pi->chan->ops->waitin(pi->chan); if ((msg=msgfromIPC(pi->chan)) == NULL) { ha_api_perror("read_api_msg: " "Cannot read reply from IPC channel"); continue; } if ((type=ha_msg_value(msg, F_TYPE)) != NULL && strcmp(type, T_APIRESP) == 0) { return(msg); } /* Got an unexpected non-api message */ /* Queue it up for reading later */ enqueue_msg(pi, msg); } /*NOTREACHED*/ return(NULL);}/* * Read a client status respond message either from local node or from * a remote node. All other messages are enqueued to be read later. */static struct ha_msg *read_cstatus_respond_msg(llc_private_t* pi, int timeout){ struct ha_msg* msg; const char * type; struct pollfd pfd; pfd.fd = pi->chan->ops->get_recv_select_fd(pi->chan); pfd.events = POLLIN; while ((pi->chan->ops->is_message_pending(pi->chan)) || (poll(&pfd, 1, timeout) > 0 && pfd.revents == POLLIN)) { while (pi->chan->ops->is_message_pending(pi->chan)) { if ((msg=msgfromIPC(pi->chan)) == NULL) { ha_api_perror("read_api_msg: " "Cannot read reply from IPC channel"); continue; } if (((type=ha_msg_value(msg, F_TYPE)) != NULL && strcmp(type, T_RCSTATUS) == 0) || ((type=ha_msg_value(msg, F_SUBTYPE)) != NULL && strcmp(type, T_RCSTATUS) == 0)) { return(msg); } /* Got an unexpected non-api message */ /* Queue it up for reading later */ enqueue_msg(pi, msg); } } /* Timeout or caught a signal */ return NULL;}/* * Pop up orderQ. */static struct ha_msg *pop_orderQ(struct orderQ * q){ struct ha_msg * msg; if (q->orderQ[q->curr_index]){ msg = q->orderQ[q->curr_index]; q->orderQ[q->curr_index] = NULL; q->curr_index = (q->curr_index + 1) % MAXMSGHIST; q->curr_seqno++; return msg; } return NULL;}/* * Process ordered message */static struct ha_msg *process_ordered_msg(struct orderQ* q, struct ha_msg* msg, seqno_t oseq){ int i; if (oseq < q->curr_seqno || oseq - q->curr_seqno >= MAXMSGHIST){ if (oseq < q->curr_seqno){ /* Sender restarted */ if (DEBUGORDER) cl_log(LOG_DEBUG, "Sender restarted!"); q->curr_seqno = 1; }else { /* * receives a very big sequence number, the * message is not reliable at this point */ if (DEBUGORDER) { cl_log(LOG_DEBUG , "lost at least one unretrievable " "packet! [%lx:%lx], force reset" , q->curr_seqno , oseq); } q->curr_seqno = oseq; } for (i = 0; i < MAXMSGHIST; i++) { /* Clear order queue, msg obsoleted */ if (q->orderQ[i]){ ha_free(q->orderQ[i]); q->orderQ[i] = NULL; } } q->curr_index = 0; } /* Put the new received packet in queue */ q->orderQ[(q->curr_index + oseq - q->curr_seqno) % MAXMSGHIST] = msg; /* Should we send the first ordered packets? */ if (oseq == q->curr_seqno || (q->curr_seqno == 1 && oseq - q->curr_seqno >= SEQGAP)){ if (oseq != q->curr_seqno){ /* * We probably missed first several packets * since the client on from_node restarted. */ if (DEBUGORDER) { cl_log(LOG_DEBUG , "Finally lost SEQGAP pkts, discard, " "oseq 0x%lx, currseq 0x%lx" , oseq, q->curr_seqno); } for (i = q->curr_index ; q->orderQ[i] == NULL ; i = (i+1) % MAXMSGHIST) q->curr_seqno++; q->curr_index = i; if (DEBUGORDER) { cl_log(LOG_DEBUG , "After discard, seqno 0x%lx, index %d" , q->curr_seqno , q->curr_index); } } return pop_orderQ(q); } return NULL;}/* * Process msg gotten from FIFO or msgQ. */static struct ha_msg *process_hb_msg(llc_private_t* pi, struct ha_msg* msg){ const char * coseq; const char * from_node; const char * to_node; seqno_t oseq; order_queue_t * oq; int i; if ((coseq = ha_msg_value(msg, F_ORDERSEQ)) != NULL && sscanf(coseq, "%lx", &oseq) == 1){ /* find the order queue by from_node */ if ((from_node = ha_msg_value(msg, F_ORIG)) == NULL){ ha_api_log(LOG_ERR , "%s: extract F_ORIG failed", __FUNCTION__); ZAPMSG(msg); return NULL; } for (oq = pi->order_queue_head; oq != NULL; oq = oq->next){ if (strcmp(oq->from_node, from_node) == 0) break; } if (oq == NULL){ oq = (order_queue_t *) ha_malloc(sizeof(order_queue_t)); if (oq == NULL){ ha_api_log(LOG_ERR , "%s: order_queue_t malloc failed" , __FUNCTION__); ZAPMSG(msg); return NULL; } strncpy(oq->from_node, from_node, HOSTLENG); oq->node.curr_index = 0; oq->node.curr_seqno = 1; oq->cluster.curr_index = 0; oq->cluster.curr_seqno = 1; for (i=0; i < MAXMSGHIST; i++){ oq->node.orderQ[i] = NULL; oq->cluster.orderQ[i] = NULL; } oq->next = pi->order_queue_head; pi->order_queue_head = oq; } if ((to_node = ha_msg_value(msg, F_TO)) == NULL) return process_ordered_msg(&oq->cluster, msg, oseq); else return process_ordered_msg(&oq->node, msg, oseq); }else /* Simply return no order required msg */ return msg;}/* * Read a heartbeat message. Read from the queue first. */static struct ha_msg *read_hb_msg(ll_cluster_t* llc, int blocking){ llc_private_t* pi; struct ha_msg* msg; struct ha_msg* retmsg; order_queue_t* oq; if (!ISOURS(llc)) { ha_api_log(LOG_ERR, "read_hb_msg: bad cinfo"); return NULL; } pi = (llc_private_t*)llc->ll_cluster_private; if (!pi->SignedOn) { return NULL; } /* Process msg from msgQ */ while ((msg = dequeue_msg(pi))){ if ((retmsg = process_hb_msg(pi, msg))) return retmsg; } for (oq = pi->order_queue_head; oq != NULL; oq = oq->next){ if ((retmsg = pop_orderQ(&oq->node))) return retmsg; if ((retmsg = pop_orderQ(&oq->cluster))) return retmsg; } /* Process msg from FIFO */ while (msgready(llc)){ msg = msgfromIPC(pi->chan); if (msg == NULL) { if (pi->chan->ch_status != IPC_CONNECT) { pi->SignedOn = FALSE; return NULL; } }else if ((retmsg = process_hb_msg(pi, msg))) { return retmsg; } } /* Process msg from orderQ */ if (!blocking) return NULL; /* If this is a blocking call, we keep on reading from FIFO, so * that we can finally return a non-NULL msg to user. */ for(;;) { pi->chan->ops->waitin(pi->chan); msg = msgfromIPC(pi->chan); if (msg == NULL) { if (pi->chan->ch_status != IPC_CONNECT) { pi->SignedOn = FALSE; } return NULL; } if ((retmsg = process_hb_msg(pi, msg))) { return retmsg; } }}/* * Add a callback for the given message type. */static intset_msg_callback(ll_cluster_t* ci, const char * msgtype, llc_msg_callback_t callback, void * p){ ClearLog(); if (!ISOURS(ci)) { ha_api_log(LOG_ERR, "set_msg_callback: bad cinfo"); return HA_FAIL; } return(add_gen_callback(msgtype, (llc_private_t*)ci->ll_cluster_private, callback, p));}/* * Set the node status change callback. */static intset_nstatus_callback (ll_cluster_t* ci, llc_nstatus_callback_t cbf, void * p){ llc_private_t* pi = ci->ll_cluster_private; pi->node_callback = cbf; pi->node_private = p; return(HA_OK);}/* * Set the interface status change callback. */static intset_ifstatus_callback (ll_cluster_t* ci, llc_ifstatus_callback_t cbf, void * p){ llc_private_t* pi = ci->ll_cluster_private; pi->if_callback = cbf; pi->if_private = p; return(HA_OK);}/* * Set the client status change callback. */static intset_cstatus_callback (ll_cluster_t* ci, llc_cstatus_callback_t cbf, void * p){ llc_private_t* pi = ci->ll_cluster_private; ClearLog(); if (!ISOURS(ci)) { ha_api_log(LOG_ERR, "%s: bad cinfo", __FUNCTION__); return HA_FAIL; } if (!pi->SignedOn) { ha_api_log(LOG_ERR, "not signed on"); return HA_FAIL; } pi->cstatus_callback = cbf; pi->client_private = p; return HA_OK;}/* * Call the callback associated with this message (if any) * Return TRUE if a callback was called. */static intCallbackCall(llc_private_t* p, struct ha_msg * msg){ const char * mtype= ha_msg_value(msg, F_TYPE); struct gen_callback* gcb; if (mtype == NULL) { return(0); } /* Special case: node status (change) */ if ((strcasecmp(mtype, T_STATUS) == 0 || strcasecmp(mtype, T_NS_STATUS) == 0)) { /* If DEADSTATUS, cleanup order queue for the node */ if (strcmp(ha_msg_value(msg, F_STATUS), DEADSTATUS) == 0) { order_queue_t * oq = p->order_queue_head; order_queue_t * prev; order_queue_t * next; int i; for (prev = NULL; oq != NULL; prev = oq, oq = oq->next){ if (strcmp(oq->from_node , ha_msg_value(msg, F_ORIG)) == 0) break; } if (oq){ next = oq->next; for (i = 0; i < MAXMSGHIST; i++){ if (oq->node.orderQ[i]) ZAPMSG(oq->node.orderQ[i]); if (oq->cluster.orderQ[i]) ZAPMSG(oq->cluster.orderQ[i]); } ha_free(oq); if (prev) prev->next = next; else p->order_queue_head = next; } } if (p->node_callback) { p->node_callback(ha_msg_value(msg, F_ORIG) , ha_msg_value(msg, F_STATUS), p->node_private); return(1); } } /* Special case: interface status (change) */ if (p->if_callback && strcasecmp(mtype, T_IFSTATUS) == 0) { p->if_callback(ha_msg_value(msg, F_NODE) , ha_msg_value(msg, F_IFNAME) , ha_msg_value(msg, F_STATUS) , p->if_private); return(1); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -