⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 storage_sync.c

📁 文件系统源代码!!!!! 文件系统源代码
💻 C
📖 第 1 页 / 共 3 页
字号:
/*** Copyright (C) 2008 Happy Fish / YuQing** FastDFS may be copied only under the terms of the GNU General* Public License V3, which may be found in the FastDFS source kit.* Please visit the FastDFS Home Page http://www.csource.org/ for more detail.**///storage_sync.c#include <sys/types.h>#include <sys/stat.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <unistd.h>#include <fcntl.h>#include <stdio.h>#include <stdlib.h>#include <string.h>#include <errno.h>#include <time.h>#include <unistd.h>#include "fdfs_define.h"#include "logger.h"#include "fdfs_global.h"#include "sockopt.h"#include "shared_func.h"#include "ini_file_reader.h"#include "tracker_types.h"#include "tracker_proto.h"#include "storage_global.h"#include "storage_func.h"#include "storage_sync.h"#include "tracker_client_thread.h"#define SYNC_BINLOG_FILE_MAX_SIZE	1024 * 1024 * 1024#define SYNC_BINLOG_FILE_PREFIX		"binlog"#define SYNC_BINLOG_INDEX_FILENAME	SYNC_BINLOG_FILE_PREFIX".index"#define SYNC_MARK_FILE_EXT		".mark"#define SYNC_BINLOG_FILE_EXT_FMT	".%03d"#define SYNC_DIR_NAME			"sync"#define MARK_ITEM_BINLOG_FILE_INDEX	"binlog_index"#define MARK_ITEM_BINLOG_FILE_OFFSET	"binlog_offset"#define MARK_ITEM_NEED_SYNC_OLD		"need_sync_old"#define MARK_ITEM_SYNC_OLD_DONE		"sync_old_done"#define MARK_ITEM_UNTIL_TIMESTAMP	"until_timestamp"#define MARK_ITEM_SCAN_ROW_COUNT	"scan_row_count"#define MARK_ITEM_SYNC_ROW_COUNT	"sync_row_count"int g_binlog_fd = -1;int g_binlog_index = 0;static off_t binlog_file_size = 0;int g_storage_sync_thread_count = 0;static pthread_mutex_t sync_thread_lock;static char binlog_write_cache_buff[16 * 1024];static int binlog_write_cache_len = 0;/* save sync thread ids */static pthread_t *sync_tids = NULL;static int storage_write_to_mark_file(BinLogReader *pReader);static int storage_binlog_reader_skip(BinLogReader *pReader);static void storage_reader_destroy(BinLogReader *pReader);static int storage_binlog_fsync(const bool bNeedLock);/**8 bytes: filename bytes8 bytes: file size4 bytes: source op timestampFDFS_GROUP_NAME_MAX_LEN bytes: group_namefilename bytes : filenamefile size bytes: file content**/static int storage_sync_copy_file(TrackerServerInfo *pStorageServer, \			const BinLogRecord *pRecord, const char proto_cmd){	TrackerHeader header;	int result;	int64_t in_bytes;	char *p;	char *pBuff;	char full_filename[MAX_PATH_SIZE];	char out_buff[sizeof(TrackerHeader)+FDFS_GROUP_NAME_MAX_LEN+256];	char in_buff[1];	struct stat stat_buf;	snprintf(full_filename, sizeof(full_filename), \		"%s/data/%s", pRecord->pBasePath, pRecord->true_filename);	if (stat(full_filename, &stat_buf) != 0)	{		if (errno == ENOENT)		{			if(pRecord->op_type==STORAGE_OP_TYPE_SOURCE_CREATE_FILE)			{				logWarning("file: "__FILE__", line: %d, " \					"sync data file, file: %s not exists, "\					"maybe deleted later?", \					__LINE__, full_filename);			}		}		else		{			logError("file: "__FILE__", line: %d, " \				"call stat fail, file: %s, "\				"error no: %d, error info: %s", \				__LINE__, full_filename, \				errno, strerror(errno));		}		return 0;	}	//printf("sync create file: %s\n", pRecord->filename);	while (1)	{		memset(&header, 0, sizeof(header));		long2buff(2 * FDFS_PROTO_PKG_LEN_SIZE + \				4 + FDFS_GROUP_NAME_MAX_LEN + \				pRecord->filename_len + stat_buf.st_size,\				header.pkg_len);		header.cmd = proto_cmd;		memcpy(out_buff, &header, sizeof(TrackerHeader));		p = out_buff + sizeof(TrackerHeader);		long2buff(pRecord->filename_len, p);		p += FDFS_PROTO_PKG_LEN_SIZE;		long2buff(stat_buf.st_size, p);		p += FDFS_PROTO_PKG_LEN_SIZE;		int2buff(pRecord->timestamp, p);		p += 4;		sprintf(p, "%s", pStorageServer->group_name);		p += FDFS_GROUP_NAME_MAX_LEN;		memcpy(p, pRecord->filename, pRecord->filename_len);		p += pRecord->filename_len;		if((result=tcpsenddata(pStorageServer->sock, out_buff, \			p - out_buff, g_network_timeout)) != 0)		{			logError("file: "__FILE__", line: %d, " \				"sync data to storage server %s:%d fail, " \				"errno: %d, error info: %s", \				__LINE__, pStorageServer->ip_addr, \				pStorageServer->port, \				result, strerror(result));			break;		}		if((stat_buf.st_size > 0) && ((result=tcpsendfile( \			pStorageServer->sock, full_filename, \			stat_buf.st_size)) != 0))		{			logError("file: "__FILE__", line: %d, " \				"sync data to storage server %s:%d fail, " \				"errno: %d, error info: %s", \				__LINE__, pStorageServer->ip_addr, \				pStorageServer->port, \				result, strerror(result));			break;		}		pBuff = in_buff;		if ((result=fdfs_recv_response(pStorageServer, \			&pBuff, 0, &in_bytes)) != 0)		{			break;		}		break;	}	//printf("sync create file end!\n");	if (result == EEXIST)	{		if (pRecord->op_type == STORAGE_OP_TYPE_SOURCE_CREATE_FILE)		{			logWarning("file: "__FILE__", line: %d, " \				"storage server ip: %s:%d, data file: %s " \				"already exists, maybe some mistake?", \				__LINE__, pStorageServer->ip_addr, \				pStorageServer->port, pRecord->filename);		}		return 0;	}	else	{		return result;	}}/**send pkg format:4 bytes: source delete timestampFDFS_GROUP_NAME_MAX_LEN bytes: group_nameremain bytes: filename**/static int storage_sync_delete_file(TrackerServerInfo *pStorageServer, \			const BinLogRecord *pRecord){	TrackerHeader header;	int result;	char full_filename[MAX_PATH_SIZE];	char out_buff[sizeof(TrackerHeader)+FDFS_GROUP_NAME_MAX_LEN+64];	char in_buff[1];	char *pBuff;	int64_t in_bytes;	snprintf(full_filename, sizeof(full_filename), \		"%s/data/%s", pRecord->pBasePath, pRecord->true_filename);	if (fileExists(full_filename))	{		if (pRecord->op_type == STORAGE_OP_TYPE_SOURCE_DELETE_FILE)		{			logWarning("file: "__FILE__", line: %d, " \				"sync data file, file: %s exists, " \				"maybe created later?", \				__LINE__, full_filename);		}		return 0;	}	while (1)	{	memset(out_buff, 0, sizeof(out_buff));	int2buff(pRecord->timestamp, out_buff + sizeof(TrackerHeader));	snprintf(out_buff + sizeof(TrackerHeader) + 4, sizeof(out_buff) - \		sizeof(TrackerHeader),  "%s", g_group_name);	memcpy(out_buff + sizeof(TrackerHeader) + 4 + FDFS_GROUP_NAME_MAX_LEN, \		pRecord->filename, pRecord->filename_len);	memset(&header, 0, sizeof(header));	long2buff(4 + FDFS_GROUP_NAME_MAX_LEN + pRecord->filename_len, \			header.pkg_len);	header.cmd = STORAGE_PROTO_CMD_SYNC_DELETE_FILE;	memcpy(out_buff, &header, sizeof(TrackerHeader));	if ((result=tcpsenddata(pStorageServer->sock, out_buff, \		sizeof(TrackerHeader) + 4 + FDFS_GROUP_NAME_MAX_LEN + \		pRecord->filename_len, g_network_timeout)) != 0)	{		logError("FILE: "__FILE__", line: %d, " \			"send data to storage server %s:%d fail, " \			"errno: %d, error info: %s", \			__LINE__, pStorageServer->ip_addr, \			pStorageServer->port, \			result, strerror(result));		break;	}	pBuff = in_buff;	result = fdfs_recv_response(pStorageServer, &pBuff, 0, &in_bytes);	if (result == ENOENT)	{		result = 0;	}		break;	}	return result;}#define STARAGE_CHECK_IF_NEED_SYNC_OLD(pReader, pRecord) \	if ((!pReader->need_sync_old) || pReader->sync_old_done || \		(pRecord->timestamp > pReader->until_timestamp)) \	{ \		return 0; \	} \static int storage_sync_data(BinLogReader *pReader, \			TrackerServerInfo *pStorageServer, \			const BinLogRecord *pRecord){	int result;	switch(pRecord->op_type)	{		case STORAGE_OP_TYPE_SOURCE_CREATE_FILE:			result = storage_sync_copy_file(pStorageServer, \				pRecord, STORAGE_PROTO_CMD_SYNC_CREATE_FILE);			break;		case STORAGE_OP_TYPE_SOURCE_DELETE_FILE:			result = storage_sync_delete_file( \				pStorageServer, pRecord);			break;		case STORAGE_OP_TYPE_SOURCE_UPDATE_FILE:			result = storage_sync_copy_file(pStorageServer, \				pRecord, STORAGE_PROTO_CMD_SYNC_UPDATE_FILE);			break;		case STORAGE_OP_TYPE_REPLICA_CREATE_FILE:			STARAGE_CHECK_IF_NEED_SYNC_OLD(pReader, pRecord)			result = storage_sync_copy_file(pStorageServer, \				pRecord, STORAGE_PROTO_CMD_SYNC_CREATE_FILE);			break;		case STORAGE_OP_TYPE_REPLICA_DELETE_FILE:			STARAGE_CHECK_IF_NEED_SYNC_OLD(pReader, pRecord)			result = storage_sync_delete_file( \				pStorageServer, pRecord);			break;		case STORAGE_OP_TYPE_REPLICA_UPDATE_FILE:			STARAGE_CHECK_IF_NEED_SYNC_OLD(pReader, pRecord)			result = storage_sync_copy_file(pStorageServer, \				pRecord, STORAGE_PROTO_CMD_SYNC_UPDATE_FILE);			break;		default:			return EINVAL;	}	if (result == 0)	{		pReader->sync_row_count++;	}	return result;}static int write_to_binlog_index(){	char full_filename[MAX_PATH_SIZE];	char buff[16];	int fd;	int len;	snprintf(full_filename, sizeof(full_filename), \			"%s/data/"SYNC_DIR_NAME"/%s", g_base_path, \			SYNC_BINLOG_INDEX_FILENAME);	if ((fd=open(full_filename, O_WRONLY | O_CREAT | O_TRUNC, 0644)) < 0)	{		logError("file: "__FILE__", line: %d, " \			"open file \"%s\" fail, " \			"errno: %d, error info: %s", \			__LINE__, full_filename, \			errno, strerror(errno));		return errno != 0 ? errno : ENOENT;	}	len = sprintf(buff, "%d", g_binlog_index);	if (write(fd, buff, len) != len)	{		logError("file: "__FILE__", line: %d, " \			"write to file \"%s\" fail, " \			"errno: %d, error info: %s",  \			__LINE__, full_filename, \			errno, strerror(errno));		close(fd);		return errno != 0 ? errno : EIO;	}	close(fd);	return 0;}static char *get_writable_binlog_filename(char *full_filename){	static char buff[MAX_PATH_SIZE];	if (full_filename == NULL)	{		full_filename = buff;	}	snprintf(full_filename, MAX_PATH_SIZE, \			"%s/data/"SYNC_DIR_NAME"/"SYNC_BINLOG_FILE_PREFIX"" \			SYNC_BINLOG_FILE_EXT_FMT, \			g_base_path, g_binlog_index);	return full_filename;}static int open_next_writable_binlog(){	char full_filename[MAX_PATH_SIZE];	if (g_binlog_fd >= 0)	{		close(g_binlog_fd);		g_binlog_fd = -1;	}	get_writable_binlog_filename(full_filename);	if (fileExists(full_filename))	{		if (unlink(full_filename) != 0)		{			logError("file: "__FILE__", line: %d, " \				"unlink file \"%s\" fail, " \				"errno: %d, error info: %s", \				__LINE__, full_filename, \				errno, strerror(errno));			return errno != 0 ? errno : ENOENT;		}		logError("file: "__FILE__", line: %d, " \			"binlog file \"%s\" already exists, truncate", \			__LINE__, full_filename);	}	g_binlog_fd = open(full_filename, O_WRONLY | O_CREAT | O_APPEND, 0644);	if (g_binlog_fd < 0)	{		logError("file: "__FILE__", line: %d, " \			"open file \"%s\" fail, " \			"errno: %d, error info: %s", \			__LINE__, full_filename, \			errno, strerror(errno));		return errno != 0 ? errno : EACCES;	}	return 0;}int storage_sync_init(){	char data_path[MAX_PATH_SIZE];	char sync_path[MAX_PATH_SIZE];	char full_filename[MAX_PATH_SIZE];	char file_buff[64];	int bytes;	int result;	int fd;	snprintf(data_path, sizeof(data_path), "%s/data", g_base_path);	if (!fileExists(data_path))	{		if (mkdir(data_path, 0755) != 0)		{			logError("file: "__FILE__", line: %d, " \				"mkdir \"%s\" fail, " \				"errno: %d, error info: %s", \				__LINE__, data_path, \				errno, strerror(errno));			return errno != 0 ? errno : ENOENT;		}	}	snprintf(sync_path, sizeof(sync_path), \			"%s/"SYNC_DIR_NAME, data_path);	if (!fileExists(sync_path))	{		if (mkdir(sync_path, 0755) != 0)		{			logError("file: "__FILE__", line: %d, " \				"mkdir \"%s\" fail, " \				"errno: %d, error info: %s", \				__LINE__, sync_path, \				errno, strerror(errno));			return errno != 0 ? errno : ENOENT;		}	}	snprintf(full_filename, sizeof(full_filename), \			"%s/%s", sync_path, SYNC_BINLOG_INDEX_FILENAME);	if ((fd=open(full_filename, O_RDONLY)) >= 0)	{		bytes = read(fd, file_buff, sizeof(file_buff) - 1);		close(fd);		if (bytes <= 0)		{			logError("file: "__FILE__", line: %d, " \				"read file \"%s\" fail, bytes read: %d", \				__LINE__, full_filename, bytes);			return errno != 0 ? errno : EIO;		}		file_buff[bytes] = '\0';		g_binlog_index = atoi(file_buff);		if (g_binlog_index < 0)		{			logError("file: "__FILE__", line: %d, " \				"in file \"%s\", binlog_index: %d < 0", \				__LINE__, full_filename, g_binlog_index);			return EINVAL;		}	}	else	{		g_binlog_index = 0;		if ((result=write_to_binlog_index()) != 0)		{			return result;		}	}	get_writable_binlog_filename(full_filename);	g_binlog_fd = open(full_filename, O_WRONLY | O_CREAT | O_APPEND, 0644);	if (g_binlog_fd < 0)	{		logError("file: "__FILE__", line: %d, " \			"open file \"%s\" fail, " \			"errno: %d, error info: %s", \			__LINE__, full_filename, \			errno, strerror(errno));		return errno != 0 ? errno : EACCES;	}	binlog_file_size = lseek(g_binlog_fd, 0, SEEK_END);	if (binlog_file_size < 0)	{		logError("file: "__FILE__", line: %d, " \			"ftell file \"%s\" fail, " \			"errno: %d, error info: %s", \			__LINE__, full_filename, \			errno, strerror(errno));		storage_sync_destroy();		return errno != 0 ? errno : EIO;	}	/*	//printf("full_filename=%s, binlog_file_size=%d\n", \			full_filename, binlog_file_size);	*/		if ((result=init_pthread_lock(&sync_thread_lock)) != 0)	{		return result;	}	load_local_host_ip_addrs();	return 0;}int storage_sync_destroy(){	int result;	if (g_binlog_fd >= 0)	{		storage_binlog_fsync(true);		close(g_binlog_fd);		g_binlog_fd = -1;	}	if ((result=pthread_mutex_destroy(&sync_thread_lock)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"call pthread_mutex_destroy fail, " \			"errno: %d, error info: %s", \			__LINE__, result, strerror(result));		return result;	}	return 0;}int kill_storage_sync_threads(){	int result;	if (sync_tids != NULL)	{		result = kill_work_threads(sync_tids, \				g_storage_sync_thread_count);		free(sync_tids);		sync_tids = NULL;	}	else	{		result = 0;	}	return result;}static int storage_binlog_fsync(const bool bNeedLock){	int result;	int write_ret;	if (bNeedLock && (result=pthread_mutex_lock(&sync_thread_lock)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"call pthread_mutex_lock fail, " \			"errno: %d, error info: %s", \			__LINE__, result, strerror(result));	}	if (binlog_write_cache_len == 0) //ignore	{		write_ret = 0;  //skip	}	else if (write(g_binlog_fd, binlog_write_cache_buff, \		binlog_write_cache_len) != binlog_write_cache_len)	{		logError("file: "__FILE__", line: %d, " \

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -