events.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 735 行 · 第 1/2 页

C
735
字号
                 * before the SENT event (oh yes we can), we know we                 * read/wrote the peer buffer and how much... */                desc->bd_success = 1;                desc->bd_nob_transferred = ev->mlength;                desc->bd_sender = ev->sender;        }        if (ev->unlinked) {                /* This is the last callback no matter what... */                desc->bd_network_rw = 0;                cfs_waitq_signal(&desc->bd_waitq);        }        spin_unlock(&desc->bd_lock);        EXIT;}static void ptlrpc_master_callback(lnet_event_t *ev){        struct ptlrpc_cb_id *cbid = ev->md.user_ptr;        void (*callback)(lnet_event_t *ev) = cbid->cbid_fn;        /* Honestly, it's best to find out early. */        LASSERT (cbid->cbid_arg != LP_POISON);        LASSERT (callback == request_out_callback ||                 callback == reply_in_callback ||                 callback == client_bulk_callback ||                 callback == request_in_callback ||                 callback == reply_out_callback ||                 callback == server_bulk_callback);                callback (ev);}int ptlrpc_uuid_to_peer (struct obd_uuid *uuid,                          lnet_process_id_t *peer, lnet_nid_t *self){        int               best_dist = 0;        __u32             best_order = 0;        int               count = 0;        int               rc = -ENOENT;        int               portals_compatibility;        int               dist;        __u32             order;        lnet_nid_t        dst_nid;        lnet_nid_t        src_nid;        portals_compatibility = LNetCtl(IOC_LIBCFS_PORTALS_COMPATIBILITY, NULL);        peer->pid = LUSTRE_SRV_LNET_PID;        /* Choose the matching UUID that's closest */        while (lustre_uuid_to_peer(uuid->uuid, &dst_nid, count++) == 0) {                dist = LNetDist(dst_nid, &src_nid, &order);                if (dist < 0)                        continue;                if (dist == 0) {                /* local! use loopback LND */                        peer->nid = *self = LNET_MKNID(LNET_MKNET(LOLND, 0), 0);                        rc = 0;                        break;                }                                if (rc < 0 ||                    dist < best_dist ||                    (dist == best_dist && order < best_order)) {                        best_dist = dist;                        best_order = order;                        if (portals_compatibility > 1) {                                /* Strong portals compatibility: Zero the nid's                                 * NET, so if I'm reading new config logs, or                                 * getting configured by (new) lconf I can                                 * still talk to old servers. */                                dst_nid = LNET_MKNID(0, LNET_NIDADDR(dst_nid));                                src_nid = LNET_MKNID(0, LNET_NIDADDR(src_nid));                        }                        peer->nid = dst_nid;                        *self = src_nid;                        rc = 0;                }        }        CDEBUG(D_NET,"%s->%s\n", uuid->uuid, libcfs_id2str(*peer));        if (rc != 0)                 CERROR("No NID found for %s\n", uuid->uuid);        return rc;}void ptlrpc_ni_fini(void){        cfs_waitq_t         waitq;        struct l_wait_info  lwi;        int                 rc;        int                 retries;                /* Wait for the event queue to become idle since there may still be         * messages in flight with pending events (i.e. the fire-and-forget         * messages == client requests and "non-difficult" server         * replies */        for (retries = 0;; retries++) {                rc = LNetEQFree(ptlrpc_eq_h);                switch (rc) {                default:                        LBUG();                case 0:                        LNetNIFini();                        return;                                        case -EBUSY:                        if (retries != 0)                                CWARN("Event queue still busy\n");                                                /* Wait for a bit */                        cfs_waitq_init(&waitq);                        lwi = LWI_TIMEOUT(cfs_time_seconds(2), NULL, NULL);                        l_wait_event(waitq, 0, &lwi);                        break;                }        }        /* notreached */}lnet_pid_t ptl_get_pid(void){        lnet_pid_t        pid;#ifndef  __KERNEL__        pid = getpid();#else        pid = LUSTRE_SRV_LNET_PID;#endif        return pid;}        int ptlrpc_ni_init(void){        int              rc;        lnet_pid_t       pid;        pid = ptl_get_pid();        CDEBUG(D_NET, "My pid is: %x\n", pid);        /* We're not passing any limits yet... */        rc = LNetNIInit(pid);        if (rc < 0) {                CDEBUG (D_NET, "Can't init network interface: %d\n", rc);                return (-ENOENT);        }        /* CAVEAT EMPTOR: how we process portals events is _radically_         * different depending on... */#ifdef __KERNEL__        /* kernel portals calls our master callback when events are added to         * the event queue.  In fact lustre never pulls events off this queue,         * so it's only sized for some debug history. */        rc = LNetEQAlloc(1024, ptlrpc_master_callback, &ptlrpc_eq_h);#else        /* liblustre calls the master callback when it removes events from the         * event queue.  The event queue has to be big enough not to drop         * anything */        rc = LNetEQAlloc(10240, LNET_EQ_HANDLER_NONE, &ptlrpc_eq_h);#endif        if (rc == 0)                return 0;        CERROR ("Failed to allocate event queue: %d\n", rc);        LNetNIFini();        return (-ENOMEM);}#ifndef __KERNEL__CFS_LIST_HEAD(liblustre_wait_callbacks);CFS_LIST_HEAD(liblustre_idle_callbacks);void *liblustre_services_callback;void *liblustre_register_waitidle_callback (struct list_head *callback_list,                                      const char *name,                                      int (*fn)(void *arg), void *arg){        struct liblustre_wait_callback *llwc;                OBD_ALLOC(llwc, sizeof(*llwc));        LASSERT (llwc != NULL);                llwc->llwc_name = name;        llwc->llwc_fn = fn;        llwc->llwc_arg = arg;        list_add_tail(&llwc->llwc_list, callback_list);                return (llwc);}voidliblustre_deregister_waitidle_callback (void *opaque){        struct liblustre_wait_callback *llwc = opaque;                list_del(&llwc->llwc_list);        OBD_FREE(llwc, sizeof(*llwc));}void *liblustre_register_wait_callback (const char *name,                                  int (*fn)(void *arg), void *arg){        return liblustre_register_waitidle_callback(&liblustre_wait_callbacks,                                                    name, fn, arg);}voidliblustre_deregister_wait_callback (void *opaque){        liblustre_deregister_waitidle_callback(opaque);}void *liblustre_register_idle_callback (const char *name,                                  int (*fn)(void *arg), void *arg){        return liblustre_register_waitidle_callback(&liblustre_idle_callbacks,                                                    name, fn, arg);}voidliblustre_deregister_idle_callback (void *opaque){        liblustre_deregister_waitidle_callback(opaque);}intliblustre_check_events (int timeout){        lnet_event_t ev;        int         rc;        int         i;        ENTRY;        rc = LNetEQPoll(&ptlrpc_eq_h, 1, timeout * 1000, &ev, &i);        if (rc == 0)                RETURN(0);                LASSERT (rc == -EOVERFLOW || rc == 1);                /* liblustre: no asynch callback so we can't affort to miss any         * events... */        if (rc == -EOVERFLOW) {                CERROR ("Dropped an event!!!\n");                abort();        }                ptlrpc_master_callback (&ev);        RETURN(1);}int liblustre_waiting = 0;intliblustre_wait_event (int timeout){        struct list_head               *tmp;        struct liblustre_wait_callback *llwc;        int                             found_something = 0;        /* single threaded recursion check... */        liblustre_waiting = 1;        for (;;) {                /* Deal with all pending events */                while (liblustre_check_events(0))                        found_something = 1;                /* Give all registered callbacks a bite at the cherry */                list_for_each(tmp, &liblustre_wait_callbacks) {                        llwc = list_entry(tmp, struct liblustre_wait_callback,                                           llwc_list);                                        if (llwc->llwc_fn(llwc->llwc_arg))                                found_something = 1;                }                if (found_something || timeout == 0)                        break;                /* Nothing so far, but I'm allowed to block... */                found_something = liblustre_check_events(timeout);                if (!found_something)           /* still nothing */                        break;                  /* I timed out */        }        liblustre_waiting = 0;        return found_something;}voidliblustre_wait_idle(void){        static int recursed = 0;                struct list_head               *tmp;        struct liblustre_wait_callback *llwc;        int                             idle = 0;        LASSERT(!recursed);        recursed = 1;                do {                liblustre_wait_event(0);                idle = 1;                list_for_each(tmp, &liblustre_idle_callbacks) {                        llwc = list_entry(tmp, struct liblustre_wait_callback,                                          llwc_list);                                                if (!llwc->llwc_fn(llwc->llwc_arg)) {                                idle = 0;                                break;                        }                }                                } while (!idle);        recursed = 0;}#endif /* __KERNEL__ */int ptlrpc_init_portals(void){        int   rc = ptlrpc_ni_init();        if (rc != 0) {                CERROR("network initialisation failed\n");                return -EIO;        }#ifndef __KERNEL__        liblustre_services_callback =                 liblustre_register_wait_callback("liblustre_check_services",                                                 &liblustre_check_services, NULL);#endif        rc = ptlrpcd_addref();        if (rc == 0)                return 0;                CERROR("rpcd initialisation failed\n");#ifndef __KERNEL__        liblustre_deregister_wait_callback(liblustre_services_callback);#endif        ptlrpc_ni_fini();        return rc;}void ptlrpc_exit_portals(void){#ifndef __KERNEL__        liblustre_deregister_wait_callback(liblustre_services_callback);#endif        ptlrpcd_decref();        ptlrpc_ni_fini();}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?