📄 framework.c
字号:
list_del_init(&rpc->crpc_list); srpc_init_client_rpc(rpc, peer, tsi->tsi_service, nblk, blklen, sfw_test_rpc_done, sfw_test_rpc_fini, tsu); } spin_unlock(&tsi->tsi_lock); if (rpc == NULL) rpc = srpc_create_client_rpc(peer, tsi->tsi_service, nblk, blklen, sfw_test_rpc_done, sfw_test_rpc_fini, tsu); if (rpc == NULL) { CERROR ("Can't create rpc for test %d\n", tsi->tsi_service); return -ENOMEM; } *rpcpp = rpc; return 0;}intsfw_run_test (swi_workitem_t *wi){ sfw_test_unit_t *tsu = wi->wi_data; sfw_test_instance_t *tsi = tsu->tsu_instance; srpc_client_rpc_t *rpc = NULL; LASSERT (wi == &tsu->tsu_worker); if (tsi->tsi_ops->tso_prep_rpc(tsu, tsu->tsu_dest, &rpc) != 0) { LASSERT (rpc == NULL); goto test_done; } LASSERT (rpc != NULL); spin_lock(&tsi->tsi_lock); if (tsi->tsi_stopping) { list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs); spin_unlock(&tsi->tsi_lock); goto test_done; } if (tsu->tsu_loop > 0) tsu->tsu_loop--; list_add_tail(&rpc->crpc_list, &tsi->tsi_active_rpcs); spin_unlock(&tsi->tsi_lock); rpc->crpc_timeout = SFW_TEST_RPC_TIMEOUT; spin_lock(&rpc->crpc_lock); srpc_post_rpc(rpc); spin_unlock(&rpc->crpc_lock); return 0;test_done: /* * No one can schedule me now since: * - previous RPC, if any, has done and * - no new RPC is initiated. * - my batch is still active; no one can run it again now. * Cancel pending schedules and prevent future schedule attempts: */ swi_kill_workitem(wi); sfw_test_unit_done(tsu); return 1;}intsfw_run_batch (sfw_batch_t *tsb){ swi_workitem_t *wi; sfw_test_unit_t *tsu; sfw_test_instance_t *tsi; if (sfw_batch_active(tsb)) { CDEBUG (D_NET, "Can't start active batch: "LPU64" (%d)\n", tsb->bat_id.bat_id, atomic_read(&tsb->bat_nactive)); return -EPERM; } list_for_each_entry (tsi, &tsb->bat_tests, tsi_list) { if (!tsi->tsi_is_client) /* skip server instances */ continue; LASSERT (!tsi->tsi_stopping); LASSERT (!sfw_test_active(tsi)); atomic_inc(&tsb->bat_nactive); list_for_each_entry (tsu, &tsi->tsi_units, tsu_list) { atomic_inc(&tsi->tsi_nactive); tsu->tsu_loop = tsi->tsi_loop; wi = &tsu->tsu_worker; swi_init_workitem(wi, tsu, sfw_run_test); swi_schedule_workitem(wi); } } return 0;}intsfw_stop_batch (sfw_batch_t *tsb, int force){ sfw_test_instance_t *tsi; srpc_client_rpc_t *rpc; if (!sfw_batch_active(tsb)) return -EPERM; list_for_each_entry (tsi, &tsb->bat_tests, tsi_list) { spin_lock(&tsi->tsi_lock); if (!tsi->tsi_is_client || !sfw_test_active(tsi) || tsi->tsi_stopping) { spin_unlock(&tsi->tsi_lock); continue; } tsi->tsi_stopping = 1; if (!force) { spin_unlock(&tsi->tsi_lock); continue; } /* abort launched rpcs in the test */ list_for_each_entry (rpc, &tsi->tsi_active_rpcs, crpc_list) { spin_lock(&rpc->crpc_lock); srpc_abort_rpc(rpc, -EINTR); spin_unlock(&rpc->crpc_lock); } spin_unlock(&tsi->tsi_lock); } return 0;}intsfw_query_batch (sfw_batch_t *tsb, int testidx, srpc_batch_reply_t *reply){ sfw_test_instance_t *tsi; if (testidx < 0) return -EINVAL; if (testidx == 0) { reply->bar_active = atomic_read(&tsb->bat_nactive); return 0; } list_for_each_entry (tsi, &tsb->bat_tests, tsi_list) { if (testidx-- > 1) continue; reply->bar_active = atomic_read(&tsi->tsi_nactive); return 0; } return -ENOENT;}voidsfw_free_pages (srpc_server_rpc_t *rpc){ srpc_free_bulk(rpc->srpc_bulk); rpc->srpc_bulk = NULL;}intsfw_alloc_pages (srpc_server_rpc_t *rpc, int npages, int sink){ LASSERT (rpc->srpc_bulk == NULL); LASSERT (npages > 0 && npages <= LNET_MAX_IOV); rpc->srpc_bulk = srpc_alloc_bulk(npages, sink); if (rpc->srpc_bulk == NULL) return -ENOMEM; return 0;}intsfw_add_test (srpc_server_rpc_t *rpc){ sfw_session_t *sn = sfw_data.fw_session; srpc_test_reply_t *reply = &rpc->srpc_replymsg.msg_body.tes_reply; srpc_test_reqst_t *request; int rc; sfw_batch_t *bat; request = &rpc->srpc_reqstbuf->buf_msg.msg_body.tes_reqst; reply->tsr_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id; if (request->tsr_loop == 0 || request->tsr_concur == 0 || request->tsr_sid.ses_nid == LNET_NID_ANY || request->tsr_ndest > SFW_MAX_NDESTS || (request->tsr_is_client && request->tsr_ndest == 0) || request->tsr_concur > SFW_MAX_CONCUR || request->tsr_service > SRPC_SERVICE_MAX_ID || request->tsr_service <= SRPC_FRAMEWORK_SERVICE_MAX_ID) { reply->tsr_status = EINVAL; return 0; } if (sn == NULL || !sfw_sid_equal(request->tsr_sid, sn->sn_id) || sfw_find_test_case(request->tsr_service) == NULL) { reply->tsr_status = ENOENT; return 0; } bat = sfw_bid2batch(request->tsr_bid); if (bat == NULL) { CERROR ("Dropping RPC (%s) from %s under memory pressure.\n", rpc->srpc_service->sv_name, libcfs_id2str(rpc->srpc_peer)); return -ENOMEM; } if (sfw_batch_active(bat)) { reply->tsr_status = EBUSY; return 0; } if (request->tsr_is_client && rpc->srpc_bulk == NULL) { /* rpc will be resumed later in sfw_bulk_ready */ return sfw_alloc_pages(rpc, sfw_id_pages(request->tsr_ndest), 1); } rc = sfw_add_test_instance(bat, rpc); CDEBUG (rc == 0 ? D_NET : D_WARNING, "%s test: sv %d %s, loop %d, concur %d, ndest %d\n", rc == 0 ? "Added" : "Failed to add", request->tsr_service, request->tsr_is_client ? "client" : "server", request->tsr_loop, request->tsr_concur, request->tsr_ndest); reply->tsr_status = (rc < 0) ? -rc : rc; return 0;}intsfw_control_batch (srpc_batch_reqst_t *request, srpc_batch_reply_t *reply){ sfw_session_t *sn = sfw_data.fw_session; int rc = 0; sfw_batch_t *bat; reply->bar_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id; if (sn == NULL || !sfw_sid_equal(request->bar_sid, sn->sn_id)) { reply->bar_status = ESRCH; return 0; } bat = sfw_find_batch(request->bar_bid); if (bat == NULL) { reply->bar_status = ENOENT; return 0; } switch (request->bar_opc) { case SRPC_BATCH_OPC_RUN: rc = sfw_run_batch(bat); break; case SRPC_BATCH_OPC_STOP: rc = sfw_stop_batch(bat, request->bar_arg); break; case SRPC_BATCH_OPC_QUERY: rc = sfw_query_batch(bat, request->bar_testidx, reply); break; default: return -EINVAL; /* drop it */ } reply->bar_status = (rc < 0) ? -rc : rc; return 0;}intsfw_handle_server_rpc (srpc_server_rpc_t *rpc){ srpc_service_t *sv = rpc->srpc_service; srpc_msg_t *reply = &rpc->srpc_replymsg; srpc_msg_t *request = &rpc->srpc_reqstbuf->buf_msg; int rc = 0; LASSERT (sfw_data.fw_active_srpc == NULL); LASSERT (sv->sv_id <= SRPC_FRAMEWORK_SERVICE_MAX_ID); spin_lock(&sfw_data.fw_lock); if (sfw_data.fw_shuttingdown) { spin_unlock(&sfw_data.fw_lock); return -ESHUTDOWN; } /* Remove timer to avoid racing with it or expiring active session */ if (sfw_del_session_timer() != 0) { CERROR ("Dropping RPC (%s) from %s: racing with expiry timer.", sv->sv_name, libcfs_id2str(rpc->srpc_peer)); spin_unlock(&sfw_data.fw_lock); return -EAGAIN; } sfw_data.fw_active_srpc = rpc; spin_unlock(&sfw_data.fw_lock); sfw_unpack_message(request); LASSERT (request->msg_type == srpc_service2request(sv->sv_id)); switch(sv->sv_id) { default: LBUG (); case SRPC_SERVICE_TEST: rc = sfw_add_test(rpc); break; case SRPC_SERVICE_BATCH: rc = sfw_control_batch(&request->msg_body.bat_reqst, &reply->msg_body.bat_reply); break; case SRPC_SERVICE_QUERY_STAT: rc = sfw_get_stats(&request->msg_body.stat_reqst, &reply->msg_body.stat_reply); break; case SRPC_SERVICE_DEBUG: rc = sfw_debug_session(&request->msg_body.dbg_reqst, &reply->msg_body.dbg_reply); break; case SRPC_SERVICE_MAKE_SESSION: rc = sfw_make_session(&request->msg_body.mksn_reqst, &reply->msg_body.mksn_reply); break; case SRPC_SERVICE_REMOVE_SESSION: rc = sfw_remove_session(&request->msg_body.rmsn_reqst, &reply->msg_body.rmsn_reply); break; } rpc->srpc_done = sfw_server_rpc_done; spin_lock(&sfw_data.fw_lock);#ifdef __KERNEL__ if (!sfw_data.fw_shuttingdown) sfw_add_session_timer();#else LASSERT (!sfw_data.fw_shuttingdown); sfw_add_session_timer();#endif sfw_data.fw_active_srpc = NULL; spin_unlock(&sfw_data.fw_lock); return rc;}intsfw_bulk_ready (srpc_server_rpc_t *rpc, int status){ srpc_service_t *sv = rpc->srpc_service; int rc; LASSERT (rpc->srpc_bulk != NULL); LASSERT (sv->sv_id == SRPC_SERVICE_TEST); LASSERT (sfw_data.fw_active_srpc == NULL); LASSERT (rpc->srpc_reqstbuf->buf_msg.msg_body.tes_reqst.tsr_is_client); spin_lock(&sfw_data.fw_lock); if (status != 0) { CERROR ("Bulk transfer failed for RPC: " "service %s, peer %s, status %d\n", sv->sv_name, libcfs_id2str(rpc->srpc_peer), status); spin_unlock(&sfw_data.fw_lock); return -EIO; } if (sfw_data.fw_shuttingdown) { spin_unlock(&sfw_data.fw_lock); return -ESHUTDOWN; } if (sfw_del_session_timer() != 0) { CERROR ("Dropping RPC (%s) from %s: racing with expiry timer", sv->sv_name, libcfs_id2str(rpc->srpc_peer)); spin_unlock(&sfw_data.fw_lock); return -EAGAIN; } sfw_data.fw_active_srpc = rpc; spin_unlock(&sfw_data.fw_lock); rc = sfw_add_test(rpc); spin_lock(&sfw_data.fw_lock);#ifdef __KERNEL__ if (!sfw_data.fw_shuttingdown) sfw_add_session_timer();#else LASSERT (!sfw_data.fw_shuttingdown);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -