📄 tracker_service.c
字号:
{ status = 0; } //printf("storage_sync_time_chg_count=%d\n", g_storage_sync_time_chg_count); break; } //printf("deal storage report, status=%d\n", status); return tracker_check_and_sync(pClientInfo, status);}static int tracker_deal_storage_df_report(TrackerClientInfo *pClientInfo, \ const int64_t nInPackLen){ int status; int result; int i; char in_buff[sizeof(TrackerStatReportReqBody) * 16]; char *pBuff; TrackerStatReportReqBody *pStatBuff; int64_t *path_total_mbs; int64_t *path_free_mbs; int64_t old_free_mb; pBuff = in_buff; while (1) { if (nInPackLen != sizeof(TrackerStatReportReqBody) * \ pClientInfo->pGroup->store_path_count) { logError("file: "__FILE__", line: %d, " \ "cmd=%d, client ip: %s, package size " \ INT64_PRINTF_FORMAT" is not correct, " \ "expect length: %d", __LINE__, \ TRACKER_PROTO_CMD_STORAGE_REPORT, \ pClientInfo->ip_addr, nInPackLen, \ sizeof(TrackerStatReportReqBody) * \ pClientInfo->pGroup->store_path_count); status = EINVAL; break; } if (nInPackLen > sizeof(in_buff)) { pBuff = (char *)malloc(nInPackLen); if (pBuff == NULL) { logError("file: "__FILE__", line: %d, " \ "malloc %d bytes fail, " \ "errno: %d, error info: %s", __LINE__, \ (int)nInPackLen, errno,strerror(errno)); status = errno != 0 ? errno : ENOMEM; break; } } if ((status=tcprecvdata(pClientInfo->sock, pBuff, \ nInPackLen, g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "cmd=%d, client ip addr: %s, recv data fail, " \ "errno: %d, error info: %s.", \ __LINE__, \ TRACKER_PROTO_CMD_STORAGE_REPORT, \ pClientInfo->ip_addr, \ status, strerror(status)); break; } old_free_mb = pClientInfo->pStorage->free_mb; path_total_mbs = pClientInfo->pStorage->path_total_mbs; path_free_mbs = pClientInfo->pStorage->path_free_mbs; pClientInfo->pStorage->total_mb = 0; pClientInfo->pStorage->free_mb = 0; pStatBuff = (TrackerStatReportReqBody *)pBuff; for (i=0; i<pClientInfo->pGroup->store_path_count; i++) { path_total_mbs[i] = buff2long(pStatBuff->sz_total_mb); path_free_mbs[i] = buff2long(pStatBuff->sz_free_mb); pClientInfo->pStorage->total_mb += path_total_mbs[i]; pClientInfo->pStorage->free_mb += path_free_mbs[i]; if (g_groups.store_path == FDFS_STORE_PATH_LOAD_BALANCE && path_free_mbs[i] > path_free_mbs[ \ pClientInfo->pStorage->current_write_path]) { pClientInfo->pStorage->current_write_path = i; } pStatBuff++; } if ((pClientInfo->pGroup->free_mb == 0) || (pClientInfo->pStorage->free_mb < \ pClientInfo->pGroup->free_mb)) { pClientInfo->pGroup->free_mb = \ pClientInfo->pStorage->free_mb; } else if (pClientInfo->pStorage->free_mb > old_free_mb) { tracker_find_min_free_space(pClientInfo->pGroup); } if (g_groups.store_lookup == \ FDFS_STORE_LOOKUP_LOAD_BALANCE) { if ((result=pthread_mutex_lock( \ &g_tracker_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_lock fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); } tracker_find_max_free_space_group(); if ((result=pthread_mutex_unlock( \ &g_tracker_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_unlock fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); } } status = 0; /* //printf("storage: %s:%d, total_mb=%dMB, free_mb=%dMB\n", \ pClientInfo->pStorage->ip_addr, \ pClientInfo->pGroup->storage_port, \ pClientInfo->pStorage->total_mb, \ pClientInfo->pStorage->free_mb); */ break; } if (pBuff != in_buff) { free(pBuff); } if (status == 0) { tracker_check_dirty(pClientInfo); tracker_mem_active_store_server(pClientInfo->pGroup, \ pClientInfo->pStorage); } //printf("deal storage report, status=%d\n", status); return tracker_check_and_sync(pClientInfo, status);}static int tracker_deal_storage_beat(TrackerClientInfo *pClientInfo, \ const int64_t nInPackLen){ int status; FDFSStorageStatBuff statBuff; FDFSStorageStat *pStat; while (1) { if (nInPackLen == 0) { status = 0; break; } if (nInPackLen != sizeof(FDFSStorageStatBuff)) { logError("file: "__FILE__", line: %d, " \ "cmd=%d, client ip: %s, package size "INT64_PRINTF_FORMAT" " \ "is not correct, " \ "expect length: 0 or %d", \ __LINE__, \ TRACKER_PROTO_CMD_STORAGE_BEAT, \ pClientInfo->ip_addr, nInPackLen, \ sizeof(FDFSStorageStatBuff)); status = EINVAL; break; } if ((status=tcprecvdata(pClientInfo->sock, &statBuff, \ sizeof(FDFSStorageStatBuff), g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "cmd=%d, client ip addr: %s, recv data fail, " \ "errno: %d, error info: %s.", \ __LINE__, \ TRACKER_PROTO_CMD_STORAGE_BEAT, \ pClientInfo->ip_addr, \ status, strerror(status)); break; } pStat = &(pClientInfo->pStorage->stat); pStat->total_upload_count = \ buff2long(statBuff.sz_total_upload_count); pStat->success_upload_count = \ buff2long(statBuff.sz_success_upload_count); pStat->total_download_count = \ buff2long(statBuff.sz_total_download_count); pStat->success_download_count = \ buff2long(statBuff.sz_success_download_count); pStat->total_set_meta_count = \ buff2long(statBuff.sz_total_set_meta_count); pStat->success_set_meta_count = \ buff2long(statBuff.sz_success_set_meta_count); pStat->total_delete_count = \ buff2long(statBuff.sz_total_delete_count); pStat->success_delete_count = \ buff2long(statBuff.sz_success_delete_count); pStat->total_get_meta_count = \ buff2long(statBuff.sz_total_get_meta_count); pStat->success_get_meta_count = \ buff2long(statBuff.sz_success_get_meta_count); pStat->last_source_update = \ buff2long(statBuff.sz_last_source_update); pStat->last_sync_update = \ buff2long(statBuff.sz_last_sync_update); if (++g_storage_stat_chg_count % TRACKER_SYNC_TO_FILE_FREQ == 0) { status = tracker_save_storages(); } else { status = 0; } //printf("g_storage_stat_chg_count=%d\n", g_storage_stat_chg_count); break; } if (status == 0) { tracker_check_dirty(pClientInfo); tracker_mem_active_store_server(pClientInfo->pGroup, \ pClientInfo->pStorage); } //printf("deal heart beat, status=%d\n", status); return tracker_check_and_sync(pClientInfo, status);}void* tracker_thread_entrance(void* arg){/*package format:8 bytes length (hex string)1 bytes cmd (char)1 bytes status(char)data buff (struct)*/ TrackerClientInfo client_info; TrackerHeader header; int result; int64_t nInPackLen; int count; int recv_bytes; int log_level; in_addr_t client_ip; int server_sock; server_sock = (int)arg; while (g_continue_flag) { if ((result=pthread_mutex_lock(&g_tracker_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_lock fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); } if (!g_continue_flag) { pthread_mutex_unlock(&g_tracker_thread_lock); break; } memset(&client_info, 0, sizeof(client_info)); client_info.sock = nbaccept(server_sock, 1 * 60, &result); if (pthread_mutex_unlock(&g_tracker_thread_lock) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_unlock fail", \ __LINE__); } if(client_info.sock < 0) //error { if (result == ETIMEDOUT || result == EINTR || \ result == EAGAIN) { continue; } if(result == EBADF) { logError("file: "__FILE__", line: %d, " \ "accept failed, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); break; } logError("file: "__FILE__", line: %d, " \ "accept failed, errno: %d, error info: %s", \ __LINE__, result, strerror(result)); continue; } client_ip = getPeerIpaddr(client_info.sock, \ client_info.ip_addr, IP_ADDRESS_SIZE); if (g_allow_ip_count >= 0) { if (bsearch(&client_ip, g_allow_ip_addrs, g_allow_ip_count, \ sizeof(in_addr_t), cmp_by_ip_addr_t) == NULL) { logError("file: "__FILE__", line: %d, " \ "ip addr %s is not allowed to access", \ __LINE__, client_info.ip_addr); close(client_info.sock); continue; } } count = 0; while (g_continue_flag) { result = tcprecvdata_ex(client_info.sock, &header, \ sizeof(header), g_network_timeout, &recv_bytes); if (result == ETIMEDOUT && count > 0) { continue; } if (result != 0) { if (result == ENOTCONN && recv_bytes == 0) { log_level = LOG_WARNING; } else { log_level = LOG_ERR; } log_it(log_level, "file: "__FILE__", line: %d, " \ "client ip: %s, recv data fail, " \ "errno: %d, error info: %s", \ __LINE__, client_info.ip_addr, \ result, strerror(result)); break; } nInPackLen = buff2long(header.pkg_len); tracker_check_dirty(&client_info); if (header.cmd == TRACKER_PROTO_CMD_STORAGE_BEAT) { if (tracker_check_logined(&client_info) != 0) { break; } if (tracker_deal_storage_beat(&client_info, \ nInPackLen) != 0) { break; } } else if (header.cmd == TRACKER_PROTO_CMD_STORAGE_SYNC_REPORT) { if (tracker_check_logined(&client_info) != 0) { break; } if (tracker_deal_storage_sync_report(&client_info, \ nInPackLen) != 0) { break; } } else if (header.cmd == TRACKER_PROTO_CMD_STORAGE_REPORT) { if (tracker_check_logined(&client_info) != 0) { break; } if (tracker_deal_storage_df_report(&client_info, \ nInPackLen) != 0) { break; } } else if (header.cmd == TRACKER_PROTO_CMD_STORAGE_JOIN) { if (tracker_deal_storage_join(&client_info, \ nInPackLen) != 0) { break; } } else if (header.cmd == TRACKER_PROTO_CMD_STORAGE_REPLICA_CHG) { if (tracker_check_logined(&client_info) != 0) { break; } if (tracker_deal_storage_replica_chg(&client_info, \ nInPackLen) != 0) { break; } } else if (header.cmd == TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH) { if (tracker_deal_service_query_fetch_update( \ &client_info, header.cmd, nInPackLen) != 0) { break; } } else if (header.cmd == TRACKER_PROTO_CMD_SERVICE_QUERY_UPDATE) { if (tracker_deal_service_query_fetch_update( &client_info, header.cmd, nInPackLen) != 0) { break; } } else if (header.cmd == TRACKER_PROTO_CMD_SERVICE_QUERY_STORE) { if (tracker_deal_service_query_storage(&client_info, \ nInPackLen) != 0) { break; } } else if (header.cmd == TRACKER_PROTO_CMD_SERVER_LIST_GROUP) { if (tracker_deal_server_list_groups(&client_info, \ nInPackLen) != 0) { break; } } else if (header.cmd == TRACKER_PROTO_CMD_SERVER_LIST_STORAGE) { if (tracker_deal_server_list_group_storages( \ &client_info, nInPackLen) != 0) { break; } } else if (header.cmd == TRACKER_PROTO_CMD_STORAGE_SYNC_SRC_REQ) { if (tracker_deal_storage_sync_src_req( \ &client_info, nInPackLen) != 0) { break; } } else if (header.cmd == TRACKER_PROTO_CMD_STORAGE_SYNC_DEST_REQ) { if (tracker_deal_storage_sync_dest_req( \ &client_info, nInPackLen) != 0) { break; } } else if (header.cmd == TRACKER_PROTO_CMD_STORAGE_SYNC_NOTIFY) { if (tracker_deal_storage_sync_notify( \ &client_info, nInPackLen) != 0) { break; } } else if (header.cmd == TRACKER_PROTO_CMD_SERVER_DELETE_STORAGE) { if (tracker_deal_server_delete_storage( \ &client_info, nInPackLen) != 0) { break; } } else if (header.cmd == FDFS_PROTO_CMD_QUIT) { break; } else { logError("file: "__FILE__", line: %d, " \ "client ip: %s, unkown cmd: %d", \ __LINE__, client_info.ip_addr, \ header.cmd); break; } count++; } if (g_continue_flag) { tracker_check_dirty(&client_info); tracker_mem_offline_store_server(&client_info); } if (client_info.pGroup != NULL) { --(*(client_info.pGroup->ref_count)); } if (client_info.pStorage != NULL) { --(*(client_info.pStorage->ref_count)); } close(client_info.sock); } if ((result=pthread_mutex_lock(&g_tracker_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_lock fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); } g_tracker_thread_count--; if ((result=pthread_mutex_unlock(&g_tracker_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_unlock fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); } return NULL;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -