⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 storage_service.c

📁 文件系统源代码!!!!! 文件系统源代码
💻 C
📖 第 1 页 / 共 4 页
字号:
				resp.status = errno != 0 ? errno : EACCES;				break;			}		}		resp.status = storage_binlog_write(*timestamp, \				STORAGE_OP_TYPE_REPLICA_DELETE_FILE, filename);		break;	}	resp.cmd = STORAGE_PROTO_CMD_RESP;	if ((result=tcpsenddata(pClientInfo->sock, \		&resp, sizeof(resp), g_network_timeout)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"client ip: %s, send data fail, " \			"errno: %d, error info: %s", \			__LINE__, pClientInfo->ip_addr, \			result, strerror(result));		return result;	}	return resp.status;}/**pkg format:HeaderFDFS_GROUP_NAME_MAX_LEN bytes: group_namefilename**/static int storage_delete_file(StorageClientInfo *pClientInfo, \				const int64_t nInPackLen){	TrackerHeader resp;	char in_buff[FDFS_GROUP_NAME_MAX_LEN + 64];	char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];	char full_filename[MAX_PATH_SIZE+sizeof(in_buff)];	char meta_filename[MAX_PATH_SIZE+sizeof(in_buff)];	char true_filename[64];	char *pBasePath;	char *filename;	int filename_len;	int result;	memset(&resp, 0, sizeof(resp));	while (1)	{		if (nInPackLen <= FDFS_GROUP_NAME_MAX_LEN)		{			logError("file: "__FILE__", line: %d, " \				"cmd=%d, client ip: %s, package size "INT64_PRINTF_FORMAT" " \				"is not correct, " \				"expect length <= %d", \				__LINE__, \				STORAGE_PROTO_CMD_UPLOAD_FILE, \				pClientInfo->ip_addr,  \				nInPackLen, FDFS_GROUP_NAME_MAX_LEN);			resp.status = EINVAL;			break;		}		if (nInPackLen >= sizeof(in_buff))		{			logError("file: "__FILE__", line: %d, " \				"cmd=%d, client ip: %s, package size "INT64_PRINTF_FORMAT" " \				"is too large, " \				"expect length should < %d", \				__LINE__, \				STORAGE_PROTO_CMD_UPLOAD_FILE, \				pClientInfo->ip_addr,  \				nInPackLen, sizeof(in_buff));			resp.status = EINVAL;			break;		}		if ((resp.status=tcprecvdata(pClientInfo->sock, in_buff, \			nInPackLen, g_network_timeout)) != 0)		{			logError("file: "__FILE__", line: %d, " \				"client ip:%s, recv data fail, " \				"errno: %d, error info: %s.", \				__LINE__, pClientInfo->ip_addr, \				resp.status, strerror(resp.status));			break;		}		memcpy(group_name, in_buff, FDFS_GROUP_NAME_MAX_LEN);		group_name[FDFS_GROUP_NAME_MAX_LEN] = '\0';		if (strcmp(group_name, g_group_name) != 0)		{			logError("file: "__FILE__", line: %d, " \				"client ip:%s, group_name: %s " \				"not correct, should be: %s", \				__LINE__, pClientInfo->ip_addr, \				group_name, g_group_name);			resp.status = EINVAL;			break;		}		*(in_buff + nInPackLen) = '\0';		filename = in_buff + FDFS_GROUP_NAME_MAX_LEN;		filename_len = nInPackLen - FDFS_GROUP_NAME_MAX_LEN;		if ((resp.status=storage_split_filename(filename, \			&filename_len, true_filename, &pBasePath)) != 0)		{			break;		}		if ((resp.status=fdfs_check_data_filename(true_filename, \			filename_len)) != 0)		{			break;		}		sprintf(full_filename, "%s/data/%s", pBasePath, true_filename);		if (unlink(full_filename) != 0)		{			logError("file: "__FILE__", line: %d, " \				"client ip: %s, delete file %s fail," \				"errno: %d, error info: %s", \				__LINE__, pClientInfo->ip_addr, full_filename, \				errno, strerror(errno));			resp.status = errno != 0 ? errno : EACCES;			break;		}		resp.status = storage_binlog_write(time(NULL), \				STORAGE_OP_TYPE_SOURCE_DELETE_FILE, filename);		if (resp.status != 0)		{			break;		}		sprintf(meta_filename, "%s"STORAGE_META_FILE_EXT, \				full_filename);		if (unlink(meta_filename) != 0)		{			if (errno != ENOENT)			{				logError("file: "__FILE__", line: %d, " \					"client ip: %s, delete file %s fail," \					"errno: %d, error info: %s", \					__LINE__, \					pClientInfo->ip_addr, meta_filename, \					errno, strerror(errno));				resp.status = errno != 0 ? errno : EACCES;				break;			}			break;  //meta file do not exist, do not log to binlog		}		sprintf(meta_filename, "%s"STORAGE_META_FILE_EXT, filename);		resp.status = storage_binlog_write(time(NULL), \			STORAGE_OP_TYPE_SOURCE_DELETE_FILE, meta_filename);		break;	}	resp.cmd = STORAGE_PROTO_CMD_RESP;	if ((result=tcpsenddata(pClientInfo->sock, \		&resp, sizeof(resp), g_network_timeout)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"client ip: %s, send data fail, " \			"errno: %d, error info: %s", \			__LINE__, pClientInfo->ip_addr, \			result, strerror(result));		return result;	}	return resp.status;}static FDFSStorageServer *get_storage_server(const char *ip_addr){	FDFSStorageServer targetServer;	FDFSStorageServer *pTargetServer;	FDFSStorageServer **ppFound;	memset(&targetServer, 0, sizeof(targetServer));	strcpy(targetServer.server.ip_addr, ip_addr);	pTargetServer = &targetServer;	ppFound = (FDFSStorageServer **)bsearch(&pTargetServer, \		g_sorted_storages, g_storage_count, \		sizeof(FDFSStorageServer *), storage_cmp_by_ip_addr);	if (ppFound == NULL)	{		return NULL;	}	else	{		return *ppFound;	}}#define CHECK_AND_WRITE_TO_STAT_FILE1  \		pthread_mutex_lock(&stat_count_thread_lock); \\		if (pSrcStorage == NULL) \		{ \			pSrcStorage = get_storage_server(client_info.ip_addr); \		} \		if (pSrcStorage != NULL) \		{ \			pSrcStorage->last_sync_src_timestamp = \					src_sync_timestamp; \			g_sync_change_count++; \		} \\		g_storage_stat.last_sync_update = time(NULL); \		if (++g_stat_change_count % STORAGE_SYNC_STAT_FILE_FREQ == 0) \		{ \			if (storage_write_to_stat_file() != 0) \			{ \				pthread_mutex_unlock(&stat_count_thread_lock); \				break; \			} \		} \		pthread_mutex_unlock(&stat_count_thread_lock);#define CHECK_AND_WRITE_TO_STAT_FILE2(total_count, success_count)  \		pthread_mutex_lock(&stat_count_thread_lock); \		total_count++; \		success_count++; \		if (++g_stat_change_count % STORAGE_SYNC_STAT_FILE_FREQ == 0) \		{ \			if (storage_write_to_stat_file() != 0) \			{ \				pthread_mutex_unlock(&stat_count_thread_lock); \				break; \			} \		} \		pthread_mutex_unlock(&stat_count_thread_lock);#define CHECK_AND_WRITE_TO_STAT_FILE3(total_count, success_count, timestamp)  \		pthread_mutex_lock(&stat_count_thread_lock); \		total_count++; \		success_count++; \		timestamp = time(NULL); \		if (++g_stat_change_count % STORAGE_SYNC_STAT_FILE_FREQ == 0) \		{ \			if (storage_write_to_stat_file() != 0) \			{ \				pthread_mutex_unlock(&stat_count_thread_lock); \				break; \			} \		} \		pthread_mutex_unlock(&stat_count_thread_lock);void* storage_thread_entrance(void* arg){/*package format:8 bytes length (hex string)1 bytes cmd (char)1 bytes status(char)data buff (struct)*/	StorageClientInfo client_info;	TrackerHeader header;	int result;	int64_t nInPackLen;	int count;	int recv_bytes;	int log_level;	in_addr_t client_ip;	int src_sync_timestamp;	FDFSStorageServer *pSrcStorage;	int server_sock;		server_sock = (int)arg;	while (g_continue_flag)	{	if ((result=pthread_mutex_lock(&g_storage_thread_lock)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"call pthread_mutex_lock fail, " \			"errno: %d, error info: %s", \			__LINE__, result, strerror(result));	}	if (!g_continue_flag)	{		pthread_mutex_unlock(&g_storage_thread_lock);		break;	}	memset(&client_info, 0, sizeof(client_info));	client_info.sock = nbaccept(server_sock, 1 * 60, &result);	if (pthread_mutex_unlock(&g_storage_thread_lock) != 0)	{		logError("file: "__FILE__", line: %d, " \			"call pthread_mutex_unlock fail", \			__LINE__);	}	if(client_info.sock < 0) //error	{		if (result == ETIMEDOUT || result == EINTR || \			result == EAGAIN)		{			continue;		}					if(result == EBADF)		{			logError("file: "__FILE__", line: %d, " \				"accept failed, " \				"errno: %d, error info: %s", \				__LINE__, result, strerror(result));			break;		}					logError("file: "__FILE__", line: %d, " \			"accept failed, errno: %d, error info: %s", \			__LINE__, result, strerror(result));		continue;	}		client_ip = getPeerIpaddr(client_info.sock, \				client_info.ip_addr, IP_ADDRESS_SIZE);	if (g_allow_ip_count >= 0)	{		if (bsearch(&client_ip, g_allow_ip_addrs, g_allow_ip_count, \			sizeof(in_addr_t), cmp_by_ip_addr_t) == NULL)		{			logError("file: "__FILE__", line: %d, " \				"ip addr %s is not allowed to access", \				__LINE__, client_info.ip_addr);			close(client_info.sock);			continue;		}	}	pSrcStorage = NULL;	count = 0;	while (g_continue_flag)	{		result = tcprecvdata_ex(client_info.sock, &header, \			sizeof(header), g_network_timeout, &recv_bytes);		if (result == ETIMEDOUT)		{			continue;		}		if (result != 0)		{			/*			unsigned char *p;			unsigned char *pEnd = (unsigned char *)&header + recv_bytes);			for (p=(unsigned char *)&header; p<pEnd; p++)			{				fprintf(stderr, "%02X ", *p);			}			fprintf(stderr, "\n");			*/			if (result == ENOTCONN && recv_bytes == 0)			{				log_level = LOG_WARNING;			}			else			{				log_level = LOG_ERR;			}			log_it(log_level, "file: "__FILE__", line: %d, " \				"client ip: %s, recv data fail, " \				"errno: %d, error info: %s", \				__LINE__, client_info.ip_addr, \				result, strerror(result));			break;		}		nInPackLen = buff2long(header.pkg_len);		if (header.cmd == STORAGE_PROTO_CMD_DOWNLOAD_FILE)		{			if (storage_download_file(&client_info, \				nInPackLen) != 0)			{				pthread_mutex_lock(&stat_count_thread_lock);				g_storage_stat.total_download_count++;				pthread_mutex_unlock(&stat_count_thread_lock);				break;			}			CHECK_AND_WRITE_TO_STAT_FILE2( \				g_storage_stat.total_download_count, \				g_storage_stat.success_download_count)		}		else if (header.cmd == STORAGE_PROTO_CMD_GET_METADATA)		{			if (storage_get_metadata(&client_info, \				nInPackLen) != 0)			{				pthread_mutex_lock(&stat_count_thread_lock);				g_storage_stat.total_get_meta_count++;				pthread_mutex_unlock(&stat_count_thread_lock);				break;			}			CHECK_AND_WRITE_TO_STAT_FILE2( \				g_storage_stat.total_get_meta_count, \				g_storage_stat.success_get_meta_count)		}		else if (header.cmd == STORAGE_PROTO_CMD_UPLOAD_FILE)		{			if (storage_upload_file(&client_info, \				nInPackLen) != 0)			{				pthread_mutex_lock(&stat_count_thread_lock);				g_storage_stat.total_upload_count++;				pthread_mutex_unlock(&stat_count_thread_lock);				break;			}			CHECK_AND_WRITE_TO_STAT_FILE3( \				g_storage_stat.total_upload_count, \				g_storage_stat.success_upload_count, \				g_storage_stat.last_source_update)		}		else if (header.cmd == STORAGE_PROTO_CMD_DELETE_FILE)		{			if (storage_delete_file(&client_info, \				nInPackLen) != 0)			{				pthread_mutex_lock(&stat_count_thread_lock);				g_storage_stat.total_delete_count++;				pthread_mutex_unlock(&stat_count_thread_lock);				break;			}			CHECK_AND_WRITE_TO_STAT_FILE3( \				g_storage_stat.total_delete_count, \				g_storage_stat.success_delete_count, \				g_storage_stat.last_source_update)		}		else if (header.cmd == STORAGE_PROTO_CMD_SYNC_CREATE_FILE)		{			if (storage_sync_copy_file(&client_info, \				nInPackLen,header.cmd,&src_sync_timestamp) != 0)			{				break;			}			CHECK_AND_WRITE_TO_STAT_FILE1		}		else if (header.cmd == STORAGE_PROTO_CMD_SYNC_DELETE_FILE)		{			if (storage_sync_delete_file(&client_info, \				nInPackLen, &src_sync_timestamp) != 0)			{				break;			}			CHECK_AND_WRITE_TO_STAT_FILE1		}		else if (header.cmd == STORAGE_PROTO_CMD_SYNC_UPDATE_FILE)		{			if (storage_sync_copy_file(&client_info, \				nInPackLen,header.cmd,&src_sync_timestamp)!=0)			{				break;			}			CHECK_AND_WRITE_TO_STAT_FILE1		}		else if (header.cmd == STORAGE_PROTO_CMD_SET_METADATA)		{			if (storage_set_metadata(&client_info, \				nInPackLen) != 0)			{				pthread_mutex_lock(&stat_count_thread_lock);				g_storage_stat.total_set_meta_count++;				pthread_mutex_unlock(&stat_count_thread_lock);				break;			}			CHECK_AND_WRITE_TO_STAT_FILE3( \				g_storage_stat.total_set_meta_count, \				g_storage_stat.success_set_meta_count, \				g_storage_stat.last_source_update)		}		else if (header.cmd == FDFS_PROTO_CMD_QUIT)		{			break;		}		else		{			logError("file: "__FILE__", line: %d, "   \				"client ip: %s, unkown cmd: %d", \				__LINE__, client_info.ip_addr, header.cmd);			break;		}		count++;	}	close(client_info.sock);	}	if ((result=pthread_mutex_lock(&g_storage_thread_lock)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"call pthread_mutex_lock fail, " \			"errno: %d, error info: %s", \			__LINE__, result, strerror(result));	}	g_storage_thread_count--;	if ((result=pthread_mutex_unlock(&g_storage_thread_lock)) != 0)	{		logError("file: "__FILE__", line: %d, " \			"call pthread_mutex_unlock fail, " \			"errno: %d, error info: %s", \			__LINE__, result, strerror(result));	}	return NULL;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -