⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ptllnd_tx.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 2 页
字号:
                  tx->tx_replymsg == NULL));        spin_unlock_irqrestore(&peer->peer_lock, flags);        if (!PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE)) {                prc = PtlMDUnlink(msg_mdh);                if (prc == PTL_OK)                        msg_mdh = PTL_INVALID_HANDLE;        }        if (!PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {                prc = PtlMDUnlink(rdma_mdh);                if (prc == PTL_OK)                        rdma_mdh = PTL_INVALID_HANDLE;        }        spin_lock_irqsave(&peer->peer_lock, flags);        /* update tx_???_mdh if callback hasn't fired */        if (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE))                msg_mdh = PTL_INVALID_HANDLE;        else                tx->tx_msg_mdh = msg_mdh;                if (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE))                rdma_mdh = PTL_INVALID_HANDLE;        else                tx->tx_rdma_mdh = rdma_mdh;        if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&            PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {                spin_unlock_irqrestore(&peer->peer_lock, flags);                return 0;        }        /* stash the tx on its peer until it completes */        atomic_set(&tx->tx_refcount, 1);        tx->tx_active = 1;        list_add_tail(&tx->tx_list, &peer->peer_activeq);        kptllnd_peer_addref(peer);              /* extra ref for me... */        spin_unlock_irqrestore(&peer->peer_lock, flags);        /* This will get the watchdog thread to try aborting all the peer's         * comms again.  NB, this deems it fair that 1 failing tx which can't         * be aborted immediately (i.e. its MDs are still busy) is valid cause         * to nuke everything to the same peer! */        kptllnd_peer_close(peer, tx->tx_status);        kptllnd_peer_decref(peer);        return -EAGAIN;}#endifvoidkptllnd_tx_fini (kptl_tx_t *tx){        lnet_msg_t     *replymsg = tx->tx_lnet_replymsg;        lnet_msg_t     *msg      = tx->tx_lnet_msg;        kptl_peer_t    *peer     = tx->tx_peer;        int             status   = tx->tx_status;        int             rc;        LASSERT (!in_interrupt());        LASSERT (atomic_read(&tx->tx_refcount) == 0);        LASSERT (!tx->tx_idle);        LASSERT (!tx->tx_active);        /* TX has completed or failed */        if (peer != NULL) {                rc = kptllnd_tx_abort_netio(tx);                if (rc != 0)                        return;        }        LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));        LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));        tx->tx_lnet_msg = tx->tx_lnet_replymsg = NULL;        tx->tx_peer = NULL;        tx->tx_idle = 1;        spin_lock(&kptllnd_data.kptl_tx_lock);        list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);        spin_unlock(&kptllnd_data.kptl_tx_lock);        /* Must finalize AFTER freeing 'tx' */        if (msg != NULL)                lnet_finalize(kptllnd_data.kptl_ni, msg,                              (replymsg == NULL) ? status : 0);        if (replymsg != NULL)                lnet_finalize(kptllnd_data.kptl_ni, replymsg, status);        if (peer != NULL)                kptllnd_peer_decref(peer);}const char *kptllnd_tx_typestr(int type){        switch (type) {        default:                return "<TYPE UNKNOWN>";                        case TX_TYPE_SMALL_MESSAGE:                return "msg";        case TX_TYPE_PUT_REQUEST:                return "put_req";        case TX_TYPE_GET_REQUEST:                return "get_req";                break;        case TX_TYPE_PUT_RESPONSE:                return "put_rsp";                break;        case TX_TYPE_GET_RESPONSE:                return "get_rsp";        }}voidkptllnd_tx_callback(ptl_event_t *ev){        kptl_eventarg_t *eva = ev->md.user_ptr;        int              ismsg = (eva->eva_type == PTLLND_EVENTARG_TYPE_MSG);        kptl_tx_t       *tx = kptllnd_eventarg2obj(eva);        kptl_peer_t     *peer = tx->tx_peer;        int              ok = (ev->ni_fail_type == PTL_OK);        int              unlinked;        unsigned long    flags;        LASSERT (peer != NULL);        LASSERT (eva->eva_type == PTLLND_EVENTARG_TYPE_MSG ||                 eva->eva_type == PTLLND_EVENTARG_TYPE_RDMA);        LASSERT (!ismsg || !PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));        LASSERT (ismsg || !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));#ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS        unlinked = ev->unlinked;#else        unlinked = (ev->type == PTL_EVENT_UNLINK);#endif        CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s(%d) tx=%p fail=%s(%d) unlinked=%d\n",               libcfs_id2str(peer->peer_id), peer->peer_credits,               peer->peer_outstanding_credits, peer->peer_sent_credits,               kptllnd_evtype2str(ev->type), ev->type,                tx, kptllnd_errtype2str(ev->ni_fail_type),               ev->ni_fail_type, unlinked);        switch (tx->tx_type) {        default:                LBUG();                        case TX_TYPE_SMALL_MESSAGE:                LASSERT (ismsg);                LASSERT (ev->type == PTL_EVENT_UNLINK ||                         ev->type == PTL_EVENT_SEND_END ||                         (ev->type == PTL_EVENT_ACK && tx->tx_acked));                break;        case TX_TYPE_PUT_REQUEST:                LASSERT (ev->type == PTL_EVENT_UNLINK ||                         (ismsg && ev->type == PTL_EVENT_SEND_END) ||                         (ismsg && ev->type == PTL_EVENT_ACK && tx->tx_acked) ||                         (!ismsg && ev->type == PTL_EVENT_GET_END));                break;        case TX_TYPE_GET_REQUEST:                LASSERT (ev->type == PTL_EVENT_UNLINK ||                         (ismsg && ev->type == PTL_EVENT_SEND_END) ||                         (ismsg && ev->type == PTL_EVENT_ACK && tx->tx_acked) ||                         (!ismsg && ev->type == PTL_EVENT_PUT_END));                if (!ismsg && ok && ev->type == PTL_EVENT_PUT_END) {                        if (ev->hdr_data == PTLLND_RDMA_OK) {                                lnet_set_reply_msg_len(                                        kptllnd_data.kptl_ni,                                        tx->tx_lnet_replymsg,                                        ev->mlength);                        } else {                                /* no match at peer */                                tx->tx_status = -EIO;                        }                }                break;        case TX_TYPE_PUT_RESPONSE:                LASSERT (!ismsg);                LASSERT (ev->type == PTL_EVENT_UNLINK ||                         ev->type == PTL_EVENT_SEND_END ||                         ev->type == PTL_EVENT_REPLY_END);                break;        case TX_TYPE_GET_RESPONSE:                LASSERT (!ismsg);                LASSERT (ev->type == PTL_EVENT_UNLINK ||                         ev->type == PTL_EVENT_SEND_END ||                         (ev->type == PTL_EVENT_ACK && tx->tx_acked));                break;        }        if (ok) {                kptllnd_peer_alive(peer);        } else {                CERROR("Portals error to %s: %s(%d) tx=%p fail=%s(%d) unlinked=%d\n",                       libcfs_id2str(peer->peer_id),                       kptllnd_evtype2str(ev->type), ev->type,                        tx, kptllnd_errtype2str(ev->ni_fail_type),                       ev->ni_fail_type, unlinked);                tx->tx_status = -EIO;                 kptllnd_peer_close(peer, -EIO);        }        if (!unlinked)                return;        spin_lock_irqsave(&peer->peer_lock, flags);        if (ismsg)                tx->tx_msg_mdh = PTL_INVALID_HANDLE;        else                tx->tx_rdma_mdh = PTL_INVALID_HANDLE;        if (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ||            !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ||            !tx->tx_active) {                spin_unlock_irqrestore(&peer->peer_lock, flags);                return;        }        list_del(&tx->tx_list);        tx->tx_active = 0;        spin_unlock_irqrestore(&peer->peer_lock, flags);        /* drop peer's ref, but if it was the last one... */        if (atomic_dec_and_test(&tx->tx_refcount)) {                /* ...finalize it in thread context! */                spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);                list_add_tail(&tx->tx_list, &kptllnd_data.kptl_sched_txq);                wake_up(&kptllnd_data.kptl_sched_waitq);                spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);        }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -