📄 storage_service.c
字号:
resp.status = errno != 0 ? errno : EACCES; break; } } resp.status = storage_binlog_write(*timestamp, \ STORAGE_OP_TYPE_REPLICA_DELETE_FILE, filename); break; } resp.cmd = STORAGE_PROTO_CMD_RESP; if ((result=tcpsenddata(pClientInfo->sock, \ &resp, sizeof(resp), g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, send data fail, " \ "errno: %d, error info: %s", \ __LINE__, pClientInfo->ip_addr, \ result, strerror(result)); return result; } return resp.status;}/**pkg format:HeaderFDFS_GROUP_NAME_MAX_LEN bytes: group_namefilename**/static int storage_delete_file(StorageClientInfo *pClientInfo, \ const int64_t nInPackLen){ TrackerHeader resp; char in_buff[FDFS_GROUP_NAME_MAX_LEN + 64]; char group_name[FDFS_GROUP_NAME_MAX_LEN + 1]; char full_filename[MAX_PATH_SIZE+sizeof(in_buff)]; char meta_filename[MAX_PATH_SIZE+sizeof(in_buff)]; char true_filename[64]; char *pBasePath; char *filename; int filename_len; int result; memset(&resp, 0, sizeof(resp)); while (1) { if (nInPackLen <= FDFS_GROUP_NAME_MAX_LEN) { logError("file: "__FILE__", line: %d, " \ "cmd=%d, client ip: %s, package size "INT64_PRINTF_FORMAT" " \ "is not correct, " \ "expect length <= %d", \ __LINE__, \ STORAGE_PROTO_CMD_UPLOAD_FILE, \ pClientInfo->ip_addr, \ nInPackLen, FDFS_GROUP_NAME_MAX_LEN); resp.status = EINVAL; break; } if (nInPackLen >= sizeof(in_buff)) { logError("file: "__FILE__", line: %d, " \ "cmd=%d, client ip: %s, package size "INT64_PRINTF_FORMAT" " \ "is too large, " \ "expect length should < %d", \ __LINE__, \ STORAGE_PROTO_CMD_UPLOAD_FILE, \ pClientInfo->ip_addr, \ nInPackLen, sizeof(in_buff)); resp.status = EINVAL; break; } if ((resp.status=tcprecvdata(pClientInfo->sock, in_buff, \ nInPackLen, g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip:%s, recv data fail, " \ "errno: %d, error info: %s.", \ __LINE__, pClientInfo->ip_addr, \ resp.status, strerror(resp.status)); break; } memcpy(group_name, in_buff, FDFS_GROUP_NAME_MAX_LEN); group_name[FDFS_GROUP_NAME_MAX_LEN] = '\0'; if (strcmp(group_name, g_group_name) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip:%s, group_name: %s " \ "not correct, should be: %s", \ __LINE__, pClientInfo->ip_addr, \ group_name, g_group_name); resp.status = EINVAL; break; } *(in_buff + nInPackLen) = '\0'; filename = in_buff + FDFS_GROUP_NAME_MAX_LEN; filename_len = nInPackLen - FDFS_GROUP_NAME_MAX_LEN; if ((resp.status=storage_split_filename(filename, \ &filename_len, true_filename, &pBasePath)) != 0) { break; } if ((resp.status=fdfs_check_data_filename(true_filename, \ filename_len)) != 0) { break; } sprintf(full_filename, "%s/data/%s", pBasePath, true_filename); if (unlink(full_filename) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, delete file %s fail," \ "errno: %d, error info: %s", \ __LINE__, pClientInfo->ip_addr, full_filename, \ errno, strerror(errno)); resp.status = errno != 0 ? errno : EACCES; break; } resp.status = storage_binlog_write(time(NULL), \ STORAGE_OP_TYPE_SOURCE_DELETE_FILE, filename); if (resp.status != 0) { break; } sprintf(meta_filename, "%s"STORAGE_META_FILE_EXT, \ full_filename); if (unlink(meta_filename) != 0) { if (errno != ENOENT) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, delete file %s fail," \ "errno: %d, error info: %s", \ __LINE__, \ pClientInfo->ip_addr, meta_filename, \ errno, strerror(errno)); resp.status = errno != 0 ? errno : EACCES; break; } break; //meta file do not exist, do not log to binlog } sprintf(meta_filename, "%s"STORAGE_META_FILE_EXT, filename); resp.status = storage_binlog_write(time(NULL), \ STORAGE_OP_TYPE_SOURCE_DELETE_FILE, meta_filename); break; } resp.cmd = STORAGE_PROTO_CMD_RESP; if ((result=tcpsenddata(pClientInfo->sock, \ &resp, sizeof(resp), g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "client ip: %s, send data fail, " \ "errno: %d, error info: %s", \ __LINE__, pClientInfo->ip_addr, \ result, strerror(result)); return result; } return resp.status;}static FDFSStorageServer *get_storage_server(const char *ip_addr){ FDFSStorageServer targetServer; FDFSStorageServer *pTargetServer; FDFSStorageServer **ppFound; memset(&targetServer, 0, sizeof(targetServer)); strcpy(targetServer.server.ip_addr, ip_addr); pTargetServer = &targetServer; ppFound = (FDFSStorageServer **)bsearch(&pTargetServer, \ g_sorted_storages, g_storage_count, \ sizeof(FDFSStorageServer *), storage_cmp_by_ip_addr); if (ppFound == NULL) { return NULL; } else { return *ppFound; }}#define CHECK_AND_WRITE_TO_STAT_FILE1 \ pthread_mutex_lock(&stat_count_thread_lock); \\ if (pSrcStorage == NULL) \ { \ pSrcStorage = get_storage_server(client_info.ip_addr); \ } \ if (pSrcStorage != NULL) \ { \ pSrcStorage->last_sync_src_timestamp = \ src_sync_timestamp; \ g_sync_change_count++; \ } \\ g_storage_stat.last_sync_update = time(NULL); \ if (++g_stat_change_count % STORAGE_SYNC_STAT_FILE_FREQ == 0) \ { \ if (storage_write_to_stat_file() != 0) \ { \ pthread_mutex_unlock(&stat_count_thread_lock); \ break; \ } \ } \ pthread_mutex_unlock(&stat_count_thread_lock);#define CHECK_AND_WRITE_TO_STAT_FILE2(total_count, success_count) \ pthread_mutex_lock(&stat_count_thread_lock); \ total_count++; \ success_count++; \ if (++g_stat_change_count % STORAGE_SYNC_STAT_FILE_FREQ == 0) \ { \ if (storage_write_to_stat_file() != 0) \ { \ pthread_mutex_unlock(&stat_count_thread_lock); \ break; \ } \ } \ pthread_mutex_unlock(&stat_count_thread_lock);#define CHECK_AND_WRITE_TO_STAT_FILE3(total_count, success_count, timestamp) \ pthread_mutex_lock(&stat_count_thread_lock); \ total_count++; \ success_count++; \ timestamp = time(NULL); \ if (++g_stat_change_count % STORAGE_SYNC_STAT_FILE_FREQ == 0) \ { \ if (storage_write_to_stat_file() != 0) \ { \ pthread_mutex_unlock(&stat_count_thread_lock); \ break; \ } \ } \ pthread_mutex_unlock(&stat_count_thread_lock);void* storage_thread_entrance(void* arg){/*package format:8 bytes length (hex string)1 bytes cmd (char)1 bytes status(char)data buff (struct)*/ StorageClientInfo client_info; TrackerHeader header; int result; int64_t nInPackLen; int count; int recv_bytes; int log_level; in_addr_t client_ip; int src_sync_timestamp; FDFSStorageServer *pSrcStorage; int server_sock; server_sock = (int)arg; while (g_continue_flag) { if ((result=pthread_mutex_lock(&g_storage_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_lock fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); } if (!g_continue_flag) { pthread_mutex_unlock(&g_storage_thread_lock); break; } memset(&client_info, 0, sizeof(client_info)); client_info.sock = nbaccept(server_sock, 1 * 60, &result); if (pthread_mutex_unlock(&g_storage_thread_lock) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_unlock fail", \ __LINE__); } if(client_info.sock < 0) //error { if (result == ETIMEDOUT || result == EINTR || \ result == EAGAIN) { continue; } if(result == EBADF) { logError("file: "__FILE__", line: %d, " \ "accept failed, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); break; } logError("file: "__FILE__", line: %d, " \ "accept failed, errno: %d, error info: %s", \ __LINE__, result, strerror(result)); continue; } client_ip = getPeerIpaddr(client_info.sock, \ client_info.ip_addr, IP_ADDRESS_SIZE); if (g_allow_ip_count >= 0) { if (bsearch(&client_ip, g_allow_ip_addrs, g_allow_ip_count, \ sizeof(in_addr_t), cmp_by_ip_addr_t) == NULL) { logError("file: "__FILE__", line: %d, " \ "ip addr %s is not allowed to access", \ __LINE__, client_info.ip_addr); close(client_info.sock); continue; } } pSrcStorage = NULL; count = 0; while (g_continue_flag) { result = tcprecvdata_ex(client_info.sock, &header, \ sizeof(header), g_network_timeout, &recv_bytes); if (result == ETIMEDOUT) { continue; } if (result != 0) { /* unsigned char *p; unsigned char *pEnd = (unsigned char *)&header + recv_bytes); for (p=(unsigned char *)&header; p<pEnd; p++) { fprintf(stderr, "%02X ", *p); } fprintf(stderr, "\n"); */ if (result == ENOTCONN && recv_bytes == 0) { log_level = LOG_WARNING; } else { log_level = LOG_ERR; } log_it(log_level, "file: "__FILE__", line: %d, " \ "client ip: %s, recv data fail, " \ "errno: %d, error info: %s", \ __LINE__, client_info.ip_addr, \ result, strerror(result)); break; } nInPackLen = buff2long(header.pkg_len); if (header.cmd == STORAGE_PROTO_CMD_DOWNLOAD_FILE) { if (storage_download_file(&client_info, \ nInPackLen) != 0) { pthread_mutex_lock(&stat_count_thread_lock); g_storage_stat.total_download_count++; pthread_mutex_unlock(&stat_count_thread_lock); break; } CHECK_AND_WRITE_TO_STAT_FILE2( \ g_storage_stat.total_download_count, \ g_storage_stat.success_download_count) } else if (header.cmd == STORAGE_PROTO_CMD_GET_METADATA) { if (storage_get_metadata(&client_info, \ nInPackLen) != 0) { pthread_mutex_lock(&stat_count_thread_lock); g_storage_stat.total_get_meta_count++; pthread_mutex_unlock(&stat_count_thread_lock); break; } CHECK_AND_WRITE_TO_STAT_FILE2( \ g_storage_stat.total_get_meta_count, \ g_storage_stat.success_get_meta_count) } else if (header.cmd == STORAGE_PROTO_CMD_UPLOAD_FILE) { if (storage_upload_file(&client_info, \ nInPackLen) != 0) { pthread_mutex_lock(&stat_count_thread_lock); g_storage_stat.total_upload_count++; pthread_mutex_unlock(&stat_count_thread_lock); break; } CHECK_AND_WRITE_TO_STAT_FILE3( \ g_storage_stat.total_upload_count, \ g_storage_stat.success_upload_count, \ g_storage_stat.last_source_update) } else if (header.cmd == STORAGE_PROTO_CMD_DELETE_FILE) { if (storage_delete_file(&client_info, \ nInPackLen) != 0) { pthread_mutex_lock(&stat_count_thread_lock); g_storage_stat.total_delete_count++; pthread_mutex_unlock(&stat_count_thread_lock); break; } CHECK_AND_WRITE_TO_STAT_FILE3( \ g_storage_stat.total_delete_count, \ g_storage_stat.success_delete_count, \ g_storage_stat.last_source_update) } else if (header.cmd == STORAGE_PROTO_CMD_SYNC_CREATE_FILE) { if (storage_sync_copy_file(&client_info, \ nInPackLen,header.cmd,&src_sync_timestamp) != 0) { break; } CHECK_AND_WRITE_TO_STAT_FILE1 } else if (header.cmd == STORAGE_PROTO_CMD_SYNC_DELETE_FILE) { if (storage_sync_delete_file(&client_info, \ nInPackLen, &src_sync_timestamp) != 0) { break; } CHECK_AND_WRITE_TO_STAT_FILE1 } else if (header.cmd == STORAGE_PROTO_CMD_SYNC_UPDATE_FILE) { if (storage_sync_copy_file(&client_info, \ nInPackLen,header.cmd,&src_sync_timestamp)!=0) { break; } CHECK_AND_WRITE_TO_STAT_FILE1 } else if (header.cmd == STORAGE_PROTO_CMD_SET_METADATA) { if (storage_set_metadata(&client_info, \ nInPackLen) != 0) { pthread_mutex_lock(&stat_count_thread_lock); g_storage_stat.total_set_meta_count++; pthread_mutex_unlock(&stat_count_thread_lock); break; } CHECK_AND_WRITE_TO_STAT_FILE3( \ g_storage_stat.total_set_meta_count, \ g_storage_stat.success_set_meta_count, \ g_storage_stat.last_source_update) } else if (header.cmd == FDFS_PROTO_CMD_QUIT) { break; } else { logError("file: "__FILE__", line: %d, " \ "client ip: %s, unkown cmd: %d", \ __LINE__, client_info.ip_addr, header.cmd); break; } count++; } close(client_info.sock); } if ((result=pthread_mutex_lock(&g_storage_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_lock fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); } g_storage_thread_count--; if ((result=pthread_mutex_unlock(&g_storage_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_unlock fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); } return NULL;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -