📄 ralnd_cb.c
字号:
&rxmsg->ram_u.get.ragm_desc, nob, rxmsg->ram_u.get.ragm_cookie); return; failed_1: kranal_tx_done(tx, -EIO); failed_0: lnet_finalize(ni, lntmsg, -EIO);}intkranal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, void **new_private){ kra_conn_t *conn = (kra_conn_t *)private; LCONSOLE_ERROR_MSG(0x12b, "Dropping message from %s: no buffers free.\n", libcfs_nid2str(conn->rac_peer->rap_nid)); return -EDEADLK;}intkranal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed, unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov, unsigned int offset, unsigned int mlen, unsigned int rlen){ kra_conn_t *conn = private; kra_msg_t *rxmsg = conn->rac_rxmsg; kra_tx_t *tx; void *buffer; int rc; LASSERT (mlen <= rlen); LASSERT (!in_interrupt()); /* Either all pages or all vaddrs */ LASSERT (!(kiov != NULL && iov != NULL)); CDEBUG(D_NET, "conn %p, rxmsg %p, lntmsg %p\n", conn, rxmsg, lntmsg); switch(rxmsg->ram_type) { default: LBUG(); case RANAL_MSG_IMMEDIATE: if (mlen == 0) { buffer = NULL; } else if (kiov != NULL) { CERROR("Can't recv immediate into paged buffer\n"); return -EIO; } else { LASSERT (niov > 0); while (offset >= iov->iov_len) { offset -= iov->iov_len; iov++; niov--; LASSERT (niov > 0); } if (mlen > iov->iov_len - offset) { CERROR("Can't handle immediate frags\n"); return -EIO; } buffer = ((char *)iov->iov_base) + offset; } rc = kranal_consume_rxmsg(conn, buffer, mlen); lnet_finalize(ni, lntmsg, (rc == 0) ? 0 : -EIO); return 0; case RANAL_MSG_PUT_REQ: tx = kranal_new_tx_msg(RANAL_MSG_PUT_ACK); if (tx == NULL) { kranal_consume_rxmsg(conn, NULL, 0); return -ENOMEM; } rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen); if (rc != 0) { kranal_tx_done(tx, rc); kranal_consume_rxmsg(conn, NULL, 0); return -EIO; } tx->tx_conn = conn; rc = kranal_map_buffer(tx); if (rc != 0) { kranal_tx_done(tx, rc); kranal_consume_rxmsg(conn, NULL, 0); return -EIO; } tx->tx_msg.ram_u.putack.rapam_src_cookie = conn->rac_rxmsg->ram_u.putreq.raprm_cookie; tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie; tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key; tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits = (__u64)((unsigned long)tx->tx_buffer); tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen; tx->tx_lntmsg[0] = lntmsg; /* finalize this on RDMA_DONE */ kranal_post_fma(conn, tx); kranal_consume_rxmsg(conn, NULL, 0); return 0; case RANAL_MSG_GET_REQ: if (lntmsg != NULL) { /* Matched! */ kranal_reply(ni, conn, lntmsg); } else { /* No match */ tx = kranal_new_tx_msg(RANAL_MSG_GET_NAK); if (tx != NULL) { tx->tx_msg.ram_u.completion.racm_cookie = rxmsg->ram_u.get.ragm_cookie; kranal_post_fma(conn, tx); } } kranal_consume_rxmsg(conn, NULL, 0); return 0; }}intkranal_thread_start (int(*fn)(void *arg), void *arg){ long pid = kernel_thread(fn, arg, 0); if (pid < 0) return(int)pid; atomic_inc(&kranal_data.kra_nthreads); return 0;}voidkranal_thread_fini (void){ atomic_dec(&kranal_data.kra_nthreads);}intkranal_check_conn_timeouts (kra_conn_t *conn){ kra_tx_t *tx; struct list_head *ttmp; unsigned long flags; long timeout; unsigned long now = jiffies; LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED || conn->rac_state == RANAL_CONN_CLOSING); if (!conn->rac_close_sent && time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) { /* not sent in a while; schedule conn so scheduler sends a keepalive */ CDEBUG(D_NET, "Scheduling keepalive %p->%s\n", conn, libcfs_nid2str(conn->rac_peer->rap_nid)); kranal_schedule_conn(conn); } timeout = conn->rac_timeout * HZ; if (!conn->rac_close_recvd && time_after_eq(now, conn->rac_last_rx + timeout)) { CERROR("%s received from %s within %lu seconds\n", (conn->rac_state == RANAL_CONN_ESTABLISHED) ? "Nothing" : "CLOSE not", libcfs_nid2str(conn->rac_peer->rap_nid), (now - conn->rac_last_rx)/HZ); return -ETIMEDOUT; } if (conn->rac_state != RANAL_CONN_ESTABLISHED) return 0; /* Check the conn's queues are moving. These are "belt+braces" checks, * in case of hardware/software errors that make this conn seem * responsive even though it isn't progressing its message queues. */ spin_lock_irqsave(&conn->rac_lock, flags); list_for_each (ttmp, &conn->rac_fmaq) { tx = list_entry(ttmp, kra_tx_t, tx_list); if (time_after_eq(now, tx->tx_qtime + timeout)) { spin_unlock_irqrestore(&conn->rac_lock, flags); CERROR("tx on fmaq for %s blocked %lu seconds\n", libcfs_nid2str(conn->rac_peer->rap_nid), (now - tx->tx_qtime)/HZ); return -ETIMEDOUT; } } list_for_each (ttmp, &conn->rac_rdmaq) { tx = list_entry(ttmp, kra_tx_t, tx_list); if (time_after_eq(now, tx->tx_qtime + timeout)) { spin_unlock_irqrestore(&conn->rac_lock, flags); CERROR("tx on rdmaq for %s blocked %lu seconds\n", libcfs_nid2str(conn->rac_peer->rap_nid), (now - tx->tx_qtime)/HZ); return -ETIMEDOUT; } } list_for_each (ttmp, &conn->rac_replyq) { tx = list_entry(ttmp, kra_tx_t, tx_list); if (time_after_eq(now, tx->tx_qtime + timeout)) { spin_unlock_irqrestore(&conn->rac_lock, flags); CERROR("tx on replyq for %s blocked %lu seconds\n", libcfs_nid2str(conn->rac_peer->rap_nid), (now - tx->tx_qtime)/HZ); return -ETIMEDOUT; } } spin_unlock_irqrestore(&conn->rac_lock, flags); return 0;}voidkranal_reaper_check (int idx, unsigned long *min_timeoutp){ struct list_head *conns = &kranal_data.kra_conns[idx]; struct list_head *ctmp; kra_conn_t *conn; unsigned long flags; int rc; again: /* NB. We expect to check all the conns and not find any problems, so * we just use a shared lock while we take a look... */ read_lock(&kranal_data.kra_global_lock); list_for_each (ctmp, conns) { conn = list_entry(ctmp, kra_conn_t, rac_hashlist); if (conn->rac_timeout < *min_timeoutp ) *min_timeoutp = conn->rac_timeout; if (conn->rac_keepalive < *min_timeoutp ) *min_timeoutp = conn->rac_keepalive; rc = kranal_check_conn_timeouts(conn); if (rc == 0) continue; kranal_conn_addref(conn); read_unlock(&kranal_data.kra_global_lock); CERROR("Conn to %s, cqid %d timed out\n", libcfs_nid2str(conn->rac_peer->rap_nid), conn->rac_cqid); write_lock_irqsave(&kranal_data.kra_global_lock, flags); switch (conn->rac_state) { default: LBUG(); case RANAL_CONN_ESTABLISHED: kranal_close_conn_locked(conn, -ETIMEDOUT); break; case RANAL_CONN_CLOSING: kranal_terminate_conn_locked(conn); break; } write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); kranal_conn_decref(conn); /* start again now I've dropped the lock */ goto again; } read_unlock(&kranal_data.kra_global_lock);}intkranal_connd (void *arg){ long id = (long)arg; char name[16]; wait_queue_t wait; unsigned long flags; kra_peer_t *peer; kra_acceptsock_t *ras; int did_something; snprintf(name, sizeof(name), "kranal_connd_%02ld", id); cfs_daemonize(name); cfs_block_allsigs(); init_waitqueue_entry(&wait, current); spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); while (!kranal_data.kra_shutdown) { did_something = 0; if (!list_empty(&kranal_data.kra_connd_acceptq)) { ras = list_entry(kranal_data.kra_connd_acceptq.next, kra_acceptsock_t, ras_list); list_del(&ras->ras_list); spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); CDEBUG(D_NET,"About to handshake someone\n"); kranal_conn_handshake(ras->ras_sock, NULL); kranal_free_acceptsock(ras); CDEBUG(D_NET,"Finished handshaking someone\n"); spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); did_something = 1; } if (!list_empty(&kranal_data.kra_connd_peers)) { peer = list_entry(kranal_data.kra_connd_peers.next, kra_peer_t, rap_connd_list); list_del_init(&peer->rap_connd_list); spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); kranal_connect(peer); kranal_peer_decref(peer); spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); did_something = 1; } if (did_something) continue; set_current_state(TASK_INTERRUPTIBLE); add_wait_queue_exclusive(&kranal_data.kra_connd_waitq, &wait); spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); schedule (); set_current_state(TASK_RUNNING); remove_wait_queue(&kranal_data.kra_connd_waitq, &wait); spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); } spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); kranal_thread_fini(); return 0;}voidkranal_update_reaper_timeout(long timeout){ unsigned long flags; LASSERT (timeout > 0); spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); if (timeout < kranal_data.kra_new_min_timeout) kranal_data.kra_new_min_timeout = timeout; spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);}intkranal_reaper (void *arg){ wait_queue_t wait;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -