⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 client_lib.c

📁 在LINUX下实现HA的源代码
💻 C
📖 第 1 页 / 共 4 页
字号:
		order_seq = next;	 	}	pi->order_seq_head.next = NULL;}static voidzap_order_queue(llc_private_t* pi){	order_queue_t *	oq = pi->order_queue_head;	order_queue_t *	next;	int		i;	while (oq != NULL) {		next = oq->next;		for (i = 0; i < MAXMSGHIST; i++){			if (oq->node.orderQ[i]){				ZAPMSG(oq->node.orderQ[i]);				oq->node.orderQ[i] = NULL;			}			if (oq->cluster.orderQ[i]){				ZAPMSG(oq->cluster.orderQ[i]);				oq->cluster.orderQ[i] = NULL;			}		}		ha_free(oq);		oq = next;     	}	pi->order_queue_head = NULL;}/* * Create a new stringlist. */static struct stringlist*new_stringlist(const char *s){	struct stringlist*	ret;	char *			cp;	if (s == NULL) {		return(NULL);	}	if ((cp = ha_strdup(s)) == NULL) {		return(NULL);	}	if ((ret = MALLOCT(struct stringlist)) == NULL) {		ha_free(cp);		return(NULL);	}	ret->next = NULL;	ret->value = cp;	return(ret);}/* * Destroy (free) a stringlist. */static voiddestroy_stringlist(struct stringlist * s){	struct stringlist *	this;	struct stringlist *	next;	for (this=s; this; this=next) {		next = this->next;		ha_free(this->value);		memset(this, 0, sizeof(*this));		ha_free(this);	}}/* * Enqueue a message to be read later. */static intenqueue_msg(llc_private_t* pi, struct ha_msg* msg){	struct MsgQueue*	newQelem;	if (msg == NULL) {		return(HA_FAIL);	}	if ((newQelem = MALLOCT(struct MsgQueue)) == NULL) {		return(HA_FAIL);	}	newQelem->value = msg;	newQelem->prev = pi->lastQdmsg;	newQelem->next = NULL;	if (pi->lastQdmsg != NULL) {		pi->lastQdmsg->next = newQelem;	}	pi->lastQdmsg = newQelem;	if (pi->firstQdmsg == NULL) {		pi->firstQdmsg = newQelem;	}	return HA_OK;}/* * Dequeue a message. */static struct ha_msg *dequeue_msg(llc_private_t* pi){	struct MsgQueue*	qret;	struct ha_msg*		ret = NULL;		qret = pi->firstQdmsg;	if (qret != NULL) {		ret = qret->value;		pi->firstQdmsg=qret->next;		if (pi->firstQdmsg) {			pi->firstQdmsg->prev = NULL;		}		memset(qret, 0, sizeof(*qret));				/*		 * The only two pointers to this element are the first pointer,		 * and the prev pointer of the next element in the queue.		 * (or possibly lastQdmsg... See below)		 */		ha_free(qret);	}	if (pi->firstQdmsg == NULL) {		 /* Zap lastQdmsg if it pointed at this Q element */		pi->lastQdmsg=NULL;	}	return(ret);}/* * Search the general callback list for the given message type */static gen_callback_t*search_gen_callback(const char * type, llc_private_t* lcp){	struct gen_callback*	gcb;	for (gcb=lcp->genlist; gcb != NULL; gcb=gcb->next) {		if (strcmp(type, gcb->msgtype) == 0) {			return(gcb);		}	}	return(NULL);} /* * Add a general callback to the list of general callbacks. */static intadd_gen_callback(const char * msgtype, llc_private_t* lcp,	llc_msg_callback_t funp, void* pd){	struct gen_callback*	gcb;	char *			type;	if ((gcb = search_gen_callback(msgtype, lcp)) == NULL) {		gcb = MALLOCT(struct gen_callback);		if (gcb == NULL) {			return(HA_FAIL);		}		type = ha_strdup(msgtype);		if (type == NULL) {			ha_free(gcb);			return(HA_FAIL);		}		gcb->msgtype = type;		gcb->next = lcp->genlist;		lcp->genlist = gcb;	}else if (funp == NULL) {		return(del_gen_callback(lcp, msgtype));	}	gcb->cf = funp;	gcb->pd = pd;	return(HA_OK);}/* * Delete a general callback from the list of general callbacks. */static int	del_gen_callback(llc_private_t* lcp, const char * msgtype){	struct gen_callback*	gcb;	struct gen_callback*	prev = NULL;	for (gcb=lcp->genlist; gcb != NULL; gcb=gcb->next) {		if (strcmp(msgtype, gcb->msgtype) == 0) {			if (prev) {				prev->next = gcb->next;			}else{				lcp->genlist = gcb->next;			}			ha_free(gcb->msgtype);			gcb->msgtype = NULL;			free(gcb);			return(HA_OK);		}		prev = gcb;	}	return(HA_FAIL);} /* * Read an API message.  All other messages are enqueued to be read later. */static struct ha_msg *read_api_msg(llc_private_t* pi){	for (;;) {		struct ha_msg*	msg;		const char *	type;		pi->chan->ops->waitin(pi->chan);		if ((msg=msgfromIPC(pi->chan)) == NULL) {			ha_api_perror("read_api_msg: "			"Cannot read reply from IPC channel");			continue;		}		if ((type=ha_msg_value(msg, F_TYPE)) != NULL		&&	strcmp(type, T_APIRESP) == 0) {			return(msg);		}		/* Got an unexpected non-api message */		/* Queue it up for reading later */		enqueue_msg(pi, msg);	}	/*NOTREACHED*/	return(NULL);}/* * Read a client status respond message either from local node or from * a remote node. All other messages are enqueued to be read later. */static struct ha_msg *read_cstatus_respond_msg(llc_private_t* pi, int timeout){	struct ha_msg*	msg;	const char *	type;	struct pollfd	pfd;	pfd.fd = pi->chan->ops->get_recv_select_fd(pi->chan);	pfd.events = POLLIN;	while ((pi->chan->ops->is_message_pending(pi->chan)) 	||	(poll(&pfd, 1, timeout) > 0 && pfd.revents == POLLIN)) {		while (pi->chan->ops->is_message_pending(pi->chan)) {			if ((msg=msgfromIPC(pi->chan)) == NULL) {				ha_api_perror("read_api_msg: "				"Cannot read reply from IPC channel");				continue;			}			if (((type=ha_msg_value(msg, F_TYPE)) != NULL			&&	strcmp(type, T_RCSTATUS) == 0)			||	((type=ha_msg_value(msg, F_SUBTYPE)) != NULL			&&	strcmp(type, T_RCSTATUS) == 0)) {				return(msg);			}			/* Got an unexpected non-api message */			/* Queue it up for reading later */			enqueue_msg(pi, msg);		}	}	/* Timeout or caught a signal */	return NULL;}/* * Pop up orderQ. */static struct ha_msg *pop_orderQ(struct orderQ * q){	struct ha_msg *	msg;	if (q->orderQ[q->curr_index]){		msg = q->orderQ[q->curr_index];		q->orderQ[q->curr_index] = NULL;		q->curr_index = (q->curr_index + 1) % MAXMSGHIST;		q->curr_seqno++;		return msg;	}	return NULL;}/* *	Process ordered message */static struct ha_msg *process_ordered_msg(struct orderQ* q, struct ha_msg* msg, seqno_t oseq){	int	i;	if (oseq < q->curr_seqno || oseq - q->curr_seqno >= MAXMSGHIST){		if (oseq < q->curr_seqno){			/* Sender restarted */			if (DEBUGORDER)				cl_log(LOG_DEBUG, "Sender restarted!");			q->curr_seqno = 1;		}else {			/*			 * receives a very big sequence number, the			 * message is not reliable at this point			 */			if (DEBUGORDER) {				cl_log(LOG_DEBUG				,	"lost at least one unretrievable "					"packet! [%lx:%lx], force reset"				,	q->curr_seqno				,	oseq);			}			q->curr_seqno = oseq;		}		for (i = 0; i < MAXMSGHIST; i++) {			/* Clear order queue, msg obsoleted */			if (q->orderQ[i]){				ha_free(q->orderQ[i]);				q->orderQ[i] = NULL;			}		}		q->curr_index = 0;	}	/* Put the new received packet in queue */	q->orderQ[(q->curr_index + oseq - q->curr_seqno) % MAXMSGHIST] = msg;	/* Should we send the first ordered packets? */	if (oseq == q->curr_seqno || (q->curr_seqno == 1 	&&	oseq - q->curr_seqno >= SEQGAP)){		if (oseq != q->curr_seqno){			/*			 * We probably missed first several packets			 * since the client on from_node restarted.			 */			if (DEBUGORDER) {				cl_log(LOG_DEBUG				,	"Finally lost SEQGAP pkts, discard, "					"oseq 0x%lx, currseq 0x%lx"				,	oseq, q->curr_seqno);			}			for (i = q->curr_index			;	q->orderQ[i] == NULL			;	i = (i+1) % MAXMSGHIST)				q->curr_seqno++;			q->curr_index = i;			if (DEBUGORDER) {				cl_log(LOG_DEBUG				,	"After discard, seqno 0x%lx, index %d"				,	q->curr_seqno				,	q->curr_index);			}		}		return pop_orderQ(q);	}	return NULL;}/* *	Process msg gotten from FIFO or msgQ. */static struct ha_msg *process_hb_msg(llc_private_t* pi, struct ha_msg* msg){	const char *	coseq;	const char *	from_node;	const char *	to_node;	seqno_t		oseq;	order_queue_t * oq;	int		i;	if ((coseq = ha_msg_value(msg, F_ORDERSEQ)) != NULL	&&	sscanf(coseq, "%lx", &oseq) == 1){		/* find the order queue by from_node */		if ((from_node = ha_msg_value(msg, F_ORIG)) == NULL){			ha_api_log(LOG_ERR			,	"%s: extract F_ORIG failed", __FUNCTION__);			ZAPMSG(msg);			return NULL;		}		for (oq = pi->order_queue_head; oq != NULL; oq = oq->next){			if (strcmp(oq->from_node, from_node) == 0)				break;		}		if (oq == NULL){			oq = (order_queue_t *) ha_malloc(sizeof(order_queue_t));			if (oq == NULL){				ha_api_log(LOG_ERR				,	"%s: order_queue_t malloc failed"				,	__FUNCTION__);				ZAPMSG(msg);				return NULL;			}			strncpy(oq->from_node, from_node, HOSTLENG);			oq->node.curr_index = 0;			oq->node.curr_seqno = 1;                    			oq->cluster.curr_index = 0;			oq->cluster.curr_seqno = 1;                    			for (i=0; i < MAXMSGHIST; i++){				oq->node.orderQ[i] = NULL;				oq->cluster.orderQ[i] = NULL;			}			oq->next = pi->order_queue_head;			pi->order_queue_head = oq;		}		if ((to_node = ha_msg_value(msg, F_TO)) == NULL)			return process_ordered_msg(&oq->cluster, msg, oseq);		else			return process_ordered_msg(&oq->node, msg, oseq);	}else		/* Simply return no order required msg */		return msg;}/* * Read a heartbeat message.  Read from the queue first. */static struct ha_msg *read_hb_msg(ll_cluster_t* llc, int blocking){	llc_private_t*	pi;	struct ha_msg*	msg;	struct ha_msg*	retmsg;	order_queue_t*	oq;	if (!ISOURS(llc)) {		ha_api_log(LOG_ERR, "read_hb_msg: bad cinfo");		return NULL;	}	pi = (llc_private_t*)llc->ll_cluster_private;	if (!pi->SignedOn) {		return NULL;	}	/* Process msg from msgQ */	while ((msg = dequeue_msg(pi))){		if ((retmsg = process_hb_msg(pi, msg)))			return retmsg;	}	for (oq = pi->order_queue_head; oq != NULL; oq = oq->next){		if ((retmsg = pop_orderQ(&oq->node)))			return retmsg;		if ((retmsg = pop_orderQ(&oq->cluster)))			return retmsg;	}	/* Process msg from FIFO */	while (msgready(llc)){		msg = msgfromIPC(pi->chan);		if (msg == NULL) {			if (pi->chan->ch_status != IPC_CONNECT) {				pi->SignedOn = FALSE;				return NULL;			}		}else if ((retmsg = process_hb_msg(pi, msg))) {			return retmsg;		}	}	/* Process msg from orderQ */	if (!blocking)		return NULL;	/* If this is a blocking call, we keep on reading from FIFO, so         * that we can finally return a non-NULL msg to user.         */	for(;;) {		pi->chan->ops->waitin(pi->chan);		msg = msgfromIPC(pi->chan);		if (msg == NULL) {			if (pi->chan->ch_status != IPC_CONNECT) {				pi->SignedOn = FALSE;			}			return NULL;		}		if ((retmsg = process_hb_msg(pi, msg))) {			return retmsg;		}	}}/* * Add a callback for the given message type. */static intset_msg_callback(ll_cluster_t* ci, const char * msgtype,			llc_msg_callback_t callback, void * p){	ClearLog();	if (!ISOURS(ci)) {		ha_api_log(LOG_ERR, "set_msg_callback: bad cinfo");		return HA_FAIL;	}	return(add_gen_callback(msgtype,	(llc_private_t*)ci->ll_cluster_private, callback, p));}/* * Set the node status change callback. */static intset_nstatus_callback (ll_cluster_t* ci,		llc_nstatus_callback_t cbf, 	void * p){	llc_private_t*	pi = ci->ll_cluster_private;	pi->node_callback = cbf;	pi->node_private = p;	return(HA_OK);}/* * Set the interface status change callback. */static intset_ifstatus_callback (ll_cluster_t* ci,		llc_ifstatus_callback_t cbf, void * p){	llc_private_t*	pi = ci->ll_cluster_private;	pi->if_callback = cbf;	pi->if_private = p;	return(HA_OK);}/* * Set the client status change callback. */static intset_cstatus_callback (ll_cluster_t* ci,		llc_cstatus_callback_t cbf, void * p){	llc_private_t*	pi = ci->ll_cluster_private;	ClearLog();	if (!ISOURS(ci)) {		ha_api_log(LOG_ERR, "%s: bad cinfo", __FUNCTION__);		return HA_FAIL;	}	if (!pi->SignedOn) {		ha_api_log(LOG_ERR, "not signed on");		return HA_FAIL;	}	pi->cstatus_callback = cbf;	pi->client_private = p;	return HA_OK;}/* * Call the callback associated with this message (if any) * Return TRUE if a callback was called. */static intCallbackCall(llc_private_t* p, struct ha_msg * msg){	const char *	mtype=  ha_msg_value(msg, F_TYPE);	struct gen_callback*	gcb;	if (mtype == NULL) {		return(0);	}		/* Special case: node status (change) */	if ((strcasecmp(mtype, T_STATUS) == 0	||	strcasecmp(mtype, T_NS_STATUS) == 0)) {		/* If DEADSTATUS, cleanup order queue for the node */		if (strcmp(ha_msg_value(msg, F_STATUS), DEADSTATUS) == 0) {			order_queue_t *	oq = p->order_queue_head;			order_queue_t *	prev;			order_queue_t *	next;			int		i;			for (prev = NULL; oq != NULL; prev = oq, oq = oq->next){				if (strcmp(oq->from_node				,	ha_msg_value(msg, F_ORIG)) == 0)					break;			}			if (oq){				next = oq->next;				for (i = 0; i < MAXMSGHIST; i++){					if (oq->node.orderQ[i])						ZAPMSG(oq->node.orderQ[i]);					if (oq->cluster.orderQ[i])						ZAPMSG(oq->cluster.orderQ[i]);				}				ha_free(oq);				if (prev)					prev->next = next;				else					p->order_queue_head = next;			}		}		if (p->node_callback) {			p->node_callback(ha_msg_value(msg, F_ORIG)			,	ha_msg_value(msg, F_STATUS), p->node_private);			return(1);		}	}	/* Special case: interface status (change) */	if (p->if_callback && strcasecmp(mtype, T_IFSTATUS) == 0) {		p->if_callback(ha_msg_value(msg, F_NODE)		,	ha_msg_value(msg, F_IFNAME)		,	ha_msg_value(msg, F_STATUS)		,	p->if_private);		return(1);	}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -