⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 client_lib.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 5 页
字号:
		/*		 * receives a very big sequence number, the		 * message is not reliable at this point		 */		if (DEBUGORDER) {			cl_log(LOG_DEBUG			       ,	"lost at least one unretrievable "			       "packet! [%lx:%lx], force reset"			       ,	q->curr_oseqno			       ,	oseq);		}		q->curr_oseqno = oseq - 1;				for (i = 0; i < MAXMSGHIST; i++) {			/* Clear order queue, msg obsoleted */			if (q->orderQ[i]){				ha_msg_del(q->orderQ[i]);				q->orderQ[i] = NULL;			}		}		q->curr_index = 0;	}	 out:	/* Put the new received packet in queue */	q->orderQ[(q->curr_index + oseq - q->curr_oseqno -1 ) % MAXMSGHIST] = msg;		/* if this is the packet we are expecting, pop it*/	if (popmsg && msg_oseq_compare(q->curr_oseqno + 1, 				       q->curr_gen,oseq, gen) == 0){		return pop_orderQ(q);	}	return NULL;	}static struct ha_msg* process_client_status_msg(llc_private_t* pi, struct ha_msg* msg,			   const char* from_node){	const char*	status = ha_msg_value(msg, F_STATUS);	order_queue_t *	oq;	struct ha_msg*	retmsg;		if (status && (strcmp(status, LEAVESTATUS) == 0 		       || strcmp(status, JOINSTATUS) == 0) ){		for (oq = pi->order_queue_head; oq != NULL; oq = oq->next){			if (strcmp(oq->from_node, from_node) == 0){				break;			}		}		if (oq == NULL){			/*no ordered queue found, good, 			 *simply return the message			 */			return msg;		}		if (strcmp(status, LEAVESTATUS) == 0 ){						if (oq->leave_msg != NULL){				cl_log(LOG_ERR, "process_client_status_msg: "				       " the previous leave msg "				       "is not delivered yet");				cl_log_message(LOG_ERR, oq->leave_msg);				cl_log_message(LOG_ERR, msg);				return NULL;			}						oq->leave_msg = msg;						if ((retmsg = pop_orderQ(&oq->node))){								return retmsg;			}			if ((retmsg = pop_orderQ(&oq->cluster))){				return retmsg;			}						oq->leave_msg = NULL;			moveup_backupQ(&oq->node);			moveup_backupQ(&oq->cluster);			return msg;		}else { /*join message*/			return msg;					}						}else{		cl_log(LOG_ERR, "process_client_status_msg: "		       "no status found in client status msg");		cl_log_message(LOG_ERR, msg);		return NULL;	}				return msg;}/* *	Process msg gotten from FIFO or msgQ. */static struct ha_msg *process_hb_msg(llc_private_t* pi, struct ha_msg* msg){	const char *	from_node;	const char *	to_node;	order_queue_t * oq;	const char *	coseq;	seqno_t		oseq;	const char *	cgen;	seqno_t		gen;		const char*	cseq;	seqno_t		seq;	const char*	ccligen;	seqno_t		cligen;			if ((cseq = ha_msg_value(msg, F_SEQ)) == NULL	    ||	sscanf(cseq, "%lx", &seq) != 1){		return msg;	}			if ((cgen = ha_msg_value(msg, F_HBGENERATION)) == NULL	    ||	sscanf(cgen, "%lx", &gen) != 1){		return msg;	}			if ((ccligen = ha_msg_value(msg, F_CLIENT_GENERATION)) == NULL	    ||	sscanf(ccligen, "%lx", &cligen) != 1){		return msg;	}		if ((from_node = ha_msg_value(msg, F_ORIG)) == NULL){		ha_api_log(LOG_ERR			   ,	"%s: extract F_ORIG failed", __FUNCTION__);		ZAPMSG(msg);		return NULL;	}			if ((coseq = ha_msg_value(msg, F_ORDERSEQ)) != NULL	&&	sscanf(coseq, "%lx", &oseq) == 1){		/* find the order queue by from_node */		for (oq = pi->order_queue_head; oq != NULL; oq = oq->next){			if (strcmp(oq->from_node, from_node) == 0)				break;		}		if (oq == NULL){			oq = (order_queue_t *) 				ha_malloc(sizeof(order_queue_t));			if (oq == NULL){				ha_api_log(LOG_ERR				,	"%s: order_queue_t malloc failed"				,	__FUNCTION__);				ZAPMSG(msg);				return NULL;			}			memset(oq, 0, sizeof(*oq));			strncpy(oq->from_node, from_node, HOSTLENG);						oq->next = pi->order_queue_head;			pi->order_queue_head = oq;		}		if ((to_node = ha_msg_value(msg, F_TO)) == NULL)			return process_ordered_msg(&oq->cluster, msg, gen,						   cligen, seq, oseq, 1);		else			return process_ordered_msg(&oq->node, msg, gen, 						   cligen, seq, oseq, 1);	}else {		const char* type = ha_msg_value(msg, F_TYPE);				if ( type && strcmp(type, T_APICLISTAT) == 0){			return process_client_status_msg(pi, msg, from_node);		}				/* Simply return no order required msg */		return msg;	}}/* * Read a heartbeat message.  Read from the queue first. */static struct ha_msg *read_hb_msg(ll_cluster_t* llc, int blocking){	llc_private_t*	pi;	struct ha_msg*	msg;	struct ha_msg*	retmsg;	order_queue_t*	oq;	if (!ISOURS(llc)) {		ha_api_log(LOG_ERR, "read_hb_msg: bad cinfo");		return NULL;	}	pi = (llc_private_t*)llc->ll_cluster_private;	if (!pi->SignedOn) {		return NULL;	}	/* Process msg from msgQ */	while ((msg = dequeue_msg(pi))){		if ((retmsg = process_hb_msg(pi, msg)))			return retmsg;	}	for (oq = pi->order_queue_head; oq != NULL; oq = oq->next){	process_oq:		if ((retmsg = pop_orderQ(&oq->node))){			return retmsg;		}		if ((retmsg = pop_orderQ(&oq->cluster))){			return retmsg;		}				if (oq->leave_msg != NULL){			retmsg = oq->leave_msg;			oq->leave_msg = NULL;			oq->client_leaving = 1;			return retmsg;		}		if (oq->client_leaving){			moveup_backupQ(&oq->node);			moveup_backupQ(&oq->cluster);						oq->client_leaving = 0;			goto process_oq;		}			}	/* Process msg from FIFO */	while (msgready(llc)){		msg = msgfromIPC(pi->chan);		if (msg == NULL) {			if (pi->chan->ch_status != IPC_CONNECT) {				pi->SignedOn = FALSE;				return NULL;			}		}else if ((retmsg = process_hb_msg(pi, msg))) {			return retmsg;		}	}	/* Process msg from orderQ */	if (!blocking)		return NULL;	/* If this is a blocking call, we keep on reading from FIFO, so         * that we can finally return a non-NULL msg to user.         */	for(;;) {		pi->chan->ops->waitin(pi->chan);		msg = msgfromIPC(pi->chan);		if (msg == NULL) {			if (pi->chan->ch_status != IPC_CONNECT) {				pi->SignedOn = FALSE;			}			return NULL;		}		if ((retmsg = process_hb_msg(pi, msg))) {			return retmsg;		}	}}/* * Add a callback for the given message type. */static intset_msg_callback(ll_cluster_t* ci, const char * msgtype,			llc_msg_callback_t callback, void * p){	ClearLog();	if (!ISOURS(ci)) {		ha_api_log(LOG_ERR, "set_msg_callback: bad cinfo");		return HA_FAIL;	}	return(add_gen_callback(msgtype,	(llc_private_t*)ci->ll_cluster_private, callback, p));}/* * Set the node status change callback. */static intset_nstatus_callback (ll_cluster_t* ci,		llc_nstatus_callback_t cbf, 	void * p){	llc_private_t*	pi = ci->ll_cluster_private;	pi->node_callback = cbf;	pi->node_private = p;	return(HA_OK);}/* * Set the interface status change callback. */static intset_ifstatus_callback (ll_cluster_t* ci,		llc_ifstatus_callback_t cbf, void * p){	llc_private_t*	pi = ci->ll_cluster_private;	pi->if_callback = cbf;	pi->if_private = p;	return(HA_OK);}/* * Set the client status change callback. */static intset_cstatus_callback (ll_cluster_t* ci,		llc_cstatus_callback_t cbf, void * p){	llc_private_t*	pi = ci->ll_cluster_private;	ClearLog();	if (!ISOURS(ci)) {		ha_api_log(LOG_ERR, "%s: bad cinfo", __FUNCTION__);		return HA_FAIL;	}	if (!pi->SignedOn) {		ha_api_log(LOG_ERR, "not signed on");		return HA_FAIL;	}	pi->cstatus_callback = cbf;	pi->client_private = p;	return HA_OK;}/* * Call the callback associated with this message (if any) * Return TRUE if a callback was called. */static intCallbackCall(llc_private_t* p, struct ha_msg * msg){	const char *	mtype=  ha_msg_value(msg, F_TYPE);	struct gen_callback*	gcb;	if (mtype == NULL) {		return(0);	}		/* Special case: node status (change) */	if ((strcasecmp(mtype, T_STATUS) == 0	||	strcasecmp(mtype, T_NS_STATUS) == 0)) {		/* If DEADSTATUS, cleanup order queue for the node */		if (strcmp(ha_msg_value(msg, F_STATUS), DEADSTATUS) == 0) {			order_queue_t *	oq = p->order_queue_head;			order_queue_t *	prev;			order_queue_t *	next;			int		i;			for (prev = NULL; oq != NULL; prev = oq, oq = oq->next){				if (strcmp(oq->from_node				,	ha_msg_value(msg, F_ORIG)) == 0)					break;			}			if (oq){				next = oq->next;				for (i = 0; i < MAXMSGHIST; i++){					if (oq->node.orderQ[i])						ZAPMSG(oq->node.orderQ[i]);					if (oq->cluster.orderQ[i])						ZAPMSG(oq->cluster.orderQ[i]);				}				ha_free(oq);				if (prev)					prev->next = next;				else					p->order_queue_head = next;			}		}		if (p->node_callback) {			p->node_callback(ha_msg_value(msg, F_ORIG)			,	ha_msg_value(msg, F_STATUS), p->node_private);			return(1);		}	}	/* Special case: interface status (change) */	if (p->if_callback && strcasecmp(mtype, T_IFSTATUS) == 0) {		p->if_callback(ha_msg_value(msg, F_NODE)		,	ha_msg_value(msg, F_IFNAME)		,	ha_msg_value(msg, F_STATUS)		,	p->if_private);		return(1);	}	/* Special case: client status (change) */	if (p->cstatus_callback && strcasecmp(mtype, T_APICLISTAT) == 0) {		p->cstatus_callback(ha_msg_value(msg, F_ORIG)		,	ha_msg_value(msg, F_FROMID)		,       ha_msg_value(msg, F_STATUS)		,       p->client_private);		return(1);	}	if (p->cstatus_callback && strcasecmp(mtype, T_RCSTATUS) == 0) {		p->cstatus_callback(ha_msg_value(msg, F_ORIG)		,	ha_msg_value(msg, F_CLIENTNAME)		,       ha_msg_value(msg, F_CLIENTSTATUS)		,       p->client_private);		return(1);	}	/* The general case: Any other message type */	if ((gcb = search_gen_callback(mtype, p)) != NULL) {		gcb->cf(msg, gcb->pd);		return 1;	}	return(0);}/* * Return the next message not handled by a callback. * Invoke callbacks for messages encountered along the way. */static struct ha_msg *read_msg_w_callbacks(ll_cluster_t* llc, int blocking){	struct ha_msg*	msg = NULL;	llc_private_t* pi;	if (!ISOURS(llc)) {		ha_api_log(LOG_ERR, "read_msg_w_callbacks: bad cinfo");		return HA_FAIL;	}	pi = (llc_private_t*) llc->ll_cluster_private;	if (!pi->SignedOn) {		ha_api_log(LOG_ERR, "read_msg_w_callbacks: Not signed on");		return NULL;	}	do {		if (msg) {			ZAPMSG(msg);		}		msg = read_hb_msg(llc, blocking);	}while (msg && CallbackCall(pi, msg));	return(msg);}/* * Receive messages.  Activate callbacks.  Messages without callbacks * are ignored.  Potentially several messages could be acted on. */static intrcvmsg(ll_cluster_t* llc, int blocking){	struct ha_msg*	msg = NULL;		msg=read_msg_w_callbacks(llc, blocking);	if (msg) {		ZAPMSG(msg);		return(1);	}	return(0);}/* * Initialize nodewalk. (mainly retrieve list of nodes) */static intinit_nodewalk (ll_cluster_t* ci){	llc_private_t*	pi;	ClearLog();	if (!ISOURS(ci)) {		ha_api_log(LOG_ERR, "init_nodewalk: bad cinfo");		return HA_FAIL;	}	pi = (llc_private_t*)ci->ll_cluster_private;	if (!pi->SignedOn) {		ha_api_log(LOG_ERR, "not signed on");		return HA_FAIL;	}	zap_nodelist(pi);	return(get_nodelist(pi));}/* * Return the next node in the list, or NULL if none. */static const char *nextnode (ll_cluster_t* ci){	llc_private_t*	pi;	const char *	ret;	ClearLog();	if (!ISOURS(ci)) {		ha_api_log(LOG_ERR, "nextnode: bad cinfo");		return NULL;	}	pi = (llc_private_t*)ci->ll_cluster_private;	if (!pi->SignedOn) {		ha_api_log(LOG_ERR, "not signed on");		return NULL;	}	if (pi->nextnode == NULL) {		return(NULL);	}	ret = pi->nextnode->value;	pi->nextnode = pi->nextnode->next;	return(ret);}/* * Clean up after a nodewalk (throw away node list) */static intend_nodewalk(ll_cluster_t* ci){	llc_private_t*	pi;	ClearLog();	if (!ISOURS(ci)) {		ha_api_log(LOG_ERR, "end_nodewalk: bad cinfo");		return HA_FAIL;	}	pi = ci->ll_cluster_private;	if (!pi->SignedOn) {		ha_api_log(LOG_ERR, "not signed on");		return HA_FAIL;	}	zap_nodelist(pi);	return(HA_OK);}/* * Initialize interface walk. (mainly retrieve list of interfaces) */static intinit_ifwalk (ll_cluster_t* ci, const char * host){	llc_private_t*	pi;	ClearLog();	if (!ISOURS(ci)) {		ha_api_log(LOG_ERR, "init_ifwalk: bad cinfo");		return HA_FAIL;	}	pi = (llc_private_t*)ci->ll_cluster_private;	if (!pi->SignedOn) {		ha_api_log(LOG_ERR, "not signed on");		return HA_FAIL;	}	zap_iflist(pi);	return(get_iflist(pi, host));}/* * Return the next interface in the iflist, or NULL if none. */static const char *nextif (ll_cluster_t* ci){	llc_private_t*	pi = ci->ll_cluster_private;	const char *	ret;	ClearLog();	if (!ISOURS(ci)) {		ha_api_log(LOG_ERR, "nextif: bad cinfo");		return HA_FAIL;	}	if (!pi->SignedOn) {		ha_api_log(LOG_ERR, "not signed on");		return HA_FAIL;	}	if (pi->nextif == NULL) {		return(NULL);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -