📄 client_lib.c
字号:
/* * receives a very big sequence number, the * message is not reliable at this point */ if (DEBUGORDER) { cl_log(LOG_DEBUG , "lost at least one unretrievable " "packet! [%lx:%lx], force reset" , q->curr_oseqno , oseq); } q->curr_oseqno = oseq - 1; for (i = 0; i < MAXMSGHIST; i++) { /* Clear order queue, msg obsoleted */ if (q->orderQ[i]){ ha_msg_del(q->orderQ[i]); q->orderQ[i] = NULL; } } q->curr_index = 0; } out: /* Put the new received packet in queue */ q->orderQ[(q->curr_index + oseq - q->curr_oseqno -1 ) % MAXMSGHIST] = msg; /* if this is the packet we are expecting, pop it*/ if (popmsg && msg_oseq_compare(q->curr_oseqno + 1, q->curr_gen,oseq, gen) == 0){ return pop_orderQ(q); } return NULL; }static struct ha_msg* process_client_status_msg(llc_private_t* pi, struct ha_msg* msg, const char* from_node){ const char* status = ha_msg_value(msg, F_STATUS); order_queue_t * oq; struct ha_msg* retmsg; if (status && (strcmp(status, LEAVESTATUS) == 0 || strcmp(status, JOINSTATUS) == 0) ){ for (oq = pi->order_queue_head; oq != NULL; oq = oq->next){ if (strcmp(oq->from_node, from_node) == 0){ break; } } if (oq == NULL){ /*no ordered queue found, good, *simply return the message */ return msg; } if (strcmp(status, LEAVESTATUS) == 0 ){ if (oq->leave_msg != NULL){ cl_log(LOG_ERR, "process_client_status_msg: " " the previous leave msg " "is not delivered yet"); cl_log_message(LOG_ERR, oq->leave_msg); cl_log_message(LOG_ERR, msg); return NULL; } oq->leave_msg = msg; if ((retmsg = pop_orderQ(&oq->node))){ return retmsg; } if ((retmsg = pop_orderQ(&oq->cluster))){ return retmsg; } oq->leave_msg = NULL; moveup_backupQ(&oq->node); moveup_backupQ(&oq->cluster); return msg; }else { /*join message*/ return msg; } }else{ cl_log(LOG_ERR, "process_client_status_msg: " "no status found in client status msg"); cl_log_message(LOG_ERR, msg); return NULL; } return msg;}/* * Process msg gotten from FIFO or msgQ. */static struct ha_msg *process_hb_msg(llc_private_t* pi, struct ha_msg* msg){ const char * from_node; const char * to_node; order_queue_t * oq; const char * coseq; seqno_t oseq; const char * cgen; seqno_t gen; const char* cseq; seqno_t seq; const char* ccligen; seqno_t cligen; if ((cseq = ha_msg_value(msg, F_SEQ)) == NULL || sscanf(cseq, "%lx", &seq) != 1){ return msg; } if ((cgen = ha_msg_value(msg, F_HBGENERATION)) == NULL || sscanf(cgen, "%lx", &gen) != 1){ return msg; } if ((ccligen = ha_msg_value(msg, F_CLIENT_GENERATION)) == NULL || sscanf(ccligen, "%lx", &cligen) != 1){ return msg; } if ((from_node = ha_msg_value(msg, F_ORIG)) == NULL){ ha_api_log(LOG_ERR , "%s: extract F_ORIG failed", __FUNCTION__); ZAPMSG(msg); return NULL; } if ((coseq = ha_msg_value(msg, F_ORDERSEQ)) != NULL && sscanf(coseq, "%lx", &oseq) == 1){ /* find the order queue by from_node */ for (oq = pi->order_queue_head; oq != NULL; oq = oq->next){ if (strcmp(oq->from_node, from_node) == 0) break; } if (oq == NULL){ oq = (order_queue_t *) ha_malloc(sizeof(order_queue_t)); if (oq == NULL){ ha_api_log(LOG_ERR , "%s: order_queue_t malloc failed" , __FUNCTION__); ZAPMSG(msg); return NULL; } memset(oq, 0, sizeof(*oq)); strncpy(oq->from_node, from_node, HOSTLENG); oq->next = pi->order_queue_head; pi->order_queue_head = oq; } if ((to_node = ha_msg_value(msg, F_TO)) == NULL) return process_ordered_msg(&oq->cluster, msg, gen, cligen, seq, oseq, 1); else return process_ordered_msg(&oq->node, msg, gen, cligen, seq, oseq, 1); }else { const char* type = ha_msg_value(msg, F_TYPE); if ( type && strcmp(type, T_APICLISTAT) == 0){ return process_client_status_msg(pi, msg, from_node); } /* Simply return no order required msg */ return msg; }}/* * Read a heartbeat message. Read from the queue first. */static struct ha_msg *read_hb_msg(ll_cluster_t* llc, int blocking){ llc_private_t* pi; struct ha_msg* msg; struct ha_msg* retmsg; order_queue_t* oq; if (!ISOURS(llc)) { ha_api_log(LOG_ERR, "read_hb_msg: bad cinfo"); return NULL; } pi = (llc_private_t*)llc->ll_cluster_private; if (!pi->SignedOn) { return NULL; } /* Process msg from msgQ */ while ((msg = dequeue_msg(pi))){ if ((retmsg = process_hb_msg(pi, msg))) return retmsg; } for (oq = pi->order_queue_head; oq != NULL; oq = oq->next){ process_oq: if ((retmsg = pop_orderQ(&oq->node))){ return retmsg; } if ((retmsg = pop_orderQ(&oq->cluster))){ return retmsg; } if (oq->leave_msg != NULL){ retmsg = oq->leave_msg; oq->leave_msg = NULL; oq->client_leaving = 1; return retmsg; } if (oq->client_leaving){ moveup_backupQ(&oq->node); moveup_backupQ(&oq->cluster); oq->client_leaving = 0; goto process_oq; } } /* Process msg from FIFO */ while (msgready(llc)){ msg = msgfromIPC(pi->chan); if (msg == NULL) { if (pi->chan->ch_status != IPC_CONNECT) { pi->SignedOn = FALSE; return NULL; } }else if ((retmsg = process_hb_msg(pi, msg))) { return retmsg; } } /* Process msg from orderQ */ if (!blocking) return NULL; /* If this is a blocking call, we keep on reading from FIFO, so * that we can finally return a non-NULL msg to user. */ for(;;) { pi->chan->ops->waitin(pi->chan); msg = msgfromIPC(pi->chan); if (msg == NULL) { if (pi->chan->ch_status != IPC_CONNECT) { pi->SignedOn = FALSE; } return NULL; } if ((retmsg = process_hb_msg(pi, msg))) { return retmsg; } }}/* * Add a callback for the given message type. */static intset_msg_callback(ll_cluster_t* ci, const char * msgtype, llc_msg_callback_t callback, void * p){ ClearLog(); if (!ISOURS(ci)) { ha_api_log(LOG_ERR, "set_msg_callback: bad cinfo"); return HA_FAIL; } return(add_gen_callback(msgtype, (llc_private_t*)ci->ll_cluster_private, callback, p));}/* * Set the node status change callback. */static intset_nstatus_callback (ll_cluster_t* ci, llc_nstatus_callback_t cbf, void * p){ llc_private_t* pi = ci->ll_cluster_private; pi->node_callback = cbf; pi->node_private = p; return(HA_OK);}/* * Set the interface status change callback. */static intset_ifstatus_callback (ll_cluster_t* ci, llc_ifstatus_callback_t cbf, void * p){ llc_private_t* pi = ci->ll_cluster_private; pi->if_callback = cbf; pi->if_private = p; return(HA_OK);}/* * Set the client status change callback. */static intset_cstatus_callback (ll_cluster_t* ci, llc_cstatus_callback_t cbf, void * p){ llc_private_t* pi = ci->ll_cluster_private; ClearLog(); if (!ISOURS(ci)) { ha_api_log(LOG_ERR, "%s: bad cinfo", __FUNCTION__); return HA_FAIL; } if (!pi->SignedOn) { ha_api_log(LOG_ERR, "not signed on"); return HA_FAIL; } pi->cstatus_callback = cbf; pi->client_private = p; return HA_OK;}/* * Call the callback associated with this message (if any) * Return TRUE if a callback was called. */static intCallbackCall(llc_private_t* p, struct ha_msg * msg){ const char * mtype= ha_msg_value(msg, F_TYPE); struct gen_callback* gcb; if (mtype == NULL) { return(0); } /* Special case: node status (change) */ if ((strcasecmp(mtype, T_STATUS) == 0 || strcasecmp(mtype, T_NS_STATUS) == 0)) { /* If DEADSTATUS, cleanup order queue for the node */ if (strcmp(ha_msg_value(msg, F_STATUS), DEADSTATUS) == 0) { order_queue_t * oq = p->order_queue_head; order_queue_t * prev; order_queue_t * next; int i; for (prev = NULL; oq != NULL; prev = oq, oq = oq->next){ if (strcmp(oq->from_node , ha_msg_value(msg, F_ORIG)) == 0) break; } if (oq){ next = oq->next; for (i = 0; i < MAXMSGHIST; i++){ if (oq->node.orderQ[i]) ZAPMSG(oq->node.orderQ[i]); if (oq->cluster.orderQ[i]) ZAPMSG(oq->cluster.orderQ[i]); } ha_free(oq); if (prev) prev->next = next; else p->order_queue_head = next; } } if (p->node_callback) { p->node_callback(ha_msg_value(msg, F_ORIG) , ha_msg_value(msg, F_STATUS), p->node_private); return(1); } } /* Special case: interface status (change) */ if (p->if_callback && strcasecmp(mtype, T_IFSTATUS) == 0) { p->if_callback(ha_msg_value(msg, F_NODE) , ha_msg_value(msg, F_IFNAME) , ha_msg_value(msg, F_STATUS) , p->if_private); return(1); } /* Special case: client status (change) */ if (p->cstatus_callback && strcasecmp(mtype, T_APICLISTAT) == 0) { p->cstatus_callback(ha_msg_value(msg, F_ORIG) , ha_msg_value(msg, F_FROMID) , ha_msg_value(msg, F_STATUS) , p->client_private); return(1); } if (p->cstatus_callback && strcasecmp(mtype, T_RCSTATUS) == 0) { p->cstatus_callback(ha_msg_value(msg, F_ORIG) , ha_msg_value(msg, F_CLIENTNAME) , ha_msg_value(msg, F_CLIENTSTATUS) , p->client_private); return(1); } /* The general case: Any other message type */ if ((gcb = search_gen_callback(mtype, p)) != NULL) { gcb->cf(msg, gcb->pd); return 1; } return(0);}/* * Return the next message not handled by a callback. * Invoke callbacks for messages encountered along the way. */static struct ha_msg *read_msg_w_callbacks(ll_cluster_t* llc, int blocking){ struct ha_msg* msg = NULL; llc_private_t* pi; if (!ISOURS(llc)) { ha_api_log(LOG_ERR, "read_msg_w_callbacks: bad cinfo"); return HA_FAIL; } pi = (llc_private_t*) llc->ll_cluster_private; if (!pi->SignedOn) { ha_api_log(LOG_ERR, "read_msg_w_callbacks: Not signed on"); return NULL; } do { if (msg) { ZAPMSG(msg); } msg = read_hb_msg(llc, blocking); }while (msg && CallbackCall(pi, msg)); return(msg);}/* * Receive messages. Activate callbacks. Messages without callbacks * are ignored. Potentially several messages could be acted on. */static intrcvmsg(ll_cluster_t* llc, int blocking){ struct ha_msg* msg = NULL; msg=read_msg_w_callbacks(llc, blocking); if (msg) { ZAPMSG(msg); return(1); } return(0);}/* * Initialize nodewalk. (mainly retrieve list of nodes) */static intinit_nodewalk (ll_cluster_t* ci){ llc_private_t* pi; ClearLog(); if (!ISOURS(ci)) { ha_api_log(LOG_ERR, "init_nodewalk: bad cinfo"); return HA_FAIL; } pi = (llc_private_t*)ci->ll_cluster_private; if (!pi->SignedOn) { ha_api_log(LOG_ERR, "not signed on"); return HA_FAIL; } zap_nodelist(pi); return(get_nodelist(pi));}/* * Return the next node in the list, or NULL if none. */static const char *nextnode (ll_cluster_t* ci){ llc_private_t* pi; const char * ret; ClearLog(); if (!ISOURS(ci)) { ha_api_log(LOG_ERR, "nextnode: bad cinfo"); return NULL; } pi = (llc_private_t*)ci->ll_cluster_private; if (!pi->SignedOn) { ha_api_log(LOG_ERR, "not signed on"); return NULL; } if (pi->nextnode == NULL) { return(NULL); } ret = pi->nextnode->value; pi->nextnode = pi->nextnode->next; return(ret);}/* * Clean up after a nodewalk (throw away node list) */static intend_nodewalk(ll_cluster_t* ci){ llc_private_t* pi; ClearLog(); if (!ISOURS(ci)) { ha_api_log(LOG_ERR, "end_nodewalk: bad cinfo"); return HA_FAIL; } pi = ci->ll_cluster_private; if (!pi->SignedOn) { ha_api_log(LOG_ERR, "not signed on"); return HA_FAIL; } zap_nodelist(pi); return(HA_OK);}/* * Initialize interface walk. (mainly retrieve list of interfaces) */static intinit_ifwalk (ll_cluster_t* ci, const char * host){ llc_private_t* pi; ClearLog(); if (!ISOURS(ci)) { ha_api_log(LOG_ERR, "init_ifwalk: bad cinfo"); return HA_FAIL; } pi = (llc_private_t*)ci->ll_cluster_private; if (!pi->SignedOn) { ha_api_log(LOG_ERR, "not signed on"); return HA_FAIL; } zap_iflist(pi); return(get_iflist(pi, host));}/* * Return the next interface in the iflist, or NULL if none. */static const char *nextif (ll_cluster_t* ci){ llc_private_t* pi = ci->ll_cluster_private; const char * ret; ClearLog(); if (!ISOURS(ci)) { ha_api_log(LOG_ERR, "nextif: bad cinfo"); return HA_FAIL; } if (!pi->SignedOn) { ha_api_log(LOG_ERR, "not signed on"); return HA_FAIL; } if (pi->nextif == NULL) { return(NULL);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -