📄 rpc.c
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (C) 2001, 2002 Cluster File Systems, Inc. * Author: Isaac Huang <isaac@clusterfs.com> * */#define DEBUG_SUBSYSTEM S_LNET#include "selftest.h"typedef enum { SRPC_STATE_NONE, SRPC_STATE_NI_INIT, SRPC_STATE_EQ_INIT, SRPC_STATE_WI_INIT, SRPC_STATE_RUNNING, SRPC_STATE_STOPPING,} srpc_state_t;#define SRPC_PEER_HASH_SIZE 101 /* # peer lists */#define SRPC_PEER_CREDITS 16 /* >= most LND's default peer credit */struct smoketest_rpc { spinlock_t rpc_glock; /* global lock */ srpc_service_t *rpc_services[SRPC_SERVICE_MAX_ID + 1]; struct list_head *rpc_peers; /* hash table of known peers */ lnet_handle_eq_t rpc_lnet_eq; /* _the_ LNet event queue */ srpc_state_t rpc_state; srpc_counters_t rpc_counters; __u64 rpc_matchbits; /* matchbits counter */} srpc_data;/* forward ref's */int srpc_handle_rpc (swi_workitem_t *wi);void srpc_get_counters (srpc_counters_t *cnt){ spin_lock(&srpc_data.rpc_glock); *cnt = srpc_data.rpc_counters; spin_unlock(&srpc_data.rpc_glock);}void srpc_set_counters (const srpc_counters_t *cnt){ spin_lock(&srpc_data.rpc_glock); srpc_data.rpc_counters = *cnt; spin_unlock(&srpc_data.rpc_glock);}voidsrpc_add_bulk_page (srpc_bulk_t *bk, cfs_page_t *pg, int i){ LASSERT (i >= 0 && i < bk->bk_niov);#ifdef __KERNEL__ bk->bk_iovs[i].kiov_offset = 0; bk->bk_iovs[i].kiov_page = pg; bk->bk_iovs[i].kiov_len = CFS_PAGE_SIZE;#else LASSERT (bk->bk_pages != NULL); bk->bk_pages[i] = pg; bk->bk_iovs[i].iov_len = CFS_PAGE_SIZE; bk->bk_iovs[i].iov_base = cfs_page_address(pg);#endif return;}voidsrpc_free_bulk (srpc_bulk_t *bk){ int i; cfs_page_t *pg; LASSERT (bk != NULL);#ifndef __KERNEL__ LASSERT (bk->bk_pages != NULL);#endif for (i = 0; i < bk->bk_niov; i++) {#ifdef __KERNEL__ pg = bk->bk_iovs[i].kiov_page;#else pg = bk->bk_pages[i];#endif if (pg == NULL) break; cfs_free_page(pg); }#ifndef __KERNEL__ LIBCFS_FREE(bk->bk_pages, sizeof(cfs_page_t *) * bk->bk_niov);#endif LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[bk->bk_niov])); return;}srpc_bulk_t *srpc_alloc_bulk (int npages, int sink){ srpc_bulk_t *bk; cfs_page_t **pages; int i; LASSERT (npages > 0 && npages <= LNET_MAX_IOV); LIBCFS_ALLOC(bk, offsetof(srpc_bulk_t, bk_iovs[npages])); if (bk == NULL) { CERROR ("Can't allocate descriptor for %d pages\n", npages); return NULL; } memset(bk, 0, offsetof(srpc_bulk_t, bk_iovs[npages])); bk->bk_sink = sink; bk->bk_niov = npages; bk->bk_len = npages * CFS_PAGE_SIZE;#ifndef __KERNEL__ LIBCFS_ALLOC(pages, sizeof(cfs_page_t *) * npages); if (pages == NULL) { LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[npages])); CERROR ("Can't allocate page array for %d pages\n", npages); return NULL; } memset(pages, 0, sizeof(cfs_page_t *) * npages); bk->bk_pages = pages;#else UNUSED (pages);#endif for (i = 0; i < npages; i++) { cfs_page_t *pg = cfs_alloc_page(CFS_ALLOC_STD); if (pg == NULL) { CERROR ("Can't allocate page %d of %d\n", i, npages); srpc_free_bulk(bk); return NULL; } srpc_add_bulk_page(bk, pg, i); } return bk;}static inline struct list_head *srpc_nid2peerlist (lnet_nid_t nid){ unsigned int hash = ((unsigned int)nid) % SRPC_PEER_HASH_SIZE; return &srpc_data.rpc_peers[hash];}static inline srpc_peer_t *srpc_create_peer (lnet_nid_t nid){ srpc_peer_t *peer; LASSERT (nid != LNET_NID_ANY); LIBCFS_ALLOC(peer, sizeof(srpc_peer_t)); if (peer == NULL) { CERROR ("Failed to allocate peer structure for %s\n", libcfs_nid2str(nid)); return NULL; } memset(peer, 0, sizeof(srpc_peer_t)); peer->stp_nid = nid; peer->stp_credits = SRPC_PEER_CREDITS; spin_lock_init(&peer->stp_lock); CFS_INIT_LIST_HEAD(&peer->stp_rpcq); CFS_INIT_LIST_HEAD(&peer->stp_ctl_rpcq); return peer;}srpc_peer_t *srpc_find_peer_locked (lnet_nid_t nid){ struct list_head *peer_list = srpc_nid2peerlist(nid); srpc_peer_t *peer; LASSERT (nid != LNET_NID_ANY); list_for_each_entry (peer, peer_list, stp_list) { if (peer->stp_nid == nid) return peer; } return NULL;}static srpc_peer_t *srpc_nid2peer (lnet_nid_t nid){ srpc_peer_t *peer; srpc_peer_t *new_peer; spin_lock(&srpc_data.rpc_glock); peer = srpc_find_peer_locked(nid); spin_unlock(&srpc_data.rpc_glock); if (peer != NULL) return peer; new_peer = srpc_create_peer(nid); spin_lock(&srpc_data.rpc_glock); peer = srpc_find_peer_locked(nid); if (peer != NULL) { spin_unlock(&srpc_data.rpc_glock); if (new_peer != NULL) LIBCFS_FREE(new_peer, sizeof(srpc_peer_t)); return peer; } if (new_peer == NULL) { spin_unlock(&srpc_data.rpc_glock); return NULL; } list_add_tail(&new_peer->stp_list, srpc_nid2peerlist(nid)); spin_unlock(&srpc_data.rpc_glock); return new_peer;}static inline __u64srpc_next_id (void){ __u64 id; spin_lock(&srpc_data.rpc_glock); id = srpc_data.rpc_matchbits++; spin_unlock(&srpc_data.rpc_glock); return id;}voidsrpc_init_server_rpc (srpc_server_rpc_t *rpc, srpc_service_t *sv, srpc_buffer_t *buffer){ memset(rpc, 0, sizeof(*rpc)); swi_init_workitem(&rpc->srpc_wi, rpc, srpc_handle_rpc); rpc->srpc_ev.ev_fired = 1; /* no event expected now */ rpc->srpc_service = sv; rpc->srpc_reqstbuf = buffer; rpc->srpc_peer = buffer->buf_peer; rpc->srpc_self = buffer->buf_self; rpc->srpc_replymdh = LNET_INVALID_HANDLE;}intsrpc_add_service (srpc_service_t *sv){ int id = sv->sv_id; int i; srpc_server_rpc_t *rpc; LASSERT (sv->sv_concur > 0); LASSERT (0 <= id && id <= SRPC_SERVICE_MAX_ID); spin_lock(&srpc_data.rpc_glock); LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING); if (srpc_data.rpc_services[id] != NULL) { spin_unlock(&srpc_data.rpc_glock); return -EBUSY; } srpc_data.rpc_services[id] = sv; spin_unlock(&srpc_data.rpc_glock); sv->sv_nprune = 0; sv->sv_nposted_msg = 0; sv->sv_shuttingdown = 0; spin_lock_init(&sv->sv_lock); CFS_INIT_LIST_HEAD(&sv->sv_free_rpcq); CFS_INIT_LIST_HEAD(&sv->sv_active_rpcq); CFS_INIT_LIST_HEAD(&sv->sv_posted_msgq); CFS_INIT_LIST_HEAD(&sv->sv_blocked_msgq); sv->sv_ev.ev_data = sv; sv->sv_ev.ev_type = SRPC_REQUEST_RCVD; for (i = 0; i < sv->sv_concur; i++) { LIBCFS_ALLOC(rpc, sizeof(*rpc)); if (rpc == NULL) goto enomem; list_add(&rpc->srpc_list, &sv->sv_free_rpcq); } CDEBUG (D_NET, "Adding service: id %d, name %s, concurrency %d\n", id, sv->sv_name, sv->sv_concur); return 0;enomem: while (!list_empty(&sv->sv_free_rpcq)) { rpc = list_entry(sv->sv_free_rpcq.next, srpc_server_rpc_t, srpc_list); list_del(&rpc->srpc_list); LIBCFS_FREE(rpc, sizeof(*rpc)); } spin_lock(&srpc_data.rpc_glock); srpc_data.rpc_services[id] = NULL; spin_unlock(&srpc_data.rpc_glock); return -ENOMEM;}intsrpc_remove_service (srpc_service_t *sv){ int id = sv->sv_id; spin_lock(&srpc_data.rpc_glock); if (srpc_data.rpc_services[id] != sv) { spin_unlock(&srpc_data.rpc_glock); return -ENOENT; } srpc_data.rpc_services[id] = NULL; spin_unlock(&srpc_data.rpc_glock); return 0;}intsrpc_post_passive_rdma(int portal, __u64 matchbits, void *buf, int len, int options, lnet_process_id_t peer, lnet_handle_md_t *mdh, srpc_event_t *ev){ int rc; lnet_md_t md; lnet_handle_me_t meh; rc = LNetMEAttach(portal, peer, matchbits, 0, LNET_UNLINK, LNET_INS_AFTER, &meh); if (rc != 0) { CERROR ("LNetMEAttach failed: %d\n", rc); LASSERT (rc == -ENOMEM); return -ENOMEM; } md.threshold = 1; md.user_ptr = ev; md.start = buf; md.length = len; md.options = options; md.eq_handle = srpc_data.rpc_lnet_eq; rc = LNetMDAttach(meh, md, LNET_UNLINK, mdh); if (rc != 0) { CERROR ("LNetMDAttach failed: %d\n", rc); LASSERT (rc == -ENOMEM); rc = LNetMEUnlink(meh); LASSERT (rc == 0); return -ENOMEM; } CDEBUG (D_NET, "Posted passive RDMA: peer %s, portal %d, matchbits "LPX64"\n", libcfs_id2str(peer), portal, matchbits); return 0;}intsrpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len, int options, lnet_process_id_t peer, lnet_nid_t self, lnet_handle_md_t *mdh, srpc_event_t *ev){ int rc; lnet_md_t md; md.user_ptr = ev; md.start = buf; md.length = len; md.eq_handle = srpc_data.rpc_lnet_eq; md.threshold = ((options & LNET_MD_OP_GET) != 0) ? 2 : 1; md.options = options & ~(LNET_MD_OP_PUT | LNET_MD_OP_GET); rc = LNetMDBind(md, LNET_UNLINK, mdh); if (rc != 0) { CERROR ("LNetMDBind failed: %d\n", rc); LASSERT (rc == -ENOMEM); return -ENOMEM; } /* this is kind of an abuse of the LNET_MD_OP_{PUT,GET} options. * they're only meaningful for MDs attached to an ME (i.e. passive * buffers... */ if ((options & LNET_MD_OP_PUT) != 0) { rc = LNetPut(self, *mdh, LNET_NOACK_REQ, peer, portal, matchbits, 0, 0); } else { LASSERT ((options & LNET_MD_OP_GET) != 0); rc = LNetGet(self, *mdh, peer, portal, matchbits, 0); } if (rc != 0) { CERROR ("LNet%s(%s, %d, "LPD64") failed: %d\n", ((options & LNET_MD_OP_PUT) != 0) ? "Put" : "Get", libcfs_id2str(peer), portal, matchbits, rc); /* The forthcoming unlink event will complete this operation * with failure, so fall through and return success here. */ rc = LNetMDUnlink(*mdh); LASSERT (rc == 0); } else { CDEBUG (D_NET, "Posted active RDMA: peer %s, portal %u, matchbits "LPX64"\n", libcfs_id2str(peer), portal, matchbits); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -