📄 tracker_service.c
字号:
/*** Copyright (C) 2008 Happy Fish / YuQing** FastDFS may be copied only under the terms of the GNU General* Public License V3, which may be found in the FastDFS source kit.* Please visit the FastDFS Home Page http://www.csource.org/ for more detail.**///tracker_service.c#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <stdio.h>#include <stdlib.h>#include <string.h>#include <errno.h>#include <time.h>#include "fdfs_define.h"#include "fdfs_base64.h"#include "logger.h"#include "fdfs_global.h"#include "sockopt.h"#include "shared_func.h"#include "tracker_types.h"#include "tracker_global.h"#include "tracker_mem.h"#include "tracker_proto.h"#include "tracker_service.h"pthread_mutex_t g_tracker_thread_lock;int g_tracker_thread_count = 0;static FDFSStorageDetail *tracker_get_writable_storage( \ FDFSGroupInfo *pStoreGroup);static int tracker_check_and_sync(TrackerClientInfo *pClientInfo, \ const int status){ TrackerHeader resp; FDFSStorageDetail **ppServer; FDFSStorageDetail **ppEnd; FDFSStorageBrief briefServers[FDFS_MAX_SERVERS_EACH_GROUP]; FDFSStorageBrief *pDestServer; int out_len; int result; memset(&resp, 0, sizeof(resp)); resp.cmd = TRACKER_PROTO_CMD_STORAGE_RESP; resp.status = status; if (status != 0 || pClientInfo->pGroup == NULL || pClientInfo->pGroup->version == pClientInfo->pStorage->version) { if ((result=tcpsenddata(pClientInfo->sock, \ &resp, sizeof(resp), g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, send data fail, " \ "errno: %d, error info: %s", \ __LINE__, pClientInfo->ip_addr, \ result, strerror(result)); return result; } return status; } //printf("sync %d servers\n", pClientInfo->pGroup->count); pDestServer = briefServers; ppEnd = pClientInfo->pGroup->sorted_servers + \ pClientInfo->pGroup->count; for (ppServer=pClientInfo->pGroup->sorted_servers; \ ppServer<ppEnd; ppServer++) { pDestServer->status = (*ppServer)->status; memcpy(pDestServer->ip_addr, (*ppServer)->ip_addr, \ IP_ADDRESS_SIZE); pDestServer++; } out_len = sizeof(FDFSStorageBrief) * pClientInfo->pGroup->count; long2buff(out_len, resp.pkg_len); if ((result=tcpsenddata(pClientInfo->sock, \ &resp, sizeof(resp), g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, send data fail, " \ "errno: %d, error info: %s", \ __LINE__, pClientInfo->ip_addr, \ result, strerror(result)); return result; } if ((result=tcpsenddata(pClientInfo->sock, \ briefServers, out_len, g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, send data fail, " \ "errno: %d, error info: %s", \ __LINE__, pClientInfo->ip_addr, \ result, strerror(result)); return result; } pClientInfo->pStorage->version = pClientInfo->pGroup->version; return status;}static void tracker_check_dirty(TrackerClientInfo *pClientInfo){ bool bInserted; if (pClientInfo->pGroup != NULL && pClientInfo->pGroup->dirty) { tracker_mem_pthread_lock(); if (--(*pClientInfo->pGroup->ref_count) == 0) { free(pClientInfo->pGroup->ref_count); free(pClientInfo->pAllocedGroups); } tracker_mem_pthread_unlock(); tracker_mem_add_group(pClientInfo, true, &bInserted); } if (pClientInfo->pStorage != NULL && pClientInfo->pStorage->dirty) { tracker_mem_pthread_lock(); if (--(*pClientInfo->pStorage->ref_count) == 0) { free(pClientInfo->pStorage->ref_count); free(pClientInfo->pAllocedStorages); } tracker_mem_pthread_unlock(); tracker_mem_add_storage(pClientInfo, true, &bInserted); }}static int tracker_deal_storage_replica_chg(TrackerClientInfo *pClientInfo, \ const int64_t nInPackLen){ TrackerHeader resp; int server_count; FDFSStorageBrief briefServers[FDFS_MAX_SERVERS_EACH_GROUP]; int result; memset(&resp, 0, sizeof(resp)); while (1) { if ((nInPackLen <= 0) || \ (nInPackLen % sizeof(FDFSStorageBrief) != 0)) { logError("file: "__FILE__", line: %d, " \ "cmd=%d, client ip addr: %s, " \ "package size "INT64_PRINTF_FORMAT" " \ "is not correct", \ __LINE__, \ TRACKER_PROTO_CMD_STORAGE_REPLICA_CHG, \ pClientInfo->ip_addr, nInPackLen); resp.status = EINVAL; break; } server_count = nInPackLen / sizeof(FDFSStorageBrief); if (server_count > FDFS_MAX_SERVERS_EACH_GROUP) { logError("file: "__FILE__", line: %d, " \ "client ip addr: %s, return storage count: %d" \ " exceed max: %d", __LINE__, \ pClientInfo->ip_addr, server_count, \ FDFS_MAX_SERVERS_EACH_GROUP); resp.status = EINVAL; break; } if ((resp.status=tcprecvdata(pClientInfo->sock, briefServers, \ nInPackLen, g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip addr: %s, recv data fail, " \ "errno: %d, error info: %s.", \ __LINE__, pClientInfo->ip_addr, \ resp.status, strerror(resp.status)); break; } resp.status = tracker_mem_sync_storages(pClientInfo, \ briefServers, server_count); break; } resp.cmd = TRACKER_PROTO_CMD_STORAGE_RESP; if ((result=tcpsenddata(pClientInfo->sock, \ &resp, sizeof(resp), g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, send data fail, " \ "errno: %d, error info: %s", \ __LINE__, pClientInfo->ip_addr, \ result, strerror(result)); return result; } return resp.status;}static int tracker_deal_storage_join(TrackerClientInfo *pClientInfo, \ const int64_t nInPackLen){ TrackerStorageJoinBody body; int store_path_count; int subdir_count_per_path; int status; while (1) { if (nInPackLen != sizeof(body)) { logError("file: "__FILE__", line: %d, " \ "cmd: %d, client ip: %s, package size "INT64_PRINTF_FORMAT" " \ "is not correct, expect length: %d.", \ __LINE__, TRACKER_PROTO_CMD_STORAGE_JOIN, \ pClientInfo->ip_addr, nInPackLen, sizeof(body)); status = EINVAL; break; } if ((status=tcprecvdata(pClientInfo->sock, &body, \ nInPackLen, g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, recv data fail, " \ "errno: %d, error info: %s", \ __LINE__, pClientInfo->ip_addr, \ status, strerror(status)); break; } memcpy(pClientInfo->group_name, body.group_name, FDFS_GROUP_NAME_MAX_LEN); pClientInfo->group_name[FDFS_GROUP_NAME_MAX_LEN] = '\0'; if ((status=fdfs_validate_group_name(pClientInfo->group_name)) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, invalid group_name: %s", \ __LINE__, pClientInfo->ip_addr, \ pClientInfo->group_name); break; } pClientInfo->storage_port = (int)buff2long(body.storage_port); if (pClientInfo->storage_port <= 0) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, invalid port: %d", \ __LINE__, pClientInfo->ip_addr, \ pClientInfo->storage_port); status = EINVAL; break; } store_path_count = (int)buff2long(body.store_path_count); if (store_path_count <= 0 || store_path_count > 256) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, invalid store_path_count: %d", \ __LINE__, pClientInfo->ip_addr, store_path_count); status = EINVAL; break; } subdir_count_per_path = (int)buff2long(body.subdir_count_per_path); if (subdir_count_per_path <= 0 || subdir_count_per_path > 256) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, invalid subdir_count_per_path: %d", \ __LINE__, pClientInfo->ip_addr, subdir_count_per_path); status = EINVAL; break; } status = tracker_mem_add_group_and_storage(pClientInfo, \ store_path_count, subdir_count_per_path, true); break; } return tracker_check_and_sync(pClientInfo, status);}static int tracker_deal_server_delete_storage(TrackerClientInfo *pClientInfo, \ const int64_t nInPackLen){ TrackerHeader resp; char in_buff[FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE]; char group_name[FDFS_GROUP_NAME_MAX_LEN + 1]; char *pIpAddr; FDFSGroupInfo *pGroup; int result; memset(&resp, 0, sizeof(resp)); while (1) { if (nInPackLen <= FDFS_GROUP_NAME_MAX_LEN) { logError("file: "__FILE__", line: %d, " \ "cmd=%d, client ip: %s, package size "INT64_PRINTF_FORMAT" " \ "is not correct, " \ "expect length > %d", \ __LINE__, \ TRACKER_PROTO_CMD_SERVER_DELETE_STORAGE, \ pClientInfo->ip_addr, \ nInPackLen, FDFS_GROUP_NAME_MAX_LEN); resp.status = EINVAL; break; } if (nInPackLen >= sizeof(in_buff)) { logError("file: "__FILE__", line: %d, " \ "cmd=%d, client ip: %s, package size "INT64_PRINTF_FORMAT" " \ "is too large, " \ "expect length should < %d", \ __LINE__, \ TRACKER_PROTO_CMD_SERVER_DELETE_STORAGE, \ pClientInfo->ip_addr, nInPackLen, \ sizeof(in_buff)); resp.status = EINVAL; break; } if ((resp.status=tcprecvdata(pClientInfo->sock, in_buff, \ nInPackLen, g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, recv data fail, " \ "errno: %d, error info: %s", \ __LINE__, pClientInfo->ip_addr, \ resp.status, strerror(resp.status)); break; } in_buff[nInPackLen] = '\0'; memcpy(group_name, in_buff, FDFS_GROUP_NAME_MAX_LEN); group_name[FDFS_GROUP_NAME_MAX_LEN] = '\0'; pIpAddr = in_buff + FDFS_GROUP_NAME_MAX_LEN; pGroup = tracker_mem_get_group(group_name); if (pGroup == NULL) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, invalid group_name: %s", \ __LINE__, pClientInfo->ip_addr, \ pClientInfo->group_name); resp.status = ENOENT; break; } resp.status = tracker_mem_delete_storage(pGroup, pIpAddr); break; } resp.cmd = TRACKER_PROTO_CMD_SERVER_RESP; if ((result=tcpsenddata(pClientInfo->sock, \ &resp, sizeof(resp), g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, send data fail, " \ "errno: %d, error info: %s", \ __LINE__, pClientInfo->ip_addr, \ result, strerror(result)); return result; } return resp.status;}static int tracker_deal_storage_sync_notify(TrackerClientInfo *pClientInfo, \ const int64_t nInPackLen){ TrackerStorageSyncReqBody body; int status; char sync_src_ip_addr[IP_ADDRESS_SIZE]; bool bSaveStorages; while (1) { if (nInPackLen != sizeof(body)) { logError("file: "__FILE__", line: %d, " \ "cmd: %d, client ip: %s, package size "INT64_PRINTF_FORMAT" " \ "is not correct, expect length: %d.", \ __LINE__, TRACKER_PROTO_CMD_STORAGE_SYNC_NOTIFY, \ pClientInfo->ip_addr, nInPackLen, sizeof(body)); status = EINVAL; break; } if ((status=tcprecvdata(pClientInfo->sock, &body, \ nInPackLen, g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, recv data fail, " \ "errno: %d, error info: %s", \ __LINE__, pClientInfo->ip_addr, \ status, strerror(status)); break; } if (*(body.src_ip_addr) == '\0') { if (pClientInfo->pStorage->status == FDFS_STORAGE_STATUS_INIT || \ pClientInfo->pStorage->status == FDFS_STORAGE_STATUS_WAIT_SYNC || \ pClientInfo->pStorage->status == FDFS_STORAGE_STATUS_SYNCING) { pClientInfo->pStorage->status = FDFS_STORAGE_STATUS_ONLINE; pClientInfo->pGroup->version++; tracker_save_storages(); } status = 0; break; } bSaveStorages = false; if (pClientInfo->pStorage->status == FDFS_STORAGE_STATUS_INIT) { pClientInfo->pStorage->status = FDFS_STORAGE_STATUS_WAIT_SYNC; pClientInfo->pGroup->version++; bSaveStorages = true; } if (pClientInfo->pStorage->psync_src_server == NULL) { memcpy(sync_src_ip_addr, body.src_ip_addr, IP_ADDRESS_SIZE); sync_src_ip_addr[IP_ADDRESS_SIZE-1] = '\0'; pClientInfo->pStorage->psync_src_server = \ tracker_mem_get_storage(pClientInfo->pGroup, \ sync_src_ip_addr); if (pClientInfo->pStorage->psync_src_server == NULL) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, " \ "sync src server: %s not exists", \ __LINE__, pClientInfo->ip_addr, \ sync_src_ip_addr); status = ENOENT; break; } pClientInfo->pStorage->sync_until_timestamp = \ (int)buff2long(body.until_timestamp); bSaveStorages = true; } if (bSaveStorages) { tracker_save_storages(); } status = 0; break; } return tracker_check_and_sync(pClientInfo, status);}static int tracker_check_logined(TrackerClientInfo *pClientInfo){ TrackerHeader resp; int result; if (pClientInfo->pGroup != NULL && pClientInfo->pStorage != NULL) { return 0; } memset(&resp, 0, sizeof(resp)); resp.cmd = TRACKER_PROTO_CMD_STORAGE_RESP; resp.status = EACCES; if ((result=tcpsenddata(pClientInfo->sock, &resp, sizeof(resp), \ g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, send data fail, " \ "errno: %d, error info: %s.", \ __LINE__, pClientInfo->ip_addr, \ result, strerror(result)); return result; } return resp.status;}/**pkg format:HeaderFDFS_GROUP_NAME_MAX_LEN bytes: group_name**/static int tracker_deal_server_list_group_storages( \ TrackerClientInfo *pClientInfo, const int64_t nInPackLen){ TrackerHeader resp; char group_name[FDFS_GROUP_NAME_MAX_LEN + 1]; FDFSGroupInfo *pGroup; FDFSStorageDetail **ppServer; FDFSStorageDetail **ppEnd; FDFSStorageStat *pStorageStat; TrackerStorageStat stats[FDFS_MAX_SERVERS_EACH_GROUP]; TrackerStorageStat *pDest; FDFSStorageStatBuff *pStatBuff; int out_len; int result; memset(&resp, 0, sizeof(resp)); pDest = stats; while (1) { if (nInPackLen != FDFS_GROUP_NAME_MAX_LEN) { logError("file: "__FILE__", line: %d, " \ "cmd=%d, client ip: %s, package size "INT64_PRINTF_FORMAT" " \ "is not correct, " \ "expect length: %d", \ __LINE__, \ TRACKER_PROTO_CMD_SERVER_LIST_STORAGE, \ pClientInfo->ip_addr, \ nInPackLen, FDFS_GROUP_NAME_MAX_LEN); resp.status = EINVAL; break; } if ((resp.status=tcprecvdata(pClientInfo->sock, group_name, \ nInPackLen, g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, recv data fail, " \ "errno: %d, error info: %s", \ __LINE__, pClientInfo->ip_addr, \ resp.status, strerror(resp.status)); break; } group_name[FDFS_GROUP_NAME_MAX_LEN] = '\0'; pGroup = tracker_mem_get_group(group_name); if (pGroup == NULL) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, invalid group_name: %s", \ __LINE__, pClientInfo->ip_addr, \ pClientInfo->group_name); resp.status = EINVAL; break; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -