⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 checkpointd.c

📁 在LINUX下实现HA的源代码
💻 C
字号:
/* $Id: checkpointd.c,v 1.11.2.6 2004/11/18 06:54:23 yixiong Exp $ *//*  * checkpointd.c: data checkpoint service * * Copyright (C) 2003 Deng Pan <deng.pan@intel.com> *  * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. *  * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * General Public License for more details. *  * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#ifdef HAVE_CONFIG_H#include "config.h"#endif#ifdef HAVE_GETOPT_H#include <getopt.h>#endif#include <stdio.h>#include <stdlib.h>#ifdef HAVE_UNISTD_H#include <unistd.h>#endif#include <errno.h>#include <string.h>#include <sys/types.h>#include <sys/stat.h>#include <glib.h>#include <clplumbing/cl_log.h>#include <clplumbing/cl_signal.h>#include <clplumbing/ipc.h>#include <clplumbing/Gmain_timeout.h>#include <hb_api_core.h>#include <hb_api.h>#include <ha_msg.h>#include <heartbeat.h>#include <saf/ais.h>#include <checkpointd/clientrequest.h>#include "checkpointd.h"#include "client.h"#include "replica.h"#include "message.h"#include "request.h"#include "response.h"#include "operation.h"#include "utils.h"#ifdef USE_DMALLOC#include <dmalloc.h>#endifSaCkptServiceT	*saCkptService = NULL;GMainLoop	*mainloop = NULL;static void usage(void);static void SaCkptCheckpointdInit(void);static gboolean SaCkptHbInputDispatch(IPC_Channel*, gpointer);static void SaCkptHbInputDestroy(gpointer);static gboolean SaCkptClientChannelDispatch(IPC_Channel*, gpointer);static void SaCkptClientChannelDestroy(gpointer);static gboolean SaCkptWaitChannelDispatch(IPC_Channel *, gpointer);static void SaCkptWaitChannelDestroy(gpointer);static IPC_WaitConnection* SaCkptWaitChannelInit(char*);static int NodeHBStatusArray[] = {	HB_INIT,	HB_UP,	HB_ACTIVE,	HB_DEAD,	HB_UNKNOWN };const char * HeartbeatStatusArray[]={	INITSTATUS,	UPSTATUS,	ACTIVESTATUS,	DEADSTATUS,	""};static voidSaCkptDmalloc(int signum) {	cl_log(LOG_INFO, "Receive signal SIGINT");		g_main_quit(mainloop);}voidSaCkptCheckpointdInit(void){	ll_cluster_t*	hb = NULL;	const char* strNode = NULL;	/* set log options */	cl_log_set_entity("CKPT");	if (saCkptService->flagDaemon) {		cl_log_enable_stderr(FALSE);	} else {		cl_log_enable_stderr(TRUE);	}/*	cl_log_set_facility(LOG_LOCAL1); */	cl_log(LOG_INFO, "=== Start checkpointd ===");		hb = ll_cluster_new("heartbeat");	/* sign on with heartbeat */	if (hb->llc_ops->signon(hb, "checkpointd") != HA_OK) {		cl_log(LOG_ERR, "Cannot sign on with heartbeat\n");		cl_log(LOG_ERR, "REASON: %s\n", 			hb->llc_ops->errmsg(hb));		exit(1);	}	cl_log(LOG_INFO, "Sign on with heartbeat");	strNode = hb->llc_ops->get_mynodeid(hb);	cl_log(LOG_INFO, "Node id: %s", strNode);		CL_SIGNAL(SIGINT, SaCkptDmalloc);	CL_SIGNAL(SIGTERM, SaCkptDmalloc);	saCkptService->heartbeat = hb;	strcpy(saCkptService->nodeName, strNode);	saCkptService->clientHash = 		g_hash_table_new(g_int_hash, g_int_equal);	saCkptService->replicaHash = 		g_hash_table_new(g_str_hash, g_str_equal);	saCkptService->openCheckpointHash = 		g_hash_table_new(g_int_hash, g_int_equal);	saCkptService->unlinkedCheckpointHash = 		g_hash_table_new(g_str_hash, g_str_equal);	saCkptService->openRequestHash =		g_hash_table_new(g_str_hash, g_str_equal);	saCkptService->nodeStatusHash = 		g_hash_table_new(g_str_hash, g_str_equal);		checkpointNodeStatusInit();		saCkptService->nextClientHandle = 0;	saCkptService->nextCheckpointHandle = 0;		saCkptService->version.major = saCkptMajorVersion;	saCkptService->version.minor = saCkptMinorVersion;	saCkptService->version.releaseCode = 'A';	return;}voidusage(){	printf("checkpointd - data checkpoint service daemon\n");	printf("Usage: checkpointd [options...]\n");	printf("Options:\n");	printf("\t--help, -h, -?\t\tshow this help\n");	printf("\t--daemon, -d\trun in daemon mode\n");	printf("\t--verbose, -v\trun in verbose mode\n");}static gbooleanSaCkptHbInputDispatch(IPC_Channel* chan, gpointer user_data){	SaCkptClusterMsgProcess();	return TRUE;}static voidSaCkptHbInputDestroy(gpointer user_data){	return;}static gbooleanSaCkptClientChannelDispatch(IPC_Channel *clientChannel, gpointer user_data){	 return SaCkptRequestProcess(clientChannel);}static voidSaCkptClientChannelDestroy(gpointer user_data){	cl_log(LOG_INFO, "Client disconnected");	/* FIXME: delete client and remove all its requests and timers */	return;}static gbooleanSaCkptWaitChannelDispatch(IPC_Channel *newclient, gpointer user_data){	G_main_add_IPC_Channel(G_PRIORITY_LOW, newclient, FALSE, 				SaCkptClientChannelDispatch, newclient, 				SaCkptClientChannelDestroy);	return TRUE;}static voidSaCkptWaitChannelDestroy(gpointer user_data){	IPC_WaitConnection *waitChannel = 			(IPC_WaitConnection *)user_data;	waitChannel->ops->destroy(waitChannel);	return;}static IPC_WaitConnection *SaCkptWaitChannelInit(char* pathname){	IPC_WaitConnection *waitConnection = NULL;	mode_t mask;	char path[] = IPC_PATH_ATTR;	char domainsocket[] = IPC_DOMAIN_SOCKET;	GHashTable *attrs = g_hash_table_new(g_str_hash,g_str_equal);	g_hash_table_insert(attrs, path, pathname);	mask = umask(0);	waitConnection = ipc_wait_conn_constructor(domainsocket, attrs);	if (waitConnection == NULL){		cl_perror("Can't create wait connection");		exit(1);	}	mask = umask(mask);	g_hash_table_destroy(attrs);	return waitConnection;}#if 0/* begin :added by steve*/static gboolean SaCkptClientDebugDispatch(IPC_Channel*, gpointer);static void SaCkptClientDebugDestroy(gpointer);static gbooleanSaCkptClientDebugDispatch(IPC_Channel *clientChannel, gpointer user_data){	return SaCkptDebugProcess(clientChannel);}static voidSaCkptClientDebugDestroy(gpointer user_data){	cl_log(LOG_INFO, "clntDebug_input_destroy:received HUP");	return;}static gboolean SaCkptWaitDebugdispatch(IPC_Channel *, gpointer);static void SaCkptWaitDebugDestroy(gpointer);static gbooleanSaCkptWaitDebugDispatch(IPC_Channel *newclient, gpointer user_data){/*	client_add(newclient); */	G_main_add_IPC_Channel(G_PRIORITY_LOW, newclient, FALSE, 				SaCkptClientDebugDispatch, newclient, 				SaCkptClientDebuugDestroy);	return TRUE;}static voidSaWaitDebugDestroy(gpointer user_data){	IPC_WaitConnection *wait_debug = 			(IPC_WaitConnection *)user_data;	wait_debug->ops->destroy(wait_debug);	return;}/* end: added by steve */#endif#define OPTARGS "?dvh:"intmain(int argc, char ** argv){	IPC_WaitConnection 	*waitConnection = NULL;	ll_cluster_t		*hb = NULL;	char			pathname[64] = {0};	IPC_Channel*		chan;	int c;#ifdef HAVE_GETOPT_H	static struct option long_options[] = {		{"daemon", 0, 0, 'd'},		{"verbose", 0, 0, 'v'},		{"help", 0, 0, 'h'},		{0, 0, 0, 0}	};#endif	saCkptService = (SaCkptServiceT*)SaCkptMalloc(sizeof(SaCkptServiceT));	if (saCkptService == NULL) {		printf("Cannot allocat memory\n");	}	/* get options */	while (1) {#ifdef HAVE_GETOPT_H		c = getopt_long(argc, argv, OPTARGS, long_options, NULL);#else		c = getopt(argc, argv, OPTARGS);#endif		if (c == -1) break;		switch (c) {		case 'd': 			saCkptService->flagDaemon= 1;			break;		case 'v': 			saCkptService->flagVerbose = 1;			break;		case 'h': 		case '?': 		default :			usage();			exit(0);		}	}		if (saCkptService->flagDaemon) {		daemon(0, 0);	} 	SaCkptCheckpointdInit();	mainloop = g_main_new(TRUE);	/* heartbeat message process */	hb = saCkptService->heartbeat;	chan = hb->llc_ops->ipcchan(hb);	G_main_add_IPC_Channel(G_PRIORITY_HIGH, 		chan, 		FALSE, 		SaCkptHbInputDispatch, 		NULL, 		SaCkptHbInputDestroy);		serviceBeginNotify();	/* 	 * the clients wait channel is the other source of events.	 * This source delivers the clients connection events.	 * listen to this source at a relatively lower priority.	 */	memset (pathname, 0, sizeof(pathname));	strcpy (pathname, CKPTIPC) ;	waitConnection = SaCkptWaitChannelInit(pathname);	G_main_add_IPC_WaitConnection(G_PRIORITY_LOW, 		waitConnection, 		NULL,		FALSE, 		SaCkptWaitChannelDispatch, 		waitConnection,		SaCkptWaitChannelDestroy);	g_main_run(mainloop);	g_main_destroy(mainloop);	/*	 * FIXME: 	 * free all its elements before destory the hash table	 */	g_hash_table_destroy(saCkptService->replicaHash);	g_hash_table_destroy(saCkptService->clientHash);	g_hash_table_destroy(saCkptService->openCheckpointHash);	g_hash_table_destroy(saCkptService->unlinkedCheckpointHash);	g_hash_table_destroy(saCkptService->openRequestHash);	g_hash_table_destroy(saCkptService->nodeStatusHash);	SaCkptFree((void*)&saCkptService);#ifdef USE_DMALLOC	dmalloc_shutdown();#endif	cl_log(LOG_INFO, "=== Checkpointd exited ===");	return 0;}gintcheckpointNodeStatusInit(void){	saCkptNodeInfo				*nodeInfo = NULL;	saCkptNodeHBStatus hbStatus =		HB_UNKNOWN;	saCkptNodeCkptStatus ckptStatus =	CKPT_NOT_INIT;	ll_cluster_t*	hb = NULL;	const char * nodeName = NULL,*status = NULL;		hb = saCkptService->heartbeat;	if(saCkptService->flagVerbose){		cl_log(LOG_INFO,"init checkpoint node status\n");	}	if( hb->llc_ops->init_nodewalk(hb) == HA_OK){		nodeName = (hb->llc_ops->nextnode(hb));		while(nodeName != NULL){			nodeInfo = (saCkptNodeInfo *)ha_malloc(					sizeof(saCkptNodeInfo));						status = hb->llc_ops->node_status(hb,nodeName);						hbStatus = transHbNodeStatus(status);						if(saCkptService->flagVerbose){				cl_log(LOG_INFO,"node : %s, status : %s\n",					nodeName,status);			}						strncpy(nodeInfo->nodeName ,nodeName,SA_MAX_NAME_LENGTH);			nodeInfo->nodeName[SA_MAX_NAME_LENGTH -1] = '\0';			nodeInfo->nodeHbStatus = hbStatus;			nodeInfo->ckptStatus = ckptStatus;			g_hash_table_insert(saCkptService->nodeStatusHash,					(gpointer)nodeInfo->nodeName,					(gpointer)nodeInfo);			nodeName = (hb->llc_ops->nextnode(hb));		}		if(hb->llc_ops->end_nodewalk(hb) != HA_OK){			cl_log(LOG_ERR,"heartbeat end_nodewalk error on checkpointNodeStatusInit\n");			return HA_FAIL;			}else			return HA_OK;	}else{		cl_log(LOG_ERR,			"heartbeat nodewalk error on checkpointNodeStatusInit\n");		return HA_FAIL;	}	}saCkptNodeHBStatustransHbNodeStatus(const char *hbStatus){	int i = 0;	int found = 0;	if(hbStatus == NULL) {		cl_log(LOG_ERR,"NULL hbStatus in transHbNodeStatus\n");		return HB_UNKNOWN;	}	while(strlen( HeartbeatStatusArray[i])){		if(strncmp( hbStatus,HeartbeatStatusArray[i],					strlen( HeartbeatStatusArray[i]))==0){			found =1 ;			break;		}		i++;	}		if(found)		return NodeHBStatusArray[i];	else 		return HB_UNKNOWN;}gintserviceBeginNotify(void){	SaCkptMessageT* ckptMsg = NULL;	ckptMsg = (SaCkptMessageT*)SaCkptMalloc(sizeof(SaCkptMessageT));	if(ckptMsg == NULL){		cl_log(LOG_ERR,"Memory alloc fail on serviceBeginNotify\n");		return HA_FAIL;	}	strcpy(ckptMsg->msgType, T_CKPT);	ckptMsg->msgSubtype = M_CKPT_CREATED;	ckptMsg->msgVersion = saCkptService->version;	ckptMsg->retVal = SA_OK;	strncpy(ckptMsg->fromNodeName, saCkptService->nodeName,SA_MAX_NAME_LENGTH);	SaCkptMessageBroadcast(ckptMsg);	SaCkptFree((void*)&ckptMsg);	return HA_OK;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -