xenbus.c

来自「xen虚拟机源代码安装包」· C语言 代码 · 共 762 行 · 第 1/2 页

C
762
字号
/*  **************************************************************************** * (C) 2006 - Cambridge University **************************************************************************** * *        File: xenbus.c *      Author: Steven Smith (sos22@cam.ac.uk)  *     Changes: Grzegorz Milos (gm281@cam.ac.uk) *     Changes: John D. Ramsdell *               *        Date: Jun 2006, chages Aug 2005 *  * Environment: Xen Minimal OS * Description: Minimal implementation of xenbus * **************************************************************************** **/#include <os.h>#include <mm.h>#include <traps.h>#include <lib.h>#include <xenbus.h>#include <events.h>#include <errno.h>#include <sched.h>#include <wait.h>#include <xen/io/xs_wire.h>#include <spinlock.h>#include <xmalloc.h>#define min(x,y) ({                       \        typeof(x) tmpx = (x);                 \        typeof(y) tmpy = (y);                 \        tmpx < tmpy ? tmpx : tmpy;            \        })#ifdef XENBUS_DEBUG#define DEBUG(_f, _a...) \    printk("MINI_OS(file=xenbus.c, line=%d) " _f , __LINE__, ## _a)#else#define DEBUG(_f, _a...)    ((void)0)#endifstatic struct xenstore_domain_interface *xenstore_buf;static DECLARE_WAIT_QUEUE_HEAD(xb_waitq);DECLARE_WAIT_QUEUE_HEAD(xenbus_watch_queue);xenbus_event_queue xenbus_events;static struct watch {    char *token;    xenbus_event_queue *events;    struct watch *next;} *watches;struct xenbus_req_info {    int in_use:1;    struct wait_queue_head waitq;    void *reply;};#define NR_REQS 32static struct xenbus_req_info req_info[NR_REQS];static void memcpy_from_ring(const void *Ring,        void *Dest,        int off,        int len){    int c1, c2;    const char *ring = Ring;    char *dest = Dest;    c1 = min(len, XENSTORE_RING_SIZE - off);    c2 = len - c1;    memcpy(dest, ring + off, c1);    memcpy(dest + c1, ring, c2);}char **xenbus_wait_for_watch_return(xenbus_event_queue *queue){    struct xenbus_event *event;    DEFINE_WAIT(w);    if (!queue)        queue = &xenbus_events;    while (!(event = *queue)) {        add_waiter(w, xenbus_watch_queue);        schedule();    }    remove_waiter(w);    *queue = event->next;    return &event->path;}void xenbus_wait_for_watch(xenbus_event_queue *queue){    char **ret;    if (!queue)        queue = &xenbus_events;    ret = xenbus_wait_for_watch_return(queue);    free(ret);}char* xenbus_wait_for_value(const char* path, const char* value, xenbus_event_queue *queue){    if (!queue)        queue = &xenbus_events;    for(;;)    {        char *res, *msg;        int r;        msg = xenbus_read(XBT_NIL, path, &res);        if(msg) return msg;        r = strcmp(value,res);        free(res);        if(r==0) break;        else xenbus_wait_for_watch(queue);    }    return NULL;}static void xenbus_thread_func(void *ign){    struct xsd_sockmsg msg;    unsigned prod = xenstore_buf->rsp_prod;    for (;;)     {        wait_event(xb_waitq, prod != xenstore_buf->rsp_prod);        while (1)         {            prod = xenstore_buf->rsp_prod;            DEBUG("Rsp_cons %d, rsp_prod %d.\n", xenstore_buf->rsp_cons,                    xenstore_buf->rsp_prod);            if (xenstore_buf->rsp_prod - xenstore_buf->rsp_cons < sizeof(msg))                break;            rmb();            memcpy_from_ring(xenstore_buf->rsp,                    &msg,                    MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),                    sizeof(msg));            DEBUG("Msg len %d, %d avail, id %d.\n",                    msg.len + sizeof(msg),                    xenstore_buf->rsp_prod - xenstore_buf->rsp_cons,                    msg.req_id);            if (xenstore_buf->rsp_prod - xenstore_buf->rsp_cons <                    sizeof(msg) + msg.len)                break;            DEBUG("Message is good.\n");            if(msg.type == XS_WATCH_EVENT)            {		struct xenbus_event *event = malloc(sizeof(*event) + msg.len);                xenbus_event_queue *events = NULL;		char *data = (char*)event + sizeof(*event);                struct watch *watch;                memcpy_from_ring(xenstore_buf->rsp,		    data,                    MASK_XENSTORE_IDX(xenstore_buf->rsp_cons + sizeof(msg)),                    msg.len);		event->path = data;		event->token = event->path + strlen(event->path) + 1;                xenstore_buf->rsp_cons += msg.len + sizeof(msg);                for (watch = watches; watch; watch = watch->next)                    if (!strcmp(watch->token, event->token)) {                        events = watch->events;                        break;                    }                if (events) {                    event->next = *events;                    *events = event;                    wake_up(&xenbus_watch_queue);                } else {                    printk("unexpected watch token %s\n", event->token);                    free(event);                }            }            else            {                req_info[msg.req_id].reply = malloc(sizeof(msg) + msg.len);                memcpy_from_ring(xenstore_buf->rsp,                    req_info[msg.req_id].reply,                    MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),                    msg.len + sizeof(msg));                xenstore_buf->rsp_cons += msg.len + sizeof(msg);                wake_up(&req_info[msg.req_id].waitq);            }        }    }}static void xenbus_evtchn_handler(evtchn_port_t port, struct pt_regs *regs,				  void *ign){    wake_up(&xb_waitq);}static int nr_live_reqs;static spinlock_t req_lock = SPIN_LOCK_UNLOCKED;static DECLARE_WAIT_QUEUE_HEAD(req_wq);/* Release a xenbus identifier */static void release_xenbus_id(int id){    BUG_ON(!req_info[id].in_use);    spin_lock(&req_lock);    req_info[id].in_use = 0;    nr_live_reqs--;    req_info[id].in_use = 0;    if (nr_live_reqs == NR_REQS - 1)        wake_up(&req_wq);    spin_unlock(&req_lock);}/* Allocate an identifier for a xenbus request.  Blocks if none are   available. */static int allocate_xenbus_id(void){    static int probe;    int o_probe;    while (1)     {        spin_lock(&req_lock);        if (nr_live_reqs < NR_REQS)            break;        spin_unlock(&req_lock);        wait_event(req_wq, (nr_live_reqs < NR_REQS));    }    o_probe = probe;    for (;;)     {        if (!req_info[o_probe].in_use)            break;        o_probe = (o_probe + 1) % NR_REQS;        BUG_ON(o_probe == probe);    }    nr_live_reqs++;    req_info[o_probe].in_use = 1;    probe = (o_probe + 1) % NR_REQS;    spin_unlock(&req_lock);    init_waitqueue_head(&req_info[o_probe].waitq);    return o_probe;}/* Initialise xenbus. */void init_xenbus(void){    int err;    printk("Initialising xenbus\n");    DEBUG("init_xenbus called.\n");    xenstore_buf = mfn_to_virt(start_info.store_mfn);    create_thread("xenstore", xenbus_thread_func, NULL);    DEBUG("buf at %p.\n", xenstore_buf);    err = bind_evtchn(start_info.store_evtchn,		      xenbus_evtchn_handler,              NULL);    unmask_evtchn(start_info.store_evtchn);    DEBUG("xenbus on irq %d\n", err);}void fini_xenbus(void){}/* Send data to xenbus.  This can block.  All of the requests are seen   by xenbus as if sent atomically.  The header is added   automatically, using type %type, req_id %req_id, and trans_id   %trans_id. */static void xb_write(int type, int req_id, xenbus_transaction_t trans_id,		     const struct write_req *req, int nr_reqs){    XENSTORE_RING_IDX prod;    int r;    int len = 0;    const struct write_req *cur_req;    int req_off;    int total_off;    int this_chunk;    struct xsd_sockmsg m = {.type = type, .req_id = req_id,        .tx_id = trans_id };    struct write_req header_req = { &m, sizeof(m) };    for (r = 0; r < nr_reqs; r++)        len += req[r].len;    m.len = len;    len += sizeof(m);    cur_req = &header_req;    BUG_ON(len > XENSTORE_RING_SIZE);    /* Wait for the ring to drain to the point where we can send the       message. */    prod = xenstore_buf->req_prod;    if (prod + len - xenstore_buf->req_cons > XENSTORE_RING_SIZE)     {        /* Wait for there to be space on the ring */        DEBUG("prod %d, len %d, cons %d, size %d; waiting.\n",                prod, len, xenstore_buf->req_cons, XENSTORE_RING_SIZE);        wait_event(xb_waitq,                xenstore_buf->req_prod + len - xenstore_buf->req_cons <=                XENSTORE_RING_SIZE);        DEBUG("Back from wait.\n");        prod = xenstore_buf->req_prod;    }    /* We're now guaranteed to be able to send the message without       overflowing the ring.  Do so. */    total_off = 0;    req_off = 0;    while (total_off < len)     {        this_chunk = min(cur_req->len - req_off,                XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod));        memcpy((char *)xenstore_buf->req + MASK_XENSTORE_IDX(prod),                (char *)cur_req->data + req_off, this_chunk);        prod += this_chunk;        req_off += this_chunk;        total_off += this_chunk;        if (req_off == cur_req->len)         {            req_off = 0;            if (cur_req == &header_req)                cur_req = req;            else                cur_req++;        }    }    DEBUG("Complete main loop of xb_write.\n");    BUG_ON(req_off != 0);    BUG_ON(total_off != len);    BUG_ON(prod > xenstore_buf->req_cons + XENSTORE_RING_SIZE);    /* Remote must see entire message before updating indexes */    wmb();    xenstore_buf->req_prod += len;    /* Send evtchn to notify remote */    notify_remote_via_evtchn(start_info.store_evtchn);}/* Send a mesasge to xenbus, in the same fashion as xb_write, and   block waiting for a reply.  The reply is malloced and should be   freed by the caller. */struct xsd_sockmsg *xenbus_msg_reply(int type,		 xenbus_transaction_t trans,		 struct write_req *io,		 int nr_reqs){    int id;    DEFINE_WAIT(w);    struct xsd_sockmsg *rep;    id = allocate_xenbus_id();    add_waiter(w, req_info[id].waitq);    xb_write(type, id, trans, io, nr_reqs);    schedule();    remove_waiter(w);    wake(current);    rep = req_info[id].reply;    BUG_ON(rep->req_id != id);    release_xenbus_id(id);    return rep;}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?