⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ptllnd_rx_buf.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 2 页
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved. *   Author: PJ Kirner <pjkirner@clusterfs.com> * *   This file is part of the Lustre file system, http://www.lustre.org *   Lustre is a trademark of Cluster File Systems, Inc. * *   This file is confidential source code owned by Cluster File Systems. *   No viewing, modification, compilation, redistribution, or any other *   form of use is permitted except through a signed license agreement. * *   If you have not signed such an agreement, then you have no rights to *   this file.  Please destroy it immediately and contact CFS. * */ #include "ptllnd.h"voidkptllnd_rx_buffer_pool_init(kptl_rx_buffer_pool_t *rxbp){        memset(rxbp, 0, sizeof(*rxbp));        spin_lock_init(&rxbp->rxbp_lock);        INIT_LIST_HEAD(&rxbp->rxbp_list);}voidkptllnd_rx_buffer_destroy(kptl_rx_buffer_t *rxb){        kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;        LASSERT(rxb->rxb_refcount == 0);        LASSERT(PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE));        LASSERT(!rxb->rxb_posted);        LASSERT(rxb->rxb_idle);        list_del(&rxb->rxb_list);        rxbp->rxbp_count--;        LIBCFS_FREE(rxb->rxb_buffer, kptllnd_rx_buffer_size());        LIBCFS_FREE(rxb, sizeof(*rxb));}intkptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count){        int               bufsize;        int               msgs_per_buffer;        int               rc;        kptl_rx_buffer_t *rxb;        char             *buffer;        unsigned long     flags;        bufsize = kptllnd_rx_buffer_size();        msgs_per_buffer = bufsize / (*kptllnd_tunables.kptl_max_msg_size);        CDEBUG(D_NET, "kptllnd_rx_buffer_pool_reserve(%d)\n", count);        spin_lock_irqsave(&rxbp->rxbp_lock, flags);        for (;;) {                if (rxbp->rxbp_shutdown) {                        rc = -ESHUTDOWN;                        break;                }                                if (rxbp->rxbp_reserved + count <=                     rxbp->rxbp_count * msgs_per_buffer) {                        rc = 0;                        break;                }                                spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);                                LIBCFS_ALLOC(rxb, sizeof(*rxb));                LIBCFS_ALLOC(buffer, bufsize);                if (rxb == NULL || buffer == NULL) {                        CERROR("Failed to allocate rx buffer\n");                        if (rxb != NULL)                                LIBCFS_FREE(rxb, sizeof(*rxb));                        if (buffer != NULL)                                LIBCFS_FREE(buffer, bufsize);                                                spin_lock_irqsave(&rxbp->rxbp_lock, flags);                        rc = -ENOMEM;                        break;                }                memset(rxb, 0, sizeof(*rxb));                rxb->rxb_eventarg.eva_type = PTLLND_EVENTARG_TYPE_BUF;                rxb->rxb_refcount = 0;                rxb->rxb_pool = rxbp;                rxb->rxb_idle = 0;                rxb->rxb_posted = 0;                rxb->rxb_buffer = buffer;                rxb->rxb_mdh = PTL_INVALID_HANDLE;                spin_lock_irqsave(&rxbp->rxbp_lock, flags);                                if (rxbp->rxbp_shutdown) {                        spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);                                                LIBCFS_FREE(rxb, sizeof(*rxb));                        LIBCFS_FREE(buffer, bufsize);                        spin_lock_irqsave(&rxbp->rxbp_lock, flags);                        rc = -ESHUTDOWN;                        break;                }                                list_add_tail(&rxb->rxb_list, &rxbp->rxbp_list);                rxbp->rxbp_count++;                spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);                                kptllnd_rx_buffer_post(rxb);                spin_lock_irqsave(&rxbp->rxbp_lock, flags);        }        if (rc == 0)                rxbp->rxbp_reserved += count;        spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);        return rc;}voidkptllnd_rx_buffer_pool_unreserve(kptl_rx_buffer_pool_t *rxbp,                                 int count){        unsigned long flags;        spin_lock_irqsave(&rxbp->rxbp_lock, flags);        CDEBUG(D_NET, "kptllnd_rx_buffer_pool_unreserve(%d)\n", count);        rxbp->rxbp_reserved -= count;        spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);}voidkptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp){        kptl_rx_buffer_t       *rxb;        int                     rc;        int                     i;        unsigned long           flags;        struct list_head       *tmp;        struct list_head       *nxt;        ptl_handle_md_t         mdh;        /* CAVEAT EMPTOR: I'm racing with everything here!!!           *         * Buffers can still be posted after I set rxbp_shutdown because I         * can't hold rxbp_lock while I'm posting them.         *         * Calling PtlMDUnlink() here races with auto-unlinks; i.e. a buffer's         * MD handle could become invalid under me.  I am vulnerable to portals         * re-using handles (i.e. make the same handle valid again, but for a         * different MD) from when the MD is actually unlinked, to when the         * event callback tells me it has been unlinked. */        spin_lock_irqsave(&rxbp->rxbp_lock, flags);        rxbp->rxbp_shutdown = 1;        for (i = 9;; i++) {                list_for_each_safe(tmp, nxt, &rxbp->rxbp_list) {                        rxb = list_entry (tmp, kptl_rx_buffer_t, rxb_list);                                        if (rxb->rxb_idle) {                                spin_unlock_irqrestore(&rxbp->rxbp_lock,                                                        flags);                                kptllnd_rx_buffer_destroy(rxb);                                spin_lock_irqsave(&rxbp->rxbp_lock,                                                   flags);                                continue;                        }                        mdh = rxb->rxb_mdh;                        if (PtlHandleIsEqual(mdh, PTL_INVALID_HANDLE))                                continue;                                                spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);                        rc = PtlMDUnlink(mdh);                        spin_lock_irqsave(&rxbp->rxbp_lock, flags);                        #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS                        /* callback clears rxb_mdh and drops net's ref                         * (which causes repost, but since I set                         * shutdown, it will just set the buffer                         * idle) */#else                        if (rc == PTL_OK) {                                rxb->rxb_posted = 0;                                rxb->rxb_mdh = PTL_INVALID_HANDLE;                                kptllnd_rx_buffer_decref_locked(rxb);                        }#endif                }                if (list_empty(&rxbp->rxbp_list))                        break;                spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);                /* Wait a bit for references to be dropped */                CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */                       "Waiting for %d Busy RX Buffers\n",                       rxbp->rxbp_count);                cfs_pause(cfs_time_seconds(1));                spin_lock_irqsave(&rxbp->rxbp_lock, flags);        }        spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);}voidkptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb){        int                     rc;        ptl_md_t                md;        ptl_handle_me_t         meh;        ptl_handle_md_t         mdh;        ptl_process_id_t        any;        kptl_rx_buffer_pool_t  *rxbp = rxb->rxb_pool;        unsigned long           flags;        LASSERT (!in_interrupt());        LASSERT (rxb->rxb_refcount == 0);        LASSERT (!rxb->rxb_idle);        LASSERT (!rxb->rxb_posted);        LASSERT (PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE));        any.nid = PTL_NID_ANY;        any.pid = PTL_PID_ANY;        spin_lock_irqsave(&rxbp->rxbp_lock, flags);        if (rxbp->rxbp_shutdown) {                rxb->rxb_idle = 1;                spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);                return;        }        rxb->rxb_refcount = 1;                  /* net's ref */        rxb->rxb_posted = 1;                    /* I'm posting */                spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);        rc = PtlMEAttach(kptllnd_data.kptl_nih,                         *kptllnd_tunables.kptl_portal,                         any,                         LNET_MSG_MATCHBITS,                         0, /* all matchbits are valid - ignore none */                         PTL_UNLINK,                         PTL_INS_AFTER,                         &meh);        if (rc != PTL_OK) {                CERROR("PtlMeAttach rxb failed %s(%d)\n",                       kptllnd_errtype2str(rc), rc);                goto failed;        }        /*         * Setup MD         */        md.start = rxb->rxb_buffer;        md.length = PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages;        md.threshold = PTL_MD_THRESH_INF;        md.options = PTL_MD_OP_PUT |                     PTL_MD_LUSTRE_COMPLETION_SEMANTICS |                     PTL_MD_EVENT_START_DISABLE |                     PTL_MD_MAX_SIZE |                     PTL_MD_LOCAL_ALIGN8;        md.user_ptr = &rxb->rxb_eventarg;        md.max_size = *kptllnd_tunables.kptl_max_msg_size;        md.eq_handle = kptllnd_data.kptl_eqh;        rc = PtlMDAttach(meh, md, PTL_UNLINK, &mdh);        if (rc == PTL_OK) {                spin_lock_irqsave(&rxbp->rxbp_lock, flags);                if (rxb->rxb_posted)            /* Not auto-unlinked yet!!! */                        rxb->rxb_mdh = mdh;                spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);                return;        }                CERROR("PtlMDAttach rxb failed %s(%d)\n",               kptllnd_errtype2str(rc), rc);        rc = PtlMEUnlink(meh);        LASSERT(rc == PTL_OK); failed:        spin_lock_irqsave(&rxbp->rxbp_lock, flags);        rxb->rxb_posted = 0;        /* XXX this will just try again immediately */        kptllnd_rx_buffer_decref_locked(rxb);        spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);}kptl_rx_t *kptllnd_rx_alloc(void){        kptl_rx_t* rx;        if (IS_SIMULATION_ENABLED(FAIL_RX_ALLOC)) {                CERROR ("FAIL_RX_ALLOC SIMULATION triggered\n");                return NULL;        }        rx = cfs_mem_cache_alloc(kptllnd_data.kptl_rx_cache, CFS_ALLOC_ATOMIC);        if (rx == NULL) {                CERROR("Failed to allocate rx\n");                return NULL;        }        memset(rx, 0, sizeof(*rx));        return rx;}voidkptllnd_rx_done(kptl_rx_t *rx){        kptl_rx_buffer_t *rxb = rx->rx_rxb;        kptl_peer_t      *peer = rx->rx_peer;        unsigned long     flags;        CDEBUG(D_NET, "rx=%p rxb %p peer %p\n", rx, rxb, peer);        if (rxb != NULL)                kptllnd_rx_buffer_decref(rxb);        if (peer != NULL) {                /* Update credits (after I've decref-ed the buffer) */                spin_lock_irqsave(&peer->peer_lock, flags);                peer->peer_outstanding_credits++;                LASSERT (peer->peer_outstanding_credits +                         peer->peer_sent_credits <=                         *kptllnd_tunables.kptl_peercredits);                CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: rx %p done\n",                       libcfs_id2str(peer->peer_id), peer->peer_credits,                       peer->peer_outstanding_credits, peer->peer_sent_credits,                       rx);                spin_unlock_irqrestore(&peer->peer_lock, flags);                /* I might have to send back credits */                kptllnd_peer_check_sends(peer);                kptllnd_peer_decref(peer);        }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -