service.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,749 行 · 第 1/5 页

C
1,749
字号
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * *  Copyright (C) 2002 Cluster File Systems, Inc. * *   This file is part of the Lustre file system, http://www.lustre.org *   Lustre is a trademark of Cluster File Systems, Inc. * *   You may have signed or agreed to another license before downloading *   this software.  If so, you are bound by the terms and conditions *   of that agreement, and the following does not apply to you.  See the *   LICENSE file included with this distribution for more information. * *   If you did not agree to a different license, then this copy of Lustre *   is open source software; you can redistribute it and/or modify it *   under the terms of version 2 of the GNU General Public License as *   published by the Free Software Foundation. * *   In either case, Lustre is distributed in the hope that it will be *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the *   license text for more details. * */#define DEBUG_SUBSYSTEM S_RPC#ifndef __KERNEL__#include <liblustre.h>#include <libcfs/kp30.h>#endif#include <obd_support.h>#include <obd_class.h>#include <lustre_net.h>#include <lnet/types.h>#include "ptlrpc_internal.h"/* The following are visible and mutable through /sys/module/ptlrpc */int test_req_buffer_pressure = 0;CFS_MODULE_PARM(test_req_buffer_pressure, "i", int, 0444,                "set non-zero to put pressure on request buffer pools");unsigned int at_min = 0;CFS_MODULE_PARM(at_min, "i", int, 0644,                "Adaptive timeout minimum (sec)");#ifdef HAVE_AT_SUPPORTunsigned int at_max = 600;#elseunsigned int at_max = 0;#endifEXPORT_SYMBOL(at_max);CFS_MODULE_PARM(at_max, "i", int, 0644,                "Adaptive timeout maximum (sec)");unsigned int at_history = 600;CFS_MODULE_PARM(at_history, "i", int, 0644,                "Adaptive timeouts remember the slowest event that took place "                "within this period (sec)");static int at_early_margin = 5;CFS_MODULE_PARM(at_early_margin, "i", int, 0644,                "How soon before an RPC deadline to send an early reply");static int at_extra = 30;CFS_MODULE_PARM(at_extra, "i", int, 0644,                "How much extra time to give with each early reply");/* forward ref */static int ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc);static CFS_LIST_HEAD (ptlrpc_all_services);spinlock_t ptlrpc_all_services_lock;static char *ptlrpc_alloc_request_buffer (int size){        char *ptr;        if (size > SVC_BUF_VMALLOC_THRESHOLD)                OBD_VMALLOC(ptr, size);        else                OBD_ALLOC(ptr, size);        return (ptr);}static voidptlrpc_free_request_buffer (char *ptr, int size){        if (size > SVC_BUF_VMALLOC_THRESHOLD)                OBD_VFREE(ptr, size);        else                OBD_FREE(ptr, size);}struct ptlrpc_request_buffer_desc *ptlrpc_alloc_rqbd (struct ptlrpc_service *svc){        struct ptlrpc_request_buffer_desc *rqbd;        OBD_ALLOC(rqbd, sizeof (*rqbd));        if (rqbd == NULL)                return (NULL);        rqbd->rqbd_service = svc;        rqbd->rqbd_refcount = 0;        rqbd->rqbd_cbid.cbid_fn = request_in_callback;        rqbd->rqbd_cbid.cbid_arg = rqbd;        CFS_INIT_LIST_HEAD(&rqbd->rqbd_reqs);        rqbd->rqbd_buffer = ptlrpc_alloc_request_buffer(svc->srv_buf_size);        if (rqbd->rqbd_buffer == NULL) {                OBD_FREE(rqbd, sizeof (*rqbd));                return (NULL);        }        spin_lock(&svc->srv_lock);        list_add(&rqbd->rqbd_list, &svc->srv_idle_rqbds);        svc->srv_nbufs++;        spin_unlock(&svc->srv_lock);        return (rqbd);}voidptlrpc_free_rqbd (struct ptlrpc_request_buffer_desc *rqbd){        struct ptlrpc_service *svc = rqbd->rqbd_service;        LASSERT (rqbd->rqbd_refcount == 0);        LASSERT (list_empty(&rqbd->rqbd_reqs));        spin_lock(&svc->srv_lock);        list_del(&rqbd->rqbd_list);        svc->srv_nbufs--;        spin_unlock(&svc->srv_lock);        ptlrpc_free_request_buffer (rqbd->rqbd_buffer, svc->srv_buf_size);        OBD_FREE (rqbd, sizeof (*rqbd));}intptlrpc_grow_req_bufs(struct ptlrpc_service *svc){        struct ptlrpc_request_buffer_desc *rqbd;        int                                i;        CDEBUG(D_RPCTRACE, "%s: allocate %d new %d-byte reqbufs (%d/%d left)\n",               svc->srv_name, svc->srv_nbuf_per_group, svc->srv_buf_size,               svc->srv_nrqbd_receiving, svc->srv_nbufs);        for (i = 0; i < svc->srv_nbuf_per_group; i++) {                rqbd = ptlrpc_alloc_rqbd(svc);                if (rqbd == NULL) {                        CERROR ("%s: Can't allocate request buffer\n",                                svc->srv_name);                        return (-ENOMEM);                }                if (ptlrpc_server_post_idle_rqbds(svc) < 0)                        return (-EAGAIN);        }        return (0);}voidptlrpc_save_lock (struct ptlrpc_request *req,                  struct lustre_handle *lock, int mode){        struct ptlrpc_reply_state *rs = req->rq_reply_state;        int                        idx;        LASSERT(rs != NULL);        LASSERT(rs->rs_nlocks < RS_MAX_LOCKS);        idx = rs->rs_nlocks++;        rs->rs_locks[idx] = *lock;        rs->rs_modes[idx] = mode;        rs->rs_difficult = 1;}voidptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs){        struct ptlrpc_service *svc = rs->rs_service;#ifdef CONFIG_SMP        LASSERT (spin_is_locked (&svc->srv_lock));#endif        LASSERT (rs->rs_difficult);        rs->rs_scheduled_ever = 1;              /* flag any notification attempt */        if (rs->rs_scheduled)                   /* being set up or already notified */                return;        rs->rs_scheduled = 1;        list_del (&rs->rs_list);        list_add (&rs->rs_list, &svc->srv_reply_queue);        cfs_waitq_signal (&svc->srv_waitq);}voidptlrpc_commit_replies (struct obd_device *obd){        struct list_head   *tmp;        struct list_head   *nxt;        /* Find any replies that have been committed and get their service         * to attend to complete them. */        /* CAVEAT EMPTOR: spinlock ordering!!! */        spin_lock(&obd->obd_uncommitted_replies_lock);        list_for_each_safe (tmp, nxt, &obd->obd_uncommitted_replies) {                struct ptlrpc_reply_state *rs =                        list_entry(tmp, struct ptlrpc_reply_state, rs_obd_list);                LASSERT (rs->rs_difficult);                if (rs->rs_transno <= obd->obd_last_committed) {                        struct ptlrpc_service *svc = rs->rs_service;                        spin_lock (&svc->srv_lock);                        list_del_init (&rs->rs_obd_list);                        ptlrpc_schedule_difficult_reply (rs);                        spin_unlock (&svc->srv_lock);                }        }        spin_unlock(&obd->obd_uncommitted_replies_lock);}static intptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc){        struct ptlrpc_request_buffer_desc *rqbd;        int                                rc;        int                                posted = 0;        for (;;) {                spin_lock(&svc->srv_lock);                if (list_empty (&svc->srv_idle_rqbds)) {                        spin_unlock(&svc->srv_lock);                        return (posted);                }                rqbd = list_entry(svc->srv_idle_rqbds.next,                                  struct ptlrpc_request_buffer_desc,                                  rqbd_list);                list_del (&rqbd->rqbd_list);                /* assume we will post successfully */                svc->srv_nrqbd_receiving++;                list_add (&rqbd->rqbd_list, &svc->srv_active_rqbds);                spin_unlock(&svc->srv_lock);                rc = ptlrpc_register_rqbd(rqbd);                if (rc != 0)                        break;                posted = 1;        }        spin_lock(&svc->srv_lock);        svc->srv_nrqbd_receiving--;        list_del(&rqbd->rqbd_list);        list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);        /* Don't complain if no request buffers are posted right now; LNET         * won't drop requests because we set the portal lazy! */        spin_unlock(&svc->srv_lock);        return (-1);}static void ptlrpc_at_timer(unsigned long castmeharder){        struct ptlrpc_service *svc = (struct ptlrpc_service *)castmeharder;        CDEBUG(D_INFO, "at timer %s hit at %ld%s\n",               svc->srv_name, cfs_time_current_sec(),                list_empty(&svc->srv_at_list) ? ", empty" : "");         svc->srv_at_check = 1;        cfs_waitq_signal(&svc->srv_waitq);}/* @threadname should be 11 characters or less - 3 will be added on */struct ptlrpc_service *ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,                int req_portal, int rep_portal, int watchdog_factor,                svc_handler_t handler, char *name,                cfs_proc_dir_entry_t *proc_entry,                svcreq_printfn_t svcreq_printfn,                 int min_threads, int max_threads, char *threadname){        int                    rc;        struct ptlrpc_service *service;        ENTRY;        LASSERT (nbufs > 0);        LASSERT (bufsize >= max_req_size);                OBD_ALLOC(service, sizeof(*service));        if (service == NULL)                RETURN(NULL);        /* First initialise enough for early teardown */        service->srv_name = name;        spin_lock_init(&service->srv_lock);        CFS_INIT_LIST_HEAD(&service->srv_threads);        cfs_waitq_init(&service->srv_waitq);        service->srv_nbuf_per_group = test_req_buffer_pressure ? 1 : nbufs;        service->srv_max_req_size = max_req_size;        service->srv_buf_size = bufsize;        service->srv_rep_portal = rep_portal;        service->srv_req_portal = req_portal;        service->srv_watchdog_factor = watchdog_factor;        service->srv_handler = handler;        service->srv_request_history_print_fn = svcreq_printfn;        service->srv_request_seq = 1;           /* valid seq #s start at 1 */        service->srv_request_max_cull_seq = 0;        service->srv_threads_min = min_threads;        service->srv_threads_max = max_threads;        service->srv_thread_name = threadname;        rc = LNetSetLazyPortal(service->srv_req_portal);        LASSERT (rc == 0);        CFS_INIT_LIST_HEAD(&service->srv_request_queue);        CFS_INIT_LIST_HEAD(&service->srv_idle_rqbds);        CFS_INIT_LIST_HEAD(&service->srv_active_rqbds);        CFS_INIT_LIST_HEAD(&service->srv_history_rqbds);        CFS_INIT_LIST_HEAD(&service->srv_request_history);        CFS_INIT_LIST_HEAD(&service->srv_active_replies);        CFS_INIT_LIST_HEAD(&service->srv_reply_queue);        CFS_INIT_LIST_HEAD(&service->srv_free_rs_list);        cfs_waitq_init(&service->srv_free_rs_waitq);        spin_lock_init(&service->srv_at_lock);        CFS_INIT_LIST_HEAD(&service->srv_req_in_queue);        CFS_INIT_LIST_HEAD(&service->srv_at_list);        cfs_timer_init(&service->srv_at_timer, ptlrpc_at_timer, service);        /* At SOW, service time should be quick; 10s seems generous. If client            timeout is less than this, we'll be sending an early reply. */        at_init(&service->srv_at_estimate, 10, 0);

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?