⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ptllnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 2 页
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved. *   Author: PJ Kirner <pjkirner@clusterfs.com> * *   This file is part of the Lustre file system, http://www.lustre.org *   Lustre is a trademark of Cluster File Systems, Inc. * *   This file is confidential source code owned by Cluster File Systems. *   No viewing, modification, compilation, redistribution, or any other *   form of use is permitted except through a signed license agreement. * *   If you have not signed such an agreement, then you have no rights to *   this file.  Please destroy it immediately and contact CFS. * */#include "ptllnd.h"#ifndef _USING_LUSTRE_PORTALS_intkptllnd_extract_iov (int dst_niov, ptl_md_iovec_t *dst,                     int src_niov, struct iovec *src,                     unsigned int offset, unsigned int len){        /* Initialise 'dst' to the subset of 'src' starting at 'offset',         * for exactly 'len' bytes, and return the number of entries.         * NB not destructive to 'src' */        unsigned int    frag_len;        unsigned int    niov;        if (len == 0)                           /* no data => */                return (0);                     /* no frags */        LASSERT (src_niov > 0);        while (offset >= src->iov_len) {      /* skip initial frags */                offset -= src->iov_len;                src_niov--;                src++;                LASSERT (src_niov > 0);        }        niov = 1;        for (;;) {                LASSERT (src_niov > 0);                LASSERT (niov <= dst_niov);                frag_len = src->iov_len - offset;                dst->iov_base = ((char *)src->iov_base) + offset;                if (len <= frag_len) {                        dst->iov_len = len;                        return (niov);                }                dst->iov_len = frag_len;                len -= frag_len;                dst++;                src++;                niov++;                src_niov--;                offset = 0;        }}intkptllnd_extract_phys (int dst_niov, ptl_md_iovec_t *dst,                      int src_niov, lnet_kiov_t *src,                      unsigned int offset, unsigned int len){        /* Initialise 'dst' to the physical addresses of the subset of 'src'         * starting at 'offset', for exactly 'len' bytes, and return the number         * of entries.  NB not destructive to 'src' */        unsigned int    frag_len;        unsigned int    niov;        __u64           phys_page;        __u64           phys;        if (len == 0)                           /* no data => */                return (0);                     /* no frags */        LASSERT (src_niov > 0);        while (offset >= src->kiov_len) {      /* skip initial frags */                offset -= src->kiov_len;                src_niov--;                src++;                LASSERT (src_niov > 0);        }        niov = 1;        for (;;) {                LASSERT (src_niov > 0);                LASSERT (niov <= dst_niov);                frag_len = min(src->kiov_len - offset, len);                phys_page = lnet_page2phys(src->kiov_page);                phys = phys_page + src->kiov_offset + offset;                LASSERT (sizeof(void *) > 4 ||                          (phys <= 0xffffffffULL &&                          phys + (frag_len - 1) <= 0xffffffffULL));                dst->iov_base = (void *)((unsigned long)phys);                dst->iov_len = frag_len;                                if (frag_len == len)                        return niov;                len -= frag_len;                dst++;                src++;                niov++;                src_niov--;                offset = 0;        }}#endifvoidkptllnd_init_rdma_md(kptl_tx_t *tx, unsigned int niov,                     struct iovec *iov, lnet_kiov_t *kiov,                     unsigned int offset, unsigned int nob){        LASSERT (iov == NULL || kiov == NULL);                memset(&tx->tx_rdma_md, 0, sizeof(tx->tx_rdma_md));        tx->tx_rdma_md.start     = tx->tx_frags;        tx->tx_rdma_md.user_ptr  = &tx->tx_rdma_eventarg;        tx->tx_rdma_md.eq_handle = kptllnd_data.kptl_eqh;        tx->tx_rdma_md.options   = PTL_MD_LUSTRE_COMPLETION_SEMANTICS |                                   PTL_MD_EVENT_START_DISABLE;        switch (tx->tx_type) {        default:                LBUG();                        case TX_TYPE_PUT_REQUEST:               /* passive: peer gets */                tx->tx_rdma_md.threshold = 1;   /* GET event */                tx->tx_rdma_md.options |= PTL_MD_OP_GET;                break;        case TX_TYPE_GET_REQUEST:               /* passive: peer puts */                tx->tx_rdma_md.threshold = 1;   /* PUT event */                tx->tx_rdma_md.options |= PTL_MD_OP_PUT;                break;                        case TX_TYPE_PUT_RESPONSE:              /* active: I get */                tx->tx_rdma_md.threshold = 2;   /* SEND + REPLY */                break;                        case TX_TYPE_GET_RESPONSE:              /* active: I put */                tx->tx_rdma_md.threshold = tx->tx_acked ? 2 : 1;   /* SEND + ACK? */                break;        }        if (nob == 0) {                tx->tx_rdma_md.length = 0;                return;        }#ifdef _USING_LUSTRE_PORTALS_        if (iov != NULL) {                tx->tx_rdma_md.options |= PTL_MD_IOVEC;                tx->tx_rdma_md.length =                         lnet_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov,                                         niov, iov, offset, nob);                return;        }        /* Cheating OK since ptl_kiov_t == lnet_kiov_t */        CLASSERT(sizeof(ptl_kiov_t) == sizeof(lnet_kiov_t));        CLASSERT(offsetof(ptl_kiov_t, kiov_offset) ==                 offsetof(lnet_kiov_t, kiov_offset));        CLASSERT(offsetof(ptl_kiov_t, kiov_page) ==                 offsetof(lnet_kiov_t, kiov_page));        CLASSERT(offsetof(ptl_kiov_t, kiov_len) ==                 offsetof(lnet_kiov_t, kiov_len));                tx->tx_rdma_md.options |= PTL_MD_KIOV;        tx->tx_rdma_md.length =                 lnet_extract_kiov(PTL_MD_MAX_IOV, tx->tx_frags->kiov,                                  niov, kiov, offset, nob);#else        if (iov != NULL) {                tx->tx_rdma_md.options |= PTL_MD_IOVEC;                tx->tx_rdma_md.length =                         kptllnd_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov,                                            niov, iov, offset, nob);                return;        }        tx->tx_rdma_md.options |= PTL_MD_IOVEC | PTL_MD_PHYS;        tx->tx_rdma_md.length =                kptllnd_extract_phys(PTL_MD_MAX_IOV, tx->tx_frags->iov,                                     niov, kiov, offset, nob);#endif}intkptllnd_active_rdma(kptl_rx_t *rx, lnet_msg_t *lntmsg, int type,                    unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,                    unsigned int offset, int nob){        kptl_tx_t       *tx;        ptl_err_t        ptlrc;        kptl_msg_t      *rxmsg = rx->rx_msg;        kptl_peer_t     *peer = rx->rx_peer;        unsigned long    flags;        ptl_handle_md_t  mdh;        LASSERT (type == TX_TYPE_PUT_RESPONSE ||                  type == TX_TYPE_GET_RESPONSE);        tx = kptllnd_get_idle_tx(type);        if (tx == NULL) {                CERROR ("Can't do %s rdma to %s: can't allocate descriptor\n",                        type == TX_TYPE_PUT_RESPONSE ? "GET" : "PUT",                        libcfs_id2str(peer->peer_id));                return -ENOMEM;        }        kptllnd_set_tx_peer(tx, peer);        kptllnd_init_rdma_md(tx, niov, iov, kiov, offset, nob);        ptlrc = PtlMDBind(kptllnd_data.kptl_nih, tx->tx_rdma_md,                           PTL_UNLINK, &mdh);        if (ptlrc != PTL_OK) {                CERROR("PtlMDBind(%s) failed: %s(%d)\n",                       libcfs_id2str(peer->peer_id),                       kptllnd_errtype2str(ptlrc), ptlrc);                tx->tx_status = -EIO;                kptllnd_tx_decref(tx);                return -EIO;        }                spin_lock_irqsave(&peer->peer_lock, flags);        tx->tx_lnet_msg = lntmsg;        /* lnet_finalize() will be called when tx is torn down, so I must         * return success from here on... */        tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);        tx->tx_rdma_mdh = mdh;        tx->tx_active = 1;        list_add_tail(&tx->tx_list, &peer->peer_activeq);        /* peer has now got my ref on 'tx' */        spin_unlock_irqrestore(&peer->peer_lock, flags);        tx->tx_tposted = jiffies;        if (type == TX_TYPE_GET_RESPONSE)                ptlrc = PtlPut(mdh,                               tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,                               rx->rx_initiator,                               *kptllnd_tunables.kptl_portal,                               0,                     /* acl cookie */                               rxmsg->ptlm_u.rdma.kptlrm_matchbits,                               0,                     /* offset */                               (lntmsg != NULL) ?     /* header data */                               PTLLND_RDMA_OK :                               PTLLND_RDMA_FAIL);        else                ptlrc = PtlGet(mdh,                               rx->rx_initiator,                               *kptllnd_tunables.kptl_portal,                               0,                     /* acl cookie */                               rxmsg->ptlm_u.rdma.kptlrm_matchbits,                               0);                    /* offset */        if (ptlrc != PTL_OK) {                CERROR("Ptl%s failed: %s(%d)\n",                        (type == TX_TYPE_GET_RESPONSE) ? "Put" : "Get",                       kptllnd_errtype2str(ptlrc), ptlrc);                                kptllnd_peer_close(peer, -EIO);                /* Everything (including this RDMA) queued on the peer will                 * be completed with failure */        }        return 0;}intkptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg){        lnet_hdr_t       *hdr = &lntmsg->msg_hdr;        int               type = lntmsg->msg_type;        lnet_process_id_t target = lntmsg->msg_target;        int               target_is_router = lntmsg->msg_target_is_router;        int               routing = lntmsg->msg_routing;        unsigned int      payload_niov = lntmsg->msg_niov;        struct iovec     *payload_iov = lntmsg->msg_iov;        lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;        unsigned int      payload_offset = lntmsg->msg_offset;        unsigned int      payload_nob = lntmsg->msg_len;        kptl_peer_t      *peer;        kptl_tx_t        *tx;        int               nob;        int               nfrag;        int               rc;        LASSERT (payload_nob == 0 || payload_niov > 0);        LASSERT (payload_niov <= LNET_MAX_IOV);        LASSERT (payload_niov <= PTL_MD_MAX_IOV); /* !!! */        LASSERT (!(payload_kiov != NULL && payload_iov != NULL));        LASSERT (!in_interrupt());        rc = kptllnd_find_target(&peer, target);        if (rc != 0)                return rc;                switch (type) {        default:                LBUG();                return -EINVAL;        case LNET_MSG_REPLY:        case LNET_MSG_PUT:                /* Should the payload avoid RDMA? */                nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[payload_nob]);                if (payload_kiov == NULL &&                     nob <= peer->peer_max_msg_size)                        break;                tx = kptllnd_get_idle_tx(TX_TYPE_PUT_REQUEST);                if (tx == NULL) {                        CERROR("Can't send %s to %s: can't allocate descriptor\n",                               lnet_msgtyp2str(type),                               libcfs_id2str(target));                        rc = -ENOMEM;                        goto out;                }                kptllnd_init_rdma_md(tx, payload_niov,                                      payload_iov, payload_kiov,                                     payload_offset, payload_nob);                tx->tx_lnet_msg = lntmsg;                tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;                kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_PUT,                                  sizeof(kptl_rdma_msg_t));                CDEBUG(D_NETTRACE, "%s: passive PUT p %d %p\n",                       libcfs_id2str(target),                       le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx);                kptllnd_tx_launch(peer, tx, 0);                goto out;        case LNET_MSG_GET:                /* routed gets don't RDMA */                if (target_is_router || routing)                        break;                /* Is the payload small enough not to need RDMA? */                nob = lntmsg->msg_md->md_length;                nob = offsetof(kptl_msg_t,                                ptlm_u.immediate.kptlim_payload[nob]);                if (nob <= peer->peer_max_msg_size)                        break;                tx = kptllnd_get_idle_tx(TX_TYPE_GET_REQUEST);                if (tx == NULL) {                        CERROR("Can't send GET to %s: can't allocate descriptor\n",                               libcfs_id2str(target));                        rc = -ENOMEM;                        goto out;                }                tx->tx_lnet_replymsg =                        lnet_create_reply_msg(kptllnd_data.kptl_ni, lntmsg);                if (tx->tx_lnet_replymsg == NULL) {                        CERROR("Failed to allocate LNET reply for %s\n",                               libcfs_id2str(target));                        kptllnd_tx_decref(tx);                        rc = -ENOMEM;                        goto out;                }                if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)                        kptllnd_init_rdma_md(tx, lntmsg->msg_md->md_niov,                                             lntmsg->msg_md->md_iov.iov, NULL,                                             0, lntmsg->msg_md->md_length);                else                        kptllnd_init_rdma_md(tx, lntmsg->msg_md->md_niov,                                             NULL, lntmsg->msg_md->md_iov.kiov,                                             0, lntmsg->msg_md->md_length);                                tx->tx_lnet_msg = lntmsg;                tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;                kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_GET,                                  sizeof(kptl_rdma_msg_t));                CDEBUG(D_NETTRACE, "%s: passive GET p %d %p\n",                       libcfs_id2str(target),                       le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx);                kptllnd_tx_launch(peer, tx, 0);                goto out;        case LNET_MSG_ACK:                CDEBUG(D_NET, "LNET_MSG_ACK\n");                LASSERT (payload_nob == 0);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -