⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ptllnd_peer.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 4 页
字号:
                       tx->tx_msg->ptlm_credits);                list_add_tail(&tx->tx_list, &peer->peer_activeq);                kptllnd_tx_addref(tx);          /* 1 ref for me... */                spin_unlock_irqrestore(&peer->peer_lock, flags);                if (tx->tx_type == TX_TYPE_PUT_REQUEST ||                    tx->tx_type == TX_TYPE_GET_REQUEST) {                        /* Post bulk now we have safe matchbits */                        rc = PtlMEAttach(kptllnd_data.kptl_nih,                                         *kptllnd_tunables.kptl_portal,                                         peer->peer_ptlid,                                         tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,                                         0,             /* ignore bits */                                         PTL_UNLINK,                                         PTL_INS_BEFORE,                                         &meh);                        if (rc != PTL_OK) {                                CERROR("PtlMEAttach(%s) failed: %s(%d)\n",                                       libcfs_id2str(peer->peer_id),                                       kptllnd_errtype2str(rc), rc);                                goto failed;                        }                        rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,                                         &tx->tx_rdma_mdh);                        if (rc != PTL_OK) {                                CERROR("PtlMDAttach(%s) failed: %s(%d)\n",                                       libcfs_id2str(tx->tx_peer->peer_id),                                       kptllnd_errtype2str(rc), rc);                                rc = PtlMEUnlink(meh);                                LASSERT(rc == PTL_OK);                                tx->tx_rdma_mdh = PTL_INVALID_HANDLE;                                goto failed;                        }                        /* I'm not racing with the event callback here.  It's a                         * bug if there's an event on the MD I just attached                         * before I actually send the RDMA request message -                         * probably matchbits re-used in error. */                }                tx->tx_tposted = jiffies;       /* going on the wire */                rc = PtlPut (tx->tx_msg_mdh,                             tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,                             peer->peer_ptlid,                             *kptllnd_tunables.kptl_portal,                             0,                 /* acl cookie */                             LNET_MSG_MATCHBITS,                             0,                 /* offset */                             0);                /* header data */                if (rc != PTL_OK) {                        CERROR("PtlPut %s error %s(%d)\n",                               libcfs_id2str(peer->peer_id),                               kptllnd_errtype2str(rc), rc);                        goto failed;                }                kptllnd_tx_decref(tx);          /* drop my ref */                spin_lock_irqsave(&peer->peer_lock, flags);        }        spin_unlock_irqrestore(&peer->peer_lock, flags);        return; failed:        /* Nuke everything (including tx we were trying) */        kptllnd_peer_close(peer, -EIO);        kptllnd_tx_decref(tx);}kptl_tx_t *kptllnd_find_timed_out_tx(kptl_peer_t *peer){        kptl_tx_t         *tx;        struct list_head  *tmp;        list_for_each(tmp, &peer->peer_sendq) {                tx = list_entry(peer->peer_sendq.next, kptl_tx_t, tx_list);                if (time_after_eq(jiffies, tx->tx_deadline)) {                        kptllnd_tx_addref(tx);                        return tx;                }        }        list_for_each(tmp, &peer->peer_activeq) {                tx = list_entry(peer->peer_activeq.next, kptl_tx_t, tx_list);                if (time_after_eq(jiffies, tx->tx_deadline)) {                        kptllnd_tx_addref(tx);                        return tx;                }        }        return NULL;}voidkptllnd_peer_check_bucket (int idx, int stamp){        struct list_head  *peers = &kptllnd_data.kptl_peers[idx];        struct list_head  *ptmp;        kptl_peer_t       *peer;        kptl_tx_t         *tx;        unsigned long      flags;        int                nsend;        int                nactive;        int                check_sends;        CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp); again:        /* NB. Shared lock while I just look */        read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);        list_for_each (ptmp, peers) {                peer = list_entry (ptmp, kptl_peer_t, peer_list);                CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",                       libcfs_id2str(peer->peer_id), peer->peer_credits,                        peer->peer_outstanding_credits, peer->peer_sent_credits);                spin_lock(&peer->peer_lock);                if (peer->peer_check_stamp == stamp) {                        /* checked already this pass */                        spin_unlock(&peer->peer_lock);                        continue;                }                peer->peer_check_stamp = stamp;                tx = kptllnd_find_timed_out_tx(peer);                check_sends = peer->peer_retry_noop;                                spin_unlock(&peer->peer_lock);                                if (tx == NULL && !check_sends)                        continue;                kptllnd_peer_addref(peer); /* 1 ref for me... */                read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);                if (tx == NULL) { /* nothing timed out */                        kptllnd_peer_check_sends(peer);                        kptllnd_peer_decref(peer); /* ...until here or... */                        /* rescan after dropping the lock */                        goto again;                }                spin_lock_irqsave(&peer->peer_lock, flags);                nsend = kptllnd_count_queue(&peer->peer_sendq);                nactive = kptllnd_count_queue(&peer->peer_activeq);                spin_unlock_irqrestore(&peer->peer_lock, flags);                LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n",                                   libcfs_id2str(peer->peer_id),                                   (tx->tx_tposted == 0) ?                                    "no free peer buffers" :                                    "please check Portals");                CERROR("%s timed out: cred %d outstanding %d, sent %d, "                       "sendq %d, activeq %d Tx %p %s (%s%s%s) status %d "                       "%sposted %lu T/O %ds\n",                       libcfs_id2str(peer->peer_id), peer->peer_credits,                       peer->peer_outstanding_credits, peer->peer_sent_credits,                       nsend, nactive, tx, kptllnd_tx_typestr(tx->tx_type),                       tx->tx_active ? "A" : "",                       PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?                       "" : "M",                       PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?                       "" : "D",                       tx->tx_status,                       (tx->tx_tposted == 0) ? "not " : "",                       (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),                       *kptllnd_tunables.kptl_timeout);                kptllnd_dump_ptltrace();                kptllnd_tx_decref(tx);                kptllnd_peer_close(peer, -ETIMEDOUT);                kptllnd_peer_decref(peer); /* ...until here */                /* start again now I've dropped the lock */                goto again;        }        read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);}kptl_peer_t *kptllnd_id2peer_locked (lnet_process_id_t id){        struct list_head *peers = kptllnd_nid2peerlist(id.nid);        struct list_head *tmp;        kptl_peer_t      *peer;        list_for_each (tmp, peers) {                peer = list_entry (tmp, kptl_peer_t, peer_list);                LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||                        peer->peer_state == PEER_STATE_ACTIVE);                                if (peer->peer_id.nid != id.nid ||                    peer->peer_id.pid != id.pid)                        continue;                kptllnd_peer_addref(peer);                CDEBUG(D_NET, "%s -> %s (%d)\n",                       libcfs_id2str(id),                        kptllnd_ptlid2str(peer->peer_ptlid),                       atomic_read (&peer->peer_refcount));                return peer;        }        return NULL;}voidkptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id){        LCONSOLE_ERROR_MSG(0x127, "%s %s overflows the peer table[%d]: "                           "messages may be dropped\n",                           str, libcfs_id2str(id),                           kptllnd_data.kptl_n_active_peers);        LCONSOLE_ERROR_MSG(0x128, "Please correct by increasing "                           "'max_nodes' or 'max_procs_per_node'\n");}__u64kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid){        kptl_peer_t            *peer;        struct list_head       *tmp;        /* Find the last matchbits I saw this new peer using.  Note..           A. This peer cannot be in the peer table - she's new!           B. If I can't find the peer in the closing/zombie peers, all              matchbits are safe because all refs to the (old) peer have gone              so all txs have completed so there's no risk of matchbit              collision!         */        LASSERT(kptllnd_id2peer_locked(lpid) == NULL);        /* peer's last matchbits can't change after it comes out of the peer         * table, so first match is fine */        list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {                peer = list_entry (tmp, kptl_peer_t, peer_list);                if (peer->peer_id.nid == lpid.nid &&                    peer->peer_id.pid == lpid.pid)                        return peer->peer_last_matchbits_seen;        }                list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {                peer = list_entry (tmp, kptl_peer_t, peer_list);                if (peer->peer_id.nid == lpid.nid &&                    peer->peer_id.pid == lpid.pid)                        return peer->peer_last_matchbits_seen;        }                return PTL_RESERVED_MATCHBITS;}kptl_peer_t *kptllnd_peer_handle_hello (ptl_process_id_t  initiator,                           kptl_msg_t       *msg){        rwlock_t           *g_lock = &kptllnd_data.kptl_peer_rw_lock;        kptl_peer_t        *peer;        kptl_peer_t        *new_peer;        lnet_process_id_t   lpid;        unsigned long       flags;        kptl_tx_t          *hello_tx;        int                 rc;        __u64               safe_matchbits;        __u64               last_matchbits_seen;        lpid.nid = msg->ptlm_srcnid;        lpid.pid = msg->ptlm_srcpid;        CDEBUG(D_NET, "hello from %s(%s)\n",               libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));        if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&            (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {                /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be                 * userspace.  Refuse the connection if she hasn't set the                 * correct flag in her PID... */                CERROR("Userflag not set in hello from %s (%s)\n",                       libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));                return NULL;        }                /* kptlhm_matchbits are the highest matchbits my peer may have used to         * RDMA to me.  I ensure I never register buffers for RDMA that could         * match any she used */        safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;        if (safe_matchbits < PTL_RESERVED_MATCHBITS) {                CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",		       safe_matchbits, libcfs_id2str(lpid));		return NULL;	}	        if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {                CERROR("%s: max message size %d < MIN %d",                       libcfs_id2str(lpid),                       msg->ptlm_u.hello.kptlhm_max_msg_size,                       PTLLND_MIN_BUFFER_SIZE);                return NULL;        }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -