📄 tracker_client_thread.c
字号:
diffServers, pDiffServer - diffServers); } return 0; } pGlobalServer = g_storage_servers; pGlobalEnd = g_storage_servers + g_storage_count; pServer = briefServers; while (pServer < pEnd && pGlobalServer < pGlobalEnd) { if (pGlobalServer->server.status == FDFS_STORAGE_STATUS_NONE) { pGlobalServer++; continue; } res = strcmp(pServer->ip_addr, pGlobalServer->server.ip_addr); if (res < 0) { if (pServer->status != FDFS_STORAGE_STATUS_DELETED) { logError("file: "__FILE__", line: %d, " \ "tracker server %s:%d, " \ "group \"%s\", " \ "enter impossible statement branch", \ __LINE__, pTrackerServer->ip_addr, \ pTrackerServer->port, \ pTrackerServer->group_name ); } pServer++; } else if (res == 0) { pServer++; pGlobalServer++; } else { memcpy(pDiffServer++, &(pGlobalServer->server), \ sizeof(FDFSStorageBrief)); pGlobalServer++; } } while (pGlobalServer < pGlobalEnd) { if (pGlobalServer->server.status == FDFS_STORAGE_STATUS_NONE) { pGlobalServer++; continue; } memcpy(pDiffServer++, &(pGlobalServer->server), \ sizeof(FDFSStorageBrief)); pGlobalServer++; } return tracker_sync_diff_servers(pTrackerServer, \ diffServers, pDiffServer - diffServers);}static int tracker_check_response(TrackerServerInfo *pTrackerServer){ int64_t nInPackLen; TrackerHeader resp; int server_count; int result; FDFSStorageBrief briefServers[FDFS_MAX_SERVERS_EACH_GROUP]; if ((result=tcprecvdata(pTrackerServer->sock, &resp, \ sizeof(resp), g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "tracker server %s:%d, recv data fail, " \ "errno: %d, error info: %s.", \ __LINE__, pTrackerServer->ip_addr, \ pTrackerServer->port, \ result, strerror(result)); return result; } //printf("resp status=%d\n", resp.status); if (resp.status != 0) { return resp.status; } nInPackLen = buff2long(resp.pkg_len); if ((nInPackLen < 0) || (nInPackLen % sizeof(FDFSStorageBrief) != 0)) { logError("file: "__FILE__", line: %d, " \ "tracker server %s:%d, " \ "package size "INT64_PRINTF_FORMAT" is not correct", \ __LINE__, pTrackerServer->ip_addr, \ pTrackerServer->port, nInPackLen); return EINVAL; } if (nInPackLen == 0) { return resp.status; } server_count = nInPackLen / sizeof(FDFSStorageBrief); if (server_count > FDFS_MAX_SERVERS_EACH_GROUP) { logError("file: "__FILE__", line: %d, " \ "tracker server %s:%d, return storage count: %d" \ " exceed max: %d", \ __LINE__, pTrackerServer->ip_addr, \ pTrackerServer->port, \ server_count, FDFS_MAX_SERVERS_EACH_GROUP); return EINVAL; } if ((result=tcprecvdata(pTrackerServer->sock, briefServers, \ nInPackLen, g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "tracker server %s:%d, recv data fail, " \ "errno: %d, error info: %s.", \ __LINE__, pTrackerServer->ip_addr, \ pTrackerServer->port, \ result, strerror(result)); return result; } /* //printf("resp server count=%d\n", server_count); { int i; for (i=0; i<server_count; i++) { //printf("%d. %d:%s\n", i+1, briefServers[i].status, \ briefServers[i].ip_addr); } } */ return tracker_merge_servers(pTrackerServer, \ briefServers, server_count);}int tracker_sync_src_req(TrackerServerInfo *pTrackerServer, \ BinLogReader *pReader){ char out_buff[sizeof(TrackerHeader) + IP_ADDRESS_SIZE]; char sync_src_ip_addr[IP_ADDRESS_SIZE]; TrackerHeader *pHeader; TrackerStorageSyncReqBody syncReqbody; char *pBuff; int64_t in_bytes; int result; memset(out_buff, 0, sizeof(out_buff)); pHeader = (TrackerHeader *)out_buff; long2buff(IP_ADDRESS_SIZE, pHeader->pkg_len); pHeader->cmd = TRACKER_PROTO_CMD_STORAGE_SYNC_SRC_REQ; strcpy(out_buff + sizeof(TrackerHeader), pReader->ip_addr); if ((result=tcpsenddata(pTrackerServer->sock, out_buff, \ sizeof(out_buff), g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "tracker server %s:%d, send data fail, " \ "errno: %d, error info: %s.", \ __LINE__, pTrackerServer->ip_addr, \ pTrackerServer->port, \ result, strerror(result)); return result; } pBuff = (char *)&syncReqbody; if ((result=fdfs_recv_response(pTrackerServer, \ &pBuff, sizeof(syncReqbody), &in_bytes)) != 0) { return result; } if (in_bytes == 0) { pReader->need_sync_old = false; pReader->until_timestamp = 0; return 0; } if (in_bytes != sizeof(syncReqbody)) { logError("file: "__FILE__", line: %d, " \ "tracker server %s:%d, " \ "recv body length: "INT64_PRINTF_FORMAT" is invalid, " \ "expect body length: %d", \ __LINE__, pTrackerServer->ip_addr, \ pTrackerServer->port, in_bytes, \ sizeof(syncReqbody)); return EINVAL; } memcpy(sync_src_ip_addr, syncReqbody.src_ip_addr, IP_ADDRESS_SIZE); sync_src_ip_addr[IP_ADDRESS_SIZE-1] = '\0'; pReader->need_sync_old = is_local_host_ip(sync_src_ip_addr); pReader->until_timestamp = (time_t)buff2long( \ syncReqbody.until_timestamp); return 0;}static int tracker_sync_dest_req(TrackerServerInfo *pTrackerServer){ TrackerHeader header; TrackerStorageSyncReqBody syncReqbody; char *pBuff; int64_t in_bytes; int result; memset(&header, 0, sizeof(header)); header.cmd = TRACKER_PROTO_CMD_STORAGE_SYNC_DEST_REQ; if ((result=tcpsenddata(pTrackerServer->sock, &header, \ sizeof(header), g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "tracker server %s:%d, send data fail, " \ "errno: %d, error info: %s.", \ __LINE__, pTrackerServer->ip_addr, \ pTrackerServer->port, \ result, strerror(result)); return result; } pBuff = (char *)&syncReqbody; if ((result=fdfs_recv_response(pTrackerServer, \ &pBuff, sizeof(syncReqbody), &in_bytes)) != 0) { return result; } if (in_bytes == 0) { return result; } if (in_bytes != sizeof(syncReqbody)) { logError("file: "__FILE__", line: %d, " \ "tracker server %s:%d, " \ "recv body length: "INT64_PRINTF_FORMAT" is invalid, " \ "expect body length: %d", \ __LINE__, pTrackerServer->ip_addr, \ pTrackerServer->port, in_bytes, \ sizeof(syncReqbody)); return EINVAL; } memcpy(g_sync_src_ip_addr, syncReqbody.src_ip_addr, IP_ADDRESS_SIZE); g_sync_src_ip_addr[IP_ADDRESS_SIZE-1] = '\0'; g_sync_until_timestamp = (time_t)buff2long(syncReqbody.until_timestamp); memset(&header, 0, sizeof(header)); header.cmd = TRACKER_PROTO_CMD_STORAGE_RESP; if ((result=tcpsenddata(pTrackerServer->sock, &header, sizeof(header), \ g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "tracker server %s:%d, send data fail, " \ "errno: %d, error info: %s.", \ __LINE__, pTrackerServer->ip_addr, \ pTrackerServer->port, \ result, strerror(result)); return result; } return 0;}static int tracker_sync_notify(TrackerServerInfo *pTrackerServer){ char out_buff[sizeof(TrackerHeader)+sizeof(TrackerStorageSyncReqBody)]; TrackerHeader *pHeader; TrackerStorageSyncReqBody *pReqBody; int result; pHeader = (TrackerHeader *)out_buff; pReqBody = (TrackerStorageSyncReqBody*)(out_buff+sizeof(TrackerHeader)); memset(out_buff, 0, sizeof(out_buff)); long2buff((int)sizeof(TrackerStorageSyncReqBody), pHeader->pkg_len); pHeader->cmd = TRACKER_PROTO_CMD_STORAGE_SYNC_NOTIFY; strcpy(pReqBody->src_ip_addr, g_sync_src_ip_addr); long2buff(g_sync_until_timestamp, pReqBody->until_timestamp); if ((result=tcpsenddata(pTrackerServer->sock, out_buff, \ sizeof(out_buff), g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "tracker server %s:%d, send data fail, " \ "errno: %d, error info: %s.", \ __LINE__, pTrackerServer->ip_addr, \ pTrackerServer->port, \ result, strerror(result)); return result; } return tracker_check_response(pTrackerServer);}int tracker_report_join(TrackerServerInfo *pTrackerServer){ char out_buff[sizeof(TrackerHeader)+sizeof(TrackerStorageJoinBody)]; TrackerHeader *pHeader; TrackerStorageJoinBody *pReqBody; int result; pHeader = (TrackerHeader *)out_buff; pReqBody = (TrackerStorageJoinBody *)(out_buff+sizeof(TrackerHeader)); memset(out_buff, 0, sizeof(out_buff)); long2buff((int)sizeof(TrackerStorageJoinBody), pHeader->pkg_len); pHeader->cmd = TRACKER_PROTO_CMD_STORAGE_JOIN; strcpy(pReqBody->group_name, g_group_name); long2buff(g_server_port, pReqBody->storage_port); long2buff(g_path_count, pReqBody->store_path_count); long2buff(g_subdir_count_per_path, pReqBody->subdir_count_per_path); if ((result=tcpsenddata(pTrackerServer->sock, out_buff, \ sizeof(out_buff), g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "tracker server %s:%d, send data fail, " \ "errno: %d, error info: %s.", \ __LINE__, pTrackerServer->ip_addr, \ pTrackerServer->port, \ result, strerror(result)); return result; } return tracker_check_response(pTrackerServer);}static int tracker_report_sync_timestamp(TrackerServerInfo *pTrackerServer){ char out_buff[sizeof(TrackerHeader) + (IP_ADDRESS_SIZE + 4) * \ FDFS_MAX_SERVERS_EACH_GROUP]; char *p; TrackerHeader *pHeader; FDFSStorageServer *pServer; FDFSStorageServer *pEnd; int result; int body_len; if (g_storage_count == 0) { return 0; } memset(out_buff, 0, sizeof(out_buff)); pHeader = (TrackerHeader *)out_buff; p = out_buff + sizeof(TrackerHeader); body_len = (IP_ADDRESS_SIZE + 4) * g_storage_count; pHeader->cmd = TRACKER_PROTO_CMD_STORAGE_SYNC_REPORT; long2buff(body_len, pHeader->pkg_len); pEnd = g_storage_servers + g_storage_count; for (pServer=g_storage_servers; pServer<pEnd; pServer++) { memcpy(p, pServer->server.ip_addr, IP_ADDRESS_SIZE); p += IP_ADDRESS_SIZE; int2buff(pServer->last_sync_src_timestamp, p); p += 4; } if((result=tcpsenddata(pTrackerServer->sock, out_buff, \ sizeof(TrackerHeader) + body_len, g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "tracker server %s:%d, send data fail, " \ "errno: %d, error info: %s.", \ __LINE__, pTrackerServer->ip_addr, \ pTrackerServer->port, \ result, strerror(result)); return result; } return tracker_check_response(pTrackerServer);}static int tracker_report_df_stat(TrackerServerInfo *pTrackerServer){ char out_buff[sizeof(TrackerHeader) + \ sizeof(TrackerStatReportReqBody) * 16]; char *pBuff; TrackerHeader *pHeader; TrackerStatReportReqBody *pStatBuff; struct statfs sbuf; int body_len; int total_len; int i; int result; body_len = (int)sizeof(TrackerStatReportReqBody) * g_path_count; total_len = (int)sizeof(TrackerHeader) + body_len; if (total_len <= sizeof(out_buff)) { pBuff = out_buff; } else { pBuff = (char *)malloc(total_len); if (pBuff == NULL) { logError("file: "__FILE__", line: %d, " \ "malloc %d bytes fail, " \ "errno: %d, error info: %s", \ __LINE__, total_len, \ errno, strerror(errno)); return errno != 0 ? errno : ENOMEM; } } pHeader = (TrackerHeader *)pBuff; pStatBuff = (TrackerStatReportReqBody*) \ (pBuff + sizeof(TrackerHeader)); long2buff(body_len, pHeader->pkg_len); pHeader->cmd = TRACKER_PROTO_CMD_STORAGE_REPORT; pHeader->status = 0; for (i=0; i<g_path_count; i++) { if (statfs(g_store_paths[i], &sbuf) != 0) { logError("file: "__FILE__", line: %d, " \ "call statfs fail, errno: %d, error info: %s.",\ __LINE__, errno, strerror(errno)); if (pBuff != out_buff) { free(pBuff); } return errno != 0 ? errno : EACCES; } long2buff((((int64_t)(sbuf.f_blocks) * sbuf.f_bsize) / FDFS_ONE_MB),\ pStatBuff->sz_total_mb); long2buff((((int64_t)(sbuf.f_bavail) * sbuf.f_bsize) / FDFS_ONE_MB),\ pStatBuff->sz_free_mb); pStatBuff++; } result = tcpsenddata(pTrackerServer->sock, pBuff, \ total_len, g_network_timeout); if (pBuff != out_buff) { free(pBuff); } if(result != 0) { logError("file: "__FILE__", line: %d, " \ "tracker server %s:%d, send data fail, " \ "errno: %d, error info: %s.", \ __LINE__, pTrackerServer->ip_addr, \ pTrackerServer->port, \ result, strerror(result)); return result; } return tracker_check_response(pTrackerServer);}static int tracker_heart_beat(TrackerServerInfo *pTrackerServer, \ int *pstat_chg_sync_count){ char out_buff[sizeof(TrackerHeader) + sizeof(FDFSStorageStatBuff)]; TrackerHeader *pHeader; FDFSStorageStatBuff *pStatBuff; int body_len; int result; pHeader = (TrackerHeader *)out_buff; if (*pstat_chg_sync_count != g_stat_change_count) { pStatBuff = (FDFSStorageStatBuff *)( \ out_buff + sizeof(TrackerHeader)); long2buff(g_storage_stat.total_upload_count, \ pStatBuff->sz_total_upload_count); long2buff(g_storage_stat.success_upload_count, \ pStatBuff->sz_success_upload_count); long2buff(g_storage_stat.total_download_count, \ pStatBuff->sz_total_download_count); long2buff(g_storage_stat.success_download_count, \ pStatBuff->sz_success_download_count); long2buff(g_storage_stat.total_set_meta_count, \ pStatBuff->sz_total_set_meta_count); long2buff(g_storage_stat.success_set_meta_count, \ pStatBuff->sz_success_set_meta_count); long2buff(g_storage_stat.total_delete_count, \ pStatBuff->sz_total_delete_count); long2buff(g_storage_stat.success_delete_count, \ pStatBuff->sz_success_delete_count); long2buff(g_storage_stat.total_get_meta_count, \ pStatBuff->sz_total_get_meta_count); long2buff(g_storage_stat.success_get_meta_count, \ pStatBuff->sz_success_get_meta_count); long2buff(g_storage_stat.last_source_update, \ pStatBuff->sz_last_source_update); long2buff(g_storage_stat.last_sync_update, \ pStatBuff->sz_last_sync_update); *pstat_chg_sync_count = g_stat_change_count; body_len = sizeof(FDFSStorageStatBuff); } else { body_len = 0; } long2buff(body_len, pHeader->pkg_len); pHeader->cmd = TRACKER_PROTO_CMD_STORAGE_BEAT; pHeader->status = 0; if((result=tcpsenddata(pTrackerServer->sock, out_buff, \ sizeof(TrackerHeader) + body_len, g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "tracker server %s:%d, send data fail, " \ "errno: %d, error info: %s.", \ __LINE__, pTrackerServer->ip_addr, \ pTrackerServer->port, \ result, strerror(result)); return result; } return tracker_check_response(pTrackerServer);}int tracker_report_thread_start(){ TrackerServerInfo *pTrackerServer; TrackerServerInfo *pServerEnd; pthread_attr_t pattr; pthread_t tid; int result; if ((result=init_pthread_attr(&pattr)) != 0) { return result; } report_tids = (pthread_t *)malloc(sizeof(pthread_t) * \ g_tracker_server_count); if (report_tids == NULL) { logError("file: "__FILE__", line: %d, " \ "malloc %d bytes fail, " \ "errno: %d, error info: %s", \ __LINE__, sizeof(pthread_t) * \ g_tracker_server_count, \ errno, strerror(errno)); return errno != 0 ? errno : ENOMEM; } memset(report_tids, 0, sizeof(pthread_t) * g_tracker_server_count); g_tracker_reporter_count = 0; pServerEnd = g_tracker_servers + g_tracker_server_count; for (pTrackerServer=g_tracker_servers; pTrackerServer<pServerEnd; \ pTrackerServer++) { if((result=pthread_create(&tid, &pattr, \ tracker_report_thread_entrance, pTrackerServer)) != 0) { logError("file: "__FILE__", line: %d, " \ "create thread failed, errno: %d, " \ "error info: %s.", \ __LINE__, result, strerror(result)); return result; } if ((result=pthread_mutex_lock(&reporter_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_lock fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); } report_tids[g_tracker_reporter_count] = tid; g_tracker_reporter_count++; if ((result=pthread_mutex_unlock(&reporter_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_unlock fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); } } pthread_attr_destroy(&pattr); return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -