⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 io-threads.c

📁 分布式文件系统
💻 C
📖 第 1 页 / 共 2 页
字号:
/*   Copyright (c) 2006, 2007, 2008 Z RESEARCH, Inc. <http://www.zresearch.com>   This file is part of GlusterFS.   GlusterFS is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published   by the Free Software Foundation; either version 3 of the License,   or (at your option) any later version.   GlusterFS is distributed in the hope that it will be useful, but   WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU   General Public License for more details.   You should have received a copy of the GNU General Public License   along with this program.  If not, see   <http://www.gnu.org/licenses/>.*/#ifndef _CONFIG_H#define _CONFIG_H#include "config.h"#endif#include "call-stub.h"#include "glusterfs.h"#include "logging.h"#include "dict.h"#include "xlator.h"#include "io-threads.h"static voidiot_queue (iot_worker_t *worker,           call_stub_t *stub);static call_stub_t *iot_dequeue (iot_worker_t *worker);static iot_worker_t * iot_schedule (iot_conf_t *conf,              iot_file_t *file,              ino_t ino){  int32_t cnt = (ino % conf->thread_count);  iot_worker_t *trav = conf->workers.next;  for (; cnt; cnt--)    trav = trav->next;    if (file)    file->worker = trav;  trav->fd_count++;  return trav;}int32_tiot_open_cbk (call_frame_t *frame,              void *cookie,              xlator_t *this,              int32_t op_ret,              int32_t op_errno,              fd_t *fd){  iot_conf_t *conf = this->private;  if (op_ret >= 0) {    iot_file_t *file = calloc (1, sizeof (*file));    iot_schedule (conf, file, fd->inode->ino);    file->fd = fd;    dict_set (fd->ctx,              this->name,              data_from_static_ptr (file));    pthread_mutex_lock (&conf->files_lock);    file->next = &conf->files;    file->prev = file->next->prev;    file->next->prev = file;    file->prev->next = file;    pthread_mutex_unlock (&conf->files_lock);  }  STACK_UNWIND (frame, op_ret, op_errno, fd);  return 0;}int32_tiot_open (call_frame_t *frame,          xlator_t *this,          loc_t *loc,          int32_t flags,	  fd_t *fd){  STACK_WIND (frame,              iot_open_cbk,              FIRST_CHILD(this),              FIRST_CHILD(this)->fops->open,              loc,              flags,	      fd);  return 0;}int32_tiot_create_cbk (call_frame_t *frame,		void *cookie,		xlator_t *this,		int32_t op_ret,		int32_t op_errno,		fd_t *fd,		inode_t *inode,		struct stat *stbuf){  iot_conf_t *conf = this->private;  if (op_ret >= 0) {    iot_file_t *file = calloc (1, sizeof (*file));    iot_schedule (conf, file, fd->inode->ino);    file->fd = fd;    dict_set (fd->ctx,              this->name,              data_from_static_ptr (file));    pthread_mutex_lock (&conf->files_lock);    file->next = &conf->files;    file->prev = file->next->prev;    file->next->prev = file;    file->prev->next = file;    pthread_mutex_unlock (&conf->files_lock);  }  STACK_UNWIND (frame, op_ret, op_errno, fd, inode, stbuf);  return 0;}int32_tiot_create (call_frame_t *frame,            xlator_t *this,	    loc_t *loc,            int32_t flags,            mode_t mode,	    fd_t *fd){  STACK_WIND (frame,              iot_create_cbk,              FIRST_CHILD(this),              FIRST_CHILD(this)->fops->create,	      loc,              flags,              mode,	      fd);  return 0;}int32_tiot_close_cbk (call_frame_t *frame,               void *cookie,               xlator_t *this,               int32_t op_ret,               int32_t op_errno){  iot_conf_t *conf = this->private;  iot_local_t *local = frame->local;  iot_file_t *file = local->file;  pthread_mutex_lock (&conf->files_lock);  {    file->prev->next = file->next;    file->next->prev = file->prev;  }  pthread_mutex_unlock (&conf->files_lock);  file->worker->fd_count--;  file->worker = NULL;  freee (file);    STACK_UNWIND (frame, op_ret, op_errno);  return 0;}static int32_tiot_close_wrapper (call_frame_t *frame,                   xlator_t *this,                   fd_t *fd){  STACK_WIND (frame,              iot_close_cbk,              FIRST_CHILD(this),              FIRST_CHILD(this)->fops->close,              fd);  return 0;}int32_tiot_close (call_frame_t *frame,           xlator_t *this,           fd_t *fd){  call_stub_t *stub;  iot_local_t *local = NULL;  iot_file_t *file = NULL;  iot_worker_t *worker = NULL;  if (!dict_get (fd->ctx, this->name)) {    gf_log (this->name, GF_LOG_ERROR, "fd context is NULL, returning EBADFD");    STACK_UNWIND (frame, -1, EBADFD);    return 0;  }  file = data_to_ptr (dict_get (fd->ctx, this->name));  worker = file->worker;  local = calloc (1, sizeof (*local));  local->file = file;  frame->local = local;    stub = fop_close_stub (frame,                         iot_close_wrapper,                         fd);  if (!stub) {    gf_log (this->name, GF_LOG_ERROR, "cannot get close call stub");    STACK_UNWIND (frame, -1, ENOMEM);    return 0;  }  iot_queue (worker, stub);  return 0;}int32_tiot_readv_cbk (call_frame_t *frame,               void *cookie,               xlator_t *this,               int32_t op_ret,               int32_t op_errno,               struct iovec *vector,               int32_t count,	       struct stat *stbuf){  iot_local_t *local = frame->local;  local->frame_size = 0; //iov_length (vector, count);  STACK_UNWIND (frame, op_ret, op_errno, vector, count, stbuf);  return 0;}static int32_tiot_readv_wrapper (call_frame_t *frame,                   xlator_t *this,                   fd_t *fd,                   size_t size,                   off_t offset){  STACK_WIND (frame,              iot_readv_cbk,              FIRST_CHILD(this),              FIRST_CHILD(this)->fops->readv,              fd,              size,              offset);  return 0;}int32_tiot_readv (call_frame_t *frame,           xlator_t *this,           fd_t *fd,           size_t size,           off_t offset){  call_stub_t *stub;  iot_local_t *local = NULL;  iot_file_t *file = NULL;  iot_worker_t *worker = NULL;  if (!dict_get (fd->ctx, this->name)) {    gf_log (this->name, GF_LOG_ERROR, "fd context is NULL, returning EBADFD");    STACK_UNWIND (frame, -1, EBADFD);    return 0;  }  file = data_to_ptr (dict_get (fd->ctx, this->name));  worker = file->worker;  local = calloc (1, sizeof (*local));  frame->local = local;    stub = fop_readv_stub (frame,                          iot_readv_wrapper,                         fd,                         size,                         offset);  if (!stub) {    gf_log (this->name, GF_LOG_ERROR, "cannot get readv call stub");    STACK_UNWIND (frame, -1, ENOMEM, NULL, 0);    return 0;  }  iot_queue (worker, stub);  return 0;}int32_tiot_flush_cbk (call_frame_t *frame,               void *cookie,               xlator_t *this,               int32_t op_ret,               int32_t op_errno){  STACK_UNWIND (frame, op_ret, op_errno);  return 0;}static int32_tiot_flush_wrapper (call_frame_t *frame,                   xlator_t *this,                   fd_t *fd){  STACK_WIND (frame,              iot_flush_cbk,              FIRST_CHILD(this),              FIRST_CHILD(this)->fops->flush,              fd);  return 0;}int32_tiot_flush (call_frame_t *frame,           xlator_t *this,           fd_t *fd){  call_stub_t *stub;  iot_local_t *local = NULL;  iot_file_t *file = NULL;  iot_worker_t *worker = NULL;  if (!dict_get (fd->ctx, this->name)) {    gf_log (this->name, GF_LOG_ERROR, "fd context is NULL, returning EBADFD");    STACK_UNWIND (frame, -1, EBADFD);    return 0;  }  file = data_to_ptr (dict_get (fd->ctx, this->name));  worker = file->worker;  local = calloc (1, sizeof (*local));  frame->local = local;    stub = fop_flush_stub (frame,                         iot_flush_wrapper,                         fd);  if (!stub) {    gf_log (this->name, GF_LOG_ERROR, "cannot get flush_cbk call stub");    STACK_UNWIND (frame, -1, ENOMEM);    return 0;  }  iot_queue (worker, stub);  return 0;}int32_tiot_fsync_cbk (call_frame_t *frame,               void *cookie,               xlator_t *this,               int32_t op_ret,               int32_t op_errno){  STACK_UNWIND (frame, op_ret, op_errno);  return 0;}static int32_tiot_fsync_wrapper (call_frame_t *frame,                   xlator_t *this,                   fd_t *fd,                   int32_t datasync){  STACK_WIND (frame,              iot_fsync_cbk,              FIRST_CHILD (this),              FIRST_CHILD (this)->fops->fsync,              fd,              datasync);  return 0;}int32_tiot_fsync (call_frame_t *frame,           xlator_t *this,           fd_t *fd,           int32_t datasync){  call_stub_t *stub;  iot_local_t *local = NULL;  iot_file_t *file = NULL;  iot_worker_t *worker = NULL;  if (!dict_get (fd->ctx, this->name)) {    gf_log (this->name, GF_LOG_ERROR, "fd context is NULL, returning EBADFD");    STACK_UNWIND (frame, -1, EBADFD);    return 0;  }  file = data_to_ptr (dict_get (fd->ctx, this->name));  worker = file->worker;  local = calloc (1, sizeof (*local));  frame->local = local;    stub = fop_fsync_stub (frame,                         iot_fsync_wrapper,                         fd,                         datasync);  if (!stub) {    gf_log (this->name, GF_LOG_ERROR, "cannot get fsync_cbk call stub");    STACK_UNWIND (frame, -1, ENOMEM);    return 0;  }  iot_queue (worker, stub);  return 0;}int32_tiot_writev_cbk (call_frame_t *frame,                void *cookie,                xlator_t *this,                int32_t op_ret,                int32_t op_errno,		struct stat *stbuf){  iot_local_t *local = frame->local;  local->frame_size = 0; /* hehe, caught me! */  STACK_UNWIND (frame, op_ret, op_errno, stbuf);  return 0;}static int32_tiot_writev_wrapper (call_frame_t *frame,                    xlator_t *this,                    fd_t *fd,                    struct iovec *vector,                    int32_t count,                    off_t offset){  STACK_WIND (frame,              iot_writev_cbk,              FIRST_CHILD(this),              FIRST_CHILD(this)->fops->writev,              fd,              vector,              count,              offset);  return 0;}int32_tiot_writev (call_frame_t *frame,            xlator_t *this,            fd_t *fd,            struct iovec *vector,            int32_t count,            off_t offset){  call_stub_t *stub;  iot_local_t *local = NULL;  iot_file_t *file = NULL;  iot_worker_t *worker = NULL;  if (!dict_get (fd->ctx, this->name)) {    gf_log (this->name, GF_LOG_ERROR, "fd context is NULL, returning EBADFD");    STACK_UNWIND (frame, -1, EBADFD);    return 0;  }  file = data_to_ptr (dict_get (fd->ctx, this->name));  worker = file->worker;  local = calloc (1, sizeof (*local));  if (frame->root->req_refs)    local->frame_size = dict_serialized_length (frame->root->req_refs);  else    local->frame_size = iov_length (vector, count);  frame->local = local;    stub = fop_writev_stub (frame, iot_writev_wrapper,                          fd, vector, count, offset);  if (!stub) {    gf_log (this->name, GF_LOG_ERROR, "cannot get writev call stub");    STACK_UNWIND (frame, -1, ENOMEM);    return 0;  }  iot_queue (worker, stub);  return 0;}int32_tiot_lk_cbk (call_frame_t *frame,            void *cookie,            xlator_t *this,            int32_t op_ret,            int32_t op_errno,            struct flock *flock){  STACK_UNWIND (frame, op_ret, op_errno, flock);  return 0;}static int32_tiot_lk_wrapper (call_frame_t *frame,                xlator_t *this,                fd_t *fd,                int32_t cmd,                struct flock *flock){  STACK_WIND (frame,              iot_lk_cbk,              FIRST_CHILD(this),              FIRST_CHILD(this)->fops->lk,              fd,              cmd,              flock);  return 0;}int32_tiot_lk (call_frame_t *frame,        xlator_t *this,        fd_t *fd,        int32_t cmd,        struct flock *flock){  call_stub_t *stub;  iot_local_t *local = NULL;  iot_file_t *file = NULL;  iot_worker_t *worker = NULL;  if (!dict_get (fd->ctx, this->name)) {    gf_log (this->name, GF_LOG_ERROR, "fd context is NULL, returning EBADFD");    STACK_UNWIND (frame, -1, EBADFD);    return 0;  }  file = data_to_ptr (dict_get (fd->ctx, this->name));  worker = file->worker;  local = calloc (1, sizeof (*local));  frame->local = local;  stub = fop_lk_stub (frame,                      iot_lk_wrapper,                      fd,                      cmd,                      flock);  if (!stub) {    gf_log (this->name, GF_LOG_ERROR, "cannot get fop_lk call stub");    STACK_UNWIND (frame, -1, ENOMEM, NULL);    return 0;  }      iot_queue (worker, stub);  return 0;}int32_t iot_stat_cbk (call_frame_t *frame,              void *cookie,              xlator_t *this,

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -