⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 gsource.c

📁 在LINUX下实现HA的源代码
💻 C
字号:
/* $Id: GSource.c,v 1.13.2.3 2004/09/07 21:42:32 gshi Exp $ */#include <portability.h>#include <string.h>#include <clplumbing/cl_log.h>#include <clplumbing/GSource.h>#define	MAG_GFDSOURCE	0xfeed0001U#define	MAG_GCHSOURCE	0xfeed0002U#define	MAG_GWCSOURCE	0xfeed0003U#define	IS_FDSOURCE(p)	((p)->magno == MAG_GFDSOURCE)#define	IS_CHSOURCE(p)	((p)->magno == MAG_GCHSOURCE)#define	IS_WCSOURCE(p)	((p)->magno == MAG_GWCSOURCE)struct GFDSource_s {	unsigned	magno;	/* MAG_GFDSOURCE */	void*		udata;	gboolean	(*dispatch)(int fd, gpointer user_data);	GPollFD		gpfd;	GDestroyNotify	dnotify;	guint		gsourceid;};struct GCHSource_s {	unsigned	magno;	/* MAG_GCHSOURCE */	void*		udata;	IPC_Channel*	ch;	gboolean 	(*dispatch)(IPC_Channel* ch, gpointer user_data);	GDestroyNotify	dnotify;	gboolean	fd_fdx;	GPollFD		infd;	GPollFD		outfd;	guint		gsourceid;};struct GWCSource_s {	unsigned		magno;	/* MAG_GWCSOURCE */	void*			udata;	GPollFD			gpfd;	GDestroyNotify		dnotify;	IPC_WaitConnection*	wch;	IPC_Auth*		auth_info;	gboolean (*dispatch)(IPC_Channel* accept_ch, gpointer udata);	guint			gsourceid;};#define	DEF_EVENTS	(G_IO_IN|G_IO_PRI|G_IO_HUP|G_IO_ERR|G_IO_NVAL)#define	OUTPUT_EVENTS	(G_IO_OUT)static gboolean G_fd_prepare(gpointer source_data,       GTimeVal* current_time,       gint* timeout, gpointer user_data);static gboolean G_fd_check(gpointer source_data,       GTimeVal* current_time,       gpointer user_data);static gboolean G_fd_dispatch(gpointer source_data,       GTimeVal* current_time,       gpointer user_data);static void G_fd_destroy(gpointer user_data);static GSourceFuncs G_fd_SourceFuncs = {	G_fd_prepare,	G_fd_check,	G_fd_dispatch,	G_fd_destroy,};/* *	Add the given file descriptor to the gmainloop world. */GFDSource*G_main_add_fd(int priority, int fd, gboolean can_recurse,	gboolean (*dispatch)(int fd, gpointer user_data),	gpointer userdata,	GDestroyNotify notify){	GFDSource*	ret = g_new(GFDSource, 1);	memset(ret, 0, sizeof(*ret));	ret->magno = MAG_GFDSOURCE;	ret->udata = userdata;	ret->dispatch = dispatch;	ret->gpfd.fd = fd;	ret->gpfd.events = DEF_EVENTS;	ret->gpfd.revents = 0;	ret->dnotify = notify;	g_main_add_poll(&ret->gpfd, priority);	ret->gsourceid = g_source_add(priority, can_recurse	,	&G_fd_SourceFuncs	,	ret, ret, NULL);	if (ret->gsourceid == 0) {		g_main_remove_poll(&ret->gpfd);		memset(ret, 0, sizeof(*ret));		g_free(ret);		ret = NULL;	}	return ret;}gbooleanG_main_del_fd(GFDSource* fdp){	return g_source_remove(fdp->gsourceid);}voidg_main_output_is_blocked(GFDSource* fdp){	fdp->gpfd.events |= OUTPUT_EVENTS;}/* *	For pure file descriptor events, return FALSE because we *	have to poll to get events. * *	Note that we don't modify 'timeout' either. */static gbooleanG_fd_prepare(gpointer source_data,       GTimeVal* current_time,       gint* timeout, gpointer user_data){	GFDSource*	fdp = source_data;	g_assert(IS_FDSOURCE(fdp));	return FALSE;}/* *	Did we notice any I/O events? */static gbooleanG_fd_check(gpointer source_data,       GTimeVal* current_time,       gpointer user_data){	GFDSource*	fdp = source_data;	g_assert(IS_FDSOURCE(fdp));	return fdp->gpfd.revents != 0;}/* *	Some kind of event occurred - notify the user. */static gbooleanG_fd_dispatch(gpointer source_data,       GTimeVal* current_time,       gpointer user_data){	GFDSource*	fdp = source_data;	g_assert(IS_FDSOURCE(fdp));	/* Is output now unblocked? 	 *	 * If so, turn off OUTPUT_EVENTS to avoid going into	 * a tight poll(2) loop.	 */	if (fdp->gpfd.revents & OUTPUT_EVENTS) {		fdp->gpfd.events &= ~OUTPUT_EVENTS;	}	if(fdp->dispatch) {		return fdp->dispatch(fdp->gpfd.fd, fdp->udata);	}	return TRUE;}/* *	Free up our data, and notify the user process... */static voidG_fd_destroy(gpointer user_data){	GFDSource*	fdp = user_data;	g_assert(IS_FDSOURCE(fdp));	if (fdp->dnotify) {		fdp->dnotify(fdp->udata);	}	g_main_remove_poll(&fdp->gpfd);	g_source_remove(fdp->gsourceid);	memset(fdp, 0, sizeof(*fdp));	g_free(fdp);	fdp = NULL;}/************************************************************ *		Functions for IPC_Channels ***********************************************************/static gboolean G_CH_prepare(gpointer source_data,       GTimeVal* current_time,       gint* timeout, gpointer user_data);static gboolean G_CH_check(gpointer source_data,       GTimeVal* current_time,       gpointer user_data);static gboolean G_CH_dispatch(gpointer source_data,       GTimeVal* current_time,       gpointer user_data);static void G_CH_destroy(gpointer user_data);static GSourceFuncs G_CH_SourceFuncs = {	G_CH_prepare,	G_CH_check,	G_CH_dispatch,	G_CH_destroy,};/* *	Add an IPC_channel to the gmainloop world... */GCHSource*G_main_add_IPC_Channel(int priority, IPC_Channel* ch,	gboolean can_recurse,	gboolean (*dispatch)(IPC_Channel* source_data,		gpointer        user_data),	gpointer userdata,	GDestroyNotify notify){	GCHSource*	ret = g_new(GCHSource, 1);	int		rfd, wfd;	memset(ret, 0, sizeof(*ret));	ret->magno = MAG_GCHSOURCE;	ret->udata = userdata;	ret->ch = ch;	ret->dispatch = dispatch;	ret->dnotify = notify;	rfd = ch->ops->get_recv_select_fd(ch);	wfd = ch->ops->get_send_select_fd(ch);	ret->fd_fdx = (rfd == wfd);	ret->infd.fd      = rfd;	ret->infd.events  = DEF_EVENTS;	g_main_add_poll(&ret->infd, priority);	if (!ret->fd_fdx) {		ret->outfd.fd      = wfd;		ret->outfd.events  = DEF_EVENTS;		g_main_add_poll(&ret->outfd, priority);	}	ret->gsourceid = g_source_add(priority, can_recurse	,	&G_CH_SourceFuncs	,	ret, ret, NULL);	if (ret->gsourceid == 0) {		g_main_remove_poll(&ret->infd);		if (!ret->fd_fdx) {			g_main_remove_poll(&ret->outfd);		}		memset(ret, 0, sizeof(*ret));		g_free(ret);		ret = NULL;	}	return ret;}/* *	Delete an IPC_channel from the gmainloop world... */gboolean G_main_del_IPC_Channel(GCHSource* fdp){	gboolean	rc;	if (fdp->gsourceid <= 0) {		cl_log(LOG_CRIT, "Bad gsource in G_main_del_IPC_channel");		return FALSE;	}	rc = g_source_remove(fdp->gsourceid);	fdp->gsourceid = 0;	return rc;}/* *	For  IPC_CHANNEL events, enable output checking when needed *	and note when unread input is already queued. * *	Note that we don't modify 'timeout' either. */static gbooleanG_CH_prepare(gpointer source_data,       GTimeVal* current_time,       gint* timeout, gpointer user_data){	GCHSource*	chp = source_data;	g_assert(IS_CHSOURCE(chp));	if (chp->ch->ops->is_sending_blocked(chp->ch)) {		if (chp->fd_fdx) {			chp->infd.events |= OUTPUT_EVENTS;		}else{			chp->outfd.events |= OUTPUT_EVENTS;		}	}	return chp->ch->ops->is_message_pending(chp->ch);}/* *	Did we notice any I/O events? */static gbooleanG_CH_check(gpointer source_data,       GTimeVal* current_time,       gpointer user_data){	GCHSource*	chp = source_data;	g_assert(IS_CHSOURCE(chp));	return (chp->infd.revents != 0	||	(!chp->fd_fdx && chp->outfd.revents != 0)	||	chp->ch->ops->is_message_pending(chp->ch));}/* *	Some kind of event occurred - notify the user. */static gbooleanG_CH_dispatch(gpointer source_data,       GTimeVal* current_time,       gpointer user_data){	GCHSource*	chp = source_data;	g_assert(IS_CHSOURCE(chp));	/* Is output now unblocked? 	 *	 * If so, turn off OUTPUT_EVENTS to avoid going into	 * a tight poll(2) loop.	 */	if (chp->fd_fdx) {		if (chp->infd.revents & OUTPUT_EVENTS) {			chp->infd.events &= ~OUTPUT_EVENTS;		}	}else if (chp->outfd.revents & OUTPUT_EVENTS) {		chp->outfd.events &= ~OUTPUT_EVENTS;	}#if 0	/* If we got a HUP then mark channel as disconnected */	if ((chp->infd.revents|chp->outfd.revents) & G_IO_HUP) {		/* CHEAT!! */		chp->ch->ch_status = IPC_DISCONNECT;	}else{		chp->ch->ops->resume_io(chp->ch);	}#else	chp->ch->ops->resume_io(chp->ch);#endif	if(chp->dispatch) {		return chp->dispatch(chp->ch, chp->udata);	}	return TRUE;}/* *	Free up our data, and notify the user process... */static voidG_CH_destroy(gpointer user_data){	/* This was the source_data parameter passed to g_source_add */	/* It is a GCHSource* object */	GCHSource*	chp = user_data;	g_assert(IS_CHSOURCE(chp));	g_main_remove_poll(&chp->infd);	if (!chp->fd_fdx) {		g_main_remove_poll(&chp->outfd);	}	if (chp->dnotify) {		chp->dnotify(chp->udata);	}	g_source_remove(chp->gsourceid);	chp->ch->ops->destroy(chp->ch);	memset(chp, 0, sizeof(*chp));	g_free(chp);}/************************************************************ *		Functions for IPC_WaitConnections ***********************************************************/static gboolean G_WC_prepare(gpointer source_data,       GTimeVal* current_time,       gint* timeout, gpointer user_data);static gboolean G_WC_check(gpointer source_data,       GTimeVal* current_time,       gpointer user_data);static gboolean G_WC_dispatch(gpointer source_data,       GTimeVal* current_time,       gpointer user_data);static void G_WC_destroy(gpointer user_data);static GSourceFuncs G_WC_SourceFuncs = {	G_WC_prepare,	G_WC_check,	G_WC_dispatch,	G_WC_destroy,};/* *	Add an IPC_WaitConnection to the gmainloop world... */GWCSource*G_main_add_IPC_WaitConnection(int priority,	IPC_WaitConnection* wch,	IPC_Auth* auth_info,	gboolean can_recurse,	gboolean (*dispatch)(IPC_Channel* wch,		gpointer        user_data),	gpointer userdata,	GDestroyNotify notify){	GWCSource*	ret = g_new(GWCSource, 1);	memset(ret, 0, sizeof(*ret));	ret->magno = MAG_GWCSOURCE;	ret->udata = userdata;	ret->gpfd.fd = wch->ops->get_select_fd(wch);	ret->gpfd.events = DEF_EVENTS;	ret->gpfd.revents = 0;	ret->wch = wch;	ret->dnotify = notify;	ret->auth_info = auth_info;	ret->dispatch = dispatch;	g_main_add_poll(&ret->gpfd, priority);	ret->gsourceid = g_source_add(priority, can_recurse	,	&G_WC_SourceFuncs	,	ret, ret, NULL);	if (ret->gsourceid == 0) {		g_main_remove_poll(&ret->gpfd);		memset(ret, 0, sizeof(*ret));		g_free(ret);		ret = NULL;	}	return ret;}/* Delete the given IPC_WaitConnection from the gmainloop world */gboolean G_main_del_IPC_WaitConnection(GWCSource* wcp){	return g_source_remove(wcp->gsourceid);}/* *	For IPC_WaitConnection events, return FALSE because we *	have to poll to get events. * *	We don't modify 'timeout' either. */static gbooleanG_WC_prepare(gpointer source_data,       GTimeVal* current_time,       gint* timeout, gpointer user_data){	GWCSource*	wcp = source_data;	g_assert(IS_WCSOURCE(wcp));	return FALSE;}/* *	Did we notice any I/O (connection pending) events? */static gbooleanG_WC_check(gpointer source_data,       GTimeVal* current_time,       gpointer user_data){	GWCSource*	wcp = source_data;	g_assert(IS_WCSOURCE(wcp));	return wcp->gpfd.revents != 0;}/* *	Someone is trying to connect. *	Try to accept the connection and notify the user. */static gbooleanG_WC_dispatch(gpointer source_data,       GTimeVal* current_time,       gpointer user_data){	GWCSource*	wcp = source_data;	IPC_Channel*	ch;	gboolean	rc = TRUE;	int		count = 0;	g_assert(IS_WCSOURCE(wcp));               while(1) {		ch = wcp->wch->ops->accept_connection(wcp->wch, wcp->auth_info);		if (ch == NULL) {			break;	  	}		++count;		if(!wcp->dispatch) {			continue;		}		rc = wcp->dispatch(ch, wcp->udata);		if(!rc) {			break;		}	}	return rc;}/* *	Free up our data, and notify the user process... */static voidG_WC_destroy(gpointer user_data){	GWCSource*	wcp = user_data;	g_assert(IS_WCSOURCE(wcp));	g_main_remove_poll(&wcp->gpfd);	g_source_remove(wcp->gsourceid);	wcp->wch->ops->destroy(wcp->wch);	if (wcp->dnotify) {		wcp->dnotify(wcp->udata);	}	memset(wcp, 0, sizeof(*wcp));	g_free(wcp);	wcp = NULL;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -