service.c
来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,749 行 · 第 1/5 页
C
1,749 行
spin_lock (&ptlrpc_all_services_lock); list_add (&service->srv_list, &ptlrpc_all_services); spin_unlock (&ptlrpc_all_services_lock); /* Now allocate the request buffers */ rc = ptlrpc_grow_req_bufs(service); /* We shouldn't be under memory pressure at startup, so * fail if we can't post all our buffers at this time. */ if (rc != 0) GOTO(failed, NULL); /* Now allocate pool of reply buffers */ /* Increase max reply size to next power of two */ service->srv_max_reply_size = 1; while (service->srv_max_reply_size < max_reply_size) service->srv_max_reply_size <<= 1; if (proc_entry != NULL) ptlrpc_lprocfs_register_service(proc_entry, service); CDEBUG(D_NET, "%s: Started, listening on portal %d\n", service->srv_name, service->srv_req_portal); RETURN(service);failed: ptlrpc_unregister_service(service); return NULL;}static void ptlrpc_server_req_decref(struct ptlrpc_request *req){ struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd; if (!atomic_dec_and_test(&req->rq_refcount)) return; LASSERT(list_empty(&req->rq_timed_list)); if (req != &rqbd->rqbd_req) { /* NB request buffers use an embedded * req if the incoming req unlinked the * MD; this isn't one of them! */ OBD_FREE(req, sizeof(*req)); } else { struct ptlrpc_service *svc = rqbd->rqbd_service; /* schedule request buffer for re-use. * NB I can only do this after I've disposed of their * reqs; particularly the embedded req */ spin_lock(&svc->srv_lock); list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds); spin_unlock(&svc->srv_lock); }}static void __ptlrpc_server_free_request(struct ptlrpc_request *req){ list_del(&req->rq_list); ptlrpc_req_drop_rs(req); ptlrpc_server_req_decref(req);}static void ptlrpc_server_free_request(struct ptlrpc_request *req){ struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd; struct ptlrpc_service *svc = rqbd->rqbd_service; int refcount; struct list_head *tmp; struct list_head *nxt; if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */ DEBUG_REQ(D_INFO, req, "free req"); spin_lock(&svc->srv_at_lock); req->rq_sent_final = 1; list_del_init(&req->rq_timed_list); spin_unlock(&svc->srv_at_lock); spin_lock(&svc->srv_lock); svc->srv_n_active_reqs--; list_add(&req->rq_list, &rqbd->rqbd_reqs); refcount = --(rqbd->rqbd_refcount); if (refcount == 0) { /* request buffer is now idle: add to history */ list_del(&rqbd->rqbd_list); list_add_tail(&rqbd->rqbd_list, &svc->srv_history_rqbds); svc->srv_n_history_rqbds++; /* cull some history? * I expect only about 1 or 2 rqbds need to be recycled here */ while (svc->srv_n_history_rqbds > svc->srv_max_history_rqbds) { rqbd = list_entry(svc->srv_history_rqbds.next, struct ptlrpc_request_buffer_desc, rqbd_list); list_del(&rqbd->rqbd_list); svc->srv_n_history_rqbds--; /* remove rqbd's reqs from svc's req history while * I've got the service lock */ list_for_each(tmp, &rqbd->rqbd_reqs) { req = list_entry(tmp, struct ptlrpc_request, rq_list); /* Track the highest culled req seq */ if (req->rq_history_seq > svc->srv_request_max_cull_seq) svc->srv_request_max_cull_seq = req->rq_history_seq; list_del(&req->rq_history_list); } spin_unlock(&svc->srv_lock); list_for_each_safe(tmp, nxt, &rqbd->rqbd_reqs) { req = list_entry(rqbd->rqbd_reqs.next, struct ptlrpc_request, rq_list); __ptlrpc_server_free_request(req); } spin_lock(&svc->srv_lock); } } else if (req->rq_reply_state && req->rq_reply_state->rs_prealloc) { /* If we are low on memory, we are not interested in history */ list_del(&req->rq_history_list); __ptlrpc_server_free_request(req); } spin_unlock(&svc->srv_lock);}/* This function makes sure dead exports are evicted in a timely manner. This function is only called when some export receives a message (i.e., the network is up.) */static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay){ struct obd_export *oldest_exp; time_t oldest_time; ENTRY; LASSERT(exp); /* Compensate for slow machines, etc, by faking our request time into the future. Although this can break the strict time-ordering of the list, we can be really lazy here - we don't have to evict at the exact right moment. Eventually, all silent exports will make it to the top of the list. */ exp->exp_last_request_time = max(exp->exp_last_request_time, cfs_time_current_sec() + extra_delay); CDEBUG(D_INFO, "updating export %s at %ld\n", exp->exp_client_uuid.uuid, exp->exp_last_request_time); /* exports may get disconnected from the chain even though the export has references, so we must keep the spin lock while manipulating the lists */ spin_lock(&exp->exp_obd->obd_dev_lock); if (list_empty(&exp->exp_obd_chain_timed)) { /* this one is not timed */ spin_unlock(&exp->exp_obd->obd_dev_lock); EXIT; return; } list_move_tail(&exp->exp_obd_chain_timed, &exp->exp_obd->obd_exports_timed); oldest_exp = list_entry(exp->exp_obd->obd_exports_timed.next, struct obd_export, exp_obd_chain_timed); oldest_time = oldest_exp->exp_last_request_time; spin_unlock(&exp->exp_obd->obd_dev_lock); if (exp->exp_obd->obd_recovering) { /* be nice to everyone during recovery */ EXIT; return; } /* Note - racing to start/reset the obd_eviction timer is safe */ if (exp->exp_obd->obd_eviction_timer == 0) { /* Check if the oldest entry is expired. */ if (cfs_time_current_sec() > (oldest_time + PING_EVICT_TIMEOUT + extra_delay)) { /* We need a second timer, in case the net was down and * it just came back. Since the pinger may skip every * other PING_INTERVAL (see note in ptlrpc_pinger_main), * we better wait for 3. */ exp->exp_obd->obd_eviction_timer = cfs_time_current_sec() + 3 * PING_INTERVAL; CDEBUG(D_HA, "%s: Think about evicting %s from %ld\n", exp->exp_obd->obd_name, obd_export_nid2str(exp), oldest_time); } } else { if (cfs_time_current_sec() > (exp->exp_obd->obd_eviction_timer + extra_delay)) { /* The evictor won't evict anyone who we've heard from * recently, so we don't have to check before we start * it. */ if (!ping_evictor_wake(exp)) exp->exp_obd->obd_eviction_timer = 0; } } EXIT;}static int ptlrpc_check_req(struct ptlrpc_request *req){ if (lustre_msg_get_conn_cnt(req->rq_reqmsg) < req->rq_export->exp_conn_cnt) { DEBUG_REQ(D_ERROR, req, "DROPPING req from old connection %d < %d", lustre_msg_get_conn_cnt(req->rq_reqmsg), req->rq_export->exp_conn_cnt); return -EEXIST; } if (req->rq_export->exp_obd && req->rq_export->exp_obd->obd_fail) { /* Failing over, don't handle any more reqs, send error response instead. */ CDEBUG(D_RPCTRACE, "Dropping req %p for failed obd %s\n", req, req->rq_export->exp_obd->obd_name); req->rq_status = -ENODEV; ptlrpc_error(req); return -ENODEV; } return 0;}static void ptlrpc_at_set_timer(struct ptlrpc_service *svc){ struct ptlrpc_request *rq; time_t next; spin_lock(&svc->srv_at_lock); if (list_empty(&svc->srv_at_list)) { cfs_timer_disarm(&svc->srv_at_timer); spin_unlock(&svc->srv_at_lock); return; } /* Set timer for closest deadline */ rq = list_entry(svc->srv_at_list.next, struct ptlrpc_request, rq_timed_list); next = rq->rq_deadline - cfs_time_current_sec() - at_early_margin; if (next <= 0) ptlrpc_at_timer((unsigned long)svc); else cfs_timer_arm(&svc->srv_at_timer, cfs_time_shift(next)); spin_unlock(&svc->srv_at_lock); CDEBUG(D_INFO, "armed %s at %+lds\n", svc->srv_name, next);}/* Add rpc to early reply check list */static int ptlrpc_at_add_timed(struct ptlrpc_request *req){ struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service; struct ptlrpc_request *rq; int found = 0; if (AT_OFF) return(0); if ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) == 0) return(-ENOSYS); DEBUG_REQ(D_ADAPTTO, req, "add timed %lds", req->rq_deadline - cfs_time_current_sec()); spin_lock(&svc->srv_at_lock); if (unlikely(req->rq_sent_final)) { spin_unlock(&svc->srv_at_lock); return 0; } LASSERT(list_empty(&req->rq_timed_list)); /* Add to sorted list. Presumably latest rpcs will have the latest deadlines, so search backward. */ list_for_each_entry_reverse(rq, &svc->srv_at_list, rq_timed_list) { if (req->rq_deadline > rq->rq_deadline) { list_add(&req->rq_timed_list, &rq->rq_timed_list); found++; break; } } if (!found) /* Add to front if shortest deadline or list empty */ list_add(&req->rq_timed_list, &svc->srv_at_list); /* Check if we're the head of the list */ found = (svc->srv_at_list.next == &req->rq_timed_list); spin_unlock(&svc->srv_at_lock); if (found) ptlrpc_at_set_timer(svc); return 0;} static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req, int extra_time){ struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service; struct ptlrpc_request *reqcopy; struct lustre_msg *reqmsg; long olddl = req->rq_deadline - cfs_time_current_sec(); time_t newdl; int rc; ENTRY; /* deadline is when the client expects us to reply, margin is the difference between clients' and servers' expectations */ DEBUG_REQ(D_ADAPTTO, req, "%ssending early reply (deadline %+lds, margin %+lds) for " "%d+%d", AT_OFF ? "AT off - not " : "", olddl, olddl - at_get(&svc->srv_at_estimate), at_get(&svc->srv_at_estimate), extra_time); if (AT_OFF) RETURN(0); if (olddl < 0) { CDEBUG(D_WARNING, "x"LPU64": Already past deadline (%+lds), not" " sending early reply. Increase at_early_margin (%d)?\n", req->rq_xid, olddl, at_early_margin); /* Return an error so we're not re-added to the timed list. */ RETURN(-ETIMEDOUT); } if ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) == 0){ CDEBUG(D_INFO, "Wanted to ask client for more time, but no AT " "support\n"); RETURN(-ENOSYS); } if (extra_time) { /* Fake our processing time into the future to ask the clients for some extra amount of time */ extra_time += cfs_time_current_sec() - req->rq_arrival_time.tv_sec; at_add(&svc->srv_at_estimate, extra_time); } newdl = req->rq_arrival_time.tv_sec + at_get(&svc->srv_at_estimate); if (req->rq_deadline >= newdl) {
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?