📄 storage_sync.c
字号:
return ENOENT; } break; } if (line[*record_length-1] != '\n') { if ((result=rewind_to_prev_rec_end(pReader, \ *record_length)) != 0) { return result; } logError("file: "__FILE__", line: %d, " \ "get a line from binlog file \"%s\" fail, " \ "file offset: "INT64_PRINTF_FORMAT", " \ "no new line char, line length: %d", \ __LINE__, get_binlog_readable_filename(pReader, NULL), \ pReader->binlog_offset, *record_length); return ENOENT; } if ((result=splitEx(line, ' ', cols, 3)) < 3) { logError("file: "__FILE__", line: %d, " \ "read data from binlog file \"%s\" fail, " \ "file offset: "INT64_PRINTF_FORMAT", " \ "read item count: %d < 3", \ __LINE__, get_binlog_readable_filename(pReader, NULL), \ pReader->binlog_offset, result); return ENOENT; } pRecord->timestamp = atoi(cols[0]); pRecord->op_type = *(cols[1]); pRecord->filename_len = strlen(cols[2]) - 1; //need trim new line \n if (pRecord->filename_len > sizeof(pRecord->filename)-1) { logError("file: "__FILE__", line: %d, " \ "item \"filename\" in binlog " \ "file \"%s\" is invalid, file offset: " \ INT64_PRINTF_FORMAT", filename length: %d > %d", \ __LINE__, get_binlog_readable_filename(pReader, NULL), \ pReader->binlog_offset, \ pRecord->filename_len, sizeof(pRecord->filename)-1); return EINVAL; } memcpy(pRecord->filename, cols[2], pRecord->filename_len); *(pRecord->filename + pRecord->filename_len) = '\0'; pRecord->true_filename_len = pRecord->filename_len; if ((result=storage_split_filename(pRecord->filename, \ &pRecord->true_filename_len, pRecord->true_filename, \ &pRecord->pBasePath)) != 0) { return result; } /* //printf("timestamp=%d, op_type=%c, filename=%s(%d), line length=%d, " \ "offset=%d\n", \ pRecord->timestamp, pRecord->op_type, \ pRecord->filename, strlen(pRecord->filename), \ *record_length, pReader->binlog_offset); */ return 0;}static int storage_binlog_reader_skip(BinLogReader *pReader){ BinLogRecord record; int result; int record_len; while (1) { result = storage_binlog_read(pReader, \ &record, &record_len); if (result != 0) { if (result == ENOENT) { return 0; } return result; } if (record.timestamp >= pReader->until_timestamp) { result = rewind_to_prev_rec_end( \ pReader, record_len); break; } pReader->binlog_offset += record_len; } return result;}static int storage_unlink_mark_file(BinLogReader *pReader){ char old_filename[MAX_PATH_SIZE]; char new_filename[MAX_PATH_SIZE]; time_t t; struct tm tm; t = time(NULL); localtime_r(&t, &tm); get_mark_filename(pReader, old_filename); snprintf(new_filename, sizeof(new_filename), \ "%s.%04d%02d%02d%02d%02d%02d", old_filename, \ tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday, \ tm.tm_hour, tm.tm_min, tm.tm_sec); if (rename(old_filename, new_filename) != 0) { logError("file: "__FILE__", line: %d, " \ "rename file %s to %s fail" \ ", errno: %d, error info: %s", \ __LINE__, old_filename, new_filename, \ errno, strerror(errno)); return errno != 0 ? errno : EACCES; } return 0;}static void storage_sync_get_start_end_times(time_t current_time, \ const FDFSTimeInfo *pStartTime, const FDFSTimeInfo *pEndTime, \ time_t *start_time, time_t *end_time){ struct tm tm_time; //char buff[32]; localtime_r(¤t_time, &tm_time); tm_time.tm_sec = 0; /* strftime(buff, sizeof(buff), "%Y-%m-%d %H:%M:%S", &tm_time); //printf("current time: %s\n", buff); */ tm_time.tm_hour = pStartTime->hour; tm_time.tm_min = pStartTime->minute; *start_time = mktime(&tm_time); //end time < start time if (pEndTime->hour < pStartTime->hour || (pEndTime->hour == \ pStartTime->hour && pEndTime->minute < pStartTime->minute)) { current_time += 24 * 3600; localtime_r(¤t_time, &tm_time); tm_time.tm_sec = 0; } tm_time.tm_hour = pEndTime->hour; tm_time.tm_min = pEndTime->minute; *end_time = mktime(&tm_time);}static void* storage_sync_thread_entrance(void* arg){ FDFSStorageBrief *pStorage; BinLogReader reader; BinLogRecord record; TrackerServerInfo storage_server; char local_ip_addr[IP_ADDRESS_SIZE]; int read_result; int sync_result; int conn_result; int result; int record_len; int previousCode; int nContinuousFail; time_t current_time; time_t start_time; time_t end_time; time_t last_check_sync_cache_time; memset(local_ip_addr, 0, sizeof(local_ip_addr)); memset(&reader, 0, sizeof(reader)); current_time = time(NULL); start_time = 0; end_time = 0; pStorage = (FDFSStorageBrief *)arg; last_check_sync_cache_time = time(NULL); strcpy(storage_server.ip_addr, pStorage->ip_addr); strcpy(storage_server.group_name, g_group_name); storage_server.port = g_server_port; storage_server.sock = -1; while (g_continue_flag && \ pStorage->status != FDFS_STORAGE_STATUS_DELETED && pStorage->status != FDFS_STORAGE_STATUS_NONE) { if (storage_reader_init(pStorage, &reader) != 0) { logCrit("file: "__FILE__", line: %d, " \ "storage_reader_init fail, program exit!", \ __LINE__); g_continue_flag = false; break; } while (g_continue_flag && \ (pStorage->status != FDFS_STORAGE_STATUS_ACTIVE && \ pStorage->status != FDFS_STORAGE_STATUS_WAIT_SYNC && \ pStorage->status != FDFS_STORAGE_STATUS_SYNCING && \ pStorage->status != FDFS_STORAGE_STATUS_DELETED && \ pStorage->status != FDFS_STORAGE_STATUS_NONE)) { sleep(1); } if (g_sync_part_time) { current_time = time(NULL); storage_sync_get_start_end_times(current_time, \ &g_sync_end_time, &g_sync_start_time, \ &start_time, &end_time); start_time += 60; end_time -= 60; while (g_continue_flag && (current_time >= start_time \ && current_time <= end_time)) { current_time = time(NULL); sleep(1); } } previousCode = 0; nContinuousFail = 0; conn_result = 0; while (g_continue_flag && \ pStorage->status != FDFS_STORAGE_STATUS_DELETED && \ pStorage->status != FDFS_STORAGE_STATUS_NONE) { storage_server.sock = \ socket(AF_INET, SOCK_STREAM, 0); if(storage_server.sock < 0) { logCrit("file: "__FILE__", line: %d," \ " socket create fail, " \ "errno: %d, error info: %s. " \ "program exit!", __LINE__, \ errno, strerror(errno)); g_continue_flag = false; break; } if ((conn_result=connectserverbyip(storage_server.sock,\ storage_server.ip_addr, g_server_port)) == 0) { char szFailPrompt[36]; if (nContinuousFail == 0) { *szFailPrompt = '\0'; } else { sprintf(szFailPrompt, \ ", continuous fail count: %d", \ nContinuousFail); } logInfo("file: "__FILE__", line: %d, " \ "successfully connect to " \ "storage server %s:%d%s", __LINE__, \ storage_server.ip_addr, \ g_server_port, szFailPrompt); nContinuousFail = 0; break; } if (previousCode != conn_result) { logError("file: "__FILE__", line: %d, " \ "connect to storage server %s:%d fail" \ ", errno: %d, error info: %s", \ __LINE__, \ storage_server.ip_addr, g_server_port, \ conn_result, strerror(conn_result)); previousCode = conn_result; } nContinuousFail++; close(storage_server.sock); storage_server.sock = -1; sleep(1); } if (nContinuousFail > 0) { logError("file: "__FILE__", line: %d, " \ "connect to storage server %s:%d fail, " \ "try count: %d, errno: %d, error info: %s", \ __LINE__, storage_server.ip_addr, \ g_server_port, nContinuousFail, \ conn_result, strerror(conn_result)); } if (!g_continue_flag) { break; } getSockIpaddr(storage_server.sock, \ local_ip_addr, IP_ADDRESS_SIZE); /* //printf("file: "__FILE__", line: %d, " \ "storage_server.ip_addr=%s, " \ "local_ip_addr: %s\n", \ __LINE__, storage_server.ip_addr, local_ip_addr); */ if (strcmp(local_ip_addr, storage_server.ip_addr) == 0) { logError("file: "__FILE__", line: %d, " \ "ip_addr %s belong to the local host," \ " sync thread exit.", \ __LINE__, storage_server.ip_addr); fdfs_quit(&storage_server); break; } if (pStorage->status == FDFS_STORAGE_STATUS_WAIT_SYNC) { pStorage->status = FDFS_STORAGE_STATUS_SYNCING; storage_report_storage_status(pStorage->ip_addr, \ pStorage->status); } if (pStorage->status == FDFS_STORAGE_STATUS_SYNCING) { if (reader.need_sync_old && reader.sync_old_done) { pStorage->status = FDFS_STORAGE_STATUS_ONLINE; storage_report_storage_status( \ pStorage->ip_addr, \ pStorage->status); } } if (g_sync_part_time) { current_time = time(NULL); storage_sync_get_start_end_times(current_time, \ &g_sync_start_time, &g_sync_end_time, \ &start_time, &end_time); } sync_result = 0; while (g_continue_flag && (!g_sync_part_time || \ (current_time >= start_time && \ current_time <= end_time)) && \ pStorage->status != FDFS_STORAGE_STATUS_DELETED && \ pStorage->status != FDFS_STORAGE_STATUS_NONE) { if (g_sync_part_time) { current_time = time(NULL); } read_result = storage_binlog_read(&reader, \ &record, &record_len); if (read_result == ENOENT) { if (reader.need_sync_old && \ !reader.sync_old_done) { reader.sync_old_done = true; if (storage_write_to_mark_file(&reader) != 0) { logCrit("file: "__FILE__", line: %d, " \ "storage_write_to_mark_file " \ "fail, program exit!", \ __LINE__); g_continue_flag = false; break; } if (pStorage->status == \ FDFS_STORAGE_STATUS_SYNCING) { pStorage->status = \ FDFS_STORAGE_STATUS_ONLINE; storage_report_storage_status( \ pStorage->ip_addr, \ pStorage->status); } } if (binlog_write_cache_len > 0 && \ time(NULL)-last_check_sync_cache_time >= 60) { last_check_sync_cache_time = time(NULL); storage_binlog_fsync(true); } usleep(g_sync_wait_usec); continue; } else if (read_result != 0) { sleep(5); continue; } if ((sync_result=storage_sync_data(&reader, \ &storage_server, &record)) != 0) { if (rewind_to_prev_rec_end( \ &reader, record_len) != 0) { logCrit("file: "__FILE__", line: %d, " \ "rewind_to_prev_rec_end fail, "\ "program exit!", __LINE__); g_continue_flag = false; } break; } reader.binlog_offset += record_len; reader.scan_row_count++; if (reader.sync_row_count % 1000 == 0) { if (storage_write_to_mark_file(&reader) != 0) { logCrit("file: "__FILE__", line: %d, " \ "storage_write_to_mark_file " \ "fail, program exit!", \ __LINE__); g_continue_flag = false; break; } } if (g_sync_interval > 0) { usleep(g_sync_interval); } } if (reader.last_write_row_count != reader.scan_row_count) { if (storage_write_to_mark_file(&reader) != 0) { logCrit("file: "__FILE__", line: %d, " \ "storage_write_to_mark_file fail, " \ "program exit!", __LINE__); g_continue_flag = false; break; } } close(storage_server.sock); storage_server.sock = -1; storage_reader_destroy(&reader); if (!g_continue_flag) { break; } if (!(sync_result == ENOTCONN || sync_result == EIO)) { sleep(1); } } if (storage_server.sock >= 0) { close(storage_server.sock); } storage_reader_destroy(&reader); if (pStorage->status == FDFS_STORAGE_STATUS_DELETED) { storage_unlink_mark_file(&reader); if (strcmp(g_sync_src_ip_addr, pStorage->ip_addr) == 0) { g_sync_src_ip_addr[0] = '\0'; storage_write_to_sync_ini_file(); } sleep(2 * g_heart_beat_interval + 1); pStorage->status = FDFS_STORAGE_STATUS_NONE; } if ((result=pthread_mutex_lock(&sync_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_lock fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); } g_storage_sync_thread_count--; if ((result=pthread_mutex_unlock(&sync_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_unlock fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); } return NULL;}int storage_sync_thread_start(const FDFSStorageBrief *pStorage){ int result; pthread_attr_t pattr; pthread_t tid; if (pStorage->status == FDFS_STORAGE_STATUS_DELETED || \ pStorage->status == FDFS_STORAGE_STATUS_NONE) { return 0; } if (is_local_host_ip(pStorage->ip_addr)) //can't self sync to self { return 0; } if ((result=init_pthread_attr(&pattr)) != 0) { return result; } /* //printf("start storage ip_addr: %s, g_storage_sync_thread_count=%d\n", pStorage->ip_addr, g_storage_sync_thread_count); */ if ((result=pthread_create(&tid, &pattr, storage_sync_thread_entrance, \ (void *)pStorage)) != 0) { logError("file: "__FILE__", line: %d, " \ "create thread failed, errno: %d, " \ "error info: %s", \ __LINE__, result, strerror(result)); pthread_attr_destroy(&pattr); return result; } if ((result=pthread_mutex_lock(&sync_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_lock fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); } g_storage_sync_thread_count++; sync_tids = (pthread_t *)realloc(sync_tids, sizeof(pthread_t) * \ g_storage_sync_thread_count); if (sync_tids == NULL) { logError("file: "__FILE__", line: %d, " \ "malloc %d bytes fail, " \ "errno: %d, error info: %s", \ __LINE__, sizeof(pthread_t) * \ g_storage_sync_thread_count, \ errno, strerror(errno)); } else { sync_tids[g_storage_sync_thread_count - 1] = tid; } if ((result=pthread_mutex_unlock(&sync_thread_lock)) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_unlock fail, " \ "errno: %d, error info: %s", \ __LINE__, result, strerror(result)); } pthread_attr_destroy(&pattr); return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -