📄 ralnd_cb.c
字号:
unsigned long flags; long timeout; int i; int conn_entries = kranal_data.kra_conn_hash_size; int conn_index = 0; int base_index = conn_entries - 1; unsigned long next_check_time = jiffies; long next_min_timeout = MAX_SCHEDULE_TIMEOUT; long current_min_timeout = 1; cfs_daemonize("kranal_reaper"); cfs_block_allsigs(); init_waitqueue_entry(&wait, current); spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); while (!kranal_data.kra_shutdown) { /* I wake up every 'p' seconds to check for timeouts on some * more peers. I try to check every connection 'n' times * within the global minimum of all keepalive and timeout * intervals, to ensure I attend to every connection within * (n+1)/n times its timeout intervals. */ const int p = 1; const int n = 3; unsigned long min_timeout; int chunk; /* careful with the jiffy wrap... */ timeout = (long)(next_check_time - jiffies); if (timeout > 0) { set_current_state(TASK_INTERRUPTIBLE); add_wait_queue(&kranal_data.kra_reaper_waitq, &wait); spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags); schedule_timeout(timeout); spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); set_current_state(TASK_RUNNING); remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait); continue; } if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) { /* new min timeout set: restart min timeout scan */ next_min_timeout = MAX_SCHEDULE_TIMEOUT; base_index = conn_index - 1; if (base_index < 0) base_index = conn_entries - 1; if (kranal_data.kra_new_min_timeout < current_min_timeout) { current_min_timeout = kranal_data.kra_new_min_timeout; CDEBUG(D_NET, "Set new min timeout %ld\n", current_min_timeout); } kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT; } min_timeout = current_min_timeout; spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags); LASSERT (min_timeout > 0); /* Compute how many table entries to check now so I get round * the whole table fast enough given that I do this at fixed * intervals of 'p' seconds) */ chunk = conn_entries; if (min_timeout > n * p) chunk = (chunk * n * p) / min_timeout; if (chunk == 0) chunk = 1; for (i = 0; i < chunk; i++) { kranal_reaper_check(conn_index, &next_min_timeout); conn_index = (conn_index + 1) % conn_entries; } next_check_time += p * HZ; spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); if (((conn_index - chunk <= base_index && base_index < conn_index) || (conn_index - conn_entries - chunk <= base_index && base_index < conn_index - conn_entries))) { /* Scanned all conns: set current_min_timeout... */ if (current_min_timeout != next_min_timeout) { current_min_timeout = next_min_timeout; CDEBUG(D_NET, "Set new min timeout %ld\n", current_min_timeout); } /* ...and restart min timeout scan */ next_min_timeout = MAX_SCHEDULE_TIMEOUT; base_index = conn_index - 1; if (base_index < 0) base_index = conn_entries - 1; } } kranal_thread_fini(); return 0;}voidkranal_check_rdma_cq (kra_device_t *dev){ kra_conn_t *conn; kra_tx_t *tx; RAP_RETURN rrc; unsigned long flags; RAP_RDMA_DESCRIPTOR *desc; __u32 cqid; __u32 event_type; for (;;) { rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type); if (rrc == RAP_NOT_DONE) { CDEBUG(D_NET, "RDMA CQ %d empty\n", dev->rad_id); return; } LASSERT (rrc == RAP_SUCCESS); LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0); read_lock(&kranal_data.kra_global_lock); conn = kranal_cqid2conn_locked(cqid); if (conn == NULL) { /* Conn was destroyed? */ CDEBUG(D_NET, "RDMA CQID lookup %d failed\n", cqid); read_unlock(&kranal_data.kra_global_lock); continue; } rrc = RapkRdmaDone(conn->rac_rihandle, &desc); LASSERT (rrc == RAP_SUCCESS); CDEBUG(D_NET, "Completed %p\n", list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list)); spin_lock_irqsave(&conn->rac_lock, flags); LASSERT (!list_empty(&conn->rac_rdmaq)); tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list); list_del(&tx->tx_list); LASSERT(desc->AppPtr == (void *)tx); LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE || tx->tx_msg.ram_type == RANAL_MSG_GET_DONE); list_add_tail(&tx->tx_list, &conn->rac_fmaq); tx->tx_qtime = jiffies; spin_unlock_irqrestore(&conn->rac_lock, flags); /* Get conn's fmaq processed, now I've just put something * there */ kranal_schedule_conn(conn); read_unlock(&kranal_data.kra_global_lock); }}voidkranal_check_fma_cq (kra_device_t *dev){ kra_conn_t *conn; RAP_RETURN rrc; __u32 cqid; __u32 event_type; struct list_head *conns; struct list_head *tmp; int i; for (;;) { rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type); if (rrc == RAP_NOT_DONE) { CDEBUG(D_NET, "FMA CQ %d empty\n", dev->rad_id); return; } LASSERT (rrc == RAP_SUCCESS); if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) { read_lock(&kranal_data.kra_global_lock); conn = kranal_cqid2conn_locked(cqid); if (conn == NULL) { CDEBUG(D_NET, "FMA CQID lookup %d failed\n", cqid); } else { CDEBUG(D_NET, "FMA completed: %p CQID %d\n", conn, cqid); kranal_schedule_conn(conn); } read_unlock(&kranal_data.kra_global_lock); continue; } /* FMA CQ has overflowed: check ALL conns */ CWARN("FMA CQ overflow: scheduling ALL conns on device %d\n", dev->rad_id); for (i = 0; i < kranal_data.kra_conn_hash_size; i++) { read_lock(&kranal_data.kra_global_lock); conns = &kranal_data.kra_conns[i]; list_for_each (tmp, conns) { conn = list_entry(tmp, kra_conn_t, rac_hashlist); if (conn->rac_device == dev) kranal_schedule_conn(conn); } /* don't block write lockers for too long... */ read_unlock(&kranal_data.kra_global_lock); } }}intkranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg, void *immediate, int immediatenob){ int sync = (msg->ram_type & RANAL_MSG_FENCE) != 0; RAP_RETURN rrc; CDEBUG(D_NET,"%p sending msg %p %02x%s [%p for %d]\n", conn, msg, msg->ram_type, sync ? "(sync)" : "", immediate, immediatenob); LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX); LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ? immediatenob <= RANAL_FMA_MAX_DATA : immediatenob == 0); msg->ram_connstamp = conn->rac_my_connstamp; msg->ram_seq = conn->rac_tx_seq; if (sync) rrc = RapkFmaSyncSend(conn->rac_rihandle, immediate, immediatenob, msg, sizeof(*msg)); else rrc = RapkFmaSend(conn->rac_rihandle, immediate, immediatenob, msg, sizeof(*msg)); switch (rrc) { default: LBUG(); case RAP_SUCCESS: conn->rac_last_tx = jiffies; conn->rac_tx_seq++; return 0; case RAP_NOT_DONE: if (time_after_eq(jiffies, conn->rac_last_tx + conn->rac_keepalive*HZ)) CWARN("EAGAIN sending %02x (idle %lu secs)\n", msg->ram_type, (jiffies - conn->rac_last_tx)/HZ); return -EAGAIN; }}voidkranal_process_fmaq (kra_conn_t *conn){ unsigned long flags; int more_to_do; kra_tx_t *tx; int rc; int expect_reply; /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now. * However I will be rescheduled by an FMA completion event * when I eventually get some. * NB 2. Sampling rac_state here races with setting it elsewhere. * But it doesn't matter if I try to send a "real" message just * as I start closing because I'll get scheduled to send the * close anyway. */ /* Not racing with incoming message processing! */ LASSERT (current == conn->rac_device->rad_scheduler); if (conn->rac_state != RANAL_CONN_ESTABLISHED) { if (!list_empty(&conn->rac_rdmaq)) { /* RDMAs in progress */ LASSERT (!conn->rac_close_sent); if (time_after_eq(jiffies, conn->rac_last_tx + conn->rac_keepalive * HZ)) { CDEBUG(D_NET, "sending NOOP (rdma in progress)\n"); kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP); kranal_sendmsg(conn, &conn->rac_msg, NULL, 0); } return; } if (conn->rac_close_sent) return; CWARN("sending CLOSE to %s\n", libcfs_nid2str(conn->rac_peer->rap_nid)); kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE); rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0); if (rc != 0) return; conn->rac_close_sent = 1; if (!conn->rac_close_recvd) return; write_lock_irqsave(&kranal_data.kra_global_lock, flags); if (conn->rac_state == RANAL_CONN_CLOSING) kranal_terminate_conn_locked(conn); write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); return; } spin_lock_irqsave(&conn->rac_lock, flags); if (list_empty(&conn->rac_fmaq)) { spin_unlock_irqrestore(&conn->rac_lock, flags); if (time_after_eq(jiffies, conn->rac_last_tx + conn->rac_keepalive * HZ)) { CDEBUG(D_NET, "sending NOOP -> %s (%p idle %lu(%ld))\n", libcfs_nid2str(conn->rac_peer->rap_nid), conn, (jiffies - conn->rac_last_tx)/HZ, conn->rac_keepalive); kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP); kranal_sendmsg(conn, &conn->rac_msg, NULL, 0); } return; } tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list); list_del(&tx->tx_list); more_to_do = !list_empty(&conn->rac_fmaq); spin_unlock_irqrestore(&conn->rac_lock, flags); expect_reply = 0; CDEBUG(D_NET, "sending regular msg: %p, type %02x, cookie "LPX64"\n", tx, tx->tx_msg.ram_type, tx->tx_cookie); switch (tx->tx_msg.ram_type) { default: LBUG(); case RANAL_MSG_IMMEDIATE: rc = kranal_sendmsg(conn, &tx->tx_msg, tx->tx_buffer, tx->tx_nob); break; case RANAL_MSG_PUT_NAK: case RANAL_MSG_PUT_DONE: case RANAL_MSG_GET_NAK: case RANAL_MSG_GET_DONE: rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0); break;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -