📄 ptllnd_cb.c
字号:
libcfs_id2str(srcid)); ptllnd_peer_decref(plp); return; } /* Check peer only sends when I've sent her credits */ if (plp->plp_sent_credits == 0) { CERROR("%s[%d/%d+%d(%d)]: unexpected message\n", libcfs_id2str(plp->plp_id), plp->plp_credits, plp->plp_outstanding_credits, plp->plp_sent_credits, plni->plni_peer_credits + plp->plp_lazy_credits); return; } plp->plp_sent_credits--; /* No check for credit overflow - the peer may post new buffers after * the startup handshake. */ if (msg->ptlm_credits > 0) { plp->plp_credits += msg->ptlm_credits; ptllnd_check_sends(plp); } /* All OK so far; assume the message is good... */ rx.rx_peer = plp; rx.rx_msg = msg; rx.rx_nob = nob; plni->plni_nrxs++; switch (msg->ptlm_type) { default: /* message types have been checked already */ ptllnd_rx_done(&rx); break; case PTLLND_MSG_TYPE_PUT: case PTLLND_MSG_TYPE_GET: rc = lnet_parse(ni, &msg->ptlm_u.rdma.kptlrm_hdr, msg->ptlm_srcnid, &rx, 1); if (rc < 0) ptllnd_rx_done(&rx); break; case PTLLND_MSG_TYPE_IMMEDIATE: rc = lnet_parse(ni, &msg->ptlm_u.immediate.kptlim_hdr, msg->ptlm_srcnid, &rx, 0); if (rc < 0) ptllnd_rx_done(&rx); break; } ptllnd_peer_decref(plp);}voidptllnd_buf_event (lnet_ni_t *ni, ptl_event_t *event){ ptllnd_buffer_t *buf = ptllnd_eventarg2obj(event->md.user_ptr); ptllnd_ni_t *plni = ni->ni_data; char *msg = &buf->plb_buffer[event->offset]; int repost; int unlinked = event->type == PTL_EVENT_UNLINK; LASSERT (buf->plb_ni == ni); LASSERT (event->type == PTL_EVENT_PUT_END || event->type == PTL_EVENT_UNLINK); if (event->ni_fail_type != PTL_NI_OK) { CERROR("event type %s(%d), status %s(%d) from %s\n", ptllnd_evtype2str(event->type), event->type, ptllnd_errtype2str(event->ni_fail_type), event->ni_fail_type, ptllnd_ptlid2str(event->initiator)); } else if (event->type == PTL_EVENT_PUT_END) {#if (PTL_MD_LOCAL_ALIGN8 == 0) /* Portals can't force message alignment - someone sending an * odd-length message could misalign subsequent messages */ if ((event->mlength & 7) != 0) { CERROR("Message from %s has odd length %llu: " "probable version incompatibility\n", ptllnd_ptlid2str(event->initiator), event->mlength); LBUG(); }#endif LASSERT ((event->offset & 7) == 0); ptllnd_parse_request(ni, event->initiator, (kptl_msg_t *)msg, event->mlength); }#ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS /* UNLINK event only on explicit unlink */ repost = (event->unlinked && event->type != PTL_EVENT_UNLINK); if (event->unlinked) unlinked = 1;#else /* UNLINK event only on implicit unlink */ repost = (event->type == PTL_EVENT_UNLINK);#endif if (unlinked) { LASSERT(buf->plb_posted); buf->plb_posted = 0; plni->plni_nposted_buffers--; } if (repost) (void) ptllnd_post_buffer(buf);}voidptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event){ ptllnd_ni_t *plni = ni->ni_data; ptllnd_tx_t *tx = ptllnd_eventarg2obj(event->md.user_ptr); int error = (event->ni_fail_type != PTL_NI_OK); int isreq; int isbulk;#ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS int unlinked = event->unlinked;#else int unlinked = (event->type == PTL_EVENT_UNLINK);#endif if (error) CERROR("Error %s(%d) event %s(%d) unlinked %d, %s(%d) for %s\n", ptllnd_errtype2str(event->ni_fail_type), event->ni_fail_type, ptllnd_evtype2str(event->type), event->type, unlinked, ptllnd_msgtype2str(tx->tx_type), tx->tx_type, libcfs_id2str(tx->tx_peer->plp_id)); LASSERT (!PtlHandleIsEqual(event->md_handle, PTL_INVALID_HANDLE)); isreq = PtlHandleIsEqual(event->md_handle, tx->tx_reqmdh); if (isreq) { LASSERT (event->md.start == (void *)&tx->tx_msg); if (unlinked) { tx->tx_reqmdh = PTL_INVALID_HANDLE; gettimeofday(&tx->tx_req_done, NULL); } } isbulk = PtlHandleIsEqual(event->md_handle, tx->tx_bulkmdh); if ( isbulk && unlinked ) { tx->tx_bulkmdh = PTL_INVALID_HANDLE; gettimeofday(&tx->tx_bulk_done, NULL); } LASSERT (!isreq != !isbulk); /* always one and only 1 match */ PTLLND_HISTORY("%s[%d/%d+%d(%d)]: TX done %p %s%s", libcfs_id2str(tx->tx_peer->plp_id), tx->tx_peer->plp_credits, tx->tx_peer->plp_outstanding_credits, tx->tx_peer->plp_sent_credits, plni->plni_peer_credits + tx->tx_peer->plp_lazy_credits, tx, isreq ? "REQ" : "BULK", unlinked ? "(unlinked)" : ""); LASSERT (!isreq != !isbulk); /* always one and only 1 match */ switch (tx->tx_type) { default: LBUG(); case PTLLND_MSG_TYPE_NOOP: case PTLLND_MSG_TYPE_HELLO: case PTLLND_MSG_TYPE_IMMEDIATE: LASSERT (event->type == PTL_EVENT_UNLINK || event->type == PTL_EVENT_SEND_END); LASSERT (isreq); break; case PTLLND_MSG_TYPE_GET: LASSERT (event->type == PTL_EVENT_UNLINK || (isreq && event->type == PTL_EVENT_SEND_END) || (isbulk && event->type == PTL_EVENT_PUT_END)); if (isbulk && !error && event->type == PTL_EVENT_PUT_END) { /* Check GET matched */ if (event->hdr_data == PTLLND_RDMA_OK) { lnet_set_reply_msg_len(ni, tx->tx_lnetreplymsg, event->mlength); } else { CERROR ("Unmatched GET with %s\n", libcfs_id2str(tx->tx_peer->plp_id)); tx->tx_status = -EIO; } } break; case PTLLND_MSG_TYPE_PUT: LASSERT (event->type == PTL_EVENT_UNLINK || (isreq && event->type == PTL_EVENT_SEND_END) || (isbulk && event->type == PTL_EVENT_GET_END)); break; case PTLLND_RDMA_READ: LASSERT (event->type == PTL_EVENT_UNLINK || event->type == PTL_EVENT_SEND_END || event->type == PTL_EVENT_REPLY_END); LASSERT (isbulk); break; case PTLLND_RDMA_WRITE: LASSERT (event->type == PTL_EVENT_UNLINK || event->type == PTL_EVENT_SEND_END); LASSERT (isbulk); } /* Schedule ptllnd_tx_done() on error or last completion event */ if (error || (PtlHandleIsEqual(tx->tx_bulkmdh, PTL_INVALID_HANDLE) && PtlHandleIsEqual(tx->tx_reqmdh, PTL_INVALID_HANDLE))) { if (error) tx->tx_status = -EIO; list_del(&tx->tx_list); list_add_tail(&tx->tx_list, &plni->plni_zombie_txs); }}ptllnd_tx_t *ptllnd_find_timed_out_tx(ptllnd_peer_t *peer){ time_t now = cfs_time_current_sec(); struct list_head *tmp; list_for_each(tmp, &peer->plp_txq) { ptllnd_tx_t *tx = list_entry(tmp, ptllnd_tx_t, tx_list); if (tx->tx_deadline < now) return tx; } list_for_each(tmp, &peer->plp_activeq) { ptllnd_tx_t *tx = list_entry(tmp, ptllnd_tx_t, tx_list); if (tx->tx_deadline < now) return tx; } return NULL;}voidptllnd_check_peer(ptllnd_peer_t *peer){ ptllnd_tx_t *tx = ptllnd_find_timed_out_tx(peer); if (tx == NULL) return; CERROR("%s: timed out\n", libcfs_id2str(peer->plp_id)); ptllnd_close_peer(peer, -ETIMEDOUT);}voidptllnd_watchdog (lnet_ni_t *ni, time_t now){ ptllnd_ni_t *plni = ni->ni_data; const int n = 4; int p = plni->plni_watchdog_interval; int chunk = plni->plni_peer_hash_size; int interval = now - (plni->plni_watchdog_nextt - p); int i; struct list_head *hashlist; struct list_head *tmp; struct list_head *nxt; /* Time to check for RDMA timeouts on a few more peers: * I try to do checks every 'p' seconds on a proportion of the peer * table and I need to check every connection 'n' times within a * timeout interval, to ensure I detect a timeout on any connection * within (n+1)/n times the timeout interval. */ LASSERT (now >= plni->plni_watchdog_nextt); if (plni->plni_timeout > n * interval) { /* Scan less than the whole table? */ chunk = (chunk * n * interval) / plni->plni_timeout; if (chunk == 0) chunk = 1; } for (i = 0; i < chunk; i++) { hashlist = &plni->plni_peer_hash[plni->plni_watchdog_peeridx]; list_for_each_safe(tmp, nxt, hashlist) { ptllnd_check_peer(list_entry(tmp, ptllnd_peer_t, plp_list)); } plni->plni_watchdog_peeridx = (plni->plni_watchdog_peeridx + 1) % plni->plni_peer_hash_size; } plni->plni_watchdog_nextt = now + p;}voidptllnd_wait (lnet_ni_t *ni, int milliseconds){ static struct timeval prevt; static int prevt_count; static int call_count; struct timeval start; struct timeval then; struct timeval now; struct timeval deadline; ptllnd_ni_t *plni = ni->ni_data; ptllnd_tx_t *tx; ptl_event_t event; int which; int rc; int found = 0; int timeout = 0; /* Handle any currently queued events, returning immediately if any. * Otherwise block for the timeout and handle all events queued * then. */ gettimeofday(&start, NULL); call_count++; if (milliseconds <= 0) { deadline = start; } else { deadline.tv_sec = start.tv_sec + milliseconds/1000; deadline.tv_usec = start.tv_usec + (milliseconds % 1000)*1000; if (deadline.tv_usec >= 1000000) { start.tv_usec -= 1000000; start.tv_sec++; } } for (;;) { gettimeofday(&then, NULL); rc = PtlEQPoll(&plni->plni_eqh, 1, timeout, &event, &which); gettimeofday(&now, NULL); if ((now.tv_sec*1000 + now.tv_usec/1000) - (then.tv_sec*1000 + then.tv_usec/1000) > timeout + 1000) { /* 1000 mS grace...........................^ */ CERROR("SLOW PtlEQPoll(%d): %dmS elapsed\n", timeout, (int)(now.tv_sec*1000 + now.tv_usec/1000) - (int)(then.tv_sec*1000 + then.tv_usec/1000)); } if (rc == PTL_EQ_EMPTY) { if (found) /* handled some events */ break; if (now.tv_sec >= plni->plni_watchdog_nextt) { /* check timeouts? */ ptllnd_watchdog(ni, now.tv_sec); LASSERT (now.tv_sec < plni->plni_watchdog_nextt); } if (now.tv_sec > deadline.tv_sec || /* timeout expired */ (now.tv_sec == deadline.tv_sec && now.tv_usec >= deadline.tv_usec))
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -