⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 storage_sync.c

📁 文件系统源代码!!!!! 文件系统源代码
💻 C
📖 第 1 页 / 共 3 页
字号:
			"write to binlog file \"%s\" fail, " \			"errno: %d, error info: %s",  \			__LINE__, get_writable_binlog_filename(NULL), \			errno, strerror(errno));		write_ret = errno != 0 ? errno : EIO;	}	else if (fsync(g_binlog_fd) != 0)	{		logError("file: "__FILE__", line: %d, " \			"sync to binlog file \"%s\" fail, " \			"errno: %d, error info: %s",  \			__LINE__, get_writable_binlog_filename(NULL), \			errno, strerror(errno));		write_ret = errno != 0 ? errno : EIO;	}	else	{		binlog_file_size += binlog_write_cache_len;		if (binlog_file_size >= SYNC_BINLOG_FILE_MAX_SIZE)		{			g_binlog_index++;			if ((write_ret=write_to_binlog_index()) == 0)			{				write_ret = open_next_writable_binlog();			}			binlog_file_size = 0;			if (write_ret != 0)			{				g_continue_flag = false;				logCrit("file: "__FILE__", line: %d, " \					"open binlog file \"%s\" fail, " \					"program exit!", \					__LINE__, \					get_writable_binlog_filename(NULL));			}		}		else		{			write_ret = 0;		}	}	binlog_write_cache_len = 0;  //reset cache buff	if (bNeedLock && (result=pthread_mutex_unlock(&sync_thread_lock)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"call pthread_mutex_unlock fail, " \			"errno: %d, error info: %s", \			__LINE__, result, strerror(result));	}	return write_ret;}int storage_binlog_write(const int timestamp, const char op_type, \		const char *filename){	int result;	int write_ret;	if ((result=pthread_mutex_lock(&sync_thread_lock)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"call pthread_mutex_lock fail, " \			"errno: %d, error info: %s", \			__LINE__, result, strerror(result));	}	binlog_write_cache_len += sprintf(binlog_write_cache_buff + \				binlog_write_cache_len, "%d %c %s\n", \				timestamp, op_type, filename);	//check if buff full	if (sizeof(binlog_write_cache_buff) - binlog_write_cache_len < 128)	{		write_ret = storage_binlog_fsync(false);  //sync to disk	}	else	{		write_ret = 0;	}	if ((result=pthread_mutex_unlock(&sync_thread_lock)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"call pthread_mutex_unlock fail, " \			"errno: %d, error info: %s", \			__LINE__, result, strerror(result));	}	return write_ret;}static char *get_binlog_readable_filename(BinLogReader *pReader, \		char *full_filename){	static char buff[MAX_PATH_SIZE];	if (full_filename == NULL)	{		full_filename = buff;	}	snprintf(full_filename, MAX_PATH_SIZE, \			"%s/data/"SYNC_DIR_NAME"/"SYNC_BINLOG_FILE_PREFIX"" \			SYNC_BINLOG_FILE_EXT_FMT, \			g_base_path, pReader->binlog_index);	return full_filename;}static int storage_open_readable_binlog(BinLogReader *pReader){	char full_filename[MAX_PATH_SIZE];	if (pReader->binlog_fd >= 0)	{		close(pReader->binlog_fd);	}	get_binlog_readable_filename(pReader, full_filename);	pReader->binlog_fd = open(full_filename, O_RDONLY);	if (pReader->binlog_fd < 0)	{		logError("file: "__FILE__", line: %d, " \			"open binlog file \"%s\" fail, " \			"errno: %d, error info: %s", \			__LINE__, full_filename, \			errno, strerror(errno));		return errno != 0 ? errno : ENOENT;	}	if (pReader->binlog_offset > 0 && \	    lseek(pReader->binlog_fd, pReader->binlog_offset, SEEK_SET) < 0)	{		logError("file: "__FILE__", line: %d, " \			"seek binlog file \"%s\" fail, file offset="INT64_PRINTF_FORMAT", " \			"errno: %d, error info: %s", \			__LINE__, full_filename, pReader->binlog_offset, \			errno, strerror(errno));		close(pReader->binlog_fd);		pReader->binlog_fd = -1;		return errno != 0 ? errno : ESPIPE;	}	return 0;}static char *get_mark_filename(const void *pArg, \			char *full_filename){	const BinLogReader *pReader;	static char buff[MAX_PATH_SIZE];	pReader = (const BinLogReader *)pArg;	if (full_filename == NULL)	{		full_filename = buff;	}	snprintf(full_filename, MAX_PATH_SIZE, \			"%s/data/"SYNC_DIR_NAME"/%s_%d%s", g_base_path, \			pReader->ip_addr, g_server_port, SYNC_MARK_FILE_EXT);	return full_filename;}static int storage_report_storage_status(const char *ip_addr, \			const char status){	FDFSStorageBrief briefServers[1];	TrackerServerInfo trackerServer;	TrackerServerInfo *pGlobalServer;	TrackerServerInfo *pTServer;	TrackerServerInfo *pTServerEnd;	int result;	int i;	strcpy(briefServers[0].ip_addr, ip_addr);	briefServers[0].status = status;	result = 0;	pTServer = &trackerServer;	pTServerEnd = g_tracker_servers + g_tracker_server_count;	for (pGlobalServer=g_tracker_servers; pGlobalServer < pTServerEnd; \			pGlobalServer++)	{		memcpy(pTServer, pGlobalServer, sizeof(TrackerServerInfo));		for (i=0; i < 3; i++)		{			pTServer->sock = socket(AF_INET, SOCK_STREAM, 0);			if(pTServer->sock < 0)			{				result = errno != 0 ? errno : EPERM;				logError("file: "__FILE__", line: %d, " \					"socket create failed, errno: %d, " \					"error info: %s.", \					__LINE__, result, strerror(result));				break;			}			if ((result=connectserverbyip(pTServer->sock, \				pTServer->ip_addr, pTServer->port)) == 0)			{				break;			}			close(pTServer->sock);			pTServer->sock = -1;			sleep(5);		}		if (pTServer->sock < 0)		{			logError("file: "__FILE__", line: %d, " \				"connect to tracker server %s:%d fail, " \				"errno: %d, error info: %s", \				__LINE__, pTServer->ip_addr, pTServer->port, \				result, strerror(result));			continue;		}		if (tracker_report_join(pTServer) != 0)		{			close(pTServer->sock);			continue;		}		if ((result=tracker_sync_diff_servers(pTServer, \			briefServers, 1)) != 0)		{		}		fdfs_quit(pTServer);		close(pTServer->sock);	}	return 0;}static int storage_reader_sync_init_req(BinLogReader *pReader){	TrackerServerInfo *pTrackerServers;	TrackerServerInfo *pTServer;	TrackerServerInfo *pTServerEnd;	char tracker_client_ip[IP_ADDRESS_SIZE];	int result;	int conn_ret;	pTrackerServers = (TrackerServerInfo *)malloc( \		sizeof(TrackerServerInfo) * g_tracker_server_count);	if (pTrackerServers == NULL)	{		logError("file: "__FILE__", line: %d, " \			"malloc %d bytes fail", __LINE__, \			sizeof(TrackerServerInfo) * g_tracker_server_count);		return errno != 0 ? errno : ENOMEM;	}	memcpy(pTrackerServers, g_tracker_servers, \		sizeof(TrackerServerInfo) * g_tracker_server_count);	pTServerEnd = pTrackerServers + g_tracker_server_count;	for (pTServer=pTrackerServers; pTServer<pTServerEnd; pTServer++)	{		pTServer->sock = -1;	}	result = EINTR;	pTServer = pTrackerServers;	while (1)	{		while (g_continue_flag)		{			pTServer->sock = socket(AF_INET, SOCK_STREAM, 0);			if(pTServer->sock < 0)			{				logCrit("file: "__FILE__", line: %d, " \					"socket create failed, errno: %d, " \					"error info: %s. program exit!", \					__LINE__, errno, strerror(errno));				g_continue_flag = false;				result = errno != 0 ? errno : EPERM;				break;			}			if ((conn_ret=connectserverbyip(pTServer->sock, \				pTServer->ip_addr, pTServer->port)) == 0)			{				break;			}			logError("file: "__FILE__", line: %d, " \				"connect to tracker server %s:%d fail, " \				"errno: %d, error info: %s", \				__LINE__, pTServer->ip_addr, pTServer->port, \				conn_ret, strerror(conn_ret));			close(pTServer->sock);			pTServer++;			if (pTServer >= pTServerEnd)			{				pTServer = pTrackerServers;			}			sleep(g_heart_beat_interval);		}		if (!g_continue_flag)		{			break;		}		getSockIpaddr(pTServer->sock, \				tracker_client_ip, IP_ADDRESS_SIZE);		insert_into_local_host_ip(tracker_client_ip);		/*		//printf("file: "__FILE__", line: %d, " \			"tracker_client_ip: %s\n", \			__LINE__, tracker_client_ip);		//print_local_host_ip_addrs();		*/		if (tracker_report_join(pTServer) != 0)		{			close(pTServer->sock);			sleep(g_heart_beat_interval);			continue;		}		if ((result=tracker_sync_src_req(pTServer, pReader)) != 0)		{			fdfs_quit(pTServer);			close(pTServer->sock);			sleep(g_heart_beat_interval);			continue;		}		fdfs_quit(pTServer);		close(pTServer->sock);		break;	}	/*	//printf("need_sync_old=%d, until_timestamp=%d\n", \		pReader->need_sync_old, pReader->until_timestamp);	*/	return result;}static int storage_reader_init(FDFSStorageBrief *pStorage, \			BinLogReader *pReader){	char full_filename[MAX_PATH_SIZE];	IniItemInfo *items;	int nItemCount;	int result;	bool bFileExist;	memset(pReader, 0, sizeof(BinLogReader));	pReader->mark_fd = -1;	pReader->binlog_fd = -1;	strcpy(pReader->ip_addr, pStorage->ip_addr);	get_mark_filename(pReader, full_filename);	bFileExist = fileExists(full_filename);	if (bFileExist)	{		if ((result=iniLoadItems(full_filename, &items, &nItemCount)) \			 != 0)		{			logError("file: "__FILE__", line: %d, " \				"load from mark file \"%s\" fail, " \				"error code: %d", \				__LINE__, full_filename, result);			return result;		}		if (nItemCount < 7)		{			iniFreeItems(items);			logError("file: "__FILE__", line: %d, " \				"in mark file \"%s\", item count: %d < 7", \				__LINE__, full_filename, nItemCount);			return ENOENT;		}		pReader->binlog_index = iniGetIntValue( \				MARK_ITEM_BINLOG_FILE_INDEX, \				items, nItemCount, -1);		pReader->binlog_offset = iniGetInt64Value( \				MARK_ITEM_BINLOG_FILE_OFFSET, \				items, nItemCount, -1);		pReader->need_sync_old = iniGetBoolValue(   \				MARK_ITEM_NEED_SYNC_OLD, \				items, nItemCount);		pReader->sync_old_done = iniGetBoolValue(  \				MARK_ITEM_SYNC_OLD_DONE, \				items, nItemCount);		pReader->until_timestamp = iniGetIntValue( \				MARK_ITEM_UNTIL_TIMESTAMP, \				items, nItemCount, -1);		pReader->scan_row_count = iniGetInt64Value( \				MARK_ITEM_SCAN_ROW_COUNT, \				items, nItemCount, 0);		pReader->sync_row_count = iniGetInt64Value( \				MARK_ITEM_SYNC_ROW_COUNT, \				items, nItemCount, 0);		if (pReader->binlog_index < 0)		{			iniFreeItems(items);			logError("file: "__FILE__", line: %d, " \				"in mark file \"%s\", " \				"binlog_index: %d < 0", \				__LINE__, full_filename, \				pReader->binlog_index);			return EINVAL;		}		if (pReader->binlog_offset < 0)		{			iniFreeItems(items);			logError("file: "__FILE__", line: %d, " \				"in mark file \"%s\", " \				"binlog_offset: "INT64_PRINTF_FORMAT" < 0", \				__LINE__, full_filename, \				pReader->binlog_offset);			return EINVAL;		}		iniFreeItems(items);	}	else	{		if ((result=storage_reader_sync_init_req(pReader)) != 0)		{			return result;		}	}	pReader->last_write_row_count = pReader->scan_row_count;	pReader->mark_fd = open(full_filename, O_WRONLY | O_CREAT, 0644);	if (pReader->mark_fd < 0)	{		logError("file: "__FILE__", line: %d, " \			"open mark file \"%s\" fail, " \			"error no: %d, error info: %s", \			__LINE__, full_filename, \			errno, strerror(errno));		return errno != 0 ? errno : ENOENT;	}	if ((result=storage_open_readable_binlog(pReader)) != 0)	{		close(pReader->mark_fd);		pReader->mark_fd = -1;		return result;	}	if (!bFileExist)	{        	if (!pReader->need_sync_old && pReader->until_timestamp > 0)		{			if ((result=storage_binlog_reader_skip(pReader)) != 0)			{				storage_reader_destroy(pReader);				return result;			}		}		if ((result=storage_write_to_mark_file(pReader)) != 0)		{			storage_reader_destroy(pReader);			return result;		}	}	return 0;}static void storage_reader_destroy(BinLogReader *pReader){	if (pReader->mark_fd >= 0)	{		close(pReader->mark_fd);		pReader->mark_fd = -1;	}	if (pReader->binlog_fd >= 0)	{		close(pReader->binlog_fd);		pReader->binlog_fd = -1;	}}static int storage_write_to_mark_file(BinLogReader *pReader){	char buff[256];	int len;	int result;	len = sprintf(buff, 		"%s=%d\n"  \		"%s="INT64_PRINTF_FORMAT"\n"  \		"%s=%d\n"  \		"%s=%d\n"  \		"%s=%d\n"  \		"%s="INT64_PRINTF_FORMAT"\n"  \		"%s="INT64_PRINTF_FORMAT"\n", \		MARK_ITEM_BINLOG_FILE_INDEX, pReader->binlog_index, \		MARK_ITEM_BINLOG_FILE_OFFSET, pReader->binlog_offset, \		MARK_ITEM_NEED_SYNC_OLD, pReader->need_sync_old, \		MARK_ITEM_SYNC_OLD_DONE, pReader->sync_old_done, \		MARK_ITEM_UNTIL_TIMESTAMP, (int)pReader->until_timestamp, \		MARK_ITEM_SCAN_ROW_COUNT, pReader->scan_row_count, \		MARK_ITEM_SYNC_ROW_COUNT, pReader->sync_row_count);	if ((result=storage_write_to_fd(pReader->mark_fd, get_mark_filename, \		pReader, buff, len)) == 0)	{		pReader->last_write_row_count = pReader->scan_row_count;	}	return result;}static int rewind_to_prev_rec_end(BinLogReader *pReader, \			const int record_length){	if (lseek(pReader->binlog_fd, -1 * record_length, \			SEEK_CUR) < 0)	{		logError("file: "__FILE__", line: %d, " \			"seek binlog file \"%s\"fail, " \			"file offset: "INT64_PRINTF_FORMAT", " \			"errno: %d, error info: %s", \			__LINE__, get_binlog_readable_filename(pReader, NULL), \			pReader->binlog_offset, \			errno, strerror(errno));		return errno != 0 ? errno : ENOENT;	}	return 0;}static int storage_binlog_read(BinLogReader *pReader, \			BinLogRecord *pRecord, int *record_length){	char line[256];	char *cols[3];	int result;	while (1)	{		if ((*record_length=fd_gets(pReader->binlog_fd, line, \			sizeof(line), 49 + FDFS_FILE_EXT_NAME_MAX_LEN)) < 0)		{			logError("file: "__FILE__", line: %d, " \				"read a line from binlog file \"%s\" fail, " \				"file offset: "INT64_PRINTF_FORMAT", " \				"error no: %d, error info: %s", \				__LINE__, \				get_binlog_readable_filename(pReader, NULL), \				pReader->binlog_offset, \				errno, strerror(errno));			return errno != 0 ? errno : ENOENT;		}		if (*record_length == 0)		{			if (pReader->binlog_index < g_binlog_index) //rotate			{				pReader->binlog_index++;				pReader->binlog_offset = 0;				if ((result=storage_open_readable_binlog( \						pReader)) != 0)				{					return result;				}				if ((result=storage_write_to_mark_file( \						pReader)) != 0)				{					return result;				}				continue;  //read next binlog			}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -