⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rpc.c

📁 lustre 1.6.5 source code
💻 C
📖 第 1 页 / 共 4 页
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (C) 2001, 2002 Cluster File Systems, Inc. *   Author: Isaac Huang <isaac@clusterfs.com> * */#define DEBUG_SUBSYSTEM S_LNET#include "selftest.h"typedef enum {        SRPC_STATE_NONE,        SRPC_STATE_NI_INIT,        SRPC_STATE_EQ_INIT,        SRPC_STATE_WI_INIT,        SRPC_STATE_RUNNING,        SRPC_STATE_STOPPING,} srpc_state_t;#define SRPC_PEER_HASH_SIZE       101  /* # peer lists */#define SRPC_PEER_CREDITS         16   /* >= most LND's default peer credit */struct smoketest_rpc {        spinlock_t        rpc_glock;     /* global lock */        srpc_service_t   *rpc_services[SRPC_SERVICE_MAX_ID + 1];        struct list_head *rpc_peers;     /* hash table of known peers */        lnet_handle_eq_t  rpc_lnet_eq;   /* _the_ LNet event queue */        srpc_state_t      rpc_state;        srpc_counters_t   rpc_counters;        __u64             rpc_matchbits; /* matchbits counter */} srpc_data;/* forward ref's */int srpc_handle_rpc (swi_workitem_t *wi);void srpc_get_counters (srpc_counters_t *cnt){        spin_lock(&srpc_data.rpc_glock);        *cnt = srpc_data.rpc_counters;        spin_unlock(&srpc_data.rpc_glock);}void srpc_set_counters (const srpc_counters_t *cnt){        spin_lock(&srpc_data.rpc_glock);        srpc_data.rpc_counters = *cnt;        spin_unlock(&srpc_data.rpc_glock);}voidsrpc_add_bulk_page (srpc_bulk_t *bk, cfs_page_t *pg, int i){        LASSERT (i >= 0 && i < bk->bk_niov);#ifdef __KERNEL__        bk->bk_iovs[i].kiov_offset = 0;        bk->bk_iovs[i].kiov_page   = pg;        bk->bk_iovs[i].kiov_len    = CFS_PAGE_SIZE;#else        LASSERT (bk->bk_pages != NULL);        bk->bk_pages[i] = pg;        bk->bk_iovs[i].iov_len  = CFS_PAGE_SIZE;        bk->bk_iovs[i].iov_base = cfs_page_address(pg);#endif        return;}voidsrpc_free_bulk (srpc_bulk_t *bk){        int         i;        cfs_page_t *pg;        LASSERT (bk != NULL);#ifndef __KERNEL__        LASSERT (bk->bk_pages != NULL);#endif        for (i = 0; i < bk->bk_niov; i++) {#ifdef __KERNEL__                pg = bk->bk_iovs[i].kiov_page;#else                pg = bk->bk_pages[i];#endif                if (pg == NULL) break;                cfs_free_page(pg);        }#ifndef __KERNEL__        LIBCFS_FREE(bk->bk_pages, sizeof(cfs_page_t *) * bk->bk_niov);#endif        LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[bk->bk_niov]));        return;}srpc_bulk_t *srpc_alloc_bulk (int npages, int sink){        srpc_bulk_t  *bk;        cfs_page_t  **pages;        int           i;        LASSERT (npages > 0 && npages <= LNET_MAX_IOV);        LIBCFS_ALLOC(bk, offsetof(srpc_bulk_t, bk_iovs[npages]));        if (bk == NULL) {                CERROR ("Can't allocate descriptor for %d pages\n", npages);                return NULL;        }        memset(bk, 0, offsetof(srpc_bulk_t, bk_iovs[npages]));        bk->bk_sink = sink;        bk->bk_niov = npages;        bk->bk_len  = npages * CFS_PAGE_SIZE;#ifndef __KERNEL__        LIBCFS_ALLOC(pages, sizeof(cfs_page_t *) * npages);        if (pages == NULL) {                LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[npages]));                CERROR ("Can't allocate page array for %d pages\n", npages);                return NULL;        }        memset(pages, 0, sizeof(cfs_page_t *) * npages);        bk->bk_pages = pages;#else        UNUSED (pages);#endif        for (i = 0; i < npages; i++) {                cfs_page_t *pg = cfs_alloc_page(CFS_ALLOC_STD);                if (pg == NULL) {                        CERROR ("Can't allocate page %d of %d\n", i, npages);                        srpc_free_bulk(bk);                        return NULL;                }                srpc_add_bulk_page(bk, pg, i);        }        return bk;}static inline struct list_head *srpc_nid2peerlist (lnet_nid_t nid){        unsigned int hash = ((unsigned int)nid) % SRPC_PEER_HASH_SIZE;        return &srpc_data.rpc_peers[hash];}static inline srpc_peer_t *srpc_create_peer (lnet_nid_t nid){        srpc_peer_t *peer;        LASSERT (nid != LNET_NID_ANY);        LIBCFS_ALLOC(peer, sizeof(srpc_peer_t));        if (peer == NULL) {                CERROR ("Failed to allocate peer structure for %s\n",                        libcfs_nid2str(nid));                return NULL;        }        memset(peer, 0, sizeof(srpc_peer_t));        peer->stp_nid     = nid;        peer->stp_credits = SRPC_PEER_CREDITS;        spin_lock_init(&peer->stp_lock);        CFS_INIT_LIST_HEAD(&peer->stp_rpcq);        CFS_INIT_LIST_HEAD(&peer->stp_ctl_rpcq);        return peer;}srpc_peer_t *srpc_find_peer_locked (lnet_nid_t nid){        struct list_head *peer_list = srpc_nid2peerlist(nid);        srpc_peer_t      *peer;        LASSERT (nid != LNET_NID_ANY);        list_for_each_entry (peer, peer_list, stp_list) {                if (peer->stp_nid == nid)                        return peer;        }        return NULL;}static srpc_peer_t *srpc_nid2peer (lnet_nid_t nid){	srpc_peer_t *peer;	srpc_peer_t *new_peer;        spin_lock(&srpc_data.rpc_glock);        peer = srpc_find_peer_locked(nid);        spin_unlock(&srpc_data.rpc_glock);        if (peer != NULL)                return peer;                new_peer = srpc_create_peer(nid);        spin_lock(&srpc_data.rpc_glock);        peer = srpc_find_peer_locked(nid);        if (peer != NULL) {                spin_unlock(&srpc_data.rpc_glock);                if (new_peer != NULL)                        LIBCFS_FREE(new_peer, sizeof(srpc_peer_t));                return peer;        }        if (new_peer == NULL) {                spin_unlock(&srpc_data.rpc_glock);                return NULL;        }                        list_add_tail(&new_peer->stp_list, srpc_nid2peerlist(nid));        spin_unlock(&srpc_data.rpc_glock);        return new_peer;}static inline __u64srpc_next_id (void){        __u64 id;        spin_lock(&srpc_data.rpc_glock);        id = srpc_data.rpc_matchbits++;        spin_unlock(&srpc_data.rpc_glock);        return id;}voidsrpc_init_server_rpc (srpc_server_rpc_t *rpc,                      srpc_service_t *sv, srpc_buffer_t *buffer){        memset(rpc, 0, sizeof(*rpc));        swi_init_workitem(&rpc->srpc_wi, rpc, srpc_handle_rpc);        rpc->srpc_ev.ev_fired = 1; /* no event expected now */        rpc->srpc_service  = sv;        rpc->srpc_reqstbuf = buffer;        rpc->srpc_peer     = buffer->buf_peer;        rpc->srpc_self     = buffer->buf_self;        rpc->srpc_replymdh = LNET_INVALID_HANDLE;}intsrpc_add_service (srpc_service_t *sv){        int                id = sv->sv_id;        int                i;        srpc_server_rpc_t *rpc;        LASSERT (sv->sv_concur > 0);        LASSERT (0 <= id && id <= SRPC_SERVICE_MAX_ID);        spin_lock(&srpc_data.rpc_glock);        LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);        if (srpc_data.rpc_services[id] != NULL) {                spin_unlock(&srpc_data.rpc_glock);                return -EBUSY;        }        srpc_data.rpc_services[id] = sv;        spin_unlock(&srpc_data.rpc_glock);        sv->sv_nprune       = 0;        sv->sv_nposted_msg  = 0;        sv->sv_shuttingdown = 0;        spin_lock_init(&sv->sv_lock);        CFS_INIT_LIST_HEAD(&sv->sv_free_rpcq);        CFS_INIT_LIST_HEAD(&sv->sv_active_rpcq);        CFS_INIT_LIST_HEAD(&sv->sv_posted_msgq);        CFS_INIT_LIST_HEAD(&sv->sv_blocked_msgq);        sv->sv_ev.ev_data = sv;        sv->sv_ev.ev_type = SRPC_REQUEST_RCVD;        for (i = 0; i < sv->sv_concur; i++) {                LIBCFS_ALLOC(rpc, sizeof(*rpc));                if (rpc == NULL) goto enomem;                list_add(&rpc->srpc_list, &sv->sv_free_rpcq);        }        CDEBUG (D_NET, "Adding service: id %d, name %s, concurrency %d\n",                id, sv->sv_name, sv->sv_concur);        return 0;enomem:        while (!list_empty(&sv->sv_free_rpcq)) {                rpc = list_entry(sv->sv_free_rpcq.next,                                 srpc_server_rpc_t, srpc_list);                list_del(&rpc->srpc_list);                LIBCFS_FREE(rpc, sizeof(*rpc));        }        spin_lock(&srpc_data.rpc_glock);        srpc_data.rpc_services[id] = NULL;        spin_unlock(&srpc_data.rpc_glock);        return -ENOMEM;}intsrpc_remove_service (srpc_service_t *sv){        int id = sv->sv_id;        spin_lock(&srpc_data.rpc_glock);        if (srpc_data.rpc_services[id] != sv) {                spin_unlock(&srpc_data.rpc_glock);                return -ENOENT;        }        srpc_data.rpc_services[id] = NULL;        spin_unlock(&srpc_data.rpc_glock);        return 0;}intsrpc_post_passive_rdma(int portal, __u64 matchbits, void *buf,                       int len, int options, lnet_process_id_t peer,                       lnet_handle_md_t *mdh, srpc_event_t *ev){        int              rc;        lnet_md_t        md;        lnet_handle_me_t meh;        rc = LNetMEAttach(portal, peer, matchbits, 0,                          LNET_UNLINK, LNET_INS_AFTER, &meh);        if (rc != 0) {                CERROR ("LNetMEAttach failed: %d\n", rc);                LASSERT (rc == -ENOMEM);                return -ENOMEM;        }        md.threshold = 1;        md.user_ptr  = ev;        md.start     = buf;        md.length    = len;        md.options   = options;        md.eq_handle = srpc_data.rpc_lnet_eq;        rc = LNetMDAttach(meh, md, LNET_UNLINK, mdh);        if (rc != 0) {                CERROR ("LNetMDAttach failed: %d\n", rc);                LASSERT (rc == -ENOMEM);                rc = LNetMEUnlink(meh);                LASSERT (rc == 0);                return -ENOMEM;        }        CDEBUG (D_NET,                "Posted passive RDMA: peer %s, portal %d, matchbits "LPX64"\n",                libcfs_id2str(peer), portal, matchbits);        return 0;}intsrpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len,                       int options, lnet_process_id_t peer, lnet_nid_t self,                      lnet_handle_md_t *mdh, srpc_event_t *ev){        int       rc;        lnet_md_t md;        md.user_ptr  = ev;        md.start     = buf;        md.length    = len;        md.eq_handle = srpc_data.rpc_lnet_eq;        md.threshold = ((options & LNET_MD_OP_GET) != 0) ? 2 : 1;        md.options   = options & ~(LNET_MD_OP_PUT | LNET_MD_OP_GET);        rc = LNetMDBind(md, LNET_UNLINK, mdh);        if (rc != 0) {                CERROR ("LNetMDBind failed: %d\n", rc);                LASSERT (rc == -ENOMEM);                return -ENOMEM;        }        /* this is kind of an abuse of the LNET_MD_OP_{PUT,GET} options.         * they're only meaningful for MDs attached to an ME (i.e. passive         * buffers... */	if ((options & LNET_MD_OP_PUT) != 0) {                rc = LNetPut(self, *mdh, LNET_NOACK_REQ, peer,                             portal, matchbits, 0, 0);        } else {	        LASSERT ((options & LNET_MD_OP_GET) != 0);                rc = LNetGet(self, *mdh, peer, portal, matchbits, 0);        }        if (rc != 0) {                CERROR ("LNet%s(%s, %d, "LPD64") failed: %d\n",                        ((options & LNET_MD_OP_PUT) != 0) ? "Put" : "Get",                        libcfs_id2str(peer), portal, matchbits, rc);                /* The forthcoming unlink event will complete this operation                 * with failure, so fall through and return success here.                 */                rc = LNetMDUnlink(*mdh);                LASSERT (rc == 0);        } else {                CDEBUG (D_NET,                        "Posted active RDMA: peer %s, portal %u, matchbits "LPX64"\n",                        libcfs_id2str(peer), portal, matchbits);        }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -