📄 storage_sync.c
字号:
"write to binlog file \"%s\" fail, " \ "errno: %d, error info: %s", \ __LINE__, get_writable_binlog_filename(NULL), \ errno, strerror(errno)); write_ret = errno != 0 ? errno : EIO; } else if (fsync(g_binlog_fd) != 0) { logError("file: "__FILE__", line: %d, " \ "sync to binlog file \"%s\" fail, " \ "errno: %d, error info: %s", \ __LINE__, get_writable_binlog_filename(NULL), \ errno, strerror(errno)); write_ret = errno != 0 ? errno : EIO; } else { binlog_file_size += binlog_write_cache_len; if (binlog_file_size >= SYNC_BINLOG_FILE_MAX_SIZE) { g_binlog_index++; if ((write_ret=write_to_binlog_index()) == 0) { write_ret = open_next_writable_binlog(); } binlog_file_size = 0; if (write_ret != 0) { g_continue_flag = false; logCrit("file: "__FILE__", line: %d, " \ "open binlog file \"%s\" fail, " \ "program exit!", \ __LINE__, \ get_writable_binlog_filename(NULL)); } } else { write_ret = 0; } } binlog_write_cache_len = 0; //reset cache buff if (bNeedLock && (result=pthread_mutex_unlock(&sync_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_unlock fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); } return write_ret;}int storage_binlog_write(const int timestamp, const char op_type, \ const char *filename){ int result; int write_ret; if ((result=pthread_mutex_lock(&sync_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_lock fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); } binlog_write_cache_len += sprintf(binlog_write_cache_buff + \ binlog_write_cache_len, "%d %c %s\n", \ timestamp, op_type, filename); //check if buff full if (sizeof(binlog_write_cache_buff) - binlog_write_cache_len < 128) { write_ret = storage_binlog_fsync(false); //sync to disk } else { write_ret = 0; } if ((result=pthread_mutex_unlock(&sync_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_unlock fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); } return write_ret;}static char *get_binlog_readable_filename(BinLogReader *pReader, \ char *full_filename){ static char buff[MAX_PATH_SIZE]; if (full_filename == NULL) { full_filename = buff; } snprintf(full_filename, MAX_PATH_SIZE, \ "%s/data/"SYNC_DIR_NAME"/"SYNC_BINLOG_FILE_PREFIX"" \ SYNC_BINLOG_FILE_EXT_FMT, \ g_base_path, pReader->binlog_index); return full_filename;}static int storage_open_readable_binlog(BinLogReader *pReader){ char full_filename[MAX_PATH_SIZE]; if (pReader->binlog_fd >= 0) { close(pReader->binlog_fd); } get_binlog_readable_filename(pReader, full_filename); pReader->binlog_fd = open(full_filename, O_RDONLY); if (pReader->binlog_fd < 0) { logError("file: "__FILE__", line: %d, " \ "open binlog file \"%s\" fail, " \ "errno: %d, error info: %s", \ __LINE__, full_filename, \ errno, strerror(errno)); return errno != 0 ? errno : ENOENT; } if (pReader->binlog_offset > 0 && \ lseek(pReader->binlog_fd, pReader->binlog_offset, SEEK_SET) < 0) { logError("file: "__FILE__", line: %d, " \ "seek binlog file \"%s\" fail, file offset="INT64_PRINTF_FORMAT", " \ "errno: %d, error info: %s", \ __LINE__, full_filename, pReader->binlog_offset, \ errno, strerror(errno)); close(pReader->binlog_fd); pReader->binlog_fd = -1; return errno != 0 ? errno : ESPIPE; } return 0;}static char *get_mark_filename(const void *pArg, \ char *full_filename){ const BinLogReader *pReader; static char buff[MAX_PATH_SIZE]; pReader = (const BinLogReader *)pArg; if (full_filename == NULL) { full_filename = buff; } snprintf(full_filename, MAX_PATH_SIZE, \ "%s/data/"SYNC_DIR_NAME"/%s_%d%s", g_base_path, \ pReader->ip_addr, g_server_port, SYNC_MARK_FILE_EXT); return full_filename;}static int storage_report_storage_status(const char *ip_addr, \ const char status){ FDFSStorageBrief briefServers[1]; TrackerServerInfo trackerServer; TrackerServerInfo *pGlobalServer; TrackerServerInfo *pTServer; TrackerServerInfo *pTServerEnd; int result; int i; strcpy(briefServers[0].ip_addr, ip_addr); briefServers[0].status = status; result = 0; pTServer = &trackerServer; pTServerEnd = g_tracker_servers + g_tracker_server_count; for (pGlobalServer=g_tracker_servers; pGlobalServer < pTServerEnd; \ pGlobalServer++) { memcpy(pTServer, pGlobalServer, sizeof(TrackerServerInfo)); for (i=0; i < 3; i++) { pTServer->sock = socket(AF_INET, SOCK_STREAM, 0); if(pTServer->sock < 0) { result = errno != 0 ? errno : EPERM; logError("file: "__FILE__", line: %d, " \ "socket create failed, errno: %d, " \ "error info: %s.", \ __LINE__, result, strerror(result)); break; } if ((result=connectserverbyip(pTServer->sock, \ pTServer->ip_addr, pTServer->port)) == 0) { break; } close(pTServer->sock); pTServer->sock = -1; sleep(5); } if (pTServer->sock < 0) { logError("file: "__FILE__", line: %d, " \ "connect to tracker server %s:%d fail, " \ "errno: %d, error info: %s", \ __LINE__, pTServer->ip_addr, pTServer->port, \ result, strerror(result)); continue; } if (tracker_report_join(pTServer) != 0) { close(pTServer->sock); continue; } if ((result=tracker_sync_diff_servers(pTServer, \ briefServers, 1)) != 0) { } fdfs_quit(pTServer); close(pTServer->sock); } return 0;}static int storage_reader_sync_init_req(BinLogReader *pReader){ TrackerServerInfo *pTrackerServers; TrackerServerInfo *pTServer; TrackerServerInfo *pTServerEnd; char tracker_client_ip[IP_ADDRESS_SIZE]; int result; int conn_ret; pTrackerServers = (TrackerServerInfo *)malloc( \ sizeof(TrackerServerInfo) * g_tracker_server_count); if (pTrackerServers == NULL) { logError("file: "__FILE__", line: %d, " \ "malloc %d bytes fail", __LINE__, \ sizeof(TrackerServerInfo) * g_tracker_server_count); return errno != 0 ? errno : ENOMEM; } memcpy(pTrackerServers, g_tracker_servers, \ sizeof(TrackerServerInfo) * g_tracker_server_count); pTServerEnd = pTrackerServers + g_tracker_server_count; for (pTServer=pTrackerServers; pTServer<pTServerEnd; pTServer++) { pTServer->sock = -1; } result = EINTR; pTServer = pTrackerServers; while (1) { while (g_continue_flag) { pTServer->sock = socket(AF_INET, SOCK_STREAM, 0); if(pTServer->sock < 0) { logCrit("file: "__FILE__", line: %d, " \ "socket create failed, errno: %d, " \ "error info: %s. program exit!", \ __LINE__, errno, strerror(errno)); g_continue_flag = false; result = errno != 0 ? errno : EPERM; break; } if ((conn_ret=connectserverbyip(pTServer->sock, \ pTServer->ip_addr, pTServer->port)) == 0) { break; } logError("file: "__FILE__", line: %d, " \ "connect to tracker server %s:%d fail, " \ "errno: %d, error info: %s", \ __LINE__, pTServer->ip_addr, pTServer->port, \ conn_ret, strerror(conn_ret)); close(pTServer->sock); pTServer++; if (pTServer >= pTServerEnd) { pTServer = pTrackerServers; } sleep(g_heart_beat_interval); } if (!g_continue_flag) { break; } getSockIpaddr(pTServer->sock, \ tracker_client_ip, IP_ADDRESS_SIZE); insert_into_local_host_ip(tracker_client_ip); /* //printf("file: "__FILE__", line: %d, " \ "tracker_client_ip: %s\n", \ __LINE__, tracker_client_ip); //print_local_host_ip_addrs(); */ if (tracker_report_join(pTServer) != 0) { close(pTServer->sock); sleep(g_heart_beat_interval); continue; } if ((result=tracker_sync_src_req(pTServer, pReader)) != 0) { fdfs_quit(pTServer); close(pTServer->sock); sleep(g_heart_beat_interval); continue; } fdfs_quit(pTServer); close(pTServer->sock); break; } /* //printf("need_sync_old=%d, until_timestamp=%d\n", \ pReader->need_sync_old, pReader->until_timestamp); */ return result;}static int storage_reader_init(FDFSStorageBrief *pStorage, \ BinLogReader *pReader){ char full_filename[MAX_PATH_SIZE]; IniItemInfo *items; int nItemCount; int result; bool bFileExist; memset(pReader, 0, sizeof(BinLogReader)); pReader->mark_fd = -1; pReader->binlog_fd = -1; strcpy(pReader->ip_addr, pStorage->ip_addr); get_mark_filename(pReader, full_filename); bFileExist = fileExists(full_filename); if (bFileExist) { if ((result=iniLoadItems(full_filename, &items, &nItemCount)) \ != 0) { logError("file: "__FILE__", line: %d, " \ "load from mark file \"%s\" fail, " \ "error code: %d", \ __LINE__, full_filename, result); return result; } if (nItemCount < 7) { iniFreeItems(items); logError("file: "__FILE__", line: %d, " \ "in mark file \"%s\", item count: %d < 7", \ __LINE__, full_filename, nItemCount); return ENOENT; } pReader->binlog_index = iniGetIntValue( \ MARK_ITEM_BINLOG_FILE_INDEX, \ items, nItemCount, -1); pReader->binlog_offset = iniGetInt64Value( \ MARK_ITEM_BINLOG_FILE_OFFSET, \ items, nItemCount, -1); pReader->need_sync_old = iniGetBoolValue( \ MARK_ITEM_NEED_SYNC_OLD, \ items, nItemCount); pReader->sync_old_done = iniGetBoolValue( \ MARK_ITEM_SYNC_OLD_DONE, \ items, nItemCount); pReader->until_timestamp = iniGetIntValue( \ MARK_ITEM_UNTIL_TIMESTAMP, \ items, nItemCount, -1); pReader->scan_row_count = iniGetInt64Value( \ MARK_ITEM_SCAN_ROW_COUNT, \ items, nItemCount, 0); pReader->sync_row_count = iniGetInt64Value( \ MARK_ITEM_SYNC_ROW_COUNT, \ items, nItemCount, 0); if (pReader->binlog_index < 0) { iniFreeItems(items); logError("file: "__FILE__", line: %d, " \ "in mark file \"%s\", " \ "binlog_index: %d < 0", \ __LINE__, full_filename, \ pReader->binlog_index); return EINVAL; } if (pReader->binlog_offset < 0) { iniFreeItems(items); logError("file: "__FILE__", line: %d, " \ "in mark file \"%s\", " \ "binlog_offset: "INT64_PRINTF_FORMAT" < 0", \ __LINE__, full_filename, \ pReader->binlog_offset); return EINVAL; } iniFreeItems(items); } else { if ((result=storage_reader_sync_init_req(pReader)) != 0) { return result; } } pReader->last_write_row_count = pReader->scan_row_count; pReader->mark_fd = open(full_filename, O_WRONLY | O_CREAT, 0644); if (pReader->mark_fd < 0) { logError("file: "__FILE__", line: %d, " \ "open mark file \"%s\" fail, " \ "error no: %d, error info: %s", \ __LINE__, full_filename, \ errno, strerror(errno)); return errno != 0 ? errno : ENOENT; } if ((result=storage_open_readable_binlog(pReader)) != 0) { close(pReader->mark_fd); pReader->mark_fd = -1; return result; } if (!bFileExist) { if (!pReader->need_sync_old && pReader->until_timestamp > 0) { if ((result=storage_binlog_reader_skip(pReader)) != 0) { storage_reader_destroy(pReader); return result; } } if ((result=storage_write_to_mark_file(pReader)) != 0) { storage_reader_destroy(pReader); return result; } } return 0;}static void storage_reader_destroy(BinLogReader *pReader){ if (pReader->mark_fd >= 0) { close(pReader->mark_fd); pReader->mark_fd = -1; } if (pReader->binlog_fd >= 0) { close(pReader->binlog_fd); pReader->binlog_fd = -1; }}static int storage_write_to_mark_file(BinLogReader *pReader){ char buff[256]; int len; int result; len = sprintf(buff, "%s=%d\n" \ "%s="INT64_PRINTF_FORMAT"\n" \ "%s=%d\n" \ "%s=%d\n" \ "%s=%d\n" \ "%s="INT64_PRINTF_FORMAT"\n" \ "%s="INT64_PRINTF_FORMAT"\n", \ MARK_ITEM_BINLOG_FILE_INDEX, pReader->binlog_index, \ MARK_ITEM_BINLOG_FILE_OFFSET, pReader->binlog_offset, \ MARK_ITEM_NEED_SYNC_OLD, pReader->need_sync_old, \ MARK_ITEM_SYNC_OLD_DONE, pReader->sync_old_done, \ MARK_ITEM_UNTIL_TIMESTAMP, (int)pReader->until_timestamp, \ MARK_ITEM_SCAN_ROW_COUNT, pReader->scan_row_count, \ MARK_ITEM_SYNC_ROW_COUNT, pReader->sync_row_count); if ((result=storage_write_to_fd(pReader->mark_fd, get_mark_filename, \ pReader, buff, len)) == 0) { pReader->last_write_row_count = pReader->scan_row_count; } return result;}static int rewind_to_prev_rec_end(BinLogReader *pReader, \ const int record_length){ if (lseek(pReader->binlog_fd, -1 * record_length, \ SEEK_CUR) < 0) { logError("file: "__FILE__", line: %d, " \ "seek binlog file \"%s\"fail, " \ "file offset: "INT64_PRINTF_FORMAT", " \ "errno: %d, error info: %s", \ __LINE__, get_binlog_readable_filename(pReader, NULL), \ pReader->binlog_offset, \ errno, strerror(errno)); return errno != 0 ? errno : ENOENT; } return 0;}static int storage_binlog_read(BinLogReader *pReader, \ BinLogRecord *pRecord, int *record_length){ char line[256]; char *cols[3]; int result; while (1) { if ((*record_length=fd_gets(pReader->binlog_fd, line, \ sizeof(line), 49 + FDFS_FILE_EXT_NAME_MAX_LEN)) < 0) { logError("file: "__FILE__", line: %d, " \ "read a line from binlog file \"%s\" fail, " \ "file offset: "INT64_PRINTF_FORMAT", " \ "error no: %d, error info: %s", \ __LINE__, \ get_binlog_readable_filename(pReader, NULL), \ pReader->binlog_offset, \ errno, strerror(errno)); return errno != 0 ? errno : ENOENT; } if (*record_length == 0) { if (pReader->binlog_index < g_binlog_index) //rotate { pReader->binlog_index++; pReader->binlog_offset = 0; if ((result=storage_open_readable_binlog( \ pReader)) != 0) { return result; } if ((result=storage_write_to_mark_file( \ pReader)) != 0) { return result; } continue; //read next binlog }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -