⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tracker_client_thread.c

📁 文件系统源代码!!!!! 文件系统源代码
💻 C
📖 第 1 页 / 共 2 页
字号:
/*** Copyright (C) 2008 Happy Fish / YuQing** FastDFS may be copied only under the terms of the GNU General* Public License V3, which may be found in the FastDFS source kit.* Please visit the FastDFS Home Page http://www.csource.org/ for more detail.**/#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <stdio.h>#include <stdlib.h>#include <string.h>#include <errno.h>#include <time.h>#ifdef OS_LINUX#include <sys/vfs.h>#include <sys/statfs.h>#endif#include <sys/param.h>#include <sys/mount.h>#include "fdfs_define.h"#include "logger.h"#include "fdfs_global.h"#include "sockopt.h"#include "shared_func.h"#include "tracker_types.h"#include "tracker_proto.h"#include "tracker_client_thread.h"#include "storage_global.h"#include "storage_sync.h"#include "storage_func.h"static pthread_mutex_t reporter_thread_lock;/* save report thread ids */static pthread_t *report_tids = NULL;static int tracker_heart_beat(TrackerServerInfo *pTrackerServer, \			int *pstat_chg_sync_count);static int tracker_report_df_stat(TrackerServerInfo *pTrackerServer);static int tracker_sync_dest_req(TrackerServerInfo *pTrackerServer);static int tracker_sync_notify(TrackerServerInfo *pTrackerServer);static int tracker_report_sync_timestamp(TrackerServerInfo *pTrackerServer);int tracker_report_init(){	int result;	memset(g_storage_servers, 0, sizeof(g_storage_servers));	memset(g_sorted_storages, 0, sizeof(g_sorted_storages));	if ((result=init_pthread_lock(&reporter_thread_lock)) != 0)	{		return result;	}	return 0;} int tracker_report_destroy(){	int result;	if ((result=pthread_mutex_destroy(&reporter_thread_lock)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"call pthread_mutex_destroy fail, " \			"errno: %d, error info: %s", \			__LINE__, result, strerror(result));		return result;	}	return 0;}int kill_tracker_report_threads(){	int result;	if (report_tids != NULL)	{		result = kill_work_threads(report_tids, \				g_tracker_server_count);		free(report_tids);		report_tids = NULL;	}	else	{		result = 0;	}	return result;}static void* tracker_report_thread_entrance(void* arg){	TrackerServerInfo *pTrackerServer;	char tracker_client_ip[IP_ADDRESS_SIZE];	char szFailPrompt[36];	bool sync_old_done;	int stat_chg_sync_count;	int sync_time_chg_count;	int sleep_secs;	time_t current_time;	time_t last_df_report_time;	time_t last_sync_report_time;	time_t last_beat_time;	int result;	int previousCode;	int nContinuousFail;	stat_chg_sync_count = 0;	pTrackerServer = (TrackerServerInfo *)arg;	pTrackerServer->sock = -1;	sync_old_done = g_sync_old_done;	while (g_continue_flag &&  \		g_tracker_reporter_count < g_tracker_server_count)	{		sleep(1); //waiting for all thread started	}	/*	//printf("tracker_report_thread %s:%d start.\n", \		pTrackerServer->ip_addr, pTrackerServer->port);	*/	result = 0;	previousCode = 0;	nContinuousFail = 0;	sleep_secs = g_heart_beat_interval < g_stat_report_interval ? \			g_heart_beat_interval : g_stat_report_interval;	while (g_continue_flag)	{		if (pTrackerServer->sock >= 0)		{			close(pTrackerServer->sock);		}		pTrackerServer->sock = socket(AF_INET, SOCK_STREAM, 0);		if(pTrackerServer->sock < 0)		{			logCrit("file: "__FILE__", line: %d, " \				"socket create failed, errno: %d, " \				"error info: %s. program exit!", \				__LINE__, errno, strerror(errno));			g_continue_flag = false;			break;		}		if ((result=connectserverbyip(pTrackerServer->sock, \			pTrackerServer->ip_addr, \			pTrackerServer->port)) != 0)		{			if (previousCode != result)			{				logError("file: "__FILE__", line: %d, " \					"connect to tracker server %s:%d fail" \					", errno: %d, error info: %s", \					__LINE__, pTrackerServer->ip_addr, \					pTrackerServer->port, \					result, strerror(result));				previousCode = result;			}			nContinuousFail++;			if (g_continue_flag)			{				sleep(g_heart_beat_interval);				continue;			}			else			{				break;			}		}		if (nContinuousFail == 0)		{			*szFailPrompt = '\0';		}		else		{			sprintf(szFailPrompt, ", continuous fail count: %d", \				nContinuousFail);		}		logInfo("file: "__FILE__", line: %d, " \			"successfully connect to tracker server %s:%d%s", \			__LINE__, pTrackerServer->ip_addr, \			pTrackerServer->port, szFailPrompt);		previousCode = 0;		nContinuousFail = 0;		getSockIpaddr(pTrackerServer->sock, \				tracker_client_ip, IP_ADDRESS_SIZE);		insert_into_local_host_ip(tracker_client_ip);		/*		//printf("file: "__FILE__", line: %d, " \			"tracker_client_ip: %s\n", \			__LINE__, tracker_client_ip);		//print_local_host_ip_addrs();		*/		if (tracker_report_join(pTrackerServer) != 0)		{			sleep(g_heart_beat_interval);			continue;		}		if (!sync_old_done)		{			if ((result=pthread_mutex_lock(&reporter_thread_lock)) \					 != 0)			{				logError("file: "__FILE__", line: %d, " \					"call pthread_mutex_lock fail, " \					"errno: %d, error info: %s", \					__LINE__, result, strerror(result));				fdfs_quit(pTrackerServer);				sleep(g_heart_beat_interval);				continue;			}			if (!g_sync_old_done)			{				if (tracker_sync_dest_req(pTrackerServer) == 0)				{					g_sync_old_done = true;					if (storage_write_to_sync_ini_file() \						!= 0)					{					logCrit("file: "__FILE__", line: %d, " \						"storage_write_to_sync_ini_file"\						"  fail, program exit!", \						__LINE__);						g_continue_flag = false;						pthread_mutex_unlock( \							&reporter_thread_lock);						break;					}				}				else //request failed or need to try again				{					pthread_mutex_unlock( \						&reporter_thread_lock);					fdfs_quit(pTrackerServer);					sleep(g_heart_beat_interval);					continue;				}			}			if ((result=pthread_mutex_unlock(&reporter_thread_lock))				 != 0)			{				logError("file: "__FILE__", line: %d, " \					"call pthread_mutex_unlock fail, " \					"errno: %d, error info: %s", \					__LINE__, result, strerror(result));			}			sync_old_done = true;		}		if (tracker_sync_notify(pTrackerServer) != 0)		{			fdfs_quit(pTrackerServer);			sleep(g_heart_beat_interval);			continue;		}		sync_time_chg_count = 0;		last_df_report_time = 0;		last_beat_time = 0;		last_sync_report_time = 0;		while (g_continue_flag)		{			current_time = time(NULL);			if (current_time - last_beat_time >= \					g_heart_beat_interval)			{				if (tracker_heart_beat(pTrackerServer, \					&stat_chg_sync_count) != 0)				{					break;				}				last_beat_time = current_time;			}			if (sync_time_chg_count != g_sync_change_count && \				current_time - last_sync_report_time >= \					g_heart_beat_interval)			{				if (tracker_report_sync_timestamp( \						pTrackerServer) != 0)				{					break;				}				sync_time_chg_count = g_sync_change_count;				last_sync_report_time = current_time;			}			if (current_time - last_df_report_time >= \					g_stat_report_interval)			{				if (tracker_report_df_stat(pTrackerServer) != 0)				{					break;				}				last_df_report_time = current_time;			}			sleep(1);		}		if ((!g_continue_flag) && fdfs_quit(pTrackerServer) != 0)		{		}		close(pTrackerServer->sock);		pTrackerServer->sock = -1;		if (g_continue_flag)		{			sleep(1);		}	}	if (nContinuousFail > 0)	{		logError("file: "__FILE__", line: %d, " \			"connect to tracker server %s:%d fail, try count: %d" \			", errno: %d, error info: %s", \			__LINE__, pTrackerServer->ip_addr, \			pTrackerServer->port, nContinuousFail, \			result, strerror(result));	}	if ((result=pthread_mutex_lock(&reporter_thread_lock)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"call pthread_mutex_lock fail, " \			"errno: %d, error info: %s", \			__LINE__, result, strerror(result));	}	g_tracker_reporter_count--;	if ((result=pthread_mutex_unlock(&reporter_thread_lock)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"call pthread_mutex_unlock fail, " \			"errno: %d, error info: %s", \			__LINE__, result, strerror(result));	}	return NULL;}static bool tracker_insert_into_sorted_servers( \		FDFSStorageServer *pInsertedServer){	FDFSStorageServer **ppServer;	FDFSStorageServer **ppEnd;	int nCompare;	ppEnd = g_sorted_storages + g_storage_count;	for (ppServer=ppEnd; ppServer > g_sorted_storages; ppServer--)	{		nCompare = strcmp(pInsertedServer->server.ip_addr, \			   	(*(ppServer-1))->server.ip_addr);		if (nCompare > 0)		{			*ppServer = pInsertedServer;			return true;		}		else if (nCompare < 0)		{			*ppServer = *(ppServer-1);		}		else  //nCompare == 0		{			for (; ppServer < ppEnd; ppServer++) //restore			{				*ppServer = *(ppServer+1);			}			return false;		}	}	*ppServer = pInsertedServer;	return true;}int tracker_sync_diff_servers(TrackerServerInfo *pTrackerServer, \		FDFSStorageBrief *briefServers, const int server_count){	TrackerHeader resp;	int out_len;	int result;	if (server_count == 0)	{		return 0;	}	memset(&resp, 0, sizeof(resp));	resp.cmd = TRACKER_PROTO_CMD_STORAGE_REPLICA_CHG;	out_len = sizeof(FDFSStorageBrief) * server_count;	long2buff(out_len, resp.pkg_len);	if ((result=tcpsenddata(pTrackerServer->sock, &resp, sizeof(resp), \			g_network_timeout)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"trackert server %s:%d, send data fail, " \			"errno: %d, error info: %s", \			__LINE__, pTrackerServer->ip_addr, \			pTrackerServer->port, result, strerror(result));		return result;	}	if ((result=tcpsenddata(pTrackerServer->sock, \		briefServers, out_len, g_network_timeout)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"trackert server %s:%d, send data fail, " \			"errno: %d, error info: %s", \			__LINE__, pTrackerServer->ip_addr, \			pTrackerServer->port, result, strerror(result));		return result;	}	if ((result=tcprecvdata(pTrackerServer->sock, &resp, \			sizeof(resp), g_network_timeout)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"tracker server %s:%d, recv data fail, " \			"errno: %d, error info: %s.", \			__LINE__, pTrackerServer->ip_addr, \			pTrackerServer->port, result, strerror(result));		return result;	}	if (memcmp(resp.pkg_len, "\0\0\0\0\0\0\0\0", \		FDFS_PROTO_PKG_LEN_SIZE) != 0)	{		logError("file: "__FILE__", line: %d, " \			"tracker server %s:%d, " \			"expect pkg len 0, but recv pkg len != 0", \			__LINE__, pTrackerServer->ip_addr, \			pTrackerServer->port);		return EINVAL;	}	return resp.status;}static int tracker_merge_servers(TrackerServerInfo *pTrackerServer, \		FDFSStorageBrief *briefServers, const int server_count){	FDFSStorageBrief *pServer;	FDFSStorageBrief *pEnd;	FDFSStorageServer *pInsertedServer;	FDFSStorageServer **ppFound;	FDFSStorageServer *pGlobalServer;	FDFSStorageServer *pGlobalEnd;	FDFSStorageServer targetServer;	FDFSStorageServer *pTargetServer;	FDFSStorageBrief diffServers[FDFS_MAX_SERVERS_EACH_GROUP];	FDFSStorageBrief *pDiffServer;	int res;	int result;	int nDeletedCount;	memset(&targetServer, 0, sizeof(targetServer));	pTargetServer = &targetServer;	nDeletedCount = 0;	pDiffServer = diffServers;	pEnd = briefServers + server_count;	for (pServer=briefServers; pServer<pEnd; pServer++)	{		memcpy(&(targetServer.server),pServer,sizeof(FDFSStorageBrief));		ppFound = (FDFSStorageServer **)bsearch(&pTargetServer, \			g_sorted_storages, g_storage_count, \			sizeof(FDFSStorageServer *), storage_cmp_by_ip_addr);		if (ppFound != NULL)		{			if ((*ppFound)->server.status != pServer->status)			{				if ((((pServer->status == \					FDFS_STORAGE_STATUS_WAIT_SYNC) || \					(pServer->status == \					FDFS_STORAGE_STATUS_SYNCING)) && \					((*ppFound)->server.status > \						pServer->status)) \					 || ((*ppFound)->server.status == \						FDFS_STORAGE_STATUS_DELETED))				{					memcpy(pDiffServer++, \						&((*ppFound)->server), \						sizeof(FDFSStorageBrief));				}				else if ((*ppFound)->server.status == \					FDFS_STORAGE_STATUS_NONE && \					pServer->status == \					FDFS_STORAGE_STATUS_DELETED) //ignore				{				}				else				{					if ((*ppFound)->server.status == \						FDFS_STORAGE_STATUS_NONE && \						pServer->status != \						FDFS_STORAGE_STATUS_DELETED)					{						(*ppFound)->server.status = \							pServer->status;					if ((result=storage_sync_thread_start( \						&((*ppFound)->server))) != 0)						{							return result;						}					}					else					{						(*ppFound)->server.status = \							pServer->status;					}				}			}		}		else if (pServer->status == FDFS_STORAGE_STATUS_DELETED)//ignore		{			nDeletedCount++;		}		else		{			if (g_storage_count < FDFS_MAX_SERVERS_EACH_GROUP)			{				if ((result=pthread_mutex_lock( \					&reporter_thread_lock)) != 0)				{					logError("file: "__FILE__", line: %d, "\						"call pthread_mutex_lock fail,"\						" errno: %d, error info: %s", \						__LINE__, \						result, strerror(result));				}				pInsertedServer = g_storage_servers + \						g_storage_count;				memcpy(&(pInsertedServer->server), \					pServer, sizeof(FDFSStorageBrief));				if (tracker_insert_into_sorted_servers( \						pInsertedServer))				{					g_storage_count++;				}				if ((result=pthread_mutex_unlock( \					&reporter_thread_lock)) != 0)				{					logError("file: "__FILE__", line: %d, "\					"call pthread_mutex_unlock fail, " \					"errno: %d, error info: %s", \					__LINE__, result, strerror(result));				}				if ((result=storage_sync_thread_start( \					&(pInsertedServer->server))) != 0)				{					return result;				}			}			else			{				logError("file: "__FILE__", line: %d, " \					"tracker server %s:%d, " \					"storage servers of group \"%s\" " \					"exceeds max: %d", \					__LINE__, pTrackerServer->ip_addr, \					pTrackerServer->port, \					pTrackerServer->group_name, \					FDFS_MAX_SERVERS_EACH_GROUP);			}		}	}	if (g_storage_count + nDeletedCount == server_count)	{		if (pDiffServer - diffServers > 0)		{			return tracker_sync_diff_servers(pTrackerServer, \

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -