⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tracker_client_thread.c

📁 文件系统源代码!!!!! 文件系统源代码
💻 C
📖 第 1 页 / 共 2 页
字号:
				diffServers, pDiffServer - diffServers);		}		return 0;	}	pGlobalServer = g_storage_servers;	pGlobalEnd = g_storage_servers + g_storage_count;	pServer = briefServers;	while (pServer < pEnd && pGlobalServer < pGlobalEnd)	{		if (pGlobalServer->server.status == FDFS_STORAGE_STATUS_NONE)		{			pGlobalServer++;			continue;		}		res = strcmp(pServer->ip_addr, pGlobalServer->server.ip_addr);		if (res < 0)		{			if (pServer->status != FDFS_STORAGE_STATUS_DELETED)			{			logError("file: "__FILE__", line: %d, " \				"tracker server %s:%d, " \				"group \"%s\", " \				"enter impossible statement branch", \				__LINE__, pTrackerServer->ip_addr, \				pTrackerServer->port, \				pTrackerServer->group_name			);			}			pServer++;		}		else if (res == 0)		{			pServer++;			pGlobalServer++;		}		else		{			memcpy(pDiffServer++, &(pGlobalServer->server), \				sizeof(FDFSStorageBrief));			pGlobalServer++;		}	}	while (pGlobalServer < pGlobalEnd)	{		if (pGlobalServer->server.status == FDFS_STORAGE_STATUS_NONE)		{			pGlobalServer++;			continue;		}		memcpy(pDiffServer++, &(pGlobalServer->server), \			sizeof(FDFSStorageBrief));		pGlobalServer++;	}	return tracker_sync_diff_servers(pTrackerServer, \			diffServers, pDiffServer - diffServers);}static int tracker_check_response(TrackerServerInfo *pTrackerServer){	int64_t nInPackLen;	TrackerHeader resp;	int server_count;	int result;	FDFSStorageBrief briefServers[FDFS_MAX_SERVERS_EACH_GROUP];	if ((result=tcprecvdata(pTrackerServer->sock, &resp, \			sizeof(resp), g_network_timeout)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"tracker server %s:%d, recv data fail, " \			"errno: %d, error info: %s.", \			__LINE__, pTrackerServer->ip_addr, \			pTrackerServer->port,    \			result, strerror(result));		return result;	}	//printf("resp status=%d\n", resp.status);	if (resp.status != 0)	{		return resp.status;	}	nInPackLen = buff2long(resp.pkg_len);	if ((nInPackLen < 0) || (nInPackLen % sizeof(FDFSStorageBrief) != 0))	{		logError("file: "__FILE__", line: %d, " \			"tracker server %s:%d, " \			"package size "INT64_PRINTF_FORMAT" is not correct", \			__LINE__, pTrackerServer->ip_addr, \			pTrackerServer->port, nInPackLen);		return EINVAL;	}	if (nInPackLen == 0)	{		return resp.status;	}	server_count = nInPackLen / sizeof(FDFSStorageBrief);	if (server_count > FDFS_MAX_SERVERS_EACH_GROUP)	{		logError("file: "__FILE__", line: %d, " \			"tracker server %s:%d, return storage count: %d" \			" exceed max: %d", \			__LINE__, pTrackerServer->ip_addr, \			pTrackerServer->port, \			server_count, FDFS_MAX_SERVERS_EACH_GROUP);		return EINVAL;	}	if ((result=tcprecvdata(pTrackerServer->sock, briefServers, \			nInPackLen, g_network_timeout)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"tracker server %s:%d, recv data fail, " \			"errno: %d, error info: %s.", \			__LINE__, pTrackerServer->ip_addr, \			pTrackerServer->port, \			result, strerror(result));		return result;	}	/*	//printf("resp server count=%d\n", server_count);	{		int i;		for (i=0; i<server_count; i++)		{				//printf("%d. %d:%s\n", i+1, briefServers[i].status, \				briefServers[i].ip_addr);		}	}	*/	return tracker_merge_servers(pTrackerServer, \                briefServers, server_count);}int tracker_sync_src_req(TrackerServerInfo *pTrackerServer, \			BinLogReader *pReader){	char out_buff[sizeof(TrackerHeader) + IP_ADDRESS_SIZE];	char sync_src_ip_addr[IP_ADDRESS_SIZE];	TrackerHeader *pHeader;	TrackerStorageSyncReqBody syncReqbody;	char *pBuff;	int64_t in_bytes;	int result;	memset(out_buff, 0, sizeof(out_buff));	pHeader = (TrackerHeader *)out_buff;	long2buff(IP_ADDRESS_SIZE, pHeader->pkg_len);	pHeader->cmd = TRACKER_PROTO_CMD_STORAGE_SYNC_SRC_REQ;	strcpy(out_buff + sizeof(TrackerHeader), pReader->ip_addr);	if ((result=tcpsenddata(pTrackerServer->sock, out_buff, \			sizeof(out_buff), g_network_timeout)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"tracker server %s:%d, send data fail, " \			"errno: %d, error info: %s.", \			__LINE__, pTrackerServer->ip_addr, \			pTrackerServer->port, \			result, strerror(result));		return result;	}	pBuff = (char *)&syncReqbody;	if ((result=fdfs_recv_response(pTrackerServer, \                &pBuff, sizeof(syncReqbody), &in_bytes)) != 0)	{		return result;	}	if (in_bytes == 0)	{		pReader->need_sync_old = false;        	pReader->until_timestamp = 0;		return 0;	}	if (in_bytes != sizeof(syncReqbody))	{		logError("file: "__FILE__", line: %d, " \			"tracker server %s:%d, " \			"recv body length: "INT64_PRINTF_FORMAT" is invalid, " \			"expect body length: %d", \			__LINE__, pTrackerServer->ip_addr, \			pTrackerServer->port, in_bytes, \			sizeof(syncReqbody));		return EINVAL;	}	memcpy(sync_src_ip_addr, syncReqbody.src_ip_addr, IP_ADDRESS_SIZE);	sync_src_ip_addr[IP_ADDRESS_SIZE-1] = '\0';	pReader->need_sync_old = is_local_host_ip(sync_src_ip_addr);       	pReader->until_timestamp = (time_t)buff2long( \					syncReqbody.until_timestamp);	return 0;}static int tracker_sync_dest_req(TrackerServerInfo *pTrackerServer){	TrackerHeader header;	TrackerStorageSyncReqBody syncReqbody;	char *pBuff;	int64_t in_bytes;	int result;	memset(&header, 0, sizeof(header));	header.cmd = TRACKER_PROTO_CMD_STORAGE_SYNC_DEST_REQ;	if ((result=tcpsenddata(pTrackerServer->sock, &header, \			sizeof(header), g_network_timeout)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"tracker server %s:%d, send data fail, " \			"errno: %d, error info: %s.", \			__LINE__, pTrackerServer->ip_addr, \			pTrackerServer->port, \			result, strerror(result));		return result;	}	pBuff = (char *)&syncReqbody;	if ((result=fdfs_recv_response(pTrackerServer, \                &pBuff, sizeof(syncReqbody), &in_bytes)) != 0)	{		return result;	}	if (in_bytes == 0)	{		return result;	}	if (in_bytes != sizeof(syncReqbody))	{		logError("file: "__FILE__", line: %d, " \			"tracker server %s:%d, " \			"recv body length: "INT64_PRINTF_FORMAT" is invalid, " \			"expect body length: %d", \			__LINE__, pTrackerServer->ip_addr, \			pTrackerServer->port, in_bytes, \			sizeof(syncReqbody));		return EINVAL;	}	memcpy(g_sync_src_ip_addr, syncReqbody.src_ip_addr, IP_ADDRESS_SIZE);	g_sync_src_ip_addr[IP_ADDRESS_SIZE-1] = '\0';	g_sync_until_timestamp = (time_t)buff2long(syncReqbody.until_timestamp);	memset(&header, 0, sizeof(header));	header.cmd = TRACKER_PROTO_CMD_STORAGE_RESP;	if ((result=tcpsenddata(pTrackerServer->sock, &header, sizeof(header), \				g_network_timeout)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"tracker server %s:%d, send data fail, " \			"errno: %d, error info: %s.", \			__LINE__, pTrackerServer->ip_addr, \			pTrackerServer->port, \			result, strerror(result));		return result;	}	return 0;}static int tracker_sync_notify(TrackerServerInfo *pTrackerServer){	char out_buff[sizeof(TrackerHeader)+sizeof(TrackerStorageSyncReqBody)];	TrackerHeader *pHeader;	TrackerStorageSyncReqBody *pReqBody;	int result;	pHeader = (TrackerHeader *)out_buff;	pReqBody = (TrackerStorageSyncReqBody*)(out_buff+sizeof(TrackerHeader));	memset(out_buff, 0, sizeof(out_buff));	long2buff((int)sizeof(TrackerStorageSyncReqBody), pHeader->pkg_len);	pHeader->cmd = TRACKER_PROTO_CMD_STORAGE_SYNC_NOTIFY;	strcpy(pReqBody->src_ip_addr, g_sync_src_ip_addr);	long2buff(g_sync_until_timestamp, pReqBody->until_timestamp);	if ((result=tcpsenddata(pTrackerServer->sock, out_buff, \			sizeof(out_buff), g_network_timeout)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"tracker server %s:%d, send data fail, " \			"errno: %d, error info: %s.", \			__LINE__, pTrackerServer->ip_addr, \			pTrackerServer->port, \			result, strerror(result));		return result;	}	return tracker_check_response(pTrackerServer);}int tracker_report_join(TrackerServerInfo *pTrackerServer){	char out_buff[sizeof(TrackerHeader)+sizeof(TrackerStorageJoinBody)];	TrackerHeader *pHeader;	TrackerStorageJoinBody *pReqBody;	int result;	pHeader = (TrackerHeader *)out_buff;	pReqBody = (TrackerStorageJoinBody *)(out_buff+sizeof(TrackerHeader));	memset(out_buff, 0, sizeof(out_buff));	long2buff((int)sizeof(TrackerStorageJoinBody), pHeader->pkg_len);	pHeader->cmd = TRACKER_PROTO_CMD_STORAGE_JOIN;	strcpy(pReqBody->group_name, g_group_name);	long2buff(g_server_port, pReqBody->storage_port);	long2buff(g_path_count, pReqBody->store_path_count);	long2buff(g_subdir_count_per_path, pReqBody->subdir_count_per_path);	if ((result=tcpsenddata(pTrackerServer->sock, out_buff, \			sizeof(out_buff), g_network_timeout)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"tracker server %s:%d, send data fail, " \			"errno: %d, error info: %s.", \			__LINE__, pTrackerServer->ip_addr, \			pTrackerServer->port, \			result, strerror(result));		return result;	}	return tracker_check_response(pTrackerServer);}static int tracker_report_sync_timestamp(TrackerServerInfo *pTrackerServer){	char out_buff[sizeof(TrackerHeader) + (IP_ADDRESS_SIZE + 4) * \			FDFS_MAX_SERVERS_EACH_GROUP];	char *p;	TrackerHeader *pHeader;	FDFSStorageServer *pServer;	FDFSStorageServer *pEnd;	int result;	int body_len;	if (g_storage_count == 0)	{		return 0;	}	memset(out_buff, 0, sizeof(out_buff));	pHeader = (TrackerHeader *)out_buff;	p = out_buff + sizeof(TrackerHeader);	body_len = (IP_ADDRESS_SIZE + 4) * g_storage_count;	pHeader->cmd = TRACKER_PROTO_CMD_STORAGE_SYNC_REPORT;	long2buff(body_len, pHeader->pkg_len);	pEnd = g_storage_servers + g_storage_count;	for (pServer=g_storage_servers; pServer<pEnd; pServer++)	{		memcpy(p, pServer->server.ip_addr, IP_ADDRESS_SIZE);		p += IP_ADDRESS_SIZE;		int2buff(pServer->last_sync_src_timestamp, p);		p += 4;	}	if((result=tcpsenddata(pTrackerServer->sock, out_buff, \		sizeof(TrackerHeader) + body_len, g_network_timeout)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"tracker server %s:%d, send data fail, " \			"errno: %d, error info: %s.", \			__LINE__, pTrackerServer->ip_addr, \			pTrackerServer->port, \			result, strerror(result));		return result;	}	return tracker_check_response(pTrackerServer);}static int tracker_report_df_stat(TrackerServerInfo *pTrackerServer){	char out_buff[sizeof(TrackerHeader) + \			sizeof(TrackerStatReportReqBody) * 16];	char *pBuff;	TrackerHeader *pHeader;	TrackerStatReportReqBody *pStatBuff;	struct statfs sbuf;	int body_len;	int total_len;	int i;	int result;	body_len = (int)sizeof(TrackerStatReportReqBody) * g_path_count;	total_len = (int)sizeof(TrackerHeader) + body_len;	if (total_len <= sizeof(out_buff))	{		pBuff = out_buff;	}	else	{		pBuff = (char *)malloc(total_len);		if (pBuff == NULL)		{			logError("file: "__FILE__", line: %d, " \				"malloc %d bytes fail, " \				"errno: %d, error info: %s", \				__LINE__, total_len, \				errno, strerror(errno));			return errno != 0 ? errno : ENOMEM;		}	}	pHeader = (TrackerHeader *)pBuff;	pStatBuff = (TrackerStatReportReqBody*) \			(pBuff + sizeof(TrackerHeader));	long2buff(body_len, pHeader->pkg_len);	pHeader->cmd = TRACKER_PROTO_CMD_STORAGE_REPORT;	pHeader->status = 0;	for (i=0; i<g_path_count; i++)	{		if (statfs(g_store_paths[i], &sbuf) != 0)		{			logError("file: "__FILE__", line: %d, " \				"call statfs fail, errno: %d, error info: %s.",\				__LINE__, errno, strerror(errno));			if (pBuff != out_buff)			{				free(pBuff);			}			return errno != 0 ? errno : EACCES;		}		long2buff((((int64_t)(sbuf.f_blocks) * sbuf.f_bsize) / FDFS_ONE_MB),\			pStatBuff->sz_total_mb);		long2buff((((int64_t)(sbuf.f_bavail) * sbuf.f_bsize) / FDFS_ONE_MB),\			pStatBuff->sz_free_mb);		pStatBuff++;	}	result = tcpsenddata(pTrackerServer->sock, pBuff, \			total_len, g_network_timeout);	if (pBuff != out_buff)	{		free(pBuff);	}	if(result != 0)	{		logError("file: "__FILE__", line: %d, " \			"tracker server %s:%d, send data fail, " \			"errno: %d, error info: %s.", \			__LINE__, pTrackerServer->ip_addr, \			pTrackerServer->port, \			result, strerror(result));		return result;	}	return tracker_check_response(pTrackerServer);}static int tracker_heart_beat(TrackerServerInfo *pTrackerServer, \			int *pstat_chg_sync_count){	char out_buff[sizeof(TrackerHeader) + sizeof(FDFSStorageStatBuff)];	TrackerHeader *pHeader;	FDFSStorageStatBuff *pStatBuff;	int body_len;	int result;	pHeader = (TrackerHeader *)out_buff;	if (*pstat_chg_sync_count != g_stat_change_count)	{		pStatBuff = (FDFSStorageStatBuff *)( \				out_buff + sizeof(TrackerHeader));		long2buff(g_storage_stat.total_upload_count, \			pStatBuff->sz_total_upload_count);		long2buff(g_storage_stat.success_upload_count, \			pStatBuff->sz_success_upload_count);		long2buff(g_storage_stat.total_download_count, \			pStatBuff->sz_total_download_count);		long2buff(g_storage_stat.success_download_count, \			pStatBuff->sz_success_download_count);		long2buff(g_storage_stat.total_set_meta_count, \			pStatBuff->sz_total_set_meta_count);		long2buff(g_storage_stat.success_set_meta_count, \			pStatBuff->sz_success_set_meta_count);		long2buff(g_storage_stat.total_delete_count, \			pStatBuff->sz_total_delete_count);		long2buff(g_storage_stat.success_delete_count, \			pStatBuff->sz_success_delete_count);		long2buff(g_storage_stat.total_get_meta_count, \			pStatBuff->sz_total_get_meta_count);		long2buff(g_storage_stat.success_get_meta_count, \		 	pStatBuff->sz_success_get_meta_count);		long2buff(g_storage_stat.last_source_update, \			pStatBuff->sz_last_source_update);		long2buff(g_storage_stat.last_sync_update, \			pStatBuff->sz_last_sync_update);		*pstat_chg_sync_count = g_stat_change_count;		body_len = sizeof(FDFSStorageStatBuff);	}	else	{		body_len = 0;	}	long2buff(body_len, pHeader->pkg_len);	pHeader->cmd = TRACKER_PROTO_CMD_STORAGE_BEAT;	pHeader->status = 0;	if((result=tcpsenddata(pTrackerServer->sock, out_buff, \		sizeof(TrackerHeader) + body_len, g_network_timeout)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"tracker server %s:%d, send data fail, " \			"errno: %d, error info: %s.", \			__LINE__, pTrackerServer->ip_addr, \			pTrackerServer->port, \			result, strerror(result));		return result;	}	return tracker_check_response(pTrackerServer);}int tracker_report_thread_start(){	TrackerServerInfo *pTrackerServer;	TrackerServerInfo *pServerEnd;	pthread_attr_t pattr;	pthread_t tid;	int result;	if ((result=init_pthread_attr(&pattr)) != 0)	{		return result;	}	report_tids = (pthread_t *)malloc(sizeof(pthread_t) * \					g_tracker_server_count);	if (report_tids == NULL)	{		logError("file: "__FILE__", line: %d, " \			"malloc %d bytes fail, " \			"errno: %d, error info: %s", \			__LINE__, sizeof(pthread_t) * \			g_tracker_server_count, \			errno, strerror(errno));		return errno != 0 ? errno : ENOMEM;	}	memset(report_tids, 0, sizeof(pthread_t) * g_tracker_server_count);	g_tracker_reporter_count = 0;	pServerEnd = g_tracker_servers + g_tracker_server_count;	for (pTrackerServer=g_tracker_servers; pTrackerServer<pServerEnd; \		pTrackerServer++)	{		if((result=pthread_create(&tid, &pattr, \			tracker_report_thread_entrance, pTrackerServer)) != 0)		{			logError("file: "__FILE__", line: %d, " \				"create thread failed, errno: %d, " \				"error info: %s.", \				__LINE__, result, strerror(result));			return result;		}		if ((result=pthread_mutex_lock(&reporter_thread_lock)) != 0)		{			logError("file: "__FILE__", line: %d, " \				"call pthread_mutex_lock fail, " \				"errno: %d, error info: %s", \				__LINE__, result, strerror(result));		}		report_tids[g_tracker_reporter_count] = tid;		g_tracker_reporter_count++;		if ((result=pthread_mutex_unlock(&reporter_thread_lock)) != 0)		{			logError("file: "__FILE__", line: %d, " \				"call pthread_mutex_unlock fail, " \				"errno: %d, error info: %s", \				__LINE__, result, strerror(result));		}	}	pthread_attr_destroy(&pattr);	return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -