⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ralnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
        kranal_unmap_buffer(tx);        lnetmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;        lnetmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;        tx->tx_buftype = RANAL_BUF_NONE;        tx->tx_msg.ram_type = RANAL_MSG_NONE;        tx->tx_conn = NULL;        spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);        list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);        spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);        /* finalize AFTER freeing lnet msgs */        for (i = 0; i < 2; i++) {                if (lnetmsg[i] == NULL)                        continue;                lnet_finalize(kranal_data.kra_ni, lnetmsg[i], completion);        }}kra_conn_t *kranal_find_conn_locked (kra_peer_t *peer){        struct list_head *tmp;        /* just return the first connection */        list_for_each (tmp, &peer->rap_conns) {                return list_entry(tmp, kra_conn_t, rac_list);        }        return NULL;}voidkranal_post_fma (kra_conn_t *conn, kra_tx_t *tx){        unsigned long    flags;        tx->tx_conn = conn;        spin_lock_irqsave(&conn->rac_lock, flags);        list_add_tail(&tx->tx_list, &conn->rac_fmaq);        tx->tx_qtime = jiffies;        spin_unlock_irqrestore(&conn->rac_lock, flags);        kranal_schedule_conn(conn);}voidkranal_launch_tx (kra_tx_t *tx, lnet_nid_t nid){        unsigned long    flags;        kra_peer_t      *peer;        kra_conn_t      *conn;        int              rc;        int              retry;        rwlock_t        *g_lock = &kranal_data.kra_global_lock;        /* If I get here, I've committed to send, so I complete the tx with         * failure on any problems */        LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */        for (retry = 0; ; retry = 1) {                read_lock(g_lock);                peer = kranal_find_peer_locked(nid);                if (peer != NULL) {                        conn = kranal_find_conn_locked(peer);                        if (conn != NULL) {                                kranal_post_fma(conn, tx);                                read_unlock(g_lock);                                return;                        }                }                                /* Making connections; I'll need a write lock... */                read_unlock(g_lock);                write_lock_irqsave(g_lock, flags);                peer = kranal_find_peer_locked(nid);                if (peer != NULL)                        break;                                write_unlock_irqrestore(g_lock, flags);                                if (retry) {                        CERROR("Can't find peer %s\n", libcfs_nid2str(nid));                        kranal_tx_done(tx, -EHOSTUNREACH);                        return;                }                rc = kranal_add_persistent_peer(nid, LNET_NIDADDR(nid),                                                lnet_acceptor_port());                if (rc != 0) {                        CERROR("Can't add peer %s: %d\n",                               libcfs_nid2str(nid), rc);                        kranal_tx_done(tx, rc);                        return;                }        }                conn = kranal_find_conn_locked(peer);        if (conn != NULL) {                /* Connection exists; queue message on it */                kranal_post_fma(conn, tx);                write_unlock_irqrestore(g_lock, flags);                return;        }                                LASSERT (peer->rap_persistence > 0);        if (!peer->rap_connecting) {                LASSERT (list_empty(&peer->rap_tx_queue));                if (!(peer->rap_reconnect_interval == 0 || /* first attempt */                      time_after_eq(jiffies, peer->rap_reconnect_time))) {                        write_unlock_irqrestore(g_lock, flags);                        kranal_tx_done(tx, -EHOSTUNREACH);                        return;                }                peer->rap_connecting = 1;                kranal_peer_addref(peer); /* extra ref for connd */                spin_lock(&kranal_data.kra_connd_lock);                list_add_tail(&peer->rap_connd_list,                              &kranal_data.kra_connd_peers);                wake_up(&kranal_data.kra_connd_waitq);                spin_unlock(&kranal_data.kra_connd_lock);        }        /* A connection is being established; queue the message... */        list_add_tail(&tx->tx_list, &peer->rap_tx_queue);        write_unlock_irqrestore(g_lock, flags);}voidkranal_rdma(kra_tx_t *tx, int type,            kra_rdma_desc_t *sink, int nob, __u64 cookie){        kra_conn_t   *conn = tx->tx_conn;        RAP_RETURN    rrc;        unsigned long flags;        LASSERT (kranal_tx_mapped(tx));        LASSERT (nob <= sink->rard_nob);        LASSERT (nob <= tx->tx_nob);        /* No actual race with scheduler sending CLOSE (I'm she!) */        LASSERT (current == conn->rac_device->rad_scheduler);        memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));        tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);        tx->tx_rdma_desc.SrcKey = tx->tx_map_key;        tx->tx_rdma_desc.DstPtr = sink->rard_addr;        tx->tx_rdma_desc.DstKey = sink->rard_key;        tx->tx_rdma_desc.Length = nob;        tx->tx_rdma_desc.AppPtr = tx;        /* prep final completion message */        kranal_init_msg(&tx->tx_msg, type);        tx->tx_msg.ram_u.completion.racm_cookie = cookie;        if (nob == 0) { /* Immediate completion */                kranal_post_fma(conn, tx);                return;        }        LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */        rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);        LASSERT (rrc == RAP_SUCCESS);        spin_lock_irqsave(&conn->rac_lock, flags);        list_add_tail(&tx->tx_list, &conn->rac_rdmaq);        tx->tx_qtime = jiffies;        spin_unlock_irqrestore(&conn->rac_lock, flags);}intkranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob){        __u32      nob_received = nob;        RAP_RETURN rrc;        LASSERT (conn->rac_rxmsg != NULL);        CDEBUG(D_NET, "Consuming %p\n", conn);        rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,                             &nob_received, sizeof(kra_msg_t));        LASSERT (rrc == RAP_SUCCESS);        conn->rac_rxmsg = NULL;        if (nob_received < nob) {                CWARN("Incomplete immediate msg from %s: expected %d, got %d\n",                      libcfs_nid2str(conn->rac_peer->rap_nid),                       nob, nob_received);                return -EPROTO;        }        return 0;}intkranal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg){        lnet_hdr_t       *hdr = &lntmsg->msg_hdr;        int               type = lntmsg->msg_type;        lnet_process_id_t target = lntmsg->msg_target;        int               target_is_router = lntmsg->msg_target_is_router;        int               routing = lntmsg->msg_routing;        unsigned int      niov = lntmsg->msg_niov;        struct iovec     *iov = lntmsg->msg_iov;        lnet_kiov_t      *kiov = lntmsg->msg_kiov;        unsigned int      offset = lntmsg->msg_offset;        unsigned int      nob = lntmsg->msg_len;        kra_tx_t         *tx;        int               rc;        /* NB 'private' is different depending on what we're sending.... */        CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",               nob, niov, libcfs_id2str(target));        LASSERT (nob == 0 || niov > 0);        LASSERT (niov <= LNET_MAX_IOV);        LASSERT (!in_interrupt());        /* payload is either all vaddrs or all pages */        LASSERT (!(kiov != NULL && iov != NULL));        if (routing) {                CERROR ("Can't route\n");                return -EIO;        }        switch(type) {        default:                LBUG();        case LNET_MSG_ACK:                LASSERT (nob == 0);                break;        case LNET_MSG_GET:                LASSERT (niov == 0);                LASSERT (nob == 0);                /* We have to consider the eventual sink buffer rather than any                 * payload passed here (there isn't any, and strictly, looking                 * inside lntmsg is a layering violation).  We send a simple                 * IMMEDIATE GET if the sink buffer is mapped already and small                 * enough for FMA */                if (routing || target_is_router)                        break;                  /* send IMMEDIATE */                if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0 &&                    lntmsg->msg_md->md_length <= RANAL_FMA_MAX_DATA &&                    lntmsg->msg_md->md_length <= *kranal_tunables.kra_max_immediate)                        break;                  /* send IMMEDIATE */                tx = kranal_new_tx_msg(RANAL_MSG_GET_REQ);                if (tx == NULL)                        return -ENOMEM;                if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)                        rc = kranal_setup_virt_buffer(tx, lntmsg->msg_md->md_niov,                                                      lntmsg->msg_md->md_iov.iov,                                                      0, lntmsg->msg_md->md_length);                else                        rc = kranal_setup_phys_buffer(tx, lntmsg->msg_md->md_niov,                                                      lntmsg->msg_md->md_iov.kiov,                                                      0, lntmsg->msg_md->md_length);                if (rc != 0) {                        kranal_tx_done(tx, rc);                        return -EIO;                }                tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);                if (tx->tx_lntmsg[1] == NULL) {                        CERROR("Can't create reply for GET to %s\n",                                libcfs_nid2str(target.nid));                        kranal_tx_done(tx, rc);                        return -EIO;                }                tx->tx_lntmsg[0] = lntmsg;                tx->tx_msg.ram_u.get.ragm_hdr = *hdr;                /* rest of tx_msg is setup just before it is sent */                kranal_launch_tx(tx, target.nid);                return 0;        case LNET_MSG_REPLY:        case LNET_MSG_PUT:                if (kiov == NULL &&             /* not paged */                    nob <= RANAL_FMA_MAX_DATA && /* small enough */                    nob <= *kranal_tunables.kra_max_immediate)                        break;                  /* send IMMEDIATE */                tx = kranal_new_tx_msg(RANAL_MSG_PUT_REQ);                if (tx == NULL)                        return -ENOMEM;                rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);                if (rc != 0) {                        kranal_tx_done(tx, rc);                        return -EIO;                }                tx->tx_lntmsg[0] = lntmsg;                tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;                /* rest of tx_msg is setup just before it is sent */                kranal_launch_tx(tx, target.nid);                return 0;        }        /* send IMMEDIATE */        LASSERT (kiov == NULL);        LASSERT (nob <= RANAL_FMA_MAX_DATA);        tx = kranal_new_tx_msg(RANAL_MSG_IMMEDIATE);        if (tx == NULL)                return -ENOMEM;        rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);        if (rc != 0) {                kranal_tx_done(tx, rc);                return -EIO;        }        tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;        tx->tx_lntmsg[0] = lntmsg;        kranal_launch_tx(tx, target.nid);        return 0;}voidkranal_reply(lnet_ni_t *ni, kra_conn_t *conn, lnet_msg_t *lntmsg){        kra_msg_t     *rxmsg = conn->rac_rxmsg;        unsigned int   niov = lntmsg->msg_niov;        struct iovec  *iov = lntmsg->msg_iov;        lnet_kiov_t   *kiov = lntmsg->msg_kiov;        unsigned int   offset = lntmsg->msg_offset;        unsigned int   nob = lntmsg->msg_len;        kra_tx_t      *tx;        int            rc;        tx = kranal_get_idle_tx();        if (tx == NULL)                goto failed_0;        rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);        if (rc != 0)                goto failed_1;        tx->tx_conn = conn;        rc = kranal_map_buffer(tx);        if (rc != 0)                goto failed_1;        tx->tx_lntmsg[0] = lntmsg;        kranal_rdma(tx, RANAL_MSG_GET_DONE,

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -