⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 alu.c

📁 分布式文件系统
💻 C
📖 第 1 页 / 共 2 页
字号:
    }    alu_sched->child_count = index;    sched_array = calloc (index, sizeof (struct alu_sched_struct));    trav_xl = xl->children;    index = 0;    while (trav_xl) {      sched_array[index].xl = trav_xl->xlator;      sched_array[index].eligible = 1;      index++;      trav_xl = trav_xl->next;    }    alu_sched->array = sched_array;    data = dict_get (xl->options, "alu.read-only-subvolumes");    if (data) {      char *child = NULL;      char *tmp;      char *childs_data = strdup (data->data);            child = strtok_r (childs_data, ",", &tmp);      while (child) {	for (index = 1; index < alu_sched->child_count; index++) {	  if (strcmp (alu_sched->array[index -1].xl->name, child) == 0) {	    memcpy (&(alu_sched->array[index -1]), 		    &(alu_sched->array[alu_sched->child_count -1]), 		    sizeof (struct alu_sched_struct));	    alu_sched->child_count--;	    break;	  }	}	child = strtok_r (NULL, ",", &tmp);      }    }  }  *((long *)xl->private) = (long)alu_sched;  /* Initialize all the alu_sched structure's elements */  {    alu_sched->sched_nodes_pending = 0;    alu_sched->min_limit.free_disk = 0x00FFFFFF;    alu_sched->min_limit.disk_usage = 0xFFFFFFFF;    alu_sched->min_limit.total_disk_size = 0xFFFFFFFF;    alu_sched->min_limit.disk_speed = 0xFFFFFFFF;    alu_sched->min_limit.write_usage = 0xFFFFFFFF;    alu_sched->min_limit.read_usage = 0xFFFFFFFF;    alu_sched->min_limit.nr_files = 0xFFFFFFFF;    alu_sched->min_limit.nr_clients = 0xFFFFFFFF;  }  pthread_mutex_init (&alu_sched->alu_mutex, NULL);  return 0;}static voidalu_fini (xlator_t *xl){  if (!xl)    return;  struct alu_sched *alu_sched = (struct alu_sched *)*((long *)xl->private);  struct alu_limits *limit = alu_sched->limits_fn;  struct alu_threshold *threshold = alu_sched->threshold_fn;  void *tmp = NULL;  pthread_mutex_destroy (&alu_sched->alu_mutex);  free (alu_sched->array);  while (limit) {    tmp = limit;    limit = limit->next;    free (tmp);  }  while (threshold) {    tmp = threshold;    threshold = threshold->next;    free (tmp);  }  free (alu_sched);}static int32_t update_stat_array_cbk (call_frame_t *frame,		       void *cookie,		       xlator_t *xl,		       int32_t op_ret,		       int32_t op_errno,		       struct xlator_stats *trav_stats){  struct alu_sched *alu_sched = (struct alu_sched *)*((long *)xl->private);  struct alu_limits *limits_fn = alu_sched->limits_fn;  int32_t idx = 0;    pthread_mutex_lock (&alu_sched->alu_mutex);  for (idx = 0; idx < alu_sched->child_count; idx++) {    if (alu_sched->array[idx].xl == (xlator_t *)cookie)      break;  }  pthread_mutex_unlock (&alu_sched->alu_mutex);  if (op_ret == -1) {    alu_sched->array[idx].eligible = 0;  } else {    memcpy (&(alu_sched->array[idx].stats), trav_stats, sizeof (struct xlator_stats));        /* Get stats from all the child node */    /* Here check the limits specified by the user to        consider the nodes to be used by scheduler */    alu_sched->array[idx].eligible = 1;    limits_fn = alu_sched->limits_fn;    while (limits_fn){      if (limits_fn->max_value && 	  (limits_fn->cur_value (trav_stats) > 	   limits_fn->max_value (&(alu_sched->spec_limit)))) {	alu_sched->array[idx].eligible = 0;      }      if (limits_fn->min_value && 	  (limits_fn->cur_value (trav_stats) < 	   limits_fn->min_value (&(alu_sched->spec_limit)))) {	alu_sched->array[idx].eligible = 0;      }      limits_fn = limits_fn->next;    }    /* Select minimum and maximum disk_usage */    if (trav_stats->disk_usage > alu_sched->max_limit.disk_usage) {      alu_sched->max_limit.disk_usage = trav_stats->disk_usage;    }    if (trav_stats->disk_usage < alu_sched->min_limit.disk_usage) {      alu_sched->min_limit.disk_usage = trav_stats->disk_usage;    }    /* Select minimum and maximum disk_speed */    if (trav_stats->disk_speed > alu_sched->max_limit.disk_speed) {      alu_sched->max_limit.disk_speed = trav_stats->disk_speed;    }    if (trav_stats->disk_speed < alu_sched->min_limit.disk_speed) {      alu_sched->min_limit.disk_speed = trav_stats->disk_speed;    }    /* Select minimum and maximum number of open files */    if (trav_stats->nr_files > alu_sched->max_limit.nr_files) {      alu_sched->max_limit.nr_files = trav_stats->nr_files;    }    if (trav_stats->nr_files < alu_sched->min_limit.nr_files) {      alu_sched->min_limit.nr_files = trav_stats->nr_files;    }    /* Select minimum and maximum write-usage */    if (trav_stats->write_usage > alu_sched->max_limit.write_usage) {      alu_sched->max_limit.write_usage = trav_stats->write_usage;    }    if (trav_stats->write_usage < alu_sched->min_limit.write_usage) {      alu_sched->min_limit.write_usage = trav_stats->write_usage;    }    /* Select minimum and maximum read-usage */    if (trav_stats->read_usage > alu_sched->max_limit.read_usage) {      alu_sched->max_limit.read_usage = trav_stats->read_usage;    }    if (trav_stats->read_usage < alu_sched->min_limit.read_usage) {      alu_sched->min_limit.read_usage = trav_stats->read_usage;    }    /* Select minimum and maximum free-disk */    if (trav_stats->free_disk > alu_sched->max_limit.free_disk) {      alu_sched->max_limit.free_disk = trav_stats->free_disk;    }    if (trav_stats->free_disk < alu_sched->min_limit.free_disk) {      alu_sched->min_limit.free_disk = trav_stats->free_disk;    }  }  STACK_DESTROY (frame->root);  return 0;}static void update_stat_array (xlator_t *xl){  /* This function schedules the file in one of the child nodes */  struct alu_sched *alu_sched = (struct alu_sched *)*((long *)xl->private);  int32_t idx = 0;  call_ctx_t *cctx;  for (idx = 0 ; idx < alu_sched->child_count; idx++) {    call_pool_t *pool = xl->ctx->pool;    cctx = calloc (1, sizeof (*cctx));    cctx->frames.root  = cctx;    cctx->frames.this  = xl;        cctx->pool = pool;    LOCK (&pool->lock);    {      list_add (&cctx->all_frames, &pool->all_frames);    }    UNLOCK (&pool->lock);    STACK_WIND_COOKIE ((&cctx->frames), 		 update_stat_array_cbk, 		 alu_sched->array[idx].xl, //cookie		 alu_sched->array[idx].xl, 		 (alu_sched->array[idx].xl)->mops->stats,		 0); //flag  }  return;}static void alu_update (xlator_t *xl){  struct timeval tv;  struct alu_sched *alu_sched = (struct alu_sched *)*((long *)xl->private);  gettimeofday (&tv, NULL);  if (tv.tv_sec > (alu_sched->refresh_interval + alu_sched->last_stat_fetch.tv_sec)) {    /* Update the stats from all the server */    update_stat_array (xl);    alu_sched->last_stat_fetch.tv_sec = tv.tv_sec;  }}static xlator_t *alu_scheduler (xlator_t *xl, void *path){  /* This function schedules the file in one of the child nodes */  struct alu_sched *alu_sched = (struct alu_sched *)*((long *)xl->private);  int32_t sched_index = 0;  int32_t sched_index_orig = 0;  int32_t idx = 0;  alu_update (xl);  /* Now check each threshold one by one if some nodes are classified */  {    struct alu_threshold *trav_threshold = alu_sched->threshold_fn;    struct alu_threshold *tmp_threshold = alu_sched->sched_method;    struct alu_sched_node *tmp_sched_node;       /* This pointer 'trav_threshold' contains function pointers according to spec file       give by user, */    while (trav_threshold) {      /* This check is needed for seeing if already there are nodes in this criteria          to be scheduled */      if (!alu_sched->sched_nodes_pending) {	for (idx = 0; idx < alu_sched->child_count; idx++) {	  if (!alu_sched->array[idx].eligible) {	    continue;	  }	  if (trav_threshold->entry_value) {	    if (trav_threshold->diff_value (&(alu_sched->max_limit),					    &(alu_sched->array[idx].stats)) <		trav_threshold->entry_value (&(alu_sched->entry_limit))) {	      continue;	    }	  }	  tmp_sched_node = calloc (1, sizeof (struct alu_sched_node));	  tmp_sched_node->index = idx;	  if (!alu_sched->sched_node) {	    alu_sched->sched_node = tmp_sched_node;	  } else {	    pthread_mutex_lock (&alu_sched->alu_mutex);	    tmp_sched_node->next = alu_sched->sched_node;	    alu_sched->sched_node = tmp_sched_node;	    pthread_mutex_unlock (&alu_sched->alu_mutex);	  }	  alu_sched->sched_nodes_pending++;	}      } /* end of if (sched_nodes_pending) */      /* This loop is required to check the eligible nodes */      struct alu_sched_node *trav_sched_node;      while (alu_sched->sched_nodes_pending) {	trav_sched_node = alu_sched->sched_node;	sched_index = trav_sched_node->index;	if (alu_sched->array[sched_index].eligible)	  break;	alu_sched->sched_node = trav_sched_node->next;	free (trav_sched_node);	alu_sched->sched_nodes_pending--;      }      if (alu_sched->sched_nodes_pending) {	/* There are some node in this criteria to be scheduled, no need 	 * to sort and check other methods 	 */	if (tmp_threshold && tmp_threshold->exit_value) {	  /* verify the exit value && whether node is eligible or not */	  if (tmp_threshold->diff_value (&(alu_sched->max_limit),					  &(alu_sched->array[sched_index].stats)) >	       tmp_threshold->exit_value (&(alu_sched->exit_limit))) {	    /* Free the allocated info for the node :) */	    pthread_mutex_lock (&alu_sched->alu_mutex);	    alu_sched->sched_node = trav_sched_node->next;	    free (trav_sched_node);	    trav_sched_node = alu_sched->sched_node;	    alu_sched->sched_nodes_pending--;	    pthread_mutex_unlock (&alu_sched->alu_mutex);	  }	} else {	  /* if there is no exit value, then exit after scheduling once */	  pthread_mutex_lock (&alu_sched->alu_mutex);	  alu_sched->sched_node = trav_sched_node->next;	  free (trav_sched_node);	  trav_sched_node = alu_sched->sched_node;	  alu_sched->sched_nodes_pending--;	  pthread_mutex_unlock (&alu_sched->alu_mutex);	}		alu_sched->sched_method = tmp_threshold; /* this is the method used for selecting */	/* */	if (trav_sched_node) {	  tmp_sched_node = trav_sched_node;	  while (trav_sched_node->next) {	    trav_sched_node = trav_sched_node->next;	  }	  if (tmp_sched_node->next) {	    pthread_mutex_lock (&alu_sched->alu_mutex);	    alu_sched->sched_node = tmp_sched_node->next;	    tmp_sched_node->next = NULL;	    trav_sched_node->next = tmp_sched_node;	    pthread_mutex_unlock (&alu_sched->alu_mutex);	  }	}	/* return the scheduled node */	return alu_sched->array[sched_index].xl;      } /* end of if (pending_nodes) */            tmp_threshold = trav_threshold;      trav_threshold = trav_threshold->next;    }  }    /* This is used only when there is everything seems ok, or no eligible nodes */  sched_index_orig = alu_sched->sched_index;  alu_sched->sched_method = NULL;  while (1) {    //lock    pthread_mutex_lock (&alu_sched->alu_mutex);    sched_index = alu_sched->sched_index++;    alu_sched->sched_index = alu_sched->sched_index % alu_sched->child_count;    pthread_mutex_unlock (&alu_sched->alu_mutex);    //unlock    if (alu_sched->array[sched_index].eligible)      break;    if (sched_index_orig == (sched_index + 1) % alu_sched->child_count) {      gf_log ("alu", GF_LOG_WARNING, "No node is eligible to schedule");      //lock      pthread_mutex_lock (&alu_sched->alu_mutex);      alu_sched->sched_index++;      alu_sched->sched_index = alu_sched->sched_index % alu_sched->child_count;      pthread_mutex_unlock (&alu_sched->alu_mutex);      //unlock      break;    }  }  return alu_sched->array[sched_index].xl;}/** * notify */voidalu_notify (xlator_t *xl, int32_t event, void *data){  struct alu_sched *alu_sched = (struct alu_sched *)*((long *)xl->private);  int32_t idx = 0;    if (!alu_sched)    return;  for (idx = 0; idx < alu_sched->child_count; idx++) {    if (alu_sched->array[idx].xl == (xlator_t *)data)      break;  }  switch (event)    {    case GF_EVENT_CHILD_UP:      {	//alu_sched->array[idx].eligible = 1;      }      break;    case GF_EVENT_CHILD_DOWN:      {	alu_sched->array[idx].eligible = 0;      }      break;    default:      {	;      }      break;    }}struct sched_ops sched = {  .init     = alu_init,  .fini     = alu_fini,  .update   = alu_update,  .schedule = alu_scheduler,  .notify   = alu_notify};

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -