service.c
来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,749 行 · 第 1/5 页
C
1,749 行
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (C) 2002 Cluster File Systems, Inc. * * This file is part of the Lustre file system, http://www.lustre.org * Lustre is a trademark of Cluster File Systems, Inc. * * You may have signed or agreed to another license before downloading * this software. If so, you are bound by the terms and conditions * of that agreement, and the following does not apply to you. See the * LICENSE file included with this distribution for more information. * * If you did not agree to a different license, then this copy of Lustre * is open source software; you can redistribute it and/or modify it * under the terms of version 2 of the GNU General Public License as * published by the Free Software Foundation. * * In either case, Lustre is distributed in the hope that it will be * useful, but WITHOUT ANY WARRANTY; without even the implied warranty * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * license text for more details. * */#define DEBUG_SUBSYSTEM S_RPC#ifndef __KERNEL__#include <liblustre.h>#include <libcfs/kp30.h>#endif#include <obd_support.h>#include <obd_class.h>#include <lustre_net.h>#include <lnet/types.h>#include "ptlrpc_internal.h"/* The following are visible and mutable through /sys/module/ptlrpc */int test_req_buffer_pressure = 0;CFS_MODULE_PARM(test_req_buffer_pressure, "i", int, 0444, "set non-zero to put pressure on request buffer pools");unsigned int at_min = 0;CFS_MODULE_PARM(at_min, "i", int, 0644, "Adaptive timeout minimum (sec)");#ifdef HAVE_AT_SUPPORTunsigned int at_max = 600;#elseunsigned int at_max = 0;#endifEXPORT_SYMBOL(at_max);CFS_MODULE_PARM(at_max, "i", int, 0644, "Adaptive timeout maximum (sec)");unsigned int at_history = 600;CFS_MODULE_PARM(at_history, "i", int, 0644, "Adaptive timeouts remember the slowest event that took place " "within this period (sec)");static int at_early_margin = 5;CFS_MODULE_PARM(at_early_margin, "i", int, 0644, "How soon before an RPC deadline to send an early reply");static int at_extra = 30;CFS_MODULE_PARM(at_extra, "i", int, 0644, "How much extra time to give with each early reply");/* forward ref */static int ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc);static CFS_LIST_HEAD (ptlrpc_all_services);spinlock_t ptlrpc_all_services_lock;static char *ptlrpc_alloc_request_buffer (int size){ char *ptr; if (size > SVC_BUF_VMALLOC_THRESHOLD) OBD_VMALLOC(ptr, size); else OBD_ALLOC(ptr, size); return (ptr);}static voidptlrpc_free_request_buffer (char *ptr, int size){ if (size > SVC_BUF_VMALLOC_THRESHOLD) OBD_VFREE(ptr, size); else OBD_FREE(ptr, size);}struct ptlrpc_request_buffer_desc *ptlrpc_alloc_rqbd (struct ptlrpc_service *svc){ struct ptlrpc_request_buffer_desc *rqbd; OBD_ALLOC(rqbd, sizeof (*rqbd)); if (rqbd == NULL) return (NULL); rqbd->rqbd_service = svc; rqbd->rqbd_refcount = 0; rqbd->rqbd_cbid.cbid_fn = request_in_callback; rqbd->rqbd_cbid.cbid_arg = rqbd; CFS_INIT_LIST_HEAD(&rqbd->rqbd_reqs); rqbd->rqbd_buffer = ptlrpc_alloc_request_buffer(svc->srv_buf_size); if (rqbd->rqbd_buffer == NULL) { OBD_FREE(rqbd, sizeof (*rqbd)); return (NULL); } spin_lock(&svc->srv_lock); list_add(&rqbd->rqbd_list, &svc->srv_idle_rqbds); svc->srv_nbufs++; spin_unlock(&svc->srv_lock); return (rqbd);}voidptlrpc_free_rqbd (struct ptlrpc_request_buffer_desc *rqbd){ struct ptlrpc_service *svc = rqbd->rqbd_service; LASSERT (rqbd->rqbd_refcount == 0); LASSERT (list_empty(&rqbd->rqbd_reqs)); spin_lock(&svc->srv_lock); list_del(&rqbd->rqbd_list); svc->srv_nbufs--; spin_unlock(&svc->srv_lock); ptlrpc_free_request_buffer (rqbd->rqbd_buffer, svc->srv_buf_size); OBD_FREE (rqbd, sizeof (*rqbd));}intptlrpc_grow_req_bufs(struct ptlrpc_service *svc){ struct ptlrpc_request_buffer_desc *rqbd; int i; CDEBUG(D_RPCTRACE, "%s: allocate %d new %d-byte reqbufs (%d/%d left)\n", svc->srv_name, svc->srv_nbuf_per_group, svc->srv_buf_size, svc->srv_nrqbd_receiving, svc->srv_nbufs); for (i = 0; i < svc->srv_nbuf_per_group; i++) { rqbd = ptlrpc_alloc_rqbd(svc); if (rqbd == NULL) { CERROR ("%s: Can't allocate request buffer\n", svc->srv_name); return (-ENOMEM); } if (ptlrpc_server_post_idle_rqbds(svc) < 0) return (-EAGAIN); } return (0);}voidptlrpc_save_lock (struct ptlrpc_request *req, struct lustre_handle *lock, int mode){ struct ptlrpc_reply_state *rs = req->rq_reply_state; int idx; LASSERT(rs != NULL); LASSERT(rs->rs_nlocks < RS_MAX_LOCKS); idx = rs->rs_nlocks++; rs->rs_locks[idx] = *lock; rs->rs_modes[idx] = mode; rs->rs_difficult = 1;}voidptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs){ struct ptlrpc_service *svc = rs->rs_service;#ifdef CONFIG_SMP LASSERT (spin_is_locked (&svc->srv_lock));#endif LASSERT (rs->rs_difficult); rs->rs_scheduled_ever = 1; /* flag any notification attempt */ if (rs->rs_scheduled) /* being set up or already notified */ return; rs->rs_scheduled = 1; list_del (&rs->rs_list); list_add (&rs->rs_list, &svc->srv_reply_queue); cfs_waitq_signal (&svc->srv_waitq);}voidptlrpc_commit_replies (struct obd_device *obd){ struct list_head *tmp; struct list_head *nxt; /* Find any replies that have been committed and get their service * to attend to complete them. */ /* CAVEAT EMPTOR: spinlock ordering!!! */ spin_lock(&obd->obd_uncommitted_replies_lock); list_for_each_safe (tmp, nxt, &obd->obd_uncommitted_replies) { struct ptlrpc_reply_state *rs = list_entry(tmp, struct ptlrpc_reply_state, rs_obd_list); LASSERT (rs->rs_difficult); if (rs->rs_transno <= obd->obd_last_committed) { struct ptlrpc_service *svc = rs->rs_service; spin_lock (&svc->srv_lock); list_del_init (&rs->rs_obd_list); ptlrpc_schedule_difficult_reply (rs); spin_unlock (&svc->srv_lock); } } spin_unlock(&obd->obd_uncommitted_replies_lock);}static intptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc){ struct ptlrpc_request_buffer_desc *rqbd; int rc; int posted = 0; for (;;) { spin_lock(&svc->srv_lock); if (list_empty (&svc->srv_idle_rqbds)) { spin_unlock(&svc->srv_lock); return (posted); } rqbd = list_entry(svc->srv_idle_rqbds.next, struct ptlrpc_request_buffer_desc, rqbd_list); list_del (&rqbd->rqbd_list); /* assume we will post successfully */ svc->srv_nrqbd_receiving++; list_add (&rqbd->rqbd_list, &svc->srv_active_rqbds); spin_unlock(&svc->srv_lock); rc = ptlrpc_register_rqbd(rqbd); if (rc != 0) break; posted = 1; } spin_lock(&svc->srv_lock); svc->srv_nrqbd_receiving--; list_del(&rqbd->rqbd_list); list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds); /* Don't complain if no request buffers are posted right now; LNET * won't drop requests because we set the portal lazy! */ spin_unlock(&svc->srv_lock); return (-1);}static void ptlrpc_at_timer(unsigned long castmeharder){ struct ptlrpc_service *svc = (struct ptlrpc_service *)castmeharder; CDEBUG(D_INFO, "at timer %s hit at %ld%s\n", svc->srv_name, cfs_time_current_sec(), list_empty(&svc->srv_at_list) ? ", empty" : ""); svc->srv_at_check = 1; cfs_waitq_signal(&svc->srv_waitq);}/* @threadname should be 11 characters or less - 3 will be added on */struct ptlrpc_service *ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size, int req_portal, int rep_portal, int watchdog_factor, svc_handler_t handler, char *name, cfs_proc_dir_entry_t *proc_entry, svcreq_printfn_t svcreq_printfn, int min_threads, int max_threads, char *threadname){ int rc; struct ptlrpc_service *service; ENTRY; LASSERT (nbufs > 0); LASSERT (bufsize >= max_req_size); OBD_ALLOC(service, sizeof(*service)); if (service == NULL) RETURN(NULL); /* First initialise enough for early teardown */ service->srv_name = name; spin_lock_init(&service->srv_lock); CFS_INIT_LIST_HEAD(&service->srv_threads); cfs_waitq_init(&service->srv_waitq); service->srv_nbuf_per_group = test_req_buffer_pressure ? 1 : nbufs; service->srv_max_req_size = max_req_size; service->srv_buf_size = bufsize; service->srv_rep_portal = rep_portal; service->srv_req_portal = req_portal; service->srv_watchdog_factor = watchdog_factor; service->srv_handler = handler; service->srv_request_history_print_fn = svcreq_printfn; service->srv_request_seq = 1; /* valid seq #s start at 1 */ service->srv_request_max_cull_seq = 0; service->srv_threads_min = min_threads; service->srv_threads_max = max_threads; service->srv_thread_name = threadname; rc = LNetSetLazyPortal(service->srv_req_portal); LASSERT (rc == 0); CFS_INIT_LIST_HEAD(&service->srv_request_queue); CFS_INIT_LIST_HEAD(&service->srv_idle_rqbds); CFS_INIT_LIST_HEAD(&service->srv_active_rqbds); CFS_INIT_LIST_HEAD(&service->srv_history_rqbds); CFS_INIT_LIST_HEAD(&service->srv_request_history); CFS_INIT_LIST_HEAD(&service->srv_active_replies); CFS_INIT_LIST_HEAD(&service->srv_reply_queue); CFS_INIT_LIST_HEAD(&service->srv_free_rs_list); cfs_waitq_init(&service->srv_free_rs_waitq); spin_lock_init(&service->srv_at_lock); CFS_INIT_LIST_HEAD(&service->srv_req_in_queue); CFS_INIT_LIST_HEAD(&service->srv_at_list); cfs_timer_init(&service->srv_at_timer, ptlrpc_at_timer, service); /* At SOW, service time should be quick; 10s seems generous. If client timeout is less than this, we'll be sending an early reply. */ at_init(&service->srv_at_estimate, 10, 0);
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?