xenbus.c
来自「xen虚拟机源代码安装包」· C语言 代码 · 共 762 行 · 第 1/2 页
C
762 行
/* **************************************************************************** * (C) 2006 - Cambridge University **************************************************************************** * * File: xenbus.c * Author: Steven Smith (sos22@cam.ac.uk) * Changes: Grzegorz Milos (gm281@cam.ac.uk) * Changes: John D. Ramsdell * * Date: Jun 2006, chages Aug 2005 * * Environment: Xen Minimal OS * Description: Minimal implementation of xenbus * **************************************************************************** **/#include <os.h>#include <mm.h>#include <traps.h>#include <lib.h>#include <xenbus.h>#include <events.h>#include <errno.h>#include <sched.h>#include <wait.h>#include <xen/io/xs_wire.h>#include <spinlock.h>#include <xmalloc.h>#define min(x,y) ({ \ typeof(x) tmpx = (x); \ typeof(y) tmpy = (y); \ tmpx < tmpy ? tmpx : tmpy; \ })#ifdef XENBUS_DEBUG#define DEBUG(_f, _a...) \ printk("MINI_OS(file=xenbus.c, line=%d) " _f , __LINE__, ## _a)#else#define DEBUG(_f, _a...) ((void)0)#endifstatic struct xenstore_domain_interface *xenstore_buf;static DECLARE_WAIT_QUEUE_HEAD(xb_waitq);DECLARE_WAIT_QUEUE_HEAD(xenbus_watch_queue);xenbus_event_queue xenbus_events;static struct watch { char *token; xenbus_event_queue *events; struct watch *next;} *watches;struct xenbus_req_info { int in_use:1; struct wait_queue_head waitq; void *reply;};#define NR_REQS 32static struct xenbus_req_info req_info[NR_REQS];static void memcpy_from_ring(const void *Ring, void *Dest, int off, int len){ int c1, c2; const char *ring = Ring; char *dest = Dest; c1 = min(len, XENSTORE_RING_SIZE - off); c2 = len - c1; memcpy(dest, ring + off, c1); memcpy(dest + c1, ring, c2);}char **xenbus_wait_for_watch_return(xenbus_event_queue *queue){ struct xenbus_event *event; DEFINE_WAIT(w); if (!queue) queue = &xenbus_events; while (!(event = *queue)) { add_waiter(w, xenbus_watch_queue); schedule(); } remove_waiter(w); *queue = event->next; return &event->path;}void xenbus_wait_for_watch(xenbus_event_queue *queue){ char **ret; if (!queue) queue = &xenbus_events; ret = xenbus_wait_for_watch_return(queue); free(ret);}char* xenbus_wait_for_value(const char* path, const char* value, xenbus_event_queue *queue){ if (!queue) queue = &xenbus_events; for(;;) { char *res, *msg; int r; msg = xenbus_read(XBT_NIL, path, &res); if(msg) return msg; r = strcmp(value,res); free(res); if(r==0) break; else xenbus_wait_for_watch(queue); } return NULL;}static void xenbus_thread_func(void *ign){ struct xsd_sockmsg msg; unsigned prod = xenstore_buf->rsp_prod; for (;;) { wait_event(xb_waitq, prod != xenstore_buf->rsp_prod); while (1) { prod = xenstore_buf->rsp_prod; DEBUG("Rsp_cons %d, rsp_prod %d.\n", xenstore_buf->rsp_cons, xenstore_buf->rsp_prod); if (xenstore_buf->rsp_prod - xenstore_buf->rsp_cons < sizeof(msg)) break; rmb(); memcpy_from_ring(xenstore_buf->rsp, &msg, MASK_XENSTORE_IDX(xenstore_buf->rsp_cons), sizeof(msg)); DEBUG("Msg len %d, %d avail, id %d.\n", msg.len + sizeof(msg), xenstore_buf->rsp_prod - xenstore_buf->rsp_cons, msg.req_id); if (xenstore_buf->rsp_prod - xenstore_buf->rsp_cons < sizeof(msg) + msg.len) break; DEBUG("Message is good.\n"); if(msg.type == XS_WATCH_EVENT) { struct xenbus_event *event = malloc(sizeof(*event) + msg.len); xenbus_event_queue *events = NULL; char *data = (char*)event + sizeof(*event); struct watch *watch; memcpy_from_ring(xenstore_buf->rsp, data, MASK_XENSTORE_IDX(xenstore_buf->rsp_cons + sizeof(msg)), msg.len); event->path = data; event->token = event->path + strlen(event->path) + 1; xenstore_buf->rsp_cons += msg.len + sizeof(msg); for (watch = watches; watch; watch = watch->next) if (!strcmp(watch->token, event->token)) { events = watch->events; break; } if (events) { event->next = *events; *events = event; wake_up(&xenbus_watch_queue); } else { printk("unexpected watch token %s\n", event->token); free(event); } } else { req_info[msg.req_id].reply = malloc(sizeof(msg) + msg.len); memcpy_from_ring(xenstore_buf->rsp, req_info[msg.req_id].reply, MASK_XENSTORE_IDX(xenstore_buf->rsp_cons), msg.len + sizeof(msg)); xenstore_buf->rsp_cons += msg.len + sizeof(msg); wake_up(&req_info[msg.req_id].waitq); } } }}static void xenbus_evtchn_handler(evtchn_port_t port, struct pt_regs *regs, void *ign){ wake_up(&xb_waitq);}static int nr_live_reqs;static spinlock_t req_lock = SPIN_LOCK_UNLOCKED;static DECLARE_WAIT_QUEUE_HEAD(req_wq);/* Release a xenbus identifier */static void release_xenbus_id(int id){ BUG_ON(!req_info[id].in_use); spin_lock(&req_lock); req_info[id].in_use = 0; nr_live_reqs--; req_info[id].in_use = 0; if (nr_live_reqs == NR_REQS - 1) wake_up(&req_wq); spin_unlock(&req_lock);}/* Allocate an identifier for a xenbus request. Blocks if none are available. */static int allocate_xenbus_id(void){ static int probe; int o_probe; while (1) { spin_lock(&req_lock); if (nr_live_reqs < NR_REQS) break; spin_unlock(&req_lock); wait_event(req_wq, (nr_live_reqs < NR_REQS)); } o_probe = probe; for (;;) { if (!req_info[o_probe].in_use) break; o_probe = (o_probe + 1) % NR_REQS; BUG_ON(o_probe == probe); } nr_live_reqs++; req_info[o_probe].in_use = 1; probe = (o_probe + 1) % NR_REQS; spin_unlock(&req_lock); init_waitqueue_head(&req_info[o_probe].waitq); return o_probe;}/* Initialise xenbus. */void init_xenbus(void){ int err; printk("Initialising xenbus\n"); DEBUG("init_xenbus called.\n"); xenstore_buf = mfn_to_virt(start_info.store_mfn); create_thread("xenstore", xenbus_thread_func, NULL); DEBUG("buf at %p.\n", xenstore_buf); err = bind_evtchn(start_info.store_evtchn, xenbus_evtchn_handler, NULL); unmask_evtchn(start_info.store_evtchn); DEBUG("xenbus on irq %d\n", err);}void fini_xenbus(void){}/* Send data to xenbus. This can block. All of the requests are seen by xenbus as if sent atomically. The header is added automatically, using type %type, req_id %req_id, and trans_id %trans_id. */static void xb_write(int type, int req_id, xenbus_transaction_t trans_id, const struct write_req *req, int nr_reqs){ XENSTORE_RING_IDX prod; int r; int len = 0; const struct write_req *cur_req; int req_off; int total_off; int this_chunk; struct xsd_sockmsg m = {.type = type, .req_id = req_id, .tx_id = trans_id }; struct write_req header_req = { &m, sizeof(m) }; for (r = 0; r < nr_reqs; r++) len += req[r].len; m.len = len; len += sizeof(m); cur_req = &header_req; BUG_ON(len > XENSTORE_RING_SIZE); /* Wait for the ring to drain to the point where we can send the message. */ prod = xenstore_buf->req_prod; if (prod + len - xenstore_buf->req_cons > XENSTORE_RING_SIZE) { /* Wait for there to be space on the ring */ DEBUG("prod %d, len %d, cons %d, size %d; waiting.\n", prod, len, xenstore_buf->req_cons, XENSTORE_RING_SIZE); wait_event(xb_waitq, xenstore_buf->req_prod + len - xenstore_buf->req_cons <= XENSTORE_RING_SIZE); DEBUG("Back from wait.\n"); prod = xenstore_buf->req_prod; } /* We're now guaranteed to be able to send the message without overflowing the ring. Do so. */ total_off = 0; req_off = 0; while (total_off < len) { this_chunk = min(cur_req->len - req_off, XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod)); memcpy((char *)xenstore_buf->req + MASK_XENSTORE_IDX(prod), (char *)cur_req->data + req_off, this_chunk); prod += this_chunk; req_off += this_chunk; total_off += this_chunk; if (req_off == cur_req->len) { req_off = 0; if (cur_req == &header_req) cur_req = req; else cur_req++; } } DEBUG("Complete main loop of xb_write.\n"); BUG_ON(req_off != 0); BUG_ON(total_off != len); BUG_ON(prod > xenstore_buf->req_cons + XENSTORE_RING_SIZE); /* Remote must see entire message before updating indexes */ wmb(); xenstore_buf->req_prod += len; /* Send evtchn to notify remote */ notify_remote_via_evtchn(start_info.store_evtchn);}/* Send a mesasge to xenbus, in the same fashion as xb_write, and block waiting for a reply. The reply is malloced and should be freed by the caller. */struct xsd_sockmsg *xenbus_msg_reply(int type, xenbus_transaction_t trans, struct write_req *io, int nr_reqs){ int id; DEFINE_WAIT(w); struct xsd_sockmsg *rep; id = allocate_xenbus_id(); add_waiter(w, req_info[id].waitq); xb_write(type, id, trans, io, nr_reqs); schedule(); remove_waiter(w); wake(current); rep = req_info[id].reply; BUG_ON(rep->req_id != id); release_xenbus_id(id); return rep;}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?