📄 storage_sync.c
字号:
/*** Copyright (C) 2008 Happy Fish / YuQing** FastDFS may be copied only under the terms of the GNU General* Public License V3, which may be found in the FastDFS source kit.* Please visit the FastDFS Home Page http://www.csource.org/ for more detail.**///storage_sync.c#include <sys/types.h>#include <sys/stat.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <unistd.h>#include <fcntl.h>#include <stdio.h>#include <stdlib.h>#include <string.h>#include <errno.h>#include <time.h>#include <unistd.h>#include "fdfs_define.h"#include "logger.h"#include "fdfs_global.h"#include "sockopt.h"#include "shared_func.h"#include "ini_file_reader.h"#include "tracker_types.h"#include "tracker_proto.h"#include "storage_global.h"#include "storage_func.h"#include "storage_sync.h"#include "tracker_client_thread.h"#define SYNC_BINLOG_FILE_MAX_SIZE 1024 * 1024 * 1024#define SYNC_BINLOG_FILE_PREFIX "binlog"#define SYNC_BINLOG_INDEX_FILENAME SYNC_BINLOG_FILE_PREFIX".index"#define SYNC_MARK_FILE_EXT ".mark"#define SYNC_BINLOG_FILE_EXT_FMT ".%03d"#define SYNC_DIR_NAME "sync"#define MARK_ITEM_BINLOG_FILE_INDEX "binlog_index"#define MARK_ITEM_BINLOG_FILE_OFFSET "binlog_offset"#define MARK_ITEM_NEED_SYNC_OLD "need_sync_old"#define MARK_ITEM_SYNC_OLD_DONE "sync_old_done"#define MARK_ITEM_UNTIL_TIMESTAMP "until_timestamp"#define MARK_ITEM_SCAN_ROW_COUNT "scan_row_count"#define MARK_ITEM_SYNC_ROW_COUNT "sync_row_count"int g_binlog_fd = -1;int g_binlog_index = 0;static off_t binlog_file_size = 0;int g_storage_sync_thread_count = 0;static pthread_mutex_t sync_thread_lock;static char binlog_write_cache_buff[16 * 1024];static int binlog_write_cache_len = 0;/* save sync thread ids */static pthread_t *sync_tids = NULL;static int storage_write_to_mark_file(BinLogReader *pReader);static int storage_binlog_reader_skip(BinLogReader *pReader);static void storage_reader_destroy(BinLogReader *pReader);static int storage_binlog_fsync(const bool bNeedLock);/**8 bytes: filename bytes8 bytes: file size4 bytes: source op timestampFDFS_GROUP_NAME_MAX_LEN bytes: group_namefilename bytes : filenamefile size bytes: file content**/static int storage_sync_copy_file(TrackerServerInfo *pStorageServer, \ const BinLogRecord *pRecord, const char proto_cmd){ TrackerHeader header; int result; int64_t in_bytes; char *p; char *pBuff; char full_filename[MAX_PATH_SIZE]; char out_buff[sizeof(TrackerHeader)+FDFS_GROUP_NAME_MAX_LEN+256]; char in_buff[1]; struct stat stat_buf; snprintf(full_filename, sizeof(full_filename), \ "%s/data/%s", pRecord->pBasePath, pRecord->true_filename); if (stat(full_filename, &stat_buf) != 0) { if (errno == ENOENT) { if(pRecord->op_type==STORAGE_OP_TYPE_SOURCE_CREATE_FILE) { logWarning("file: "__FILE__", line: %d, " \ "sync data file, file: %s not exists, "\ "maybe deleted later?", \ __LINE__, full_filename); } } else { logError("file: "__FILE__", line: %d, " \ "call stat fail, file: %s, "\ "error no: %d, error info: %s", \ __LINE__, full_filename, \ errno, strerror(errno)); } return 0; } //printf("sync create file: %s\n", pRecord->filename); while (1) { memset(&header, 0, sizeof(header)); long2buff(2 * FDFS_PROTO_PKG_LEN_SIZE + \ 4 + FDFS_GROUP_NAME_MAX_LEN + \ pRecord->filename_len + stat_buf.st_size,\ header.pkg_len); header.cmd = proto_cmd; memcpy(out_buff, &header, sizeof(TrackerHeader)); p = out_buff + sizeof(TrackerHeader); long2buff(pRecord->filename_len, p); p += FDFS_PROTO_PKG_LEN_SIZE; long2buff(stat_buf.st_size, p); p += FDFS_PROTO_PKG_LEN_SIZE; int2buff(pRecord->timestamp, p); p += 4; sprintf(p, "%s", pStorageServer->group_name); p += FDFS_GROUP_NAME_MAX_LEN; memcpy(p, pRecord->filename, pRecord->filename_len); p += pRecord->filename_len; if((result=tcpsenddata(pStorageServer->sock, out_buff, \ p - out_buff, g_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "sync data to storage server %s:%d fail, " \ "errno: %d, error info: %s", \ __LINE__, pStorageServer->ip_addr, \ pStorageServer->port, \ result, strerror(result)); break; } if((stat_buf.st_size > 0) && ((result=tcpsendfile( \ pStorageServer->sock, full_filename, \ stat_buf.st_size)) != 0)) { logError("file: "__FILE__", line: %d, " \ "sync data to storage server %s:%d fail, " \ "errno: %d, error info: %s", \ __LINE__, pStorageServer->ip_addr, \ pStorageServer->port, \ result, strerror(result)); break; } pBuff = in_buff; if ((result=fdfs_recv_response(pStorageServer, \ &pBuff, 0, &in_bytes)) != 0) { break; } break; } //printf("sync create file end!\n"); if (result == EEXIST) { if (pRecord->op_type == STORAGE_OP_TYPE_SOURCE_CREATE_FILE) { logWarning("file: "__FILE__", line: %d, " \ "storage server ip: %s:%d, data file: %s " \ "already exists, maybe some mistake?", \ __LINE__, pStorageServer->ip_addr, \ pStorageServer->port, pRecord->filename); } return 0; } else { return result; }}/**send pkg format:4 bytes: source delete timestampFDFS_GROUP_NAME_MAX_LEN bytes: group_nameremain bytes: filename**/static int storage_sync_delete_file(TrackerServerInfo *pStorageServer, \ const BinLogRecord *pRecord){ TrackerHeader header; int result; char full_filename[MAX_PATH_SIZE]; char out_buff[sizeof(TrackerHeader)+FDFS_GROUP_NAME_MAX_LEN+64]; char in_buff[1]; char *pBuff; int64_t in_bytes; snprintf(full_filename, sizeof(full_filename), \ "%s/data/%s", pRecord->pBasePath, pRecord->true_filename); if (fileExists(full_filename)) { if (pRecord->op_type == STORAGE_OP_TYPE_SOURCE_DELETE_FILE) { logWarning("file: "__FILE__", line: %d, " \ "sync data file, file: %s exists, " \ "maybe created later?", \ __LINE__, full_filename); } return 0; } while (1) { memset(out_buff, 0, sizeof(out_buff)); int2buff(pRecord->timestamp, out_buff + sizeof(TrackerHeader)); snprintf(out_buff + sizeof(TrackerHeader) + 4, sizeof(out_buff) - \ sizeof(TrackerHeader), "%s", g_group_name); memcpy(out_buff + sizeof(TrackerHeader) + 4 + FDFS_GROUP_NAME_MAX_LEN, \ pRecord->filename, pRecord->filename_len); memset(&header, 0, sizeof(header)); long2buff(4 + FDFS_GROUP_NAME_MAX_LEN + pRecord->filename_len, \ header.pkg_len); header.cmd = STORAGE_PROTO_CMD_SYNC_DELETE_FILE; memcpy(out_buff, &header, sizeof(TrackerHeader)); if ((result=tcpsenddata(pStorageServer->sock, out_buff, \ sizeof(TrackerHeader) + 4 + FDFS_GROUP_NAME_MAX_LEN + \ pRecord->filename_len, g_network_timeout)) != 0) { logError("FILE: "__FILE__", line: %d, " \ "send data to storage server %s:%d fail, " \ "errno: %d, error info: %s", \ __LINE__, pStorageServer->ip_addr, \ pStorageServer->port, \ result, strerror(result)); break; } pBuff = in_buff; result = fdfs_recv_response(pStorageServer, &pBuff, 0, &in_bytes); if (result == ENOENT) { result = 0; } break; } return result;}#define STARAGE_CHECK_IF_NEED_SYNC_OLD(pReader, pRecord) \ if ((!pReader->need_sync_old) || pReader->sync_old_done || \ (pRecord->timestamp > pReader->until_timestamp)) \ { \ return 0; \ } \static int storage_sync_data(BinLogReader *pReader, \ TrackerServerInfo *pStorageServer, \ const BinLogRecord *pRecord){ int result; switch(pRecord->op_type) { case STORAGE_OP_TYPE_SOURCE_CREATE_FILE: result = storage_sync_copy_file(pStorageServer, \ pRecord, STORAGE_PROTO_CMD_SYNC_CREATE_FILE); break; case STORAGE_OP_TYPE_SOURCE_DELETE_FILE: result = storage_sync_delete_file( \ pStorageServer, pRecord); break; case STORAGE_OP_TYPE_SOURCE_UPDATE_FILE: result = storage_sync_copy_file(pStorageServer, \ pRecord, STORAGE_PROTO_CMD_SYNC_UPDATE_FILE); break; case STORAGE_OP_TYPE_REPLICA_CREATE_FILE: STARAGE_CHECK_IF_NEED_SYNC_OLD(pReader, pRecord) result = storage_sync_copy_file(pStorageServer, \ pRecord, STORAGE_PROTO_CMD_SYNC_CREATE_FILE); break; case STORAGE_OP_TYPE_REPLICA_DELETE_FILE: STARAGE_CHECK_IF_NEED_SYNC_OLD(pReader, pRecord) result = storage_sync_delete_file( \ pStorageServer, pRecord); break; case STORAGE_OP_TYPE_REPLICA_UPDATE_FILE: STARAGE_CHECK_IF_NEED_SYNC_OLD(pReader, pRecord) result = storage_sync_copy_file(pStorageServer, \ pRecord, STORAGE_PROTO_CMD_SYNC_UPDATE_FILE); break; default: return EINVAL; } if (result == 0) { pReader->sync_row_count++; } return result;}static int write_to_binlog_index(){ char full_filename[MAX_PATH_SIZE]; char buff[16]; int fd; int len; snprintf(full_filename, sizeof(full_filename), \ "%s/data/"SYNC_DIR_NAME"/%s", g_base_path, \ SYNC_BINLOG_INDEX_FILENAME); if ((fd=open(full_filename, O_WRONLY | O_CREAT | O_TRUNC, 0644)) < 0) { logError("file: "__FILE__", line: %d, " \ "open file \"%s\" fail, " \ "errno: %d, error info: %s", \ __LINE__, full_filename, \ errno, strerror(errno)); return errno != 0 ? errno : ENOENT; } len = sprintf(buff, "%d", g_binlog_index); if (write(fd, buff, len) != len) { logError("file: "__FILE__", line: %d, " \ "write to file \"%s\" fail, " \ "errno: %d, error info: %s", \ __LINE__, full_filename, \ errno, strerror(errno)); close(fd); return errno != 0 ? errno : EIO; } close(fd); return 0;}static char *get_writable_binlog_filename(char *full_filename){ static char buff[MAX_PATH_SIZE]; if (full_filename == NULL) { full_filename = buff; } snprintf(full_filename, MAX_PATH_SIZE, \ "%s/data/"SYNC_DIR_NAME"/"SYNC_BINLOG_FILE_PREFIX"" \ SYNC_BINLOG_FILE_EXT_FMT, \ g_base_path, g_binlog_index); return full_filename;}static int open_next_writable_binlog(){ char full_filename[MAX_PATH_SIZE]; if (g_binlog_fd >= 0) { close(g_binlog_fd); g_binlog_fd = -1; } get_writable_binlog_filename(full_filename); if (fileExists(full_filename)) { if (unlink(full_filename) != 0) { logError("file: "__FILE__", line: %d, " \ "unlink file \"%s\" fail, " \ "errno: %d, error info: %s", \ __LINE__, full_filename, \ errno, strerror(errno)); return errno != 0 ? errno : ENOENT; } logError("file: "__FILE__", line: %d, " \ "binlog file \"%s\" already exists, truncate", \ __LINE__, full_filename); } g_binlog_fd = open(full_filename, O_WRONLY | O_CREAT | O_APPEND, 0644); if (g_binlog_fd < 0) { logError("file: "__FILE__", line: %d, " \ "open file \"%s\" fail, " \ "errno: %d, error info: %s", \ __LINE__, full_filename, \ errno, strerror(errno)); return errno != 0 ? errno : EACCES; } return 0;}int storage_sync_init(){ char data_path[MAX_PATH_SIZE]; char sync_path[MAX_PATH_SIZE]; char full_filename[MAX_PATH_SIZE]; char file_buff[64]; int bytes; int result; int fd; snprintf(data_path, sizeof(data_path), "%s/data", g_base_path); if (!fileExists(data_path)) { if (mkdir(data_path, 0755) != 0) { logError("file: "__FILE__", line: %d, " \ "mkdir \"%s\" fail, " \ "errno: %d, error info: %s", \ __LINE__, data_path, \ errno, strerror(errno)); return errno != 0 ? errno : ENOENT; } } snprintf(sync_path, sizeof(sync_path), \ "%s/"SYNC_DIR_NAME, data_path); if (!fileExists(sync_path)) { if (mkdir(sync_path, 0755) != 0) { logError("file: "__FILE__", line: %d, " \ "mkdir \"%s\" fail, " \ "errno: %d, error info: %s", \ __LINE__, sync_path, \ errno, strerror(errno)); return errno != 0 ? errno : ENOENT; } } snprintf(full_filename, sizeof(full_filename), \ "%s/%s", sync_path, SYNC_BINLOG_INDEX_FILENAME); if ((fd=open(full_filename, O_RDONLY)) >= 0) { bytes = read(fd, file_buff, sizeof(file_buff) - 1); close(fd); if (bytes <= 0) { logError("file: "__FILE__", line: %d, " \ "read file \"%s\" fail, bytes read: %d", \ __LINE__, full_filename, bytes); return errno != 0 ? errno : EIO; } file_buff[bytes] = '\0'; g_binlog_index = atoi(file_buff); if (g_binlog_index < 0) { logError("file: "__FILE__", line: %d, " \ "in file \"%s\", binlog_index: %d < 0", \ __LINE__, full_filename, g_binlog_index); return EINVAL; } } else { g_binlog_index = 0; if ((result=write_to_binlog_index()) != 0) { return result; } } get_writable_binlog_filename(full_filename); g_binlog_fd = open(full_filename, O_WRONLY | O_CREAT | O_APPEND, 0644); if (g_binlog_fd < 0) { logError("file: "__FILE__", line: %d, " \ "open file \"%s\" fail, " \ "errno: %d, error info: %s", \ __LINE__, full_filename, \ errno, strerror(errno)); return errno != 0 ? errno : EACCES; } binlog_file_size = lseek(g_binlog_fd, 0, SEEK_END); if (binlog_file_size < 0) { logError("file: "__FILE__", line: %d, " \ "ftell file \"%s\" fail, " \ "errno: %d, error info: %s", \ __LINE__, full_filename, \ errno, strerror(errno)); storage_sync_destroy(); return errno != 0 ? errno : EIO; } /* //printf("full_filename=%s, binlog_file_size=%d\n", \ full_filename, binlog_file_size); */ if ((result=init_pthread_lock(&sync_thread_lock)) != 0) { return result; } load_local_host_ip_addrs(); return 0;}int storage_sync_destroy(){ int result; if (g_binlog_fd >= 0) { storage_binlog_fsync(true); close(g_binlog_fd); g_binlog_fd = -1; } if ((result=pthread_mutex_destroy(&sync_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_destroy fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); return result; } return 0;}int kill_storage_sync_threads(){ int result; if (sync_tids != NULL) { result = kill_work_threads(sync_tids, \ g_storage_sync_thread_count); free(sync_tids); sync_tids = NULL; } else { result = 0; } return result;}static int storage_binlog_fsync(const bool bNeedLock){ int result; int write_ret; if (bNeedLock && (result=pthread_mutex_lock(&sync_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_lock fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); } if (binlog_write_cache_len == 0) //ignore { write_ret = 0; //skip } else if (write(g_binlog_fd, binlog_write_cache_buff, \ binlog_write_cache_len) != binlog_write_cache_len) { logError("file: "__FILE__", line: %d, " \
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -