📄 tracker_service.c
字号:
memset(stats, 0, sizeof(stats)); ppEnd = pGroup->sorted_servers + pGroup->count; for (ppServer=pGroup->sorted_servers; ppServer<ppEnd; \ ppServer++) { pStatBuff = &(pDest->stat_buff); pStorageStat = &((*ppServer)->stat); pDest->status = (*ppServer)->status; memcpy(pDest->ip_addr, (*ppServer)->ip_addr, \ IP_ADDRESS_SIZE); long2buff((*ppServer)->total_mb, pDest->sz_total_mb); long2buff((*ppServer)->free_mb, pDest->sz_free_mb); long2buff(pStorageStat->total_upload_count, \ pStatBuff->sz_total_upload_count); long2buff(pStorageStat->success_upload_count, \ pStatBuff->sz_success_upload_count); long2buff(pStorageStat->total_set_meta_count, \ pStatBuff->sz_total_set_meta_count); long2buff(pStorageStat->success_set_meta_count, \ pStatBuff->sz_success_set_meta_count); long2buff(pStorageStat->total_delete_count, \ pStatBuff->sz_total_delete_count); long2buff(pStorageStat->success_delete_count, \ pStatBuff->sz_success_delete_count); long2buff(pStorageStat->total_download_count, \ pStatBuff->sz_total_download_count); long2buff(pStorageStat->success_download_count, \ pStatBuff->sz_success_download_count); long2buff(pStorageStat->total_get_meta_count, \ pStatBuff->sz_total_get_meta_count); long2buff(pStorageStat->success_get_meta_count, \ pStatBuff->sz_success_get_meta_count); long2buff(pStorageStat->last_source_update, \ pStatBuff->sz_last_source_update); long2buff(pStorageStat->last_sync_update, \ pStatBuff->sz_last_sync_update); long2buff(pStorageStat->last_synced_timestamp, \ pStatBuff->sz_last_synced_timestamp); pDest++; } resp.status = 0; break; } out_len = (pDest - stats) * sizeof(TrackerStorageStat); long2buff(out_len, resp.pkg_len); resp.cmd = TRACKER_PROTO_CMD_SERVER_RESP; if ((result=tcpsenddata(pClientInfo->sock, \ &resp, sizeof(resp), g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, send data fail, " \ "errno: %d, error info: %s", \ __LINE__, pClientInfo->ip_addr, \ result, strerror(result)); return result; } if (out_len == 0) { return resp.status; } if ((result=tcpsenddata(pClientInfo->sock, \ stats, out_len, g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, send data fail, " \ "errno: %d, error info: %s", \ __LINE__, pClientInfo->ip_addr, \ result, strerror(result)); return result; } return resp.status;}/**pkg format:HeaderFDFS_GROUP_NAME_MAX_LEN bytes: group_nameremain bytes: filename**/static int tracker_deal_service_query_fetch_update( \ TrackerClientInfo *pClientInfo, \ const byte cmd, const int64_t nInPackLen){ TrackerHeader resp; char in_buff[FDFS_GROUP_NAME_MAX_LEN + 64]; char group_name[FDFS_GROUP_NAME_MAX_LEN + 1]; char name_buff[64]; char *filename; int filename_len; int file_timestamp; int base64_len; int decoded_len; int out_len; int storage_ip; struct in_addr ip_addr; FDFSGroupInfo *pGroup; FDFSStorageDetail *pStorageServer; FDFSStorageDetail *pStoreSrcServer; char out_buff[sizeof(TrackerHeader) + \ TRACKER_QUERY_STORAGE_FETCH_BODY_LEN]; char szIpAddr[IP_ADDRESS_SIZE]; int result; memset(&resp, 0, sizeof(resp)); pGroup = NULL; pStorageServer = NULL; while (1) { if (nInPackLen < FDFS_GROUP_NAME_MAX_LEN + 22) { logError("file: "__FILE__", line: %d, " \ "cmd=%d, client ip: %s, package size " \ INT64_PRINTF_FORMAT" is not correct, " \ "expect length > %d", \ __LINE__, \ TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH, \ pClientInfo->ip_addr, \ nInPackLen, FDFS_GROUP_NAME_MAX_LEN+22); resp.status = EINVAL; break; } if (nInPackLen >= sizeof(in_buff)) { logError("file: "__FILE__", line: %d, " \ "cmd=%d, client ip: %s, package size " \ INT64_PRINTF_FORMAT" is too large, " \ "expect length should < %d", \ __LINE__, \ TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH, \ pClientInfo->ip_addr, nInPackLen, \ sizeof(in_buff)); resp.status = EINVAL; break; } if ((resp.status=tcprecvdata(pClientInfo->sock, in_buff, \ nInPackLen, g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, recv data fail, " \ "errno: %d, error info: %s", \ __LINE__, pClientInfo->ip_addr, \ resp.status, strerror(resp.status)); break; } in_buff[nInPackLen] = '\0'; memcpy(group_name, in_buff, FDFS_GROUP_NAME_MAX_LEN); group_name[FDFS_GROUP_NAME_MAX_LEN] = '\0'; filename = in_buff + FDFS_GROUP_NAME_MAX_LEN; pGroup = tracker_mem_get_group(group_name); if (pGroup == NULL) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, invalid group_name: %s", \ __LINE__, pClientInfo->ip_addr, \ pClientInfo->group_name); resp.status = ENOENT; break; } if (pGroup->active_count == 0) { resp.status = ENOENT; break; } filename_len = nInPackLen - FDFS_GROUP_NAME_MAX_LEN; //file generated by version < v1.12 if (filename_len < 32 + (FDFS_FILE_EXT_NAME_MAX_LEN + 1)) { storage_ip = INADDR_NONE; file_timestamp = 0; } else //file generated by version >= v1.12 { base64_len = filename_len - FDFS_FILE_PATH_LEN - \ (FDFS_FILE_EXT_NAME_MAX_LEN + 1); base64_decode_auto(filename + FDFS_FILE_PATH_LEN, \ base64_len, name_buff, &decoded_len); storage_ip = buff2int(name_buff); file_timestamp = buff2int(name_buff+sizeof(int)); } /* //printf("storage_ip=%d, file_timestamp=%d\n", \ storage_ip, file_timestamp); */ memset(szIpAddr, 0, sizeof(szIpAddr)); resp.status = 0; if (cmd == TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH) { if (g_groups.download_server == \ FDFS_DOWNLOAD_SERVER_SOURCE_FIRST) { memset(&ip_addr, 0, sizeof(ip_addr)); ip_addr.s_addr = storage_ip; pStoreSrcServer=tracker_mem_get_active_storage(\ pGroup, inet_ntop(AF_INET, &ip_addr, \ szIpAddr, sizeof(szIpAddr))); if (pStoreSrcServer != NULL) { pStorageServer = pStoreSrcServer; break; } } //round robin pStorageServer = *(pGroup->active_servers + \ pGroup->current_read_server); /* //printf("filename=%s, pStorageServer ip=%s, " \ "file_timestamp=%d, " \ "last_synced_timestamp=%d\n", filename, \ pStorageServer->ip_addr, file_timestamp, \ (int)pStorageServer->stat.last_synced_timestamp); */ while (1) { if ((pStorageServer->stat.last_synced_timestamp > \ file_timestamp) || \ (pStorageServer->stat.last_synced_timestamp + 1 >= \ file_timestamp&&time(NULL)-file_timestamp>60)\ || (storage_ip == INADDR_NONE \ && g_groups.store_server != \ FDFS_STORE_SERVER_FIRST)) { break; } if (storage_ip == INADDR_NONE && g_groups.store_server\ == FDFS_STORE_SERVER_FIRST) { pStorageServer = *(pGroup->active_servers); break; } if (g_groups.download_server == \ FDFS_DOWNLOAD_SERVER_ROUND_ROBIN) { //avoid search again memset(&ip_addr, 0, sizeof(ip_addr)); ip_addr.s_addr = storage_ip; pStoreSrcServer = tracker_mem_get_active_storage( \ pGroup, inet_ntop(AF_INET, &ip_addr, \ szIpAddr, sizeof(szIpAddr))); if (pStoreSrcServer != NULL) { pStorageServer = pStoreSrcServer; break; } } if (g_groups.store_server == FDFS_STORE_SERVER_FIRST) { pStorageServer = *(pGroup->active_servers); break; } break; } pGroup->current_read_server++; if (pGroup->current_read_server >= \ pGroup->active_count) { pGroup->current_read_server = 0; } } else //TRACKER_PROTO_CMD_SERVICE_QUERY_UPDATE { if (storage_ip != INADDR_NONE) { memset(&ip_addr, 0, sizeof(ip_addr)); ip_addr.s_addr = storage_ip; pStoreSrcServer=tracker_mem_get_active_storage(\ pGroup, inet_ntop(AF_INET, &ip_addr, \ szIpAddr, sizeof(szIpAddr))); if (pStoreSrcServer != NULL) { pStorageServer = pStoreSrcServer; break; } } pStorageServer = tracker_get_writable_storage(pGroup); } break; } resp.cmd = TRACKER_PROTO_CMD_SERVICE_RESP; if (resp.status == 0) { out_len = TRACKER_QUERY_STORAGE_FETCH_BODY_LEN; long2buff(out_len, resp.pkg_len); memcpy(out_buff, &resp, sizeof(resp)); memcpy(out_buff + sizeof(resp), pGroup->group_name, \ FDFS_GROUP_NAME_MAX_LEN); memcpy(out_buff + sizeof(resp) + FDFS_GROUP_NAME_MAX_LEN, \ pStorageServer->ip_addr, IP_ADDRESS_SIZE-1); long2buff(pGroup->storage_port, out_buff + sizeof(resp) + \ FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE - 1); } else { out_len = 0; long2buff(out_len, resp.pkg_len); memcpy(out_buff, &resp, sizeof(resp)); } if ((result=tcpsenddata(pClientInfo->sock, \ out_buff, sizeof(resp) + out_len, g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, send data fail, " \ "errno: %d, error info: %s", \ __LINE__, pClientInfo->ip_addr, \ result, strerror(result)); return result; } return resp.status;}static FDFSStorageDetail *tracker_get_writable_storage( \ FDFSGroupInfo *pStoreGroup){ if (g_groups.store_server == FDFS_STORE_SERVER_ROUND_ROBIN) { if (pStoreGroup->current_write_server >= \ pStoreGroup->active_count) { pStoreGroup->current_write_server = 0; } return *(pStoreGroup->active_servers + \ pStoreGroup->current_write_server++); } else //use the first server { return *(pStoreGroup->active_servers); }}static int tracker_deal_service_query_storage(TrackerClientInfo *pClientInfo, \ const int64_t nInPackLen){ TrackerHeader resp; int out_len; FDFSGroupInfo *pStoreGroup; FDFSGroupInfo **ppFoundGroup; FDFSGroupInfo **ppGroup; FDFSStorageDetail *pStorageServer; char out_buff[sizeof(TrackerHeader) + \ TRACKER_QUERY_STORAGE_STORE_BODY_LEN]; bool bHaveActiveServer; int result; memset(&resp, 0, sizeof(resp)); pStoreGroup = NULL; pStorageServer = NULL; while (1) { if (nInPackLen != 0) { logError("file: "__FILE__", line: %d, " \ "cmd=%d, client ip: %s, package size "INT64_PRINTF_FORMAT" " \ "is not correct, " \ "expect length: 0", \ __LINE__, \ TRACKER_PROTO_CMD_SERVICE_QUERY_STORE, \ pClientInfo->ip_addr, \ nInPackLen); resp.status = EINVAL; break; } if (g_groups.count == 0) { resp.status = ENOENT; break; } if (g_groups.store_lookup == FDFS_STORE_LOOKUP_ROUND_ROBIN || \ g_groups.store_lookup == FDFS_STORE_LOOKUP_LOAD_BALANCE) { bHaveActiveServer = false; ppFoundGroup = g_groups.sorted_groups + \ g_groups.current_write_group; if ((*ppFoundGroup)->active_count > 0) { bHaveActiveServer = true; if ((*ppFoundGroup)->free_mb > \ g_storage_reserved_mb) { pStoreGroup = *ppFoundGroup; } } if (pStoreGroup == NULL) { FDFSGroupInfo **ppGroupEnd; ppGroupEnd = g_groups.sorted_groups + \ g_groups.count; for (ppGroup=ppFoundGroup+1; \ ppGroup<ppGroupEnd; ppGroup++) { if ((*ppGroup)->active_count == 0) { continue; } bHaveActiveServer = true; if ((*ppGroup)->free_mb > \ g_storage_reserved_mb) { pStoreGroup = *ppGroup; if (g_groups.store_lookup == \ FDFS_STORE_LOOKUP_LOAD_BALANCE) { g_groups.current_write_group = \ ppGroup-g_groups.sorted_groups; } break; } } if (pStoreGroup == NULL) { for (ppGroup=g_groups.sorted_groups; \ ppGroup<ppFoundGroup; ppGroup++) { if ((*ppGroup)->active_count == 0) { continue; } bHaveActiveServer = true; if ((*ppGroup)->free_mb > \ g_storage_reserved_mb) { pStoreGroup = *ppGroup; if (g_groups.store_lookup == \ FDFS_STORE_LOOKUP_LOAD_BALANCE) { g_groups.current_write_group = \ ppGroup-g_groups.sorted_groups; } break; } } } if (pStoreGroup == NULL) { if (bHaveActiveServer) { resp.status = ENOSPC; } else { resp.status = ENOENT; } break; } } if (g_groups.store_lookup == FDFS_STORE_LOOKUP_ROUND_ROBIN) { g_groups.current_write_group++; if (g_groups.current_write_group >= g_groups.count) { g_groups.current_write_group = 0; } } } else if (g_groups.store_lookup == FDFS_STORE_LOOKUP_SPEC_GROUP) { if (g_groups.pStoreGroup == NULL || \ g_groups.pStoreGroup->active_count == 0) { resp.status = ENOENT; break; } if (g_groups.pStoreGroup->free_mb <= \ g_storage_reserved_mb) { resp.status = ENOSPC; break; } pStoreGroup = g_groups.pStoreGroup; } else { resp.status = EINVAL; break; } if (pStoreGroup->store_path_count <= 0) { resp.status = ENOENT; break; } pStorageServer = tracker_get_writable_storage(pStoreGroup); if (pStorageServer->path_free_mbs[pStorageServer-> \ current_write_path] <= g_storage_reserved_mb) { int i; for (i=0; i<pStoreGroup->store_path_count; i++) { if (pStorageServer->path_free_mbs[i] > g_storage_reserved_mb) { pStorageServer->current_write_path = i; break; } } if (i == pStoreGroup->store_path_count) { resp.status = ENOSPC; break; } } if (g_groups.store_path == FDFS_STORE_PATH_ROUND_ROBIN) { pStorageServer->current_write_path++; if (pStorageServer->current_write_path >= \ pStoreGroup->store_path_count) { pStorageServer->current_write_path = 0; } } /* //printf("pStoreGroup->current_write_server: %d, " \ "pStoreGroup->active_count=%d\n", \ pStoreGroup->current_write_server, \
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -