⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 storage_sync.c

📁 文件系统源代码!!!!! 文件系统源代码
💻 C
📖 第 1 页 / 共 3 页
字号:
			return ENOENT;		}		break;	}	if (line[*record_length-1] != '\n')	{		if ((result=rewind_to_prev_rec_end(pReader, \				*record_length)) != 0)		{			return result;		}		logError("file: "__FILE__", line: %d, " \			"get a line from binlog file \"%s\" fail, " \			"file offset: "INT64_PRINTF_FORMAT", " \			"no new line char, line length: %d", \			__LINE__, get_binlog_readable_filename(pReader, NULL), \			pReader->binlog_offset, *record_length);		return ENOENT;	}	if ((result=splitEx(line, ' ', cols, 3)) < 3)	{		logError("file: "__FILE__", line: %d, " \			"read data from binlog file \"%s\" fail, " \			"file offset: "INT64_PRINTF_FORMAT", " \			"read item count: %d < 3", \			__LINE__, get_binlog_readable_filename(pReader, NULL), \			pReader->binlog_offset, result);		return ENOENT;	}	pRecord->timestamp = atoi(cols[0]);	pRecord->op_type = *(cols[1]);	pRecord->filename_len = strlen(cols[2]) - 1; //need trim new line \n	if (pRecord->filename_len > sizeof(pRecord->filename)-1)	{		logError("file: "__FILE__", line: %d, " \			"item \"filename\" in binlog " \			"file \"%s\" is invalid, file offset: " \			INT64_PRINTF_FORMAT", filename length: %d > %d", \			__LINE__, get_binlog_readable_filename(pReader, NULL), \			pReader->binlog_offset, \			pRecord->filename_len, sizeof(pRecord->filename)-1);		return EINVAL;	}	memcpy(pRecord->filename, cols[2], pRecord->filename_len);	*(pRecord->filename + pRecord->filename_len) = '\0';	pRecord->true_filename_len = pRecord->filename_len;	if ((result=storage_split_filename(pRecord->filename, \			&pRecord->true_filename_len, pRecord->true_filename, \			&pRecord->pBasePath)) != 0)	{		return result;	}	/*	//printf("timestamp=%d, op_type=%c, filename=%s(%d), line length=%d, " \		"offset=%d\n", \		pRecord->timestamp, pRecord->op_type, \		pRecord->filename, strlen(pRecord->filename), \		*record_length, pReader->binlog_offset);	*/	return 0;}static int storage_binlog_reader_skip(BinLogReader *pReader){	BinLogRecord record;	int result;	int record_len;	while (1)	{		result = storage_binlog_read(pReader, \				&record, &record_len);		if (result != 0)		{			if (result == ENOENT)			{				return 0;			}			return result;		}		if (record.timestamp >= pReader->until_timestamp)		{			result = rewind_to_prev_rec_end( \					pReader, record_len);			break;		}		pReader->binlog_offset += record_len;	}	return result;}static int storage_unlink_mark_file(BinLogReader *pReader){	char old_filename[MAX_PATH_SIZE];	char new_filename[MAX_PATH_SIZE];	time_t t;	struct tm tm;	t = time(NULL);	localtime_r(&t, &tm);	get_mark_filename(pReader, old_filename);	snprintf(new_filename, sizeof(new_filename), \		"%s.%04d%02d%02d%02d%02d%02d", old_filename, \		tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday, \		tm.tm_hour, tm.tm_min, tm.tm_sec);	if (rename(old_filename, new_filename) != 0)	{		logError("file: "__FILE__", line: %d, " \			"rename file %s to %s fail" \			", errno: %d, error info: %s", \			__LINE__, old_filename, new_filename, \			errno, strerror(errno));		return errno != 0 ? errno : EACCES;	}	return 0;}static void storage_sync_get_start_end_times(time_t current_time, \	const FDFSTimeInfo *pStartTime, const FDFSTimeInfo *pEndTime, \	time_t *start_time, time_t *end_time){	struct tm tm_time;	//char buff[32];	localtime_r(&current_time, &tm_time);	tm_time.tm_sec = 0;	/*	strftime(buff, sizeof(buff), "%Y-%m-%d %H:%M:%S", &tm_time);	//printf("current time: %s\n", buff);	*/	tm_time.tm_hour = pStartTime->hour;	tm_time.tm_min = pStartTime->minute;	*start_time = mktime(&tm_time);	//end time < start time	if (pEndTime->hour < pStartTime->hour || (pEndTime->hour == \		pStartTime->hour && pEndTime->minute < pStartTime->minute))	{		current_time += 24 * 3600;		localtime_r(&current_time, &tm_time);		tm_time.tm_sec = 0;	}	tm_time.tm_hour = pEndTime->hour;	tm_time.tm_min = pEndTime->minute;	*end_time = mktime(&tm_time);}static void* storage_sync_thread_entrance(void* arg){	FDFSStorageBrief *pStorage;	BinLogReader reader;	BinLogRecord record;	TrackerServerInfo storage_server;	char local_ip_addr[IP_ADDRESS_SIZE];	int read_result;	int sync_result;	int conn_result;	int result;	int record_len;	int previousCode;	int nContinuousFail;	time_t current_time;	time_t start_time;	time_t end_time;	time_t last_check_sync_cache_time;		memset(local_ip_addr, 0, sizeof(local_ip_addr));	memset(&reader, 0, sizeof(reader));	current_time =  time(NULL);	start_time = 0;	end_time = 0;	pStorage = (FDFSStorageBrief *)arg;	last_check_sync_cache_time = time(NULL);	strcpy(storage_server.ip_addr, pStorage->ip_addr);	strcpy(storage_server.group_name, g_group_name);	storage_server.port = g_server_port;	storage_server.sock = -1;	while (g_continue_flag && \		pStorage->status != FDFS_STORAGE_STATUS_DELETED &&		pStorage->status != FDFS_STORAGE_STATUS_NONE)	{		if (storage_reader_init(pStorage, &reader) != 0)		{			logCrit("file: "__FILE__", line: %d, " \				"storage_reader_init fail, program exit!", \				__LINE__);			g_continue_flag = false;			break;		}		while (g_continue_flag && \			(pStorage->status != FDFS_STORAGE_STATUS_ACTIVE && \			pStorage->status != FDFS_STORAGE_STATUS_WAIT_SYNC && \			pStorage->status != FDFS_STORAGE_STATUS_SYNCING && \			pStorage->status != FDFS_STORAGE_STATUS_DELETED && \			pStorage->status != FDFS_STORAGE_STATUS_NONE))		{			sleep(1);		}		if (g_sync_part_time)		{			current_time = time(NULL);			storage_sync_get_start_end_times(current_time, \				&g_sync_end_time, &g_sync_start_time, \				&start_time, &end_time);			start_time += 60;			end_time -= 60;			while (g_continue_flag && (current_time >= start_time \					&& current_time <= end_time))			{				current_time = time(NULL);				sleep(1);			}		} 		previousCode = 0;		nContinuousFail = 0;		conn_result = 0;		while (g_continue_flag && \			pStorage->status != FDFS_STORAGE_STATUS_DELETED && \			pStorage->status != FDFS_STORAGE_STATUS_NONE)		{			storage_server.sock = \				socket(AF_INET, SOCK_STREAM, 0);			if(storage_server.sock < 0)			{				logCrit("file: "__FILE__", line: %d," \					" socket create fail, " \					"errno: %d, error info: %s. " \					"program exit!", __LINE__, \					errno, strerror(errno));				g_continue_flag = false;				break;			}			if ((conn_result=connectserverbyip(storage_server.sock,\				storage_server.ip_addr, g_server_port)) == 0)			{				char szFailPrompt[36];				if (nContinuousFail == 0)				{					*szFailPrompt = '\0';				}				else				{					sprintf(szFailPrompt, \						", continuous fail count: %d", \						nContinuousFail);				}				logInfo("file: "__FILE__", line: %d, " \					"successfully connect to " \					"storage server %s:%d%s", __LINE__, \					storage_server.ip_addr, \					g_server_port, szFailPrompt);				nContinuousFail = 0;				break;			}			if (previousCode != conn_result)			{				logError("file: "__FILE__", line: %d, " \					"connect to storage server %s:%d fail" \					", errno: %d, error info: %s", \					__LINE__, \					storage_server.ip_addr, g_server_port, \					conn_result, strerror(conn_result));				previousCode = conn_result;			}			nContinuousFail++;			close(storage_server.sock);			storage_server.sock = -1;			sleep(1);		}		if (nContinuousFail > 0)		{			logError("file: "__FILE__", line: %d, " \				"connect to storage server %s:%d fail, " \				"try count: %d, errno: %d, error info: %s", \				__LINE__, storage_server.ip_addr, \				g_server_port, nContinuousFail, \				conn_result, strerror(conn_result));		}		if (!g_continue_flag)		{			break;		}		getSockIpaddr(storage_server.sock, \			local_ip_addr, IP_ADDRESS_SIZE);		/*		//printf("file: "__FILE__", line: %d, " \			"storage_server.ip_addr=%s, " \			"local_ip_addr: %s\n", \			__LINE__, storage_server.ip_addr, local_ip_addr);		*/		if (strcmp(local_ip_addr, storage_server.ip_addr) == 0)		{			logError("file: "__FILE__", line: %d, " \				"ip_addr %s belong to the local host," \				" sync thread exit.", \				__LINE__, storage_server.ip_addr);			fdfs_quit(&storage_server);			break;		}		if (pStorage->status == FDFS_STORAGE_STATUS_WAIT_SYNC)		{			pStorage->status = FDFS_STORAGE_STATUS_SYNCING;			storage_report_storage_status(pStorage->ip_addr, \				pStorage->status);		}		if (pStorage->status == FDFS_STORAGE_STATUS_SYNCING)		{			if (reader.need_sync_old && reader.sync_old_done)			{				pStorage->status = FDFS_STORAGE_STATUS_ONLINE;				storage_report_storage_status(  \					pStorage->ip_addr, \					pStorage->status);			}		}		if (g_sync_part_time)		{			current_time = time(NULL);			storage_sync_get_start_end_times(current_time, \				&g_sync_start_time, &g_sync_end_time, \				&start_time, &end_time);		}		sync_result = 0;		while (g_continue_flag && (!g_sync_part_time || \			(current_time >= start_time && \			current_time <= end_time)) && \			pStorage->status != FDFS_STORAGE_STATUS_DELETED && \			pStorage->status != FDFS_STORAGE_STATUS_NONE)		{			if (g_sync_part_time)			{				current_time = time(NULL);			}			read_result = storage_binlog_read(&reader, \					&record, &record_len);			if (read_result == ENOENT)			{				if (reader.need_sync_old && \					!reader.sync_old_done)				{				reader.sync_old_done = true;				if (storage_write_to_mark_file(&reader) != 0)				{					logCrit("file: "__FILE__", line: %d, " \						"storage_write_to_mark_file " \						"fail, program exit!", \						__LINE__);					g_continue_flag = false;					break;				}				if (pStorage->status == \					FDFS_STORAGE_STATUS_SYNCING)				{					pStorage->status = \						FDFS_STORAGE_STATUS_ONLINE;					storage_report_storage_status(  \						pStorage->ip_addr, \						pStorage->status);				}				}				if (binlog_write_cache_len > 0 && \				    time(NULL)-last_check_sync_cache_time >= 60)				{					last_check_sync_cache_time = time(NULL);					storage_binlog_fsync(true);				}				usleep(g_sync_wait_usec);				continue;			}			else if (read_result != 0)			{				sleep(5);				continue;			}			if ((sync_result=storage_sync_data(&reader, \				&storage_server, &record)) != 0)			{				if (rewind_to_prev_rec_end( \					&reader, record_len) != 0)				{					logCrit("file: "__FILE__", line: %d, " \						"rewind_to_prev_rec_end fail, "\						"program exit!", __LINE__);					g_continue_flag = false;				}				break;			}			reader.binlog_offset += record_len;			reader.scan_row_count++;			if (reader.sync_row_count % 1000 == 0)			{				if (storage_write_to_mark_file(&reader) != 0)				{					logCrit("file: "__FILE__", line: %d, " \						"storage_write_to_mark_file " \						"fail, program exit!", \						__LINE__);					g_continue_flag = false;					break;				}			}			if (g_sync_interval > 0)			{				usleep(g_sync_interval);			}		}		if (reader.last_write_row_count != reader.scan_row_count)		{			if (storage_write_to_mark_file(&reader) != 0)			{				logCrit("file: "__FILE__", line: %d, " \					"storage_write_to_mark_file fail, " \					"program exit!", __LINE__);				g_continue_flag = false;				break;			}		}		close(storage_server.sock);		storage_server.sock = -1;		storage_reader_destroy(&reader);		if (!g_continue_flag)		{			break;		}		if (!(sync_result == ENOTCONN || sync_result == EIO))		{			sleep(1);		}	}	if (storage_server.sock >= 0)	{		close(storage_server.sock);	}	storage_reader_destroy(&reader);	if (pStorage->status == FDFS_STORAGE_STATUS_DELETED)	{		storage_unlink_mark_file(&reader);		if (strcmp(g_sync_src_ip_addr, pStorage->ip_addr) == 0)		{			g_sync_src_ip_addr[0] = '\0';			storage_write_to_sync_ini_file();		}		sleep(2 * g_heart_beat_interval + 1);		pStorage->status = FDFS_STORAGE_STATUS_NONE;	}	if ((result=pthread_mutex_lock(&sync_thread_lock)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"call pthread_mutex_lock fail, " \			"errno: %d, error info: %s", \			__LINE__, result, strerror(result));	}	g_storage_sync_thread_count--;	if ((result=pthread_mutex_unlock(&sync_thread_lock)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"call pthread_mutex_unlock fail, " \			"errno: %d, error info: %s", \			__LINE__, result, strerror(result));	}	return NULL;}int storage_sync_thread_start(const FDFSStorageBrief *pStorage){	int result;	pthread_attr_t pattr;	pthread_t tid;	if (pStorage->status == FDFS_STORAGE_STATUS_DELETED || \		pStorage->status == FDFS_STORAGE_STATUS_NONE)	{		return 0;	}	if (is_local_host_ip(pStorage->ip_addr)) //can't self sync to self	{		return 0;	}	if ((result=init_pthread_attr(&pattr)) != 0)	{		return result;	}	/*	//printf("start storage ip_addr: %s, g_storage_sync_thread_count=%d\n", 			pStorage->ip_addr, g_storage_sync_thread_count);	*/	if ((result=pthread_create(&tid, &pattr, storage_sync_thread_entrance, \		(void *)pStorage)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"create thread failed, errno: %d, " \			"error info: %s", \			__LINE__, result, strerror(result));		pthread_attr_destroy(&pattr);		return result;	}	if ((result=pthread_mutex_lock(&sync_thread_lock)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"call pthread_mutex_lock fail, " \			"errno: %d, error info: %s", \			__LINE__, result, strerror(result));	}	g_storage_sync_thread_count++;	sync_tids = (pthread_t *)realloc(sync_tids, sizeof(pthread_t) * \					g_storage_sync_thread_count);	if (sync_tids == NULL)	{		logError("file: "__FILE__", line: %d, " \			"malloc %d bytes fail, " \			"errno: %d, error info: %s", \			__LINE__, sizeof(pthread_t) * \			g_storage_sync_thread_count, \			errno, strerror(errno));	}	else	{		sync_tids[g_storage_sync_thread_count - 1] = tid;	}	if ((result=pthread_mutex_unlock(&sync_thread_lock)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"call pthread_mutex_unlock fail, " \			"errno: %d, error info: %s", \			__LINE__, result, strerror(result));	}	pthread_attr_destroy(&pattr);	return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -