⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 client_lib.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 5 页
字号:
	ClearLog();	if (!ISOURS(lcl)) {		ha_api_log(LOG_ERR, "get_ifstatus: bad cinfo");		return NULL;	}	pi = (llc_private_t*)lcl->ll_cluster_private;	if (!pi->SignedOn) {		ha_api_log(LOG_ERR, "not signed on");		return NULL;	}	if ((request = hb_api_boilerplate(API_IFSTATUS)) == NULL) {		return NULL;	}	if (ha_msg_add(request, F_NODENAME, host) != HA_OK) {		ha_api_log(LOG_ERR, "get_ifstatus: cannot add field");		ZAPMSG(request);		return NULL;	}	if (ha_msg_add(request, F_IFNAME, ifname) != HA_OK) {		ha_api_log(LOG_ERR, "get_ifstatus: cannot add field");		ZAPMSG(request);		return NULL;	}	/* Send message */	if (msg2ipcchan(request, pi->chan) != HA_OK) {		ZAPMSG(request);		ha_api_perror("Can't send message to IPC Channel");		return NULL;	}	ZAPMSG(request);	/* Read reply... */	if ((reply=read_api_msg(pi)) == NULL) {		return NULL;	}	if ((result = ha_msg_value(reply, F_APIRESULT)) != NULL	&&	strcmp(result, API_OK) == 0	&&	(status = ha_msg_value(reply,F_STATUS)) != NULL) {                memset(statbuf, 0, sizeof(statbuf));		strncpy(statbuf, status, sizeof(statbuf) - 1);		ret = statbuf;	}else{		ret = NULL;	}	ZAPMSG(reply);	return ret;}/* * Zap our list of nodes */static voidzap_nodelist(llc_private_t* pi){	destroy_stringlist(pi->nodelist);	pi->nodelist=NULL;	pi->nextnode = NULL;}/* * Zap our list of interfaces. */static voidzap_iflist(llc_private_t* pi){	destroy_stringlist(pi->iflist);	pi->iflist=NULL;	pi->nextif = NULL;}static voidzap_order_seq(llc_private_t* pi){	order_seq_t * order_seq = pi->order_seq_head.next;	order_seq_t * next;	while (order_seq != NULL){		next = order_seq->next;		ha_free(order_seq);		order_seq = next;	 	}	pi->order_seq_head.next = NULL;}static voidzap_order_queue(llc_private_t* pi){	order_queue_t *	oq = pi->order_queue_head;	order_queue_t *	next;	int		i;	while (oq != NULL) {		next = oq->next;		for (i = 0; i < MAXMSGHIST; i++){			if (oq->node.orderQ[i]){				ZAPMSG(oq->node.orderQ[i]);				oq->node.orderQ[i] = NULL;			}			if (oq->cluster.orderQ[i]){				ZAPMSG(oq->cluster.orderQ[i]);				oq->cluster.orderQ[i] = NULL;			}		}		ha_free(oq);		oq = next;     	}	pi->order_queue_head = NULL;}/* * Create a new stringlist. */static struct stringlist*new_stringlist(const char *s){	struct stringlist*	ret;	char *			cp;	if (s == NULL) {		return(NULL);	}	if ((cp = ha_strdup(s)) == NULL) {		return(NULL);	}	if ((ret = MALLOCT(struct stringlist)) == NULL) {		ha_free(cp);		return(NULL);	}	ret->next = NULL;	ret->value = cp;	return(ret);}/* * Destroy (free) a stringlist. */static voiddestroy_stringlist(struct stringlist * s){	struct stringlist *	this;	struct stringlist *	next;	for (this=s; this; this=next) {		next = this->next;		ha_free(this->value);		memset(this, 0, sizeof(*this));		ha_free(this);	}}/* * Enqueue a message to be read later. */static intenqueue_msg(llc_private_t* pi, struct ha_msg* msg){	struct MsgQueue*	newQelem;	if (msg == NULL) {		return(HA_FAIL);	}	if ((newQelem = MALLOCT(struct MsgQueue)) == NULL) {		return(HA_FAIL);	}	newQelem->value = msg;	newQelem->prev = pi->lastQdmsg;	newQelem->next = NULL;	if (pi->lastQdmsg != NULL) {		pi->lastQdmsg->next = newQelem;	}	pi->lastQdmsg = newQelem;	if (pi->firstQdmsg == NULL) {		pi->firstQdmsg = newQelem;	}	return HA_OK;}/* * Dequeue a message. */static struct ha_msg *dequeue_msg(llc_private_t* pi){	struct MsgQueue*	qret;	struct ha_msg*		ret = NULL;		qret = pi->firstQdmsg;	if (qret != NULL) {		ret = qret->value;		pi->firstQdmsg=qret->next;		if (pi->firstQdmsg) {			pi->firstQdmsg->prev = NULL;		}		memset(qret, 0, sizeof(*qret));				/*		 * The only two pointers to this element are the first pointer,		 * and the prev pointer of the next element in the queue.		 * (or possibly lastQdmsg... See below)		 */		ha_free(qret);	}	if (pi->firstQdmsg == NULL) {		 /* Zap lastQdmsg if it pointed at this Q element */		pi->lastQdmsg=NULL;	}	return(ret);}/* * Search the general callback list for the given message type */static gen_callback_t*search_gen_callback(const char * type, llc_private_t* lcp){	struct gen_callback*	gcb;	for (gcb=lcp->genlist; gcb != NULL; gcb=gcb->next) {		if (strcmp(type, gcb->msgtype) == 0) {			return(gcb);		}	}	return(NULL);} /* * Add a general callback to the list of general callbacks. */static intadd_gen_callback(const char * msgtype, llc_private_t* lcp,	llc_msg_callback_t funp, void* pd){	struct gen_callback*	gcb;	char *			type;	if ((gcb = search_gen_callback(msgtype, lcp)) == NULL) {		gcb = MALLOCT(struct gen_callback);		if (gcb == NULL) {			return(HA_FAIL);		}		type = ha_strdup(msgtype);		if (type == NULL) {			ha_free(gcb);			return(HA_FAIL);		}		gcb->msgtype = type;		gcb->next = lcp->genlist;		lcp->genlist = gcb;	}else if (funp == NULL) {		return(del_gen_callback(lcp, msgtype));	}	gcb->cf = funp;	gcb->pd = pd;	return(HA_OK);}/* * Delete a general callback from the list of general callbacks. */static int	del_gen_callback(llc_private_t* lcp, const char * msgtype){	struct gen_callback*	gcb;	struct gen_callback*	prev = NULL;	for (gcb=lcp->genlist; gcb != NULL; gcb=gcb->next) {		if (strcmp(msgtype, gcb->msgtype) == 0) {			if (prev) {				prev->next = gcb->next;			}else{				lcp->genlist = gcb->next;			}			ha_free(gcb->msgtype);			gcb->msgtype = NULL;			free(gcb);			return(HA_OK);		}		prev = gcb;	}	return(HA_FAIL);} /* * Read an API message.  All other messages are enqueued to be read later. */static struct ha_msg *read_api_msg(llc_private_t* pi){	for (;;) {		struct ha_msg*	msg;		const char *	type;		pi->chan->ops->waitin(pi->chan);		if ((msg=msgfromIPC(pi->chan)) == NULL) {			ha_api_perror("read_api_msg: "			"Cannot read reply from IPC channel");			continue;		}		if ((type=ha_msg_value(msg, F_TYPE)) != NULL		&&	strcmp(type, T_APIRESP) == 0) {			return(msg);		}		/* Got an unexpected non-api message */		/* Queue it up for reading later */		enqueue_msg(pi, msg);	}	/*NOTREACHED*/	return(NULL);}/* * Read a client status respond message either from local node or from * a remote node. All other messages are enqueued to be read later. */static struct ha_msg *read_cstatus_respond_msg(llc_private_t* pi, int timeout){	struct ha_msg*	msg;	const char *	type;	struct pollfd	pfd;	pfd.fd = pi->chan->ops->get_recv_select_fd(pi->chan);	pfd.events = POLLIN;	while ((pi->chan->ops->is_message_pending(pi->chan)) 	||	(poll(&pfd, 1, timeout) > 0 && pfd.revents == POLLIN)) {		while (pi->chan->ops->is_message_pending(pi->chan)) {			if ((msg=msgfromIPC(pi->chan)) == NULL) {				ha_api_perror("read_api_msg: "				"Cannot read reply from IPC channel");				continue;			}			if (((type=ha_msg_value(msg, F_TYPE)) != NULL			&&	strcmp(type, T_RCSTATUS) == 0)			||	((type=ha_msg_value(msg, F_SUBTYPE)) != NULL			&&	strcmp(type, T_RCSTATUS) == 0)) {				return(msg);			}			/* Got an unexpected non-api message */			/* Queue it up for reading later */			enqueue_msg(pi, msg);		}	}	/* Timeout or caught a signal */	return NULL;}/* This is the place to handle out of order messages from a restarted  * client. If we receive messages from a restarted client yet no leave * message has been received for the previous client, we need to  * save the restarted client's messages in backup queue. When the leave * message is received, we then call moveup_backupQ()  so that the backup * queue is promoted to our current queue, not backup any more. */static voidmoveup_backupQ(struct orderQ* q){	int	i;	if (q == NULL){		return;	}		if (q->backupQ){		struct orderQ* backup_q = q->backupQ;				memcpy(q, backup_q, sizeof(struct orderQ));				if (backup_q->backupQ != NULL){			cl_log(LOG_ERR, "moveup_backupQ:"			       "backupQ in backupQ is not NULL");  		}		ha_free(backup_q);		q->backupQ = NULL;	}else {		/*the queue must be empty*/		for (i = 0; i < MAXMSGHIST; i++) {			if (q->orderQ[i]){				cl_log(LOG_ERR, "moveup_backupQ:"				       "queue is not empty"				       " possible memory leak");				cl_log_message(LOG_ERR, q->orderQ[i]);			}		}				q->curr_oseqno = 0;			}	return ;}/* * Pop up orderQ. */static struct ha_msg *pop_orderQ(struct orderQ * q){	struct ha_msg *	msg;	if (q->orderQ[q->curr_index]){		msg = q->orderQ[q->curr_index];		q->orderQ[q->curr_index] = NULL;		q->curr_index = (q->curr_index + 1) % MAXMSGHIST;		q->curr_oseqno++;		return msg;	}		return NULL;}static intmsg_oseq_compare(seqno_t oseq1, seqno_t gen1,  		 seqno_t oseq2,  seqno_t gen2){	int ret;	if ( gen1 > gen2){		ret = 1;	} else if (gen1 < gen2){		ret = -1;	} else {				if (oseq1 > oseq2){			ret = 1;		} else if (oseq1 < oseq2){			ret = -1;		} else{			ret =  0;		}			}		return ret;	}static voidreset_orderQ(struct orderQ* q){	int i;	for (i =0 ;i < MAXMSGHIST; i++){		if (q->orderQ[i]){			ha_msg_del(q->orderQ[i]);			q->orderQ[i] = 0;		}	}		if (q->backupQ != NULL){		reset_orderQ(q->backupQ);		ha_free(q->backupQ);		q->backupQ = NULL;	}		memset(q, 0, sizeof(struct orderQ));	return;}/* *	Process ordered message */static voiddisplay_orderQ(struct orderQ* q){	if(!q){		return;	}	cl_log(LOG_INFO, "curr_index=%x,  curr_oseqno=%lx, "	       "curr_gen=%lx, curr_client_gen=%lx",	       q->curr_index, q->curr_oseqno,	       q->curr_gen, q->curr_client_gen);		cl_log(LOG_INFO, "first_msg_seq =%lx, first_msg_gen = %lx,"	       "first_msg_client_gen =%lx",	       q->first_msg_seq, q->first_msg_gen, 	       q->first_msg_client_gen);	if (q->backupQ == NULL){		cl_log(LOG_INFO, "q->backupQ is NULL");	}else{		display_orderQ(q->backupQ);	}	}static struct ha_msg *process_ordered_msg(struct orderQ* q, struct ha_msg* msg,		    seqno_t gen, seqno_t cligen,		    seqno_t seq, seqno_t oseq,		    int popmsg){	int		i;			/*    	display_orderQ(q);  */		/*if this is the first packet, pop it*/	if ( q->first_msg_seq ==  0){		q->first_msg_seq = seq;		q->first_msg_client_gen = cligen;		q->first_msg_gen = gen;		q->curr_gen = gen;		q->curr_client_gen = cligen;		q->curr_oseqno = oseq -1 ;				goto out;	}			/*any message with lower sequence than q->first_msg_seq	  will be dropped*/	if (q->first_msg_seq != 0 && msg_oseq_compare(q->first_msg_seq,						      q->first_msg_gen,						      seq, gen) > 0 ) {		return NULL;	}		if ( q->curr_oseqno == 0){		q->curr_gen = gen;		q->curr_client_gen = cligen;		goto out;	}	if ( gen > q->curr_gen ){		/*heartbeat restart, clean everything up*/						reset_orderQ(q);				q->first_msg_seq = seq;		q->first_msg_client_gen = cligen;		q->first_msg_gen = gen;		q->curr_gen = gen;		q->curr_client_gen = cligen;		q->curr_oseqno = oseq - 1;				goto out;			} else if (gen < q->curr_gen){		/*		 * message from previous heartbeat generation, 		 * drop the message		 */		return NULL;	} else if(cligen > q->curr_client_gen ){		/*client restarted*/				if (q->backupQ == NULL){			if ( (q->backupQ = ha_malloc(sizeof(struct orderQ))) 			     ==NULL  ){								cl_log(LOG_ERR, "process_ordered_msg: "				       "allocating memory for backupQ failed");				return NULL;						}			memset(q->backupQ, 0, sizeof(struct orderQ));		}				process_ordered_msg(q->backupQ, msg, gen, cligen, seq, oseq, 0);				return NULL;	} else if (cligen < q->curr_client_gen){		/*Message from a previous client*/		/*this should never happend*/				cl_log(LOG_ERR, "process_ordered_msg: Received message"		       " from previous client. This should never happen");		cl_log_message(LOG_ERR, msg);		return NULL;	}else if (oseq - q->curr_oseqno >= MAXMSGHIST){

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -