events.c
来自「lustre 1.6.5 source code」· C语言 代码 · 共 735 行 · 第 1/2 页
C
735 行
* before the SENT event (oh yes we can), we know we * read/wrote the peer buffer and how much... */ desc->bd_success = 1; desc->bd_nob_transferred = ev->mlength; desc->bd_sender = ev->sender; } if (ev->unlinked) { /* This is the last callback no matter what... */ desc->bd_network_rw = 0; cfs_waitq_signal(&desc->bd_waitq); } spin_unlock(&desc->bd_lock); EXIT;}static void ptlrpc_master_callback(lnet_event_t *ev){ struct ptlrpc_cb_id *cbid = ev->md.user_ptr; void (*callback)(lnet_event_t *ev) = cbid->cbid_fn; /* Honestly, it's best to find out early. */ LASSERT (cbid->cbid_arg != LP_POISON); LASSERT (callback == request_out_callback || callback == reply_in_callback || callback == client_bulk_callback || callback == request_in_callback || callback == reply_out_callback || callback == server_bulk_callback); callback (ev);}int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, lnet_process_id_t *peer, lnet_nid_t *self){ int best_dist = 0; __u32 best_order = 0; int count = 0; int rc = -ENOENT; int portals_compatibility; int dist; __u32 order; lnet_nid_t dst_nid; lnet_nid_t src_nid; portals_compatibility = LNetCtl(IOC_LIBCFS_PORTALS_COMPATIBILITY, NULL); peer->pid = LUSTRE_SRV_LNET_PID; /* Choose the matching UUID that's closest */ while (lustre_uuid_to_peer(uuid->uuid, &dst_nid, count++) == 0) { dist = LNetDist(dst_nid, &src_nid, &order); if (dist < 0) continue; if (dist == 0) { /* local! use loopback LND */ peer->nid = *self = LNET_MKNID(LNET_MKNET(LOLND, 0), 0); rc = 0; break; } if (rc < 0 || dist < best_dist || (dist == best_dist && order < best_order)) { best_dist = dist; best_order = order; if (portals_compatibility > 1) { /* Strong portals compatibility: Zero the nid's * NET, so if I'm reading new config logs, or * getting configured by (new) lconf I can * still talk to old servers. */ dst_nid = LNET_MKNID(0, LNET_NIDADDR(dst_nid)); src_nid = LNET_MKNID(0, LNET_NIDADDR(src_nid)); } peer->nid = dst_nid; *self = src_nid; rc = 0; } } CDEBUG(D_NET,"%s->%s\n", uuid->uuid, libcfs_id2str(*peer)); if (rc != 0) CERROR("No NID found for %s\n", uuid->uuid); return rc;}void ptlrpc_ni_fini(void){ cfs_waitq_t waitq; struct l_wait_info lwi; int rc; int retries; /* Wait for the event queue to become idle since there may still be * messages in flight with pending events (i.e. the fire-and-forget * messages == client requests and "non-difficult" server * replies */ for (retries = 0;; retries++) { rc = LNetEQFree(ptlrpc_eq_h); switch (rc) { default: LBUG(); case 0: LNetNIFini(); return; case -EBUSY: if (retries != 0) CWARN("Event queue still busy\n"); /* Wait for a bit */ cfs_waitq_init(&waitq); lwi = LWI_TIMEOUT(cfs_time_seconds(2), NULL, NULL); l_wait_event(waitq, 0, &lwi); break; } } /* notreached */}lnet_pid_t ptl_get_pid(void){ lnet_pid_t pid;#ifndef __KERNEL__ pid = getpid();#else pid = LUSTRE_SRV_LNET_PID;#endif return pid;} int ptlrpc_ni_init(void){ int rc; lnet_pid_t pid; pid = ptl_get_pid(); CDEBUG(D_NET, "My pid is: %x\n", pid); /* We're not passing any limits yet... */ rc = LNetNIInit(pid); if (rc < 0) { CDEBUG (D_NET, "Can't init network interface: %d\n", rc); return (-ENOENT); } /* CAVEAT EMPTOR: how we process portals events is _radically_ * different depending on... */#ifdef __KERNEL__ /* kernel portals calls our master callback when events are added to * the event queue. In fact lustre never pulls events off this queue, * so it's only sized for some debug history. */ rc = LNetEQAlloc(1024, ptlrpc_master_callback, &ptlrpc_eq_h);#else /* liblustre calls the master callback when it removes events from the * event queue. The event queue has to be big enough not to drop * anything */ rc = LNetEQAlloc(10240, LNET_EQ_HANDLER_NONE, &ptlrpc_eq_h);#endif if (rc == 0) return 0; CERROR ("Failed to allocate event queue: %d\n", rc); LNetNIFini(); return (-ENOMEM);}#ifndef __KERNEL__CFS_LIST_HEAD(liblustre_wait_callbacks);CFS_LIST_HEAD(liblustre_idle_callbacks);void *liblustre_services_callback;void *liblustre_register_waitidle_callback (struct list_head *callback_list, const char *name, int (*fn)(void *arg), void *arg){ struct liblustre_wait_callback *llwc; OBD_ALLOC(llwc, sizeof(*llwc)); LASSERT (llwc != NULL); llwc->llwc_name = name; llwc->llwc_fn = fn; llwc->llwc_arg = arg; list_add_tail(&llwc->llwc_list, callback_list); return (llwc);}voidliblustre_deregister_waitidle_callback (void *opaque){ struct liblustre_wait_callback *llwc = opaque; list_del(&llwc->llwc_list); OBD_FREE(llwc, sizeof(*llwc));}void *liblustre_register_wait_callback (const char *name, int (*fn)(void *arg), void *arg){ return liblustre_register_waitidle_callback(&liblustre_wait_callbacks, name, fn, arg);}voidliblustre_deregister_wait_callback (void *opaque){ liblustre_deregister_waitidle_callback(opaque);}void *liblustre_register_idle_callback (const char *name, int (*fn)(void *arg), void *arg){ return liblustre_register_waitidle_callback(&liblustre_idle_callbacks, name, fn, arg);}voidliblustre_deregister_idle_callback (void *opaque){ liblustre_deregister_waitidle_callback(opaque);}intliblustre_check_events (int timeout){ lnet_event_t ev; int rc; int i; ENTRY; rc = LNetEQPoll(&ptlrpc_eq_h, 1, timeout * 1000, &ev, &i); if (rc == 0) RETURN(0); LASSERT (rc == -EOVERFLOW || rc == 1); /* liblustre: no asynch callback so we can't affort to miss any * events... */ if (rc == -EOVERFLOW) { CERROR ("Dropped an event!!!\n"); abort(); } ptlrpc_master_callback (&ev); RETURN(1);}int liblustre_waiting = 0;intliblustre_wait_event (int timeout){ struct list_head *tmp; struct liblustre_wait_callback *llwc; int found_something = 0; /* single threaded recursion check... */ liblustre_waiting = 1; for (;;) { /* Deal with all pending events */ while (liblustre_check_events(0)) found_something = 1; /* Give all registered callbacks a bite at the cherry */ list_for_each(tmp, &liblustre_wait_callbacks) { llwc = list_entry(tmp, struct liblustre_wait_callback, llwc_list); if (llwc->llwc_fn(llwc->llwc_arg)) found_something = 1; } if (found_something || timeout == 0) break; /* Nothing so far, but I'm allowed to block... */ found_something = liblustre_check_events(timeout); if (!found_something) /* still nothing */ break; /* I timed out */ } liblustre_waiting = 0; return found_something;}voidliblustre_wait_idle(void){ static int recursed = 0; struct list_head *tmp; struct liblustre_wait_callback *llwc; int idle = 0; LASSERT(!recursed); recursed = 1; do { liblustre_wait_event(0); idle = 1; list_for_each(tmp, &liblustre_idle_callbacks) { llwc = list_entry(tmp, struct liblustre_wait_callback, llwc_list); if (!llwc->llwc_fn(llwc->llwc_arg)) { idle = 0; break; } } } while (!idle); recursed = 0;}#endif /* __KERNEL__ */int ptlrpc_init_portals(void){ int rc = ptlrpc_ni_init(); if (rc != 0) { CERROR("network initialisation failed\n"); return -EIO; }#ifndef __KERNEL__ liblustre_services_callback = liblustre_register_wait_callback("liblustre_check_services", &liblustre_check_services, NULL);#endif rc = ptlrpcd_addref(); if (rc == 0) return 0; CERROR("rpcd initialisation failed\n");#ifndef __KERNEL__ liblustre_deregister_wait_callback(liblustre_services_callback);#endif ptlrpc_ni_fini(); return rc;}void ptlrpc_exit_portals(void){#ifndef __KERNEL__ liblustre_deregister_wait_callback(liblustre_services_callback);#endif ptlrpcd_decref(); ptlrpc_ni_fini();}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?