📄 alu.c
字号:
} alu_sched->child_count = index; sched_array = calloc (index, sizeof (struct alu_sched_struct)); trav_xl = xl->children; index = 0; while (trav_xl) { sched_array[index].xl = trav_xl->xlator; sched_array[index].eligible = 1; index++; trav_xl = trav_xl->next; } alu_sched->array = sched_array; data = dict_get (xl->options, "alu.read-only-subvolumes"); if (data) { char *child = NULL; char *tmp; char *childs_data = strdup (data->data); child = strtok_r (childs_data, ",", &tmp); while (child) { for (index = 1; index < alu_sched->child_count; index++) { if (strcmp (alu_sched->array[index -1].xl->name, child) == 0) { memcpy (&(alu_sched->array[index -1]), &(alu_sched->array[alu_sched->child_count -1]), sizeof (struct alu_sched_struct)); alu_sched->child_count--; break; } } child = strtok_r (NULL, ",", &tmp); } } } *((long *)xl->private) = (long)alu_sched; /* Initialize all the alu_sched structure's elements */ { alu_sched->sched_nodes_pending = 0; alu_sched->min_limit.free_disk = 0x00FFFFFF; alu_sched->min_limit.disk_usage = 0xFFFFFFFF; alu_sched->min_limit.total_disk_size = 0xFFFFFFFF; alu_sched->min_limit.disk_speed = 0xFFFFFFFF; alu_sched->min_limit.write_usage = 0xFFFFFFFF; alu_sched->min_limit.read_usage = 0xFFFFFFFF; alu_sched->min_limit.nr_files = 0xFFFFFFFF; alu_sched->min_limit.nr_clients = 0xFFFFFFFF; } pthread_mutex_init (&alu_sched->alu_mutex, NULL); return 0;}static voidalu_fini (xlator_t *xl){ if (!xl) return; struct alu_sched *alu_sched = (struct alu_sched *)*((long *)xl->private); struct alu_limits *limit = alu_sched->limits_fn; struct alu_threshold *threshold = alu_sched->threshold_fn; void *tmp = NULL; pthread_mutex_destroy (&alu_sched->alu_mutex); free (alu_sched->array); while (limit) { tmp = limit; limit = limit->next; free (tmp); } while (threshold) { tmp = threshold; threshold = threshold->next; free (tmp); } free (alu_sched);}static int32_t update_stat_array_cbk (call_frame_t *frame, void *cookie, xlator_t *xl, int32_t op_ret, int32_t op_errno, struct xlator_stats *trav_stats){ struct alu_sched *alu_sched = (struct alu_sched *)*((long *)xl->private); struct alu_limits *limits_fn = alu_sched->limits_fn; int32_t idx = 0; pthread_mutex_lock (&alu_sched->alu_mutex); for (idx = 0; idx < alu_sched->child_count; idx++) { if (alu_sched->array[idx].xl == (xlator_t *)cookie) break; } pthread_mutex_unlock (&alu_sched->alu_mutex); if (op_ret == -1) { alu_sched->array[idx].eligible = 0; } else { memcpy (&(alu_sched->array[idx].stats), trav_stats, sizeof (struct xlator_stats)); /* Get stats from all the child node */ /* Here check the limits specified by the user to consider the nodes to be used by scheduler */ alu_sched->array[idx].eligible = 1; limits_fn = alu_sched->limits_fn; while (limits_fn){ if (limits_fn->max_value && (limits_fn->cur_value (trav_stats) > limits_fn->max_value (&(alu_sched->spec_limit)))) { alu_sched->array[idx].eligible = 0; } if (limits_fn->min_value && (limits_fn->cur_value (trav_stats) < limits_fn->min_value (&(alu_sched->spec_limit)))) { alu_sched->array[idx].eligible = 0; } limits_fn = limits_fn->next; } /* Select minimum and maximum disk_usage */ if (trav_stats->disk_usage > alu_sched->max_limit.disk_usage) { alu_sched->max_limit.disk_usage = trav_stats->disk_usage; } if (trav_stats->disk_usage < alu_sched->min_limit.disk_usage) { alu_sched->min_limit.disk_usage = trav_stats->disk_usage; } /* Select minimum and maximum disk_speed */ if (trav_stats->disk_speed > alu_sched->max_limit.disk_speed) { alu_sched->max_limit.disk_speed = trav_stats->disk_speed; } if (trav_stats->disk_speed < alu_sched->min_limit.disk_speed) { alu_sched->min_limit.disk_speed = trav_stats->disk_speed; } /* Select minimum and maximum number of open files */ if (trav_stats->nr_files > alu_sched->max_limit.nr_files) { alu_sched->max_limit.nr_files = trav_stats->nr_files; } if (trav_stats->nr_files < alu_sched->min_limit.nr_files) { alu_sched->min_limit.nr_files = trav_stats->nr_files; } /* Select minimum and maximum write-usage */ if (trav_stats->write_usage > alu_sched->max_limit.write_usage) { alu_sched->max_limit.write_usage = trav_stats->write_usage; } if (trav_stats->write_usage < alu_sched->min_limit.write_usage) { alu_sched->min_limit.write_usage = trav_stats->write_usage; } /* Select minimum and maximum read-usage */ if (trav_stats->read_usage > alu_sched->max_limit.read_usage) { alu_sched->max_limit.read_usage = trav_stats->read_usage; } if (trav_stats->read_usage < alu_sched->min_limit.read_usage) { alu_sched->min_limit.read_usage = trav_stats->read_usage; } /* Select minimum and maximum free-disk */ if (trav_stats->free_disk > alu_sched->max_limit.free_disk) { alu_sched->max_limit.free_disk = trav_stats->free_disk; } if (trav_stats->free_disk < alu_sched->min_limit.free_disk) { alu_sched->min_limit.free_disk = trav_stats->free_disk; } } STACK_DESTROY (frame->root); return 0;}static void update_stat_array (xlator_t *xl){ /* This function schedules the file in one of the child nodes */ struct alu_sched *alu_sched = (struct alu_sched *)*((long *)xl->private); int32_t idx = 0; call_ctx_t *cctx; for (idx = 0 ; idx < alu_sched->child_count; idx++) { call_pool_t *pool = xl->ctx->pool; cctx = calloc (1, sizeof (*cctx)); cctx->frames.root = cctx; cctx->frames.this = xl; cctx->pool = pool; LOCK (&pool->lock); { list_add (&cctx->all_frames, &pool->all_frames); } UNLOCK (&pool->lock); STACK_WIND_COOKIE ((&cctx->frames), update_stat_array_cbk, alu_sched->array[idx].xl, //cookie alu_sched->array[idx].xl, (alu_sched->array[idx].xl)->mops->stats, 0); //flag } return;}static void alu_update (xlator_t *xl){ struct timeval tv; struct alu_sched *alu_sched = (struct alu_sched *)*((long *)xl->private); gettimeofday (&tv, NULL); if (tv.tv_sec > (alu_sched->refresh_interval + alu_sched->last_stat_fetch.tv_sec)) { /* Update the stats from all the server */ update_stat_array (xl); alu_sched->last_stat_fetch.tv_sec = tv.tv_sec; }}static xlator_t *alu_scheduler (xlator_t *xl, void *path){ /* This function schedules the file in one of the child nodes */ struct alu_sched *alu_sched = (struct alu_sched *)*((long *)xl->private); int32_t sched_index = 0; int32_t sched_index_orig = 0; int32_t idx = 0; alu_update (xl); /* Now check each threshold one by one if some nodes are classified */ { struct alu_threshold *trav_threshold = alu_sched->threshold_fn; struct alu_threshold *tmp_threshold = alu_sched->sched_method; struct alu_sched_node *tmp_sched_node; /* This pointer 'trav_threshold' contains function pointers according to spec file give by user, */ while (trav_threshold) { /* This check is needed for seeing if already there are nodes in this criteria to be scheduled */ if (!alu_sched->sched_nodes_pending) { for (idx = 0; idx < alu_sched->child_count; idx++) { if (!alu_sched->array[idx].eligible) { continue; } if (trav_threshold->entry_value) { if (trav_threshold->diff_value (&(alu_sched->max_limit), &(alu_sched->array[idx].stats)) < trav_threshold->entry_value (&(alu_sched->entry_limit))) { continue; } } tmp_sched_node = calloc (1, sizeof (struct alu_sched_node)); tmp_sched_node->index = idx; if (!alu_sched->sched_node) { alu_sched->sched_node = tmp_sched_node; } else { pthread_mutex_lock (&alu_sched->alu_mutex); tmp_sched_node->next = alu_sched->sched_node; alu_sched->sched_node = tmp_sched_node; pthread_mutex_unlock (&alu_sched->alu_mutex); } alu_sched->sched_nodes_pending++; } } /* end of if (sched_nodes_pending) */ /* This loop is required to check the eligible nodes */ struct alu_sched_node *trav_sched_node; while (alu_sched->sched_nodes_pending) { trav_sched_node = alu_sched->sched_node; sched_index = trav_sched_node->index; if (alu_sched->array[sched_index].eligible) break; alu_sched->sched_node = trav_sched_node->next; free (trav_sched_node); alu_sched->sched_nodes_pending--; } if (alu_sched->sched_nodes_pending) { /* There are some node in this criteria to be scheduled, no need * to sort and check other methods */ if (tmp_threshold && tmp_threshold->exit_value) { /* verify the exit value && whether node is eligible or not */ if (tmp_threshold->diff_value (&(alu_sched->max_limit), &(alu_sched->array[sched_index].stats)) > tmp_threshold->exit_value (&(alu_sched->exit_limit))) { /* Free the allocated info for the node :) */ pthread_mutex_lock (&alu_sched->alu_mutex); alu_sched->sched_node = trav_sched_node->next; free (trav_sched_node); trav_sched_node = alu_sched->sched_node; alu_sched->sched_nodes_pending--; pthread_mutex_unlock (&alu_sched->alu_mutex); } } else { /* if there is no exit value, then exit after scheduling once */ pthread_mutex_lock (&alu_sched->alu_mutex); alu_sched->sched_node = trav_sched_node->next; free (trav_sched_node); trav_sched_node = alu_sched->sched_node; alu_sched->sched_nodes_pending--; pthread_mutex_unlock (&alu_sched->alu_mutex); } alu_sched->sched_method = tmp_threshold; /* this is the method used for selecting */ /* */ if (trav_sched_node) { tmp_sched_node = trav_sched_node; while (trav_sched_node->next) { trav_sched_node = trav_sched_node->next; } if (tmp_sched_node->next) { pthread_mutex_lock (&alu_sched->alu_mutex); alu_sched->sched_node = tmp_sched_node->next; tmp_sched_node->next = NULL; trav_sched_node->next = tmp_sched_node; pthread_mutex_unlock (&alu_sched->alu_mutex); } } /* return the scheduled node */ return alu_sched->array[sched_index].xl; } /* end of if (pending_nodes) */ tmp_threshold = trav_threshold; trav_threshold = trav_threshold->next; } } /* This is used only when there is everything seems ok, or no eligible nodes */ sched_index_orig = alu_sched->sched_index; alu_sched->sched_method = NULL; while (1) { //lock pthread_mutex_lock (&alu_sched->alu_mutex); sched_index = alu_sched->sched_index++; alu_sched->sched_index = alu_sched->sched_index % alu_sched->child_count; pthread_mutex_unlock (&alu_sched->alu_mutex); //unlock if (alu_sched->array[sched_index].eligible) break; if (sched_index_orig == (sched_index + 1) % alu_sched->child_count) { gf_log ("alu", GF_LOG_WARNING, "No node is eligible to schedule"); //lock pthread_mutex_lock (&alu_sched->alu_mutex); alu_sched->sched_index++; alu_sched->sched_index = alu_sched->sched_index % alu_sched->child_count; pthread_mutex_unlock (&alu_sched->alu_mutex); //unlock break; } } return alu_sched->array[sched_index].xl;}/** * notify */voidalu_notify (xlator_t *xl, int32_t event, void *data){ struct alu_sched *alu_sched = (struct alu_sched *)*((long *)xl->private); int32_t idx = 0; if (!alu_sched) return; for (idx = 0; idx < alu_sched->child_count; idx++) { if (alu_sched->array[idx].xl == (xlator_t *)data) break; } switch (event) { case GF_EVENT_CHILD_UP: { //alu_sched->array[idx].eligible = 1; } break; case GF_EVENT_CHILD_DOWN: { alu_sched->array[idx].eligible = 0; } break; default: { ; } break; }}struct sched_ops sched = { .init = alu_init, .fini = alu_fini, .update = alu_update, .schedule = alu_scheduler, .notify = alu_notify};
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -