⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ptllnd_peer.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 4 页
字号:
         * the end any time I drop the lock. */        list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {                peer = list_entry (tmp, kptl_peer_t, peer_list);                LASSERT (peer->peer_state == PEER_STATE_CLOSING);                list_del(&peer->peer_list);                list_add_tail(&peer->peer_list,                              &kptllnd_data.kptl_zombie_peers);                peer->peer_state = PEER_STATE_ZOMBIE;                write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);                kptllnd_peer_notify(peer);                kptllnd_peer_cancel_txs(peer, &txs);                kptllnd_peer_decref(peer);                write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);        }        write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);        /* Drop peer's ref on all cancelled txs.  This will get         * kptllnd_tx_fini() to abort outstanding comms if necessary. */        list_for_each_safe (tmp, nxt, &txs) {                tx = list_entry(tmp, kptl_tx_t, tx_list);                list_del(&tx->tx_list);                kptllnd_tx_decref(tx);        }}voidkptllnd_peer_close_locked(kptl_peer_t *peer, int why){        switch (peer->peer_state) {        default:                LBUG();        case PEER_STATE_WAITING_HELLO:        case PEER_STATE_ACTIVE:                /* Ensure new peers see a new incarnation of me */                LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);                if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)                        kptllnd_data.kptl_incarnation++;                /* Removing from peer table */                kptllnd_data.kptl_n_active_peers--;                LASSERT (kptllnd_data.kptl_n_active_peers >= 0);                list_del(&peer->peer_list);                kptllnd_peer_unreserve_buffers();                peer->peer_error = why; /* stash 'why' only on first close */                peer->peer_state = PEER_STATE_CLOSING;                /* Schedule for immediate attention, taking peer table's ref */                list_add_tail(&peer->peer_list,                               &kptllnd_data.kptl_closing_peers);                wake_up(&kptllnd_data.kptl_watchdog_waitq);                break;        case PEER_STATE_ZOMBIE:        case PEER_STATE_CLOSING:                break;        }}voidkptllnd_peer_close(kptl_peer_t *peer, int why){        unsigned long      flags;        write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);        kptllnd_peer_close_locked(peer, why);        write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);}intkptllnd_peer_del(lnet_process_id_t id){        struct list_head  *ptmp;        struct list_head  *pnxt;        kptl_peer_t       *peer;        int                lo;        int                hi;        int                i;        unsigned long      flags;        int                rc = -ENOENT;        /*         * Find the single bucket we are supposed to look at or if nid is a         * wildcard (LNET_NID_ANY) then look at all of the buckets         */        if (id.nid != LNET_NID_ANY) {                struct list_head *l = kptllnd_nid2peerlist(id.nid);                                lo = hi =  l - kptllnd_data.kptl_peers;        } else {                if (id.pid != LNET_PID_ANY)                        return -EINVAL;                                lo = 0;                hi = kptllnd_data.kptl_peer_hash_size - 1;        }again:        read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);        for (i = lo; i <= hi; i++) {                list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {                        peer = list_entry (ptmp, kptl_peer_t, peer_list);                        if (!(id.nid == LNET_NID_ANY ||                               (peer->peer_id.nid == id.nid &&                               (id.pid == LNET_PID_ANY ||                                 peer->peer_id.pid == id.pid))))                                continue;                        kptllnd_peer_addref(peer); /* 1 ref for me... */                        read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,                                               flags);                        kptllnd_peer_close(peer, 0);                        kptllnd_peer_decref(peer); /* ...until here */                        rc = 0;         /* matched something */                        /* start again now I've dropped the lock */                        goto again;                }        }        read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);        return (rc);}voidkptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag){        /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */        ptl_handle_md_t  msg_mdh;        ptl_md_t         md;        ptl_err_t        prc;        unsigned long    flags;        LASSERT (!tx->tx_idle);        LASSERT (!tx->tx_active);        LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));        LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));        LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||                 tx->tx_type == TX_TYPE_PUT_REQUEST ||                 tx->tx_type == TX_TYPE_GET_REQUEST);        kptllnd_set_tx_peer(tx, peer);        memset(&md, 0, sizeof(md));        md.threshold = tx->tx_acked ? 2 : 1;    /* SEND END + ACK? */        md.options = PTL_MD_OP_PUT |                     PTL_MD_LUSTRE_COMPLETION_SEMANTICS |                     PTL_MD_EVENT_START_DISABLE;        md.user_ptr = &tx->tx_msg_eventarg;        md.eq_handle = kptllnd_data.kptl_eqh;        if (nfrag == 0) {                md.start = tx->tx_msg;                md.length = tx->tx_msg->ptlm_nob;        } else {                LASSERT (nfrag > 1);                LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);                md.start = tx->tx_frags;                md.length = nfrag;                md.options |= PTL_MD_IOVEC;        }        prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);        if (prc != PTL_OK) {                CERROR("PtlMDBind(%s) failed: %s(%d)\n",                       libcfs_id2str(peer->peer_id),                       kptllnd_errtype2str(prc), prc);                tx->tx_status = -EIO;                kptllnd_tx_decref(tx);                return;        }                spin_lock_irqsave(&peer->peer_lock, flags);        tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);        tx->tx_active = 1;        tx->tx_msg_mdh = msg_mdh;	/* Ensure HELLO is sent first */	if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)		list_add(&tx->tx_list, &peer->peer_sendq);	else		list_add_tail(&tx->tx_list, &peer->peer_sendq);        spin_unlock_irqrestore(&peer->peer_lock, flags);}voidkptllnd_peer_check_sends (kptl_peer_t *peer){        ptl_handle_me_t  meh;        kptl_tx_t       *tx;        int              rc;        unsigned long    flags;        LASSERT(!in_interrupt());        spin_lock_irqsave(&peer->peer_lock, flags);        peer->peer_retry_noop = 0;        if (list_empty(&peer->peer_sendq) &&            peer->peer_outstanding_credits >= PTLLND_CREDIT_HIGHWATER &&            peer->peer_credits != 0) {                /* post a NOOP to return credits */                spin_unlock_irqrestore(&peer->peer_lock, flags);                tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);                if (tx == NULL) {                        CERROR("Can't return credits to %s: can't allocate descriptor\n",                               libcfs_id2str(peer->peer_id));                } else {                        kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);                        kptllnd_post_tx(peer, tx, 0);                }                spin_lock_irqsave(&peer->peer_lock, flags);                peer->peer_retry_noop = (tx == NULL);        }        while (!list_empty(&peer->peer_sendq)) {                tx = list_entry (peer->peer_sendq.next, kptl_tx_t, tx_list);                LASSERT (tx->tx_active);                LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));                LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));                LASSERT (peer->peer_outstanding_credits >= 0);                LASSERT (peer->peer_sent_credits >= 0);                LASSERT (peer->peer_sent_credits +                         peer->peer_outstanding_credits <=                         *kptllnd_tunables.kptl_peercredits);                LASSERT (peer->peer_credits >= 0);		/* Ensure HELLO is sent first */		if (!peer->peer_sent_hello) {			if (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_HELLO)				break;			peer->peer_sent_hello = 1;		}                if (peer->peer_credits == 0) {                        CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %p\n",                               libcfs_id2str(peer->peer_id),                                peer->peer_credits,                               peer->peer_outstanding_credits,                                peer->peer_sent_credits, tx);                        break;                }                /* Don't use the last credit unless I've got credits to                 * return */                if (peer->peer_credits == 1 &&                    peer->peer_outstanding_credits == 0) {                        CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "                               "not using last credit for %p\n",                               libcfs_id2str(peer->peer_id),                                peer->peer_credits,                               peer->peer_outstanding_credits,                               peer->peer_sent_credits, tx);                        break;                }                list_del(&tx->tx_list);                /* Discard any NOOP I queued if I'm not at the high-water mark                 * any more or more messages have been queued */                if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP &&                    (!list_empty(&peer->peer_sendq) ||                     peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)) {                        tx->tx_active = 0;                        spin_unlock_irqrestore(&peer->peer_lock, flags);                        CDEBUG(D_NET, "%s: redundant noop\n",                                libcfs_id2str(peer->peer_id));                        kptllnd_tx_decref(tx);                        spin_lock_irqsave(&peer->peer_lock, flags);                        continue;                }                /* fill last-minute msg fields */                kptllnd_msg_pack(tx->tx_msg, peer);                if (tx->tx_type == TX_TYPE_PUT_REQUEST ||                    tx->tx_type == TX_TYPE_GET_REQUEST) {                        /* peer_next_matchbits must be known good */                        LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);                        /* Assume 64-bit matchbits can't wrap */                        LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);                        tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =                                peer->peer_next_matchbits++;                }                                peer->peer_sent_credits += peer->peer_outstanding_credits;                peer->peer_outstanding_credits = 0;                peer->peer_credits--;                CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",                       libcfs_id2str(peer->peer_id), peer->peer_credits,                       peer->peer_outstanding_credits, peer->peer_sent_credits,                       kptllnd_msgtype2str(tx->tx_msg->ptlm_type),                       tx, tx->tx_msg->ptlm_nob,

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -