📄 tracker_client_thread.c
字号:
/*** Copyright (C) 2008 Happy Fish / YuQing** FastDFS may be copied only under the terms of the GNU General* Public License V3, which may be found in the FastDFS source kit.* Please visit the FastDFS Home Page http://www.csource.org/ for more detail.**/#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <stdio.h>#include <stdlib.h>#include <string.h>#include <errno.h>#include <time.h>#ifdef OS_LINUX#include <sys/vfs.h>#include <sys/statfs.h>#endif#include <sys/param.h>#include <sys/mount.h>#include "fdfs_define.h"#include "logger.h"#include "fdfs_global.h"#include "sockopt.h"#include "shared_func.h"#include "tracker_types.h"#include "tracker_proto.h"#include "tracker_client_thread.h"#include "storage_global.h"#include "storage_sync.h"#include "storage_func.h"static pthread_mutex_t reporter_thread_lock;/* save report thread ids */static pthread_t *report_tids = NULL;static int tracker_heart_beat(TrackerServerInfo *pTrackerServer, \ int *pstat_chg_sync_count);static int tracker_report_df_stat(TrackerServerInfo *pTrackerServer);static int tracker_sync_dest_req(TrackerServerInfo *pTrackerServer);static int tracker_sync_notify(TrackerServerInfo *pTrackerServer);static int tracker_report_sync_timestamp(TrackerServerInfo *pTrackerServer);int tracker_report_init(){ int result; memset(g_storage_servers, 0, sizeof(g_storage_servers)); memset(g_sorted_storages, 0, sizeof(g_sorted_storages)); if ((result=init_pthread_lock(&reporter_thread_lock)) != 0) { return result; } return 0;} int tracker_report_destroy(){ int result; if ((result=pthread_mutex_destroy(&reporter_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_destroy fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); return result; } return 0;}int kill_tracker_report_threads(){ int result; if (report_tids != NULL) { result = kill_work_threads(report_tids, \ g_tracker_server_count); free(report_tids); report_tids = NULL; } else { result = 0; } return result;}static void* tracker_report_thread_entrance(void* arg){ TrackerServerInfo *pTrackerServer; char tracker_client_ip[IP_ADDRESS_SIZE]; char szFailPrompt[36]; bool sync_old_done; int stat_chg_sync_count; int sync_time_chg_count; int sleep_secs; time_t current_time; time_t last_df_report_time; time_t last_sync_report_time; time_t last_beat_time; int result; int previousCode; int nContinuousFail; stat_chg_sync_count = 0; pTrackerServer = (TrackerServerInfo *)arg; pTrackerServer->sock = -1; sync_old_done = g_sync_old_done; while (g_continue_flag && \ g_tracker_reporter_count < g_tracker_server_count) { sleep(1); //waiting for all thread started } /* //printf("tracker_report_thread %s:%d start.\n", \ pTrackerServer->ip_addr, pTrackerServer->port); */ result = 0; previousCode = 0; nContinuousFail = 0; sleep_secs = g_heart_beat_interval < g_stat_report_interval ? \ g_heart_beat_interval : g_stat_report_interval; while (g_continue_flag) { if (pTrackerServer->sock >= 0) { close(pTrackerServer->sock); } pTrackerServer->sock = socket(AF_INET, SOCK_STREAM, 0); if(pTrackerServer->sock < 0) { logCrit("file: "__FILE__", line: %d, " \ "socket create failed, errno: %d, " \ "error info: %s. program exit!", \ __LINE__, errno, strerror(errno)); g_continue_flag = false; break; } if ((result=connectserverbyip(pTrackerServer->sock, \ pTrackerServer->ip_addr, \ pTrackerServer->port)) != 0) { if (previousCode != result) { logError("file: "__FILE__", line: %d, " \ "connect to tracker server %s:%d fail" \ ", errno: %d, error info: %s", \ __LINE__, pTrackerServer->ip_addr, \ pTrackerServer->port, \ result, strerror(result)); previousCode = result; } nContinuousFail++; if (g_continue_flag) { sleep(g_heart_beat_interval); continue; } else { break; } } if (nContinuousFail == 0) { *szFailPrompt = '\0'; } else { sprintf(szFailPrompt, ", continuous fail count: %d", \ nContinuousFail); } logInfo("file: "__FILE__", line: %d, " \ "successfully connect to tracker server %s:%d%s", \ __LINE__, pTrackerServer->ip_addr, \ pTrackerServer->port, szFailPrompt); previousCode = 0; nContinuousFail = 0; getSockIpaddr(pTrackerServer->sock, \ tracker_client_ip, IP_ADDRESS_SIZE); insert_into_local_host_ip(tracker_client_ip); /* //printf("file: "__FILE__", line: %d, " \ "tracker_client_ip: %s\n", \ __LINE__, tracker_client_ip); //print_local_host_ip_addrs(); */ if (tracker_report_join(pTrackerServer) != 0) { sleep(g_heart_beat_interval); continue; } if (!sync_old_done) { if ((result=pthread_mutex_lock(&reporter_thread_lock)) \ != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_lock fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); fdfs_quit(pTrackerServer); sleep(g_heart_beat_interval); continue; } if (!g_sync_old_done) { if (tracker_sync_dest_req(pTrackerServer) == 0) { g_sync_old_done = true; if (storage_write_to_sync_ini_file() \ != 0) { logCrit("file: "__FILE__", line: %d, " \ "storage_write_to_sync_ini_file"\ " fail, program exit!", \ __LINE__); g_continue_flag = false; pthread_mutex_unlock( \ &reporter_thread_lock); break; } } else //request failed or need to try again { pthread_mutex_unlock( \ &reporter_thread_lock); fdfs_quit(pTrackerServer); sleep(g_heart_beat_interval); continue; } } if ((result=pthread_mutex_unlock(&reporter_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_unlock fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); } sync_old_done = true; } if (tracker_sync_notify(pTrackerServer) != 0) { fdfs_quit(pTrackerServer); sleep(g_heart_beat_interval); continue; } sync_time_chg_count = 0; last_df_report_time = 0; last_beat_time = 0; last_sync_report_time = 0; while (g_continue_flag) { current_time = time(NULL); if (current_time - last_beat_time >= \ g_heart_beat_interval) { if (tracker_heart_beat(pTrackerServer, \ &stat_chg_sync_count) != 0) { break; } last_beat_time = current_time; } if (sync_time_chg_count != g_sync_change_count && \ current_time - last_sync_report_time >= \ g_heart_beat_interval) { if (tracker_report_sync_timestamp( \ pTrackerServer) != 0) { break; } sync_time_chg_count = g_sync_change_count; last_sync_report_time = current_time; } if (current_time - last_df_report_time >= \ g_stat_report_interval) { if (tracker_report_df_stat(pTrackerServer) != 0) { break; } last_df_report_time = current_time; } sleep(1); } if ((!g_continue_flag) && fdfs_quit(pTrackerServer) != 0) { } close(pTrackerServer->sock); pTrackerServer->sock = -1; if (g_continue_flag) { sleep(1); } } if (nContinuousFail > 0) { logError("file: "__FILE__", line: %d, " \ "connect to tracker server %s:%d fail, try count: %d" \ ", errno: %d, error info: %s", \ __LINE__, pTrackerServer->ip_addr, \ pTrackerServer->port, nContinuousFail, \ result, strerror(result)); } if ((result=pthread_mutex_lock(&reporter_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_lock fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); } g_tracker_reporter_count--; if ((result=pthread_mutex_unlock(&reporter_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_unlock fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); } return NULL;}static bool tracker_insert_into_sorted_servers( \ FDFSStorageServer *pInsertedServer){ FDFSStorageServer **ppServer; FDFSStorageServer **ppEnd; int nCompare; ppEnd = g_sorted_storages + g_storage_count; for (ppServer=ppEnd; ppServer > g_sorted_storages; ppServer--) { nCompare = strcmp(pInsertedServer->server.ip_addr, \ (*(ppServer-1))->server.ip_addr); if (nCompare > 0) { *ppServer = pInsertedServer; return true; } else if (nCompare < 0) { *ppServer = *(ppServer-1); } else //nCompare == 0 { for (; ppServer < ppEnd; ppServer++) //restore { *ppServer = *(ppServer+1); } return false; } } *ppServer = pInsertedServer; return true;}int tracker_sync_diff_servers(TrackerServerInfo *pTrackerServer, \ FDFSStorageBrief *briefServers, const int server_count){ TrackerHeader resp; int out_len; int result; if (server_count == 0) { return 0; } memset(&resp, 0, sizeof(resp)); resp.cmd = TRACKER_PROTO_CMD_STORAGE_REPLICA_CHG; out_len = sizeof(FDFSStorageBrief) * server_count; long2buff(out_len, resp.pkg_len); if ((result=tcpsenddata(pTrackerServer->sock, &resp, sizeof(resp), \ g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "trackert server %s:%d, send data fail, " \ "errno: %d, error info: %s", \ __LINE__, pTrackerServer->ip_addr, \ pTrackerServer->port, result, strerror(result)); return result; } if ((result=tcpsenddata(pTrackerServer->sock, \ briefServers, out_len, g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "trackert server %s:%d, send data fail, " \ "errno: %d, error info: %s", \ __LINE__, pTrackerServer->ip_addr, \ pTrackerServer->port, result, strerror(result)); return result; } if ((result=tcprecvdata(pTrackerServer->sock, &resp, \ sizeof(resp), g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "tracker server %s:%d, recv data fail, " \ "errno: %d, error info: %s.", \ __LINE__, pTrackerServer->ip_addr, \ pTrackerServer->port, result, strerror(result)); return result; } if (memcmp(resp.pkg_len, "\0\0\0\0\0\0\0\0", \ FDFS_PROTO_PKG_LEN_SIZE) != 0) { logError("file: "__FILE__", line: %d, " \ "tracker server %s:%d, " \ "expect pkg len 0, but recv pkg len != 0", \ __LINE__, pTrackerServer->ip_addr, \ pTrackerServer->port); return EINVAL; } return resp.status;}static int tracker_merge_servers(TrackerServerInfo *pTrackerServer, \ FDFSStorageBrief *briefServers, const int server_count){ FDFSStorageBrief *pServer; FDFSStorageBrief *pEnd; FDFSStorageServer *pInsertedServer; FDFSStorageServer **ppFound; FDFSStorageServer *pGlobalServer; FDFSStorageServer *pGlobalEnd; FDFSStorageServer targetServer; FDFSStorageServer *pTargetServer; FDFSStorageBrief diffServers[FDFS_MAX_SERVERS_EACH_GROUP]; FDFSStorageBrief *pDiffServer; int res; int result; int nDeletedCount; memset(&targetServer, 0, sizeof(targetServer)); pTargetServer = &targetServer; nDeletedCount = 0; pDiffServer = diffServers; pEnd = briefServers + server_count; for (pServer=briefServers; pServer<pEnd; pServer++) { memcpy(&(targetServer.server),pServer,sizeof(FDFSStorageBrief)); ppFound = (FDFSStorageServer **)bsearch(&pTargetServer, \ g_sorted_storages, g_storage_count, \ sizeof(FDFSStorageServer *), storage_cmp_by_ip_addr); if (ppFound != NULL) { if ((*ppFound)->server.status != pServer->status) { if ((((pServer->status == \ FDFS_STORAGE_STATUS_WAIT_SYNC) || \ (pServer->status == \ FDFS_STORAGE_STATUS_SYNCING)) && \ ((*ppFound)->server.status > \ pServer->status)) \ || ((*ppFound)->server.status == \ FDFS_STORAGE_STATUS_DELETED)) { memcpy(pDiffServer++, \ &((*ppFound)->server), \ sizeof(FDFSStorageBrief)); } else if ((*ppFound)->server.status == \ FDFS_STORAGE_STATUS_NONE && \ pServer->status == \ FDFS_STORAGE_STATUS_DELETED) //ignore { } else { if ((*ppFound)->server.status == \ FDFS_STORAGE_STATUS_NONE && \ pServer->status != \ FDFS_STORAGE_STATUS_DELETED) { (*ppFound)->server.status = \ pServer->status; if ((result=storage_sync_thread_start( \ &((*ppFound)->server))) != 0) { return result; } } else { (*ppFound)->server.status = \ pServer->status; } } } } else if (pServer->status == FDFS_STORAGE_STATUS_DELETED)//ignore { nDeletedCount++; } else { if (g_storage_count < FDFS_MAX_SERVERS_EACH_GROUP) { if ((result=pthread_mutex_lock( \ &reporter_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, "\ "call pthread_mutex_lock fail,"\ " errno: %d, error info: %s", \ __LINE__, \ result, strerror(result)); } pInsertedServer = g_storage_servers + \ g_storage_count; memcpy(&(pInsertedServer->server), \ pServer, sizeof(FDFSStorageBrief)); if (tracker_insert_into_sorted_servers( \ pInsertedServer)) { g_storage_count++; } if ((result=pthread_mutex_unlock( \ &reporter_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, "\ "call pthread_mutex_unlock fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); } if ((result=storage_sync_thread_start( \ &(pInsertedServer->server))) != 0) { return result; } } else { logError("file: "__FILE__", line: %d, " \ "tracker server %s:%d, " \ "storage servers of group \"%s\" " \ "exceeds max: %d", \ __LINE__, pTrackerServer->ip_addr, \ pTrackerServer->port, \ pTrackerServer->group_name, \ FDFS_MAX_SERVERS_EACH_GROUP); } } } if (g_storage_count + nDeletedCount == server_count) { if (pDiffServer - diffServers > 0) { return tracker_sync_diff_servers(pTrackerServer, \
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -