📄 tracker_service.c
字号:
pStoreGroup->active_count); */ resp.status = 0; break; } resp.cmd = TRACKER_PROTO_CMD_SERVICE_RESP; if (resp.status == 0) { out_len = TRACKER_QUERY_STORAGE_STORE_BODY_LEN; long2buff(out_len, resp.pkg_len); memcpy(out_buff, &resp, sizeof(resp)); memcpy(out_buff + sizeof(resp), pStoreGroup->group_name, \ FDFS_GROUP_NAME_MAX_LEN); memcpy(out_buff + sizeof(resp) + FDFS_GROUP_NAME_MAX_LEN, \ pStorageServer->ip_addr, IP_ADDRESS_SIZE-1); long2buff(pStoreGroup->storage_port, out_buff + sizeof(resp) + \ FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE-1); *(out_buff + sizeof(resp) + FDFS_GROUP_NAME_MAX_LEN + \ IP_ADDRESS_SIZE - 1 + FDFS_PROTO_PKG_LEN_SIZE) = \ (char)pStorageServer->current_write_path; } else { out_len = 0; long2buff(out_len, resp.pkg_len); memcpy(out_buff, &resp, sizeof(resp)); } if ((result=tcpsenddata(pClientInfo->sock, \ out_buff, sizeof(resp) + out_len, g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, send data fail, " \ "errno: %d, error info: %s", \ __LINE__, pClientInfo->ip_addr, \ result, strerror(result)); return result; } return resp.status;}static int tracker_deal_server_list_groups(TrackerClientInfo *pClientInfo, \ const int64_t nInPackLen){ TrackerHeader resp; FDFSGroupInfo **ppGroup; FDFSGroupInfo **ppEnd; TrackerGroupStat groupStats[FDFS_MAX_GROUPS]; TrackerGroupStat *pDest; int out_len; int result; memset(&resp, 0, sizeof(resp)); pDest = groupStats; while (1) { if (nInPackLen != 0) { logError("file: "__FILE__", line: %d, " \ "cmd=%d, client ip: %s, package size " \ INT64_PRINTF_FORMAT" is not correct, " \ "expect length: 0", __LINE__, \ TRACKER_PROTO_CMD_SERVER_LIST_GROUP, \ pClientInfo->ip_addr, nInPackLen); resp.status = EINVAL; break; } ppEnd = g_groups.sorted_groups + g_groups.count; for (ppGroup=g_groups.sorted_groups; ppGroup<ppEnd; ppGroup++) { memcpy(pDest->group_name, (*ppGroup)->group_name, \ FDFS_GROUP_NAME_MAX_LEN + 1); long2buff((*ppGroup)->free_mb, pDest->sz_free_mb); long2buff((*ppGroup)->count, pDest->sz_count); long2buff((*ppGroup)->storage_port, \ pDest->sz_storage_port); long2buff((*ppGroup)->active_count, \ pDest->sz_active_count); long2buff((*ppGroup)->current_write_server, \ pDest->sz_current_write_server); long2buff((*ppGroup)->store_path_count, \ pDest->sz_store_path_count); long2buff((*ppGroup)->subdir_count_per_path, \ pDest->sz_subdir_count_per_path); pDest++; } resp.status = 0; break; } out_len = (pDest - groupStats) * sizeof(TrackerGroupStat); long2buff(out_len, resp.pkg_len); resp.cmd = TRACKER_PROTO_CMD_SERVER_RESP; if ((result=tcpsenddata(pClientInfo->sock, \ &resp, sizeof(resp), g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, send data fail, " \ "errno: %d, error info: %s", \ __LINE__, pClientInfo->ip_addr, \ result, strerror(result)); return result; } if (out_len == 0) { return resp.status; } if ((result=tcpsenddata(pClientInfo->sock, \ groupStats, out_len, g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, send data fail, " \ "errno: %d, error info: %s", \ __LINE__, pClientInfo->ip_addr, \ result, strerror(result)); return result; } return resp.status;}static int tracker_deal_storage_sync_src_req(TrackerClientInfo *pClientInfo, \ const int64_t nInPackLen){ char out_buff[sizeof(TrackerHeader)+sizeof(TrackerStorageSyncReqBody)]; char dest_ip_addr[IP_ADDRESS_SIZE]; TrackerHeader *pResp; TrackerStorageSyncReqBody *pBody; FDFSStorageDetail *pDestStorage; int out_len; int result; memset(out_buff, 0, sizeof(out_buff)); pResp = (TrackerHeader *)out_buff; pBody = (TrackerStorageSyncReqBody *)(out_buff + sizeof(TrackerHeader)); out_len = sizeof(TrackerHeader); while (1) { if (nInPackLen != IP_ADDRESS_SIZE) { logError("file: "__FILE__", line: %d, " \ "cmd=%d, client ip: %s, package size "INT64_PRINTF_FORMAT" " \ "is not correct, " \ "expect length: %d", \ __LINE__, \ TRACKER_PROTO_CMD_STORAGE_SYNC_SRC_REQ, \ pClientInfo->ip_addr, nInPackLen, \ IP_ADDRESS_SIZE); pResp->status = EINVAL; break; } if ((pResp->status=tcprecvdata(pClientInfo->sock, \ dest_ip_addr, nInPackLen, g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "cmd=%d, client ip addr: %s, recv data fail, " \ "errno: %d, error info: %s.", \ __LINE__, \ TRACKER_PROTO_CMD_STORAGE_SYNC_SRC_REQ, \ pClientInfo->ip_addr, \ pResp->status, strerror(pResp->status)); break; } dest_ip_addr[IP_ADDRESS_SIZE-1] = '\0'; pDestStorage = tracker_mem_get_storage(pClientInfo->pGroup, \ dest_ip_addr); if (pDestStorage == NULL) { pResp->status = ENOENT; break; } if (pDestStorage->status == FDFS_STORAGE_STATUS_INIT) { pResp->status = ENOENT; break; } if (pDestStorage->psync_src_server != NULL) { strcpy(pBody->src_ip_addr, \ pDestStorage->psync_src_server->ip_addr); long2buff(pDestStorage->sync_until_timestamp, \ pBody->until_timestamp); out_len += sizeof(TrackerStorageSyncReqBody); } pResp->status = 0; break; } //printf("deal sync request, status=%d\n", pResp->status); long2buff(out_len - (int)sizeof(TrackerHeader), pResp->pkg_len); pResp->cmd = TRACKER_PROTO_CMD_SERVER_RESP; if ((result=tcpsenddata(pClientInfo->sock, \ out_buff, out_len, g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, send data fail, " \ "errno: %d, error info: %s", \ __LINE__, pClientInfo->ip_addr, \ result, strerror(result)); return result; } return 0;}static int tracker_deal_storage_sync_dest_req(TrackerClientInfo *pClientInfo, \ const int64_t nInPackLen){ char out_buff[sizeof(TrackerHeader)+sizeof(TrackerStorageSyncReqBody)]; TrackerHeader *pResp; TrackerStorageSyncReqBody *pBody; int out_len; int sync_until_timestamp; FDFSStorageDetail *pSrcStorage; int result; sync_until_timestamp = 0; memset(out_buff, 0, sizeof(out_buff)); pResp = (TrackerHeader *)out_buff; pBody = (TrackerStorageSyncReqBody *)(out_buff + sizeof(TrackerHeader)); out_len = sizeof(TrackerHeader); pSrcStorage = NULL; while (1) { if (nInPackLen != 0) { logError("file: "__FILE__", line: %d, " \ "cmd=%d, client ip: %s, package size " \ INT64_PRINTF_FORMAT" is not correct, " \ "expect length: 0", \ __LINE__, \ TRACKER_PROTO_CMD_STORAGE_SYNC_DEST_REQ, \ pClientInfo->ip_addr, nInPackLen); pResp->status = EINVAL; break; } if (pClientInfo->pGroup->count <= 1 || \ tracker_get_group_success_upload_count( \ pClientInfo->pGroup) <= 0) { pResp->status = 0; break; } pSrcStorage = tracker_get_group_sync_src_server( \ pClientInfo->pGroup, pClientInfo->pStorage); if (pSrcStorage == NULL) { pResp->status = ENOENT; break; } sync_until_timestamp = (int)time(NULL); strcpy(pBody->src_ip_addr, pSrcStorage->ip_addr); long2buff(sync_until_timestamp, pBody->until_timestamp); out_len += sizeof(TrackerStorageSyncReqBody); pResp->status = 0; break; } //printf("deal sync request, status=%d\n", pResp->status); long2buff(out_len - (int)sizeof(TrackerHeader), pResp->pkg_len); pResp->cmd = TRACKER_PROTO_CMD_SERVER_RESP; if ((result=tcpsenddata(pClientInfo->sock, \ out_buff, out_len, g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, send data fail, " \ "errno: %d, error info: %s", \ __LINE__, pClientInfo->ip_addr, \ result, strerror(result)); return result; } if (pSrcStorage == NULL) { if (pResp->status == 0) { pClientInfo->pStorage->status = \ FDFS_STORAGE_STATUS_ONLINE; pClientInfo->pGroup->version++; tracker_save_storages(); } return pResp->status; } if ((result=tcprecvdata(pClientInfo->sock, pResp, \ sizeof(TrackerHeader), g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "cmd=%d, client ip addr: %s, recv data fail, " \ "errno: %d, error info: %s.", \ __LINE__, \ TRACKER_PROTO_CMD_STORAGE_SYNC_DEST_REQ, \ pClientInfo->ip_addr, \ result, strerror(result)); return result; } if (pResp->cmd != TRACKER_PROTO_CMD_STORAGE_RESP) { logError("file: "__FILE__", line: %d, " \ "client ip addr: %s, " \ "recv storage confirm fail, resp cmd: %d, " \ "expect cmd: %d", \ __LINE__, pClientInfo->ip_addr, \ pResp->cmd, TRACKER_PROTO_CMD_STORAGE_RESP); return EINVAL; } if (pResp->status != 0) { logError("file: "__FILE__", line: %d, " \ "client ip addr: %s, " \ "recv storage confirm fail, resp status: %d, " \ "expect status: 0", \ __LINE__, pClientInfo->ip_addr, pResp->status); return pResp->status; } pClientInfo->pStorage->psync_src_server = pSrcStorage; pClientInfo->pStorage->sync_until_timestamp = sync_until_timestamp; pClientInfo->pStorage->status = FDFS_STORAGE_STATUS_WAIT_SYNC; pClientInfo->pGroup->version++; tracker_save_storages(); return 0;}static void tracker_find_max_free_space_group(){ FDFSGroupInfo **ppGroup; FDFSGroupInfo **ppGroupEnd; FDFSGroupInfo **ppMaxGroup; ppMaxGroup = NULL; ppGroupEnd = g_groups.sorted_groups + g_groups.count; for (ppGroup=g_groups.sorted_groups; \ ppGroup<ppGroupEnd; ppGroup++) { if ((*ppGroup)->active_count > 0) { if (ppMaxGroup == NULL) { ppMaxGroup = ppGroup; } else if ((*ppGroup)->free_mb > (*ppMaxGroup)->free_mb) { ppMaxGroup = ppGroup; } } } if (ppMaxGroup == NULL) { return; } g_groups.current_write_group = ppMaxGroup - g_groups.sorted_groups;}static void tracker_find_min_free_space(FDFSGroupInfo *pGroup){ FDFSStorageDetail **ppServerEnd; FDFSStorageDetail **ppServer; if (pGroup->active_count == 0) { return; } pGroup->free_mb = (*(pGroup->active_servers))->free_mb; ppServerEnd = pGroup->active_servers + pGroup->active_count; for (ppServer=pGroup->active_servers+1; \ ppServer<ppServerEnd; ppServer++) { if ((*ppServer)->free_mb < pGroup->free_mb) { pGroup->free_mb = (*ppServer)->free_mb; } }}static int tracker_deal_storage_sync_report(TrackerClientInfo *pClientInfo, \ const int64_t nInPackLen){ int status; char in_buff[512]; char *p; char *pEnd; char *src_ip_addr; int sync_timestamp; int src_index; int dest_index; FDFSStorageDetail *pSrcStorage; while (1) { if (nInPackLen <= 0 || nInPackLen > sizeof(in_buff) \ || nInPackLen % (IP_ADDRESS_SIZE + 4) != 0) { logError("file: "__FILE__", line: %d, " \ "cmd=%d, client ip: %s, package size " \ INT64_PRINTF_FORMAT" is not correct", \ __LINE__, \ TRACKER_PROTO_CMD_STORAGE_SYNC_REPORT, \ pClientInfo->ip_addr, nInPackLen); status = EINVAL; break; } if ((status=tcprecvdata(pClientInfo->sock, in_buff, \ nInPackLen, g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "cmd=%d, client ip addr: %s, recv data fail, " \ "errno: %d, error info: %s.", \ __LINE__, \ TRACKER_PROTO_CMD_STORAGE_SYNC_REPORT, \ pClientInfo->ip_addr, \ status, strerror(status)); break; } tracker_check_dirty(pClientInfo); dest_index = pClientInfo->pStorage - \ pClientInfo->pGroup->all_servers; if (g_groups.store_server == FDFS_STORE_SERVER_FIRST) { int max_synced_timestamp; max_synced_timestamp = pClientInfo->pStorage->stat.\ last_synced_timestamp; pEnd = in_buff + nInPackLen; for (p=in_buff; p<pEnd; p += (IP_ADDRESS_SIZE + 4)) { sync_timestamp = buff2int(p + IP_ADDRESS_SIZE); if (sync_timestamp <= 0) { continue; } src_ip_addr = p; *(src_ip_addr + (IP_ADDRESS_SIZE - 1)) = '\0'; pSrcStorage = tracker_mem_get_storage( \ pClientInfo->pGroup, src_ip_addr); if (pSrcStorage == NULL) { continue; } if (pSrcStorage->status != FDFS_STORAGE_STATUS_ACTIVE) { continue; } src_index = pSrcStorage - pClientInfo->pGroup->all_servers; if (src_index == dest_index) { continue; } pClientInfo->pGroup->last_sync_timestamps \ [src_index][dest_index] = sync_timestamp; if (sync_timestamp > max_synced_timestamp) { max_synced_timestamp = sync_timestamp; } } pClientInfo->pStorage->stat.last_synced_timestamp = \ max_synced_timestamp; } else //round robin { int min_synced_timestamp; min_synced_timestamp = 0; pEnd = in_buff + nInPackLen; for (p=in_buff; p<pEnd; p += (IP_ADDRESS_SIZE + 4)) { sync_timestamp = buff2int(p + IP_ADDRESS_SIZE); if (sync_timestamp <= 0) { continue; } src_ip_addr = p; *(src_ip_addr + (IP_ADDRESS_SIZE - 1)) = '\0'; pSrcStorage = tracker_mem_get_storage( \ pClientInfo->pGroup, src_ip_addr); if (pSrcStorage == NULL) { continue; } if (pSrcStorage->status != FDFS_STORAGE_STATUS_ACTIVE) { continue; } src_index = pSrcStorage - pClientInfo->pGroup->all_servers; if (src_index == dest_index) { continue; } pClientInfo->pGroup->last_sync_timestamps \ [src_index][dest_index] = sync_timestamp; if (min_synced_timestamp == 0) { min_synced_timestamp = sync_timestamp; } else if (sync_timestamp < min_synced_timestamp) { min_synced_timestamp = sync_timestamp; } } if (min_synced_timestamp > 0) { pClientInfo->pStorage->stat.last_synced_timestamp = \ min_synced_timestamp; } } if (++g_storage_sync_time_chg_count % \ TRACKER_SYNC_TO_FILE_FREQ == 0) { status = tracker_save_sync_timestamps(); } else
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -