📄 ptllnd_cb.c
字号:
break; } /* I don't have to handle kiovs */ LASSERT (payload_nob == 0 || payload_iov != NULL); tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE); if (tx == NULL) { CERROR("Can't send %s to %s: can't allocate descriptor\n", lnet_msgtyp2str(type), libcfs_id2str(target)); rc = -ENOMEM; goto out; } tx->tx_lnet_msg = lntmsg; tx->tx_msg->ptlm_u.immediate.kptlim_hdr = *hdr; if (payload_nob == 0) { nfrag = 0; } else { tx->tx_frags->iov[0].iov_base = tx->tx_msg; tx->tx_frags->iov[0].iov_len = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload); /* NB relying on lustre not asking for PTL_MD_MAX_IOV * fragments!! */#ifdef _USING_LUSTRE_PORTALS_ nfrag = 1 + lnet_extract_iov(PTL_MD_MAX_IOV - 1, &tx->tx_frags->iov[1], payload_niov, payload_iov, payload_offset, payload_nob);#else nfrag = 1 + kptllnd_extract_iov(PTL_MD_MAX_IOV - 1, &tx->tx_frags->iov[1], payload_niov, payload_iov, payload_offset, payload_nob);#endif } nob = offsetof(kptl_immediate_msg_t, kptlim_payload[payload_nob]); kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, nob); CDEBUG(D_NETTRACE, "%s: immediate %s p %d %p\n", libcfs_id2str(target), lnet_msgtyp2str(lntmsg->msg_type), (le32_to_cpu(lntmsg->msg_type) == LNET_MSG_PUT) ? le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index) : (le32_to_cpu(lntmsg->msg_type) == LNET_MSG_GET) ? le32_to_cpu(lntmsg->msg_hdr.msg.get.ptl_index) : -1, tx); kptllnd_tx_launch(peer, tx, nfrag); out: kptllnd_peer_decref(peer); return rc;}int kptllnd_eager_recv(struct lnet_ni *ni, void *private, lnet_msg_t *msg, void **new_privatep){ kptl_rx_t *rx = private; CDEBUG(D_NET, "Eager RX=%p RXB=%p\n", rx, rx->rx_rxb); /* I have to release my ref on rxb (if I have one) to ensure I'm an * eager receiver, so I copy the incoming request from the buffer it * landed in, into space reserved in the descriptor... */#if (PTL_MD_LOCAL_ALIGN8 == 0) if (rx->rx_rxb == NULL) /* already copied */ return 0; /* to fix alignment */#else LASSERT(rx->rx_rxb != NULL);#endif LASSERT(rx->rx_nob <= *kptllnd_tunables.kptl_max_msg_size); memcpy(rx->rx_space, rx->rx_msg, rx->rx_nob); rx->rx_msg = (kptl_msg_t *)rx->rx_space; kptllnd_rx_buffer_decref(rx->rx_rxb); rx->rx_rxb = NULL; return 0;}int kptllnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed, unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov, unsigned int offset, unsigned int mlen, unsigned int rlen){ kptl_rx_t *rx = private; kptl_msg_t *rxmsg = rx->rx_msg; int nob; int rc; CDEBUG(D_NET, "%s niov=%d offset=%d mlen=%d rlen=%d\n", kptllnd_msgtype2str(rxmsg->ptlm_type), niov, offset, mlen, rlen); LASSERT (mlen <= rlen); LASSERT (mlen >= 0); LASSERT (!in_interrupt()); LASSERT (!(kiov != NULL && iov != NULL)); /* never both */ LASSERT (niov <= PTL_MD_MAX_IOV); /* !!! */#ifdef CRAY_XT3 if (lntmsg != NULL && rx->rx_uid != 0) { /* Set the UID if the sender's uid isn't 0; i.e. non-root * running in userspace (e.g. a catamount node; linux kernel * senders, including routers have uid 0). If this is a lustre * RPC request, this tells lustre not to trust the creds in the * RPC message body. */ lnet_set_msg_uid(ni, lntmsg, rx->rx_uid); }#endif switch(rxmsg->ptlm_type) { default: LBUG(); rc = -EINVAL; break; case PTLLND_MSG_TYPE_IMMEDIATE: CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE %d,%d\n", mlen, rlen); nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[rlen]); if (nob > rx->rx_nob) { CERROR ("Immediate message from %s too big: %d(%d)\n", libcfs_id2str(rx->rx_peer->peer_id), nob, rx->rx_nob); rc = -EINVAL; break; } if (kiov != NULL) lnet_copy_flat2kiov( niov, kiov, offset, *kptllnd_tunables.kptl_max_msg_size, rxmsg->ptlm_u.immediate.kptlim_payload, 0, mlen); else lnet_copy_flat2iov( niov, iov, offset, *kptllnd_tunables.kptl_max_msg_size, rxmsg->ptlm_u.immediate.kptlim_payload, 0, mlen); lnet_finalize (ni, lntmsg, 0); rc = 0; break; case PTLLND_MSG_TYPE_GET: CDEBUG(D_NET, "PTLLND_MSG_TYPE_GET %d,%d\n", mlen, rlen); /* NB always send RDMA so the peer can complete. I send * success/failure in the portals 'hdr_data' */ if (lntmsg == NULL) rc = kptllnd_active_rdma(rx, NULL, TX_TYPE_GET_RESPONSE, 0, NULL, NULL, 0, 0); else rc = kptllnd_active_rdma(rx, lntmsg, TX_TYPE_GET_RESPONSE, lntmsg->msg_niov, lntmsg->msg_iov, lntmsg->msg_kiov, lntmsg->msg_offset, lntmsg->msg_len); break; case PTLLND_MSG_TYPE_PUT: CDEBUG(D_NET, "PTLLND_MSG_TYPE_PUT %d,%d\n", mlen, rlen); /* NB always send RDMA so the peer can complete; it'll be 0 * bytes if there was no match (lntmsg == NULL). I have no way * to let my peer know this, but she's only interested in when * the net has stopped accessing her buffer in any case. */ rc = kptllnd_active_rdma(rx, lntmsg, TX_TYPE_PUT_RESPONSE, niov, iov, kiov, offset, mlen); break; } /* * We're done with the RX */ kptllnd_rx_done(rx); return rc;}voidkptllnd_eq_callback(ptl_event_t *ev){ kptl_eventarg_t *eva = ev->md.user_ptr; switch (eva->eva_type) { default: LBUG(); case PTLLND_EVENTARG_TYPE_MSG: case PTLLND_EVENTARG_TYPE_RDMA: kptllnd_tx_callback(ev); break; case PTLLND_EVENTARG_TYPE_BUF: kptllnd_rx_buffer_callback(ev); break; }}voidkptllnd_thread_fini (void){ atomic_dec(&kptllnd_data.kptl_nthreads);}intkptllnd_thread_start (int (*fn)(void *arg), void *arg){ long pid; atomic_inc(&kptllnd_data.kptl_nthreads); pid = kernel_thread (fn, arg, 0); if (pid >= 0) return 0; CERROR("Failed to start kernel_thread: error %d\n", (int)pid); kptllnd_thread_fini(); return (int)pid;}intkptllnd_watchdog(void *arg){ int id = (long)arg; char name[16]; wait_queue_t waitlink; int stamp = 0; int peer_index = 0; unsigned long deadline = jiffies; int timeout; int i; snprintf(name, sizeof(name), "kptllnd_wd_%02d", id); cfs_daemonize(name); cfs_block_allsigs(); init_waitqueue_entry(&waitlink, current); /* threads shut down in phase 2 after all peers have been destroyed */ while (kptllnd_data.kptl_shutdown < 2) { timeout = (int)(deadline - jiffies); if (timeout <= 0) { const int n = 4; const int p = 1; int chunk = kptllnd_data.kptl_peer_hash_size; /* Time to check for RDMA timeouts on a few more * peers: I do checks every 'p' seconds on a * proportion of the peer table and I need to check * every connection 'n' times within a timeout * interval, to ensure I detect a timeout on any * connection within (n+1)/n times the timeout * interval. */ if ((*kptllnd_tunables.kptl_timeout) > n * p) chunk = (chunk * n * p) / (*kptllnd_tunables.kptl_timeout); if (chunk == 0) chunk = 1; for (i = 0; i < chunk; i++) { kptllnd_peer_check_bucket(peer_index, stamp); peer_index = (peer_index + 1) % kptllnd_data.kptl_peer_hash_size; } deadline += p * HZ; stamp++; continue; } kptllnd_handle_closing_peers(); set_current_state(TASK_INTERRUPTIBLE); add_wait_queue_exclusive(&kptllnd_data.kptl_watchdog_waitq, &waitlink); schedule_timeout(timeout); set_current_state (TASK_RUNNING); remove_wait_queue(&kptllnd_data.kptl_watchdog_waitq, &waitlink); } kptllnd_thread_fini(); CDEBUG(D_NET, "<<<\n"); return (0);};intkptllnd_scheduler (void *arg){ int id = (long)arg; char name[16]; wait_queue_t waitlink; unsigned long flags; int did_something; int counter = 0; kptl_rx_t *rx; kptl_rx_buffer_t *rxb; kptl_tx_t *tx; snprintf(name, sizeof(name), "kptllnd_sd_%02d", id); cfs_daemonize(name); cfs_block_allsigs(); init_waitqueue_entry(&waitlink, current); spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags); /* threads shut down in phase 2 after all peers have been destroyed */ while (kptllnd_data.kptl_shutdown < 2) { did_something = 0; if (!list_empty(&kptllnd_data.kptl_sched_rxq)) { rx = list_entry (kptllnd_data.kptl_sched_rxq.next, kptl_rx_t, rx_list); list_del(&rx->rx_list); spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags); kptllnd_rx_parse(rx); did_something = 1; spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags); } if (!list_empty(&kptllnd_data.kptl_sched_rxbq)) { rxb = list_entry (kptllnd_data.kptl_sched_rxbq.next, kptl_rx_buffer_t, rxb_repost_list); list_del(&rxb->rxb_repost_list); spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags); kptllnd_rx_buffer_post(rxb); did_something = 1; spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags); } if (!list_empty(&kptllnd_data.kptl_sched_txq)) { tx = list_entry (kptllnd_data.kptl_sched_txq.next, kptl_tx_t, tx_list); list_del_init(&tx->tx_list); spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags); kptllnd_tx_fini(tx); did_something = 1; spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags); } if (did_something) { if (++counter != *kptllnd_tunables.kptl_reschedule_loops) continue; } set_current_state(TASK_INTERRUPTIBLE); add_wait_queue_exclusive(&kptllnd_data.kptl_sched_waitq, &waitlink); spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags); if (!did_something) schedule(); else cond_resched(); set_current_state(TASK_RUNNING); remove_wait_queue(&kptllnd_data.kptl_sched_waitq, &waitlink); spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags); counter = 0; } spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags); kptllnd_thread_fini(); return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -