📄 afr.c
字号:
call_frame_t *prev_frame = cookie; AFR_DEBUG(this); if (op_ret != 0 && op_errno != ENOTCONN) { local->op_errno = op_errno; } if (op_ret == 0) { local->op_ret = op_ret; } else { GF_ERROR (this, "(path=%s child=%s) op_ret=%d op_errno=%d", local->loc->path, prev_frame->this->name, op_ret, op_errno); } LOCK (&frame->lock); { callcnt = --local->call_count; } UNLOCK (&frame->lock); if (callcnt == 0) { afr_loc_free (local->loc); STACK_UNWIND (frame, local->op_ret, local->op_errno); } return 0;}int32_tafr_removexattr (call_frame_t *frame, xlator_t *this, loc_t *loc, const char *name){ char *afr_errno = NULL; char *child_errno = NULL; afr_local_t *local = (void *) calloc (1, sizeof (afr_local_t)); afr_private_t *pvt = this->private; xlator_t **children = pvt->children; int32_t child_count = pvt->child_count, i; AFR_DEBUG(this); afr_errno = data_to_ptr (dict_get (loc->inode->ctx, this->name)); AFR_ERRNO_DUP(child_errno, afr_errno, child_count); frame->local = local; local->op_ret = -1; local->op_errno = ENOTCONN; local->loc = afr_loc_dup (loc); for (i = 0; i < child_count; i++) { if (child_errno[i] == 0) ++local->call_count; } if (local->call_count == 0) { GF_ERROR (this, "child_errno[] is not 0, returning ENOTCONN"); STACK_UNWIND (frame, -1, ENOTCONN); return 0; } for (i = 0; i < child_count; i++) { if (child_errno[i] == 0) STACK_WIND (frame, afr_removexattr_cbk, children[i], children[i]->fops->removexattr, loc, name); } return 0;}int32_tafr_selfheal_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno){ afr_local_t *local = frame->local; afr_selfheal_t *ash, *ashtemp; call_frame_t *prev_frame = cookie; struct list_head *list = local->list; if (op_ret == -1) { GF_ERROR (this, "(path=%s child=%s) op_ret=%d op_errno=%d", local->loc->path, prev_frame->this->name, op_ret, op_errno); call_frame_t *open_frame = local->orig_frame; afr_local_t *open_local = open_frame->local; open_local->sh_return_error = 1; } AFR_DEBUG_FMT (this, "call_resume()"); call_resume (local->stub); /* clean up after resume */ freee (local->loc->path); freee (local->loc); if (local->fd) { afrfd_t *afrfdp = data_to_ptr (dict_get(local->fd->ctx, this->name)); freee (afrfdp->fdstate); /* afrfdp->path is not allocated */ freee (afrfdp); dict_destroy (local->fd->ctx); freee (local->fd); } list_for_each_entry_safe (ash, ashtemp, list, clist) { list_del (&ash->clist); if (ash->dict) dict_unref (ash->dict); freee (ash); } freee (list); STACK_DESTROY (frame->root); return 0;}int32_tafr_selfheal_nosync_close_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno);/* we call afr_error_during_sync if there was any error during read/write during syncing. */int32_tafr_error_during_sync (call_frame_t *frame){ int32_t cnt; afr_local_t *local = frame->local; afr_private_t *pvt = frame->this->private; xlator_t **children = pvt->children; int32_t child_count = pvt->child_count, i; afrfd_t *afrfdp = NULL; call_frame_t *open_frame = local->orig_frame; afr_local_t *open_local = open_frame->local; GF_ERROR (frame->this, "error during self-heal"); afrfdp = data_to_ptr(dict_get(local->fd->ctx, frame->this->name)); open_local->sh_return_error = 1; local->call_count = 0; for (i = 0; i < child_count; i++) { if(afrfdp->fdstate[i]) local->call_count++; } /* local->call_count cant be 0 because we'll have atleast source's fd in dict */ GF_BUG_ON (!local->call_count); cnt = local->call_count; for (i = 0; i < child_count; i++) { if(afrfdp->fdstate[i]){ STACK_WIND (frame, afr_selfheal_nosync_close_cbk, children[i], children[i]->fops->close, local->fd); /* in case this is the last WIND, list will be free'd before next iteration * causing segfault. so we use a cnt counter */ if (--cnt == 0) break; } } return 0;}int32_tafr_selfheal_setxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno){ int32_t callcnt = 0; afr_local_t *local = frame->local; call_frame_t *prev_frame = cookie; AFR_DEBUG_FMT (this, "op_ret = %d from client %s", op_ret, prev_frame->this->name); if (op_ret == -1) { GF_ERROR (this, "(path=%s child=%s) op_ret=%d op_errno=%d", local->loc->path, prev_frame->this->name, op_ret, op_errno); /* since we would have already called close, we wont use afr_error_during_sync */ call_frame_t *open_frame = local->orig_frame; afr_local_t *open_local = open_frame->local; open_local->sh_return_error = 1; } LOCK (&frame->lock); { callcnt = --local->call_count; } UNLOCK (&frame->lock); if (callcnt == 0) { char *lock_path = NULL; asprintf (&lock_path, "/%s%s", local->lock_node->name, local->loc->path); STACK_WIND (frame, afr_selfheal_unlock_cbk, local->lock_node, local->lock_node->mops->unlock, lock_path); freee (lock_path); } return 0;}int32_tafr_selfheal_utimens_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct stat *stat){ int32_t callcnt = 0; afr_local_t *local = frame->local; call_frame_t *prev_frame = cookie; if (op_ret == -1) { GF_ERROR (this, "(path=%s child=%s) op_ret=%d op_errno=%d", local->loc->path, prev_frame->this->name, op_ret, op_errno); /* since we would have already called close, we wont use afr_error_during_sync */ call_frame_t *open_frame = local->orig_frame; afr_local_t *open_local = open_frame->local; open_local->sh_return_error = 1; } LOCK (&frame->lock); { callcnt = --local->call_count; } UNLOCK (&frame->lock); if (callcnt == 0) { char *lock_path = NULL; asprintf (&lock_path, "/%s%s", local->lock_node->name, local->loc->path); STACK_WIND (frame, afr_selfheal_unlock_cbk, local->lock_node, local->lock_node->mops->unlock, lock_path); freee (lock_path); } return 0;}/* FIXME handle the situation when one of the close fails */int32_tafr_selfheal_close_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno){ int32_t cnt; int32_t callcnt; struct list_head *list; afr_selfheal_t *ash; afr_local_t *local = frame->local; AFR_DEBUG (this); LOCK (&frame->lock); { callcnt = --local->call_count; } UNLOCK (&frame->lock); if (callcnt == 0) { if (local->source->ctime == 0) { /* if source didnt have any ctime xattr */ struct timeval tv; /* if it was copied in the backend */ int32_t ctime; char dict_ctime[100]; gettimeofday (&tv, NULL); ctime = tv.tv_sec; sprintf (dict_ctime, "%u", ctime); dict_set (local->source->dict, GLUSTERFS_CREATETIME, bin_to_data (dict_ctime, strlen (dict_ctime))); } list = local->list; list_for_each_entry (ash, list, clist) { if (ash->inode && (ash->repair || ash->version == 1)) { /* version 1 means there are no attrs, possibly it was copied * direcly in the backend */ local->call_count++; /* for setxattr */ local->call_count++; /* for utimens */ } } cnt = local->call_count; list_for_each_entry (ash, list, clist) { struct timespec ts[2]; ts[0].tv_sec = local->source->stat.st_atime; ts[0].tv_nsec = 0; ts[1].tv_sec = local->source->stat.st_mtime; ts[1].tv_nsec = 0; if (ash->inode && (ash->repair || ash->version == 1)) { AFR_DEBUG_FMT (this, "setxattr() on %s version %u ctime %u", ash->xl->name, local->source->version, local->source->ctime); STACK_WIND (frame, afr_selfheal_setxattr_cbk, ash->xl, ash->xl->fops->setxattr, local->loc, local->source->dict, 0); STACK_WIND (frame, afr_selfheal_utimens_cbk, ash->xl, ash->xl->fops->utimens, local->loc, ts); --cnt; /* for setxattr */ if (--cnt == 0) /* for utimens */ break; } } } return 0;}int32_tafr_selfheal_sync_file (call_frame_t *frame, xlator_t *this);int32_tafr_selfheal_sync_file_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct stat *stat){ int32_t callcnt = 0; afr_local_t *local = frame->local; call_frame_t *prev_frame = cookie; AFR_DEBUG_FMT (this, "op_ret = %d", op_ret); if (op_ret == -1 && op_errno != ENOTCONN) local->op_errno = op_errno; if (op_ret >= 0) local->op_ret = op_ret; if (op_ret == -1) { /* even if one write fails, we will return open with error, need to see if * we can improve on this behaviour. * We should wait for all cbks before calling afr_error_during_sync() */ GF_ERROR (this, "(path=%s child=%s) op_ret=%d op_errno=%d", local->loc->path, prev_frame->this->name, op_ret, op_errno); local->sh_return_error = 1; } LOCK(&frame->lock); { callcnt = --local->call_count; } UNLOCK(&frame->lock); if (callcnt == 0) { if (local->sh_return_error) { /* if there was an error during one of the writes */ afr_error_during_sync (frame); } else { local->offset = local->offset + op_ret; afr_selfheal_sync_file (frame, this); } } return 0;}int32_tafr_selfheal_sync_file_readv_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iovec *vector, int32_t count, struct stat *stat){ int32_t cnt; afr_local_t *local = frame->local; call_frame_t *prev_frame = cookie; afr_private_t *pvt = this->private; xlator_t **children = pvt->children; int32_t child_count = pvt->child_count, i; afrfd_t *afrfdp = NULL; AFR_DEBUG_FMT (this, "op_ret = %d", op_ret); afrfdp = data_to_ptr (dict_get (local->fd->ctx, this->name)); for (i = 0; i < child_count; i++){ if (afrfdp->fdstate[i]) local->call_count++; } if (op_ret == 0) { /* EOF reached */ AFR_DEBUG_FMT (this, "EOF reached"); cnt = local->call_count; for (i = 0; i < child_count; i++){ if (afrfdp->fdstate[i]) { STACK_WIND (frame, afr_selfheal_close_cbk, children[i], children[i]->fops->close, local->fd); if (--cnt == 0) break; } } } else if (op_ret > 0) { local->call_count--; /* we dont write on source */ local->op_ret = -1; local->op_errno = ENOTCONN; cnt = local->call_count; for (i = 0; i < child_count; i++) { if (children[i] == local->source->xl) continue; if (afrfdp->fdstate[i]) { AFR_DEBUG_FMT (this, "write call on %s", children[i]->name); STACK_WIND (frame, afr_selfheal_sync_file_writev_cbk, children[i], children[i]->fops->writev, local->fd, vector, count, local->offset); if (--cnt == 0) break; } } } else { /* error during read */ GF_ERROR (this, "(path=%s child=%s) op_ret=%d op_errno=%d", local->loc->path, prev_frame->this->name, op_ret, op_errno); afr_error_during_sync(frame); } return 0;}int32_tafr_selfheal_sync_file (call_frame_t *frame, xlator_t *this){ afr_local_t *local = frame->local; size_t readbytes = 128*1024; AFR_DEBUG_FMT (this, "reading from offset %u", local->offset); STACK_WIND (frame, afr_selfheal_sync_file_readv_cbk, local->source->xl, local->source->xl->fops->readv, local->fd, readbytes, local->offset); return 0;}int32_tafr_selfheal_nosync_close_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno){ int32_t callcnt = 0; afr_local_t *local = frame->local; AFR_DEBUG(this); LOCK (&frame->lock); { callcnt = --local->call_count; } UNLOCK (&frame->lock); if (callcnt == 0) { char *lock_path = NULL; AFR_DEBUG_FMT(this, "calling unlock on local->loc->path %s", local->loc->path); asprintf (&lock_path, "/%s%s", local->lock_node->name, local->loc->path); STACK_WIND (frame, afr_selfheal_unlock_cbk, local->lock_node, local->lock_node->mops->unlock, lock_path); freee (lock_path); } return 0;}int32_tafr_selfheal_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, fd_t *fd){ int32_t callcnt = 0; afrfd_t *afrfdp = NULL; afr_local_t *local = frame->local; call_frame_t *prev_frame = cookie; afr_private_t *pvt = this->private; xlator_t **children = pvt->children; int32_t child_count = pvt->child_count, i; AFR_DEBUG_FMT (this, "op_ret = %d from %s", op_ret, prev_frame->this->name);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -