📄 afr.c
字号:
afrfdp = data_to_ptr (dict_get (local->fd->ctx, this->name)); if (op_ret >= 0) { GF_BUG_ON (!local->fd); for (i = 0; i < child_count; i++) if (prev_frame->this == children[i]) break; afrfdp->fdstate[i] = 1; } else { GF_ERROR (this, "(path=%s child=%s) op_ret=%d op_errno=%d ", local->loc->path, prev_frame->this->name, op_ret, op_errno); } LOCK (&frame->lock); { callcnt = --local->call_count; } UNLOCK (&frame->lock); if (callcnt == 0) { int32_t src_open = 0, sync_file_cnt = 0; for (i = 0; i < child_count; i++) { if (afrfdp->fdstate[i]) { sync_file_cnt++; if (children[i] == local->source->xl) src_open = 1; } } if (src_open && (sync_file_cnt >= 2)) { /* source open success + atleast a file to sync */ afr_selfheal_sync_file (frame, this); } else { local->call_count = sync_file_cnt; for (i = 0; i < child_count; i++) { if (afrfdp->fdstate[i]) STACK_WIND (frame, afr_selfheal_nosync_close_cbk, children[i], children[i]->fops->close, local->fd); } } } return 0;}int32_tafr_selfheal_stat_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct stat *stat){ int32_t cnt = 0; afr_local_t *local = frame->local; struct list_head *list = local->list; afr_selfheal_t *ash, *source = local->source; if (op_ret == 0) { local->source->stat = *stat; } else { char *lock_path = NULL; call_frame_t *open_frame = local->orig_frame; afr_local_t *open_local = open_frame->local; open_local->sh_return_error = 1; asprintf (&lock_path, "/%s%s", local->lock_node->name, local->loc->path); GF_ERROR (this, "stat() on latest file failed (errno=%d), calling unlock on %s", op_errno, lock_path); STACK_WIND (frame, afr_selfheal_unlock_cbk, local->lock_node, local->lock_node->mops->unlock, lock_path); freee (lock_path); return 0; } cnt = local->call_count; list_for_each_entry (ash, list, clist) { int32_t flags; if (ash == source || ash->repair) { if (ash == source) flags = O_RDONLY; else flags = O_RDWR | O_TRUNC; AFR_DEBUG_FMT (this, "open() on %s", ash->xl->name); STACK_WIND (frame, afr_selfheal_open_cbk, ash->xl, ash->xl->fops->open, local->loc, flags, local->fd); if (--cnt == 0) break; continue; } } return 0;}/* TODO: crappy code, clean this function */int32_tafr_selfheal_getxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *dict){ int32_t callcnt = 0; afrfd_t *afrfdp = NULL; afr_selfheal_t *ash = NULL; afr_local_t *local = frame->local; struct list_head *list = local->list; call_frame_t *prev_frame = cookie; afr_private_t *pvt = this->private; int32_t child_count = pvt->child_count; list_for_each_entry (ash, list, clist) { if (prev_frame->this == ash->xl) break; } if (op_ret >= 0) { if (dict){ ash->dict = dict_ref (dict); data_t *version_data = dict_get (dict, GLUSTERFS_VERSION); if (version_data) { /* version_data->data is NULL terminated bin data*/ ash->version = data_to_uint32 (version_data); } else { AFR_DEBUG_FMT (this, "version attribute was not found on %s, defaulting to 1", prev_frame->this->name) ash->version = 1; dict_set(ash->dict, GLUSTERFS_VERSION, bin_to_data("1", 1)); } data_t *ctime_data = dict_get (dict, GLUSTERFS_CREATETIME); if (ctime_data) { /* ctime_data->data is NULL terminated bin data */ ash->ctime = data_to_uint32 (ctime_data); } else { ash->ctime = 0; } AFR_DEBUG_FMT (this, "op_ret = %d version = %u ctime = %u from %s", op_ret, ash->version, ash->ctime, prev_frame->this->name); ash->op_errno = 0; } } else { AFR_DEBUG_FMT (this, "op_ret = %d from %s", op_ret, prev_frame->this->name); if (op_errno != ENODATA) { GF_ERROR (this, "(path=%s child=%s) op_ret=%d op_errno=%d", local->loc->path, prev_frame->this->name, op_ret, op_errno); } ash->op_errno = op_errno; if (op_errno == ENODATA) { ash->dict = dict_ref (dict); ash->version = 1; dict_set(ash->dict, GLUSTERFS_VERSION, bin_to_data("1", 1)); ash->ctime = 0; } } LOCK(&frame->lock); { callcnt = --local->call_count; } UNLOCK (&frame->lock); if (callcnt == 0) { uint32_t latest = 0; int32_t cnt = 0; afr_selfheal_t *source = NULL; int32_t ctime_repair = 0; list_for_each_entry (ash, list, clist) { if (ash->inode == NULL) continue; if (source == NULL) { source = ash; continue; } if (source->ctime != ash->ctime) { ctime_repair = 1; if (source->ctime < ash->ctime) source = ash; continue; } if (source->ctime == ash->ctime && source->version < ash->version) { source = ash; } } if (ctime_repair) { AFR_DEBUG_FMT(this, "create time difference! latest is %s", source->xl->name); cnt = 0; list_for_each_entry (ash, list, clist) { cnt++; if (ash == source) { AFR_DEBUG_FMT (this, "%s is the source", ash->xl->name); local->call_count++; continue; } if (ash->op_errno != 0) { continue; } if (source->ctime > ash->ctime) { AFR_DEBUG_FMT (this, "%s ctime is outdated", ash->xl->name); ash->repair = 1; local->call_count++; continue; } if (source->ctime == ash->ctime && source->version > ash->version) { AFR_DEBUG_FMT (this, "%s version is outdated", ash->xl->name); ash->repair = 1; local->call_count++; } AFR_DEBUG_FMT (this, "%s does not need repair", ash->xl->name); } } else { source = NULL; list_for_each_entry (ash, list, clist) { AFR_DEBUG_FMT (this, "latest %d ash->version %d", latest, ash->version); if (ash->inode) { if (latest == 0) { latest = ash->version; continue; } if (ash->version > latest) latest = ash->version; } } if (latest == 0) { char *lock_path = NULL; asprintf (&lock_path, "/%s%s", local->lock_node->name, local->loc->path); AFR_DEBUG_FMT (this, "latest version is 0? or the file does not have verion attribute?"); STACK_WIND (frame, afr_selfheal_unlock_cbk, local->lock_node, local->lock_node->mops->unlock, lock_path); freee (lock_path); return 0; } AFR_DEBUG_FMT (this, "latest version is %u", latest); cnt = 0; list_for_each_entry (ash, list, clist) { cnt++; if (latest == ash->version && source == NULL) { local->call_count++; AFR_DEBUG_FMT (this, "%s is latest, %d", ash->xl->name, local->call_count); source = ash; continue; } if (ash->op_errno != 0) continue; if (latest > ash->version) { ash->repair = 1; local->call_count++; AFR_DEBUG_FMT (this, "%s version %d outdated, latest=%d, %d", ash->xl->name, ash->version, latest, local->call_count); } } if (local->call_count == 1) { char *lock_path = NULL; asprintf (&lock_path, "/%s%s", local->lock_node->name, local->loc->path); /* call_count would have got incremented for source */ AFR_DEBUG_FMT (this, "self heal NOT needed"); STACK_WIND (frame, afr_selfheal_unlock_cbk, local->lock_node, local->lock_node->mops->unlock, lock_path); freee (lock_path); return 0; } } GF_DEBUG (this, "self-heal needed (path=%s source=%s)", local->loc->path, source->xl->name); local->source = source; local->fd = calloc (1, sizeof(fd_t)); local->fd->ctx = get_new_dict(); afrfdp = calloc (1, sizeof (*afrfdp)); afrfdp->fdstate = calloc (child_count, sizeof (char)); dict_set (local->fd->ctx, this->name, data_from_static_ptr (afrfdp)); local->fd->inode = local->loc->inode; cnt = local->call_count; STACK_WIND (frame, afr_selfheal_stat_cbk, source->xl, source->xl->fops->stat, local->loc); } return 0;}int32_tafr_selfheal_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno){ int32_t totcnt = 0; afr_selfheal_t *ash, *ashtemp; afr_local_t *local = frame->local; struct list_head *list = local->list; call_frame_t *prev_frame = cookie; AFR_DEBUG_FMT(this, "op_ret = %d", op_ret, op_errno); if (op_ret == -1) { AFR_DEBUG_FMT (this, "locking failed!"); GF_ERROR (this, "(path=%s child=%s) op_ret=%d op_errno=%d", local->loc->path, prev_frame->this->name, op_ret, op_errno); call_frame_t *open_frame = local->orig_frame; afr_local_t *open_local = open_frame->local; open_local->sh_return_error = 1; call_resume(local->stub); freee (local->loc->path); freee (local->loc); if (local->fd) { afrfd_t *afrfdp; afrfdp = data_to_ptr (dict_get(local->fd->ctx, this->name)); freee(afrfdp->fdstate); /* afrfdp is freed in dict_destroy */ dict_destroy (local->fd->ctx); freee (local->fd); } list_for_each_entry_safe (ash, ashtemp, list, clist) { list_del (&ash->clist); if (ash->dict) dict_unref (ash->dict); freee (ash); } freee (list); STACK_DESTROY (frame->root); return 0; } list_for_each_entry (ash, list, clist) { if(ash->inode) local->call_count++; } totcnt = local->call_count; list_for_each_entry (ash, list, clist) { if (ash->inode) { AFR_DEBUG_FMT (this, "calling getxattr on %s", ash->xl->name); STACK_WIND (frame, afr_selfheal_getxattr_cbk, ash->xl, ash->xl->fops->getxattr, local->loc); if (--totcnt == 0) break; } } return 0;}int32_tafr_selfheal (call_frame_t *frame, xlator_t *this, call_stub_t *stub, loc_t *loc){ int32_t i, lock_node = 0; char *child_errno = NULL; char *lock_path = NULL; afr_selfheal_t *ash; call_frame_t *shframe = copy_frame (frame); afr_local_t *shlocal = calloc (1, sizeof (afr_local_t)); struct list_head *list = calloc (1, sizeof (*list)); afr_private_t *pvt = this->private; xlator_t **children = pvt->children; int32_t child_count = pvt->child_count; AFR_DEBUG(this); child_errno = data_to_ptr (dict_get (loc->inode->ctx, this->name)); for (i = 0; i < child_count; i++) { if (pvt->state[i]) break; } if (i == child_count) { GF_ERROR (this, "none of the children are up for locking, returning EIO"); freee (list); freee (shlocal); STACK_DESTROY (shframe->root); /* copy_frame() allocates some memory, free it */ STACK_UNWIND (frame, -1, EIO, NULL); return 0; } lock_node = i; INIT_LIST_HEAD (list); shframe->local = shlocal; shlocal->list = list; shlocal->loc = calloc (1, sizeof (loc_t)); shlocal->loc->path = strdup (loc->path); shlocal->loc->inode = loc->inode; shlocal->orig_frame = frame; shlocal->stub = stub; ((afr_local_t*)frame->local)->shcalled = 1; shframe->root->uid = 0; shframe->root->gid = 0; for (i = 0; i < child_count; i++) { ash = calloc (1, sizeof (*ash)); ash->xl = children[i]; if (child_errno[i] == 0) ash->inode = (void*)1; ash->op_errno = child_errno[i]; list_add_tail (&ash->clist, list); } AFR_DEBUG_FMT (this, "locking the node %s", children[lock_node]->name); shlocal->lock_node = children[lock_node]; asprintf (&lock_path, "/%s%s", children[lock_node]->name, loc->path); STACK_WIND (shframe, afr_selfheal_lock_cbk, children[lock_node], children[lock_node]->mops->lock, lock_path); freee (lock_path); return 0;}int32_tafr_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, fd_t *fd){ int32_t callcnt, i; afr_local_t *local = frame->local; call_frame_t *prev_frame = cookie; afr_private_t *pvt = this->private; xlator_t **children = pvt->children; int32_t child_count = pvt->child_count; AFR_DEBUG(this); if (op_ret == -1 && op_errno != ENOTCONN) { local->op_errno = op_errno; } if (op_ret >= 0 && local->op_ret == -1) { local->op_ret = op_ret; } if (op_ret == -1) { GF_ERROR (this, "(path=%s child=%s) op_ret=%d op_errno=%d", local->loc->path, prev_frame->this->name, op_ret, op_errno); } LOCK (&frame->lock); { if (op_ret >= 0) { GF_BUG_ON (!fd); afrfd_t *afrfdp; data_t *afrfdp_data; afrfdp_data = dict_get (fd->ctx, this->name); if (afrfdp_data == NULL) { /* first successful open_cbk */ afrfdp = calloc (1, sizeof (afrfd_t)); afrfdp->fdstate = calloc (child_count, sizeof (char)); afrfdp->fdsuccess = calloc (child_count, sizeof (char)); /* path will be used during close to increment version */ afrfdp->path = strdup (local->loc->path); dict_set (fd->ctx, this->name, data_from_static_ptr (afrfdp)); /* we use the path here just for debugging */ if (local->flags & O_TRUNC) afrfdp->write = 1; } else { afrfdp = data_to_ptr (afrfdp_data); } for (i = 0; i < child_count; i++) { if (children[i] == prev_frame->this) break; } /* 1 indicates open success, 0 indicates failure */ afrfdp->fdstate[i] = 1; afrfdp->fdsuccess[i] = 1; } callcnt = --local->call_count; } UNLOCK (&frame->lock); if (callcnt == 0) { afrfd_t *afrfdp = data_to_ptr (dict_get(local->fd->ctx, this->name)); if (local->op_ret != -1) { if (pvt->read_node == -1 || afrfdp->fdstate[pvt->read_node] == 0) { int32_t rchild = 0, alive_children = 0;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -