📄 ptllnd_rx_buf.c
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved. * Author: PJ Kirner <pjkirner@clusterfs.com> * * This file is part of the Lustre file system, http://www.lustre.org * Lustre is a trademark of Cluster File Systems, Inc. * * This file is confidential source code owned by Cluster File Systems. * No viewing, modification, compilation, redistribution, or any other * form of use is permitted except through a signed license agreement. * * If you have not signed such an agreement, then you have no rights to * this file. Please destroy it immediately and contact CFS. * */ #include "ptllnd.h"voidkptllnd_rx_buffer_pool_init(kptl_rx_buffer_pool_t *rxbp){ memset(rxbp, 0, sizeof(*rxbp)); spin_lock_init(&rxbp->rxbp_lock); INIT_LIST_HEAD(&rxbp->rxbp_list);}voidkptllnd_rx_buffer_destroy(kptl_rx_buffer_t *rxb){ kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool; LASSERT(rxb->rxb_refcount == 0); LASSERT(PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE)); LASSERT(!rxb->rxb_posted); LASSERT(rxb->rxb_idle); list_del(&rxb->rxb_list); rxbp->rxbp_count--; LIBCFS_FREE(rxb->rxb_buffer, kptllnd_rx_buffer_size()); LIBCFS_FREE(rxb, sizeof(*rxb));}intkptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count){ int bufsize; int msgs_per_buffer; int rc; kptl_rx_buffer_t *rxb; char *buffer; unsigned long flags; bufsize = kptllnd_rx_buffer_size(); msgs_per_buffer = bufsize / (*kptllnd_tunables.kptl_max_msg_size); CDEBUG(D_NET, "kptllnd_rx_buffer_pool_reserve(%d)\n", count); spin_lock_irqsave(&rxbp->rxbp_lock, flags); for (;;) { if (rxbp->rxbp_shutdown) { rc = -ESHUTDOWN; break; } if (rxbp->rxbp_reserved + count <= rxbp->rxbp_count * msgs_per_buffer) { rc = 0; break; } spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); LIBCFS_ALLOC(rxb, sizeof(*rxb)); LIBCFS_ALLOC(buffer, bufsize); if (rxb == NULL || buffer == NULL) { CERROR("Failed to allocate rx buffer\n"); if (rxb != NULL) LIBCFS_FREE(rxb, sizeof(*rxb)); if (buffer != NULL) LIBCFS_FREE(buffer, bufsize); spin_lock_irqsave(&rxbp->rxbp_lock, flags); rc = -ENOMEM; break; } memset(rxb, 0, sizeof(*rxb)); rxb->rxb_eventarg.eva_type = PTLLND_EVENTARG_TYPE_BUF; rxb->rxb_refcount = 0; rxb->rxb_pool = rxbp; rxb->rxb_idle = 0; rxb->rxb_posted = 0; rxb->rxb_buffer = buffer; rxb->rxb_mdh = PTL_INVALID_HANDLE; spin_lock_irqsave(&rxbp->rxbp_lock, flags); if (rxbp->rxbp_shutdown) { spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); LIBCFS_FREE(rxb, sizeof(*rxb)); LIBCFS_FREE(buffer, bufsize); spin_lock_irqsave(&rxbp->rxbp_lock, flags); rc = -ESHUTDOWN; break; } list_add_tail(&rxb->rxb_list, &rxbp->rxbp_list); rxbp->rxbp_count++; spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); kptllnd_rx_buffer_post(rxb); spin_lock_irqsave(&rxbp->rxbp_lock, flags); } if (rc == 0) rxbp->rxbp_reserved += count; spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); return rc;}voidkptllnd_rx_buffer_pool_unreserve(kptl_rx_buffer_pool_t *rxbp, int count){ unsigned long flags; spin_lock_irqsave(&rxbp->rxbp_lock, flags); CDEBUG(D_NET, "kptllnd_rx_buffer_pool_unreserve(%d)\n", count); rxbp->rxbp_reserved -= count; spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);}voidkptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp){ kptl_rx_buffer_t *rxb; int rc; int i; unsigned long flags; struct list_head *tmp; struct list_head *nxt; ptl_handle_md_t mdh; /* CAVEAT EMPTOR: I'm racing with everything here!!! * * Buffers can still be posted after I set rxbp_shutdown because I * can't hold rxbp_lock while I'm posting them. * * Calling PtlMDUnlink() here races with auto-unlinks; i.e. a buffer's * MD handle could become invalid under me. I am vulnerable to portals * re-using handles (i.e. make the same handle valid again, but for a * different MD) from when the MD is actually unlinked, to when the * event callback tells me it has been unlinked. */ spin_lock_irqsave(&rxbp->rxbp_lock, flags); rxbp->rxbp_shutdown = 1; for (i = 9;; i++) { list_for_each_safe(tmp, nxt, &rxbp->rxbp_list) { rxb = list_entry (tmp, kptl_rx_buffer_t, rxb_list); if (rxb->rxb_idle) { spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); kptllnd_rx_buffer_destroy(rxb); spin_lock_irqsave(&rxbp->rxbp_lock, flags); continue; } mdh = rxb->rxb_mdh; if (PtlHandleIsEqual(mdh, PTL_INVALID_HANDLE)) continue; spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); rc = PtlMDUnlink(mdh); spin_lock_irqsave(&rxbp->rxbp_lock, flags); #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS /* callback clears rxb_mdh and drops net's ref * (which causes repost, but since I set * shutdown, it will just set the buffer * idle) */#else if (rc == PTL_OK) { rxb->rxb_posted = 0; rxb->rxb_mdh = PTL_INVALID_HANDLE; kptllnd_rx_buffer_decref_locked(rxb); }#endif } if (list_empty(&rxbp->rxbp_list)) break; spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); /* Wait a bit for references to be dropped */ CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */ "Waiting for %d Busy RX Buffers\n", rxbp->rxbp_count); cfs_pause(cfs_time_seconds(1)); spin_lock_irqsave(&rxbp->rxbp_lock, flags); } spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);}voidkptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb){ int rc; ptl_md_t md; ptl_handle_me_t meh; ptl_handle_md_t mdh; ptl_process_id_t any; kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool; unsigned long flags; LASSERT (!in_interrupt()); LASSERT (rxb->rxb_refcount == 0); LASSERT (!rxb->rxb_idle); LASSERT (!rxb->rxb_posted); LASSERT (PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE)); any.nid = PTL_NID_ANY; any.pid = PTL_PID_ANY; spin_lock_irqsave(&rxbp->rxbp_lock, flags); if (rxbp->rxbp_shutdown) { rxb->rxb_idle = 1; spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); return; } rxb->rxb_refcount = 1; /* net's ref */ rxb->rxb_posted = 1; /* I'm posting */ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); rc = PtlMEAttach(kptllnd_data.kptl_nih, *kptllnd_tunables.kptl_portal, any, LNET_MSG_MATCHBITS, 0, /* all matchbits are valid - ignore none */ PTL_UNLINK, PTL_INS_AFTER, &meh); if (rc != PTL_OK) { CERROR("PtlMeAttach rxb failed %s(%d)\n", kptllnd_errtype2str(rc), rc); goto failed; } /* * Setup MD */ md.start = rxb->rxb_buffer; md.length = PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages; md.threshold = PTL_MD_THRESH_INF; md.options = PTL_MD_OP_PUT | PTL_MD_LUSTRE_COMPLETION_SEMANTICS | PTL_MD_EVENT_START_DISABLE | PTL_MD_MAX_SIZE | PTL_MD_LOCAL_ALIGN8; md.user_ptr = &rxb->rxb_eventarg; md.max_size = *kptllnd_tunables.kptl_max_msg_size; md.eq_handle = kptllnd_data.kptl_eqh; rc = PtlMDAttach(meh, md, PTL_UNLINK, &mdh); if (rc == PTL_OK) { spin_lock_irqsave(&rxbp->rxbp_lock, flags); if (rxb->rxb_posted) /* Not auto-unlinked yet!!! */ rxb->rxb_mdh = mdh; spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); return; } CERROR("PtlMDAttach rxb failed %s(%d)\n", kptllnd_errtype2str(rc), rc); rc = PtlMEUnlink(meh); LASSERT(rc == PTL_OK); failed: spin_lock_irqsave(&rxbp->rxbp_lock, flags); rxb->rxb_posted = 0; /* XXX this will just try again immediately */ kptllnd_rx_buffer_decref_locked(rxb); spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);}kptl_rx_t *kptllnd_rx_alloc(void){ kptl_rx_t* rx; if (IS_SIMULATION_ENABLED(FAIL_RX_ALLOC)) { CERROR ("FAIL_RX_ALLOC SIMULATION triggered\n"); return NULL; } rx = cfs_mem_cache_alloc(kptllnd_data.kptl_rx_cache, CFS_ALLOC_ATOMIC); if (rx == NULL) { CERROR("Failed to allocate rx\n"); return NULL; } memset(rx, 0, sizeof(*rx)); return rx;}voidkptllnd_rx_done(kptl_rx_t *rx){ kptl_rx_buffer_t *rxb = rx->rx_rxb; kptl_peer_t *peer = rx->rx_peer; unsigned long flags; CDEBUG(D_NET, "rx=%p rxb %p peer %p\n", rx, rxb, peer); if (rxb != NULL) kptllnd_rx_buffer_decref(rxb); if (peer != NULL) { /* Update credits (after I've decref-ed the buffer) */ spin_lock_irqsave(&peer->peer_lock, flags); peer->peer_outstanding_credits++; LASSERT (peer->peer_outstanding_credits + peer->peer_sent_credits <= *kptllnd_tunables.kptl_peercredits); CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: rx %p done\n", libcfs_id2str(peer->peer_id), peer->peer_credits, peer->peer_outstanding_credits, peer->peer_sent_credits, rx); spin_unlock_irqrestore(&peer->peer_lock, flags); /* I might have to send back credits */ kptllnd_peer_check_sends(peer); kptllnd_peer_decref(peer); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -