📄 pipe.c
字号:
/* * Copyright (C) 1998, 1999, Jonathan S. Shapiro. * * This file is part of the EROS Operating System. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2, * or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *//* Pipe -- limited size buffering object for unidirection streams. This version assumes single writer/single reader. Multi reader/writer variants will come later. */#include <eros/target.h>#include <eros/Invoke.h>#include <eros/ReturnerKey.h>#include <eros/ProcessKey.h>#include <eros/NodeKey.h>#include <domain/Runtime.h>#if 0#include <memory.h>#endif#include <domain/domdbg.h>#include <domain/PipeKey.h>#include <domain/ProtoSpace.h>#define dbg_init 0x01u /* requests */#define dbg_req 0x02u /* requests */#define dbg_ack 0x04u /* acknowledgements */#define dbg_sleep 0x08u /* sleep/wakeup */#define dbg_eof 0x10u /* sleep/wakeup *//* Following should be an OR of some of the above */#define dbg_flags ( 0u )#define CND_DEBUG(x) (dbg_##x & dbg_flags)#define DEBUG(x) if (CND_DEBUG(x))#include "constituents.h"#ifdef ALIGNEDconst uint32_t __rt_stack_pages = 1;#elseconst uint32_t __rt_stack_pages = (PIPE_BUF_SZ/EROS_PAGE_SIZE)+1;#endifstruct pipe_state { char buf[PIPE_BUF_SZ]; uint32_t start; uint32_t end; uint32_t sleepSlot; uint32_t sleep; uint32_t wrReqLen; uint32_t rdReqLen; uint32_t wClosed; uint32_t nWakeWriter; uint32_t nWakeReader;};typedef struct pipe_state pipe_state;/* Size of pipe buffer is chosen to guarantee that only one of the following can be true. */#define SL_NONE 0 /* no one is sleeping */#define SL_WRITER 1 /* writer is sleeping */#define SL_READER 2 /* reader is sleeping */#define KR_DOMCRE 3#define KR_CLIENT0 6 /* Client can be either reader or */#define KR_CLIENT1 7 /* writer; it ping pongs. */#if EROS_NODE_SIZE != 32#error "wrong node size!"#endif#define KR_OSTREAM 29#define KR_RETURNER 30#define KR_RESUME 31#define KI_READER 1#define KI_WRITER 2volatile voidteardown(uint32_t caller){ copy_key_reg(KR_RETURNER, caller, KR_RESUME); /* Wake up both clients as needed: */ /* get the protospace */ node_copy(KR_CONSTIT, KC_PROTOSPC, KR_CLIENT0); /* destroy as small space. */ protospace_destroy(KR_RETURNER, KR_CLIENT0, KR_SELF, KR_DOMCRE, KR_BANK, 1); /* NOTREACHED */}pipe_state *InitPipe(pipe_state *ps){ ps->start = 0; ps->end = 0; ps->sleep = SL_NONE; ps->sleepSlot = 0; ps->wClosed = 0; ps->nWakeReader = 0; ps->nWakeWriter = 0; return ps;}uint32_treturn_to_writer(Message *msg, pipe_state *ps){ /* Place reader to sleep and return to the writer. */ uint32_t sleeper = ps->sleepSlot; ps->sleep = SL_READER; ps->sleepSlot = msg->snd_invKey; ps->rdReqLen = msg->rcv_w2; ps->nWakeWriter++; msg->snd_invKey = sleeper; msg->snd_len = 0; msg->snd_w1 = ps->wrReqLen; return RC_OK;}uint32_treturn_to_reader(Message *msg, pipe_state *ps){ /* Place reader to sleep and return to the writer. */ uint32_t sleeper = ps->sleepSlot; uint32_t result = RC_OK; uint32_t len = ps->end - ps->start; ps->sleep = SL_WRITER; ps->sleepSlot = msg->snd_invKey; ps->wrReqLen = msg->rcv_w2; if (len > ps->rdReqLen) len = ps->rdReqLen; if (len > PIPE_BUF_SZ) len = PIPE_BUF_SZ; msg->snd_invKey = sleeper; msg->snd_data = &ps->buf[ps->start]; msg->snd_len = len; msg->snd_w1 = ps->rdReqLen; /* ?? */ ps->start += len; ps->nWakeReader++; if (ps->wClosed && (ps->start == ps->end)) result = RC_EOF; if (ps->start == ps->end) ps->start = ps->end = 0; return result;}voidwake_writer(pipe_state *ps){ Message wakeMsg; DEBUG(sleep) kprintf(KR_OSTREAM, "pipe wakes writer saying %d\n", ps->wrReqLen); /* Tell writer that we accepted no data so they will resend */ wakeMsg.snd_invKey = ps->sleepSlot; /* if sleeping, always this KR */ wakeMsg.snd_key0 = KR_VOID; wakeMsg.snd_key1 = KR_VOID; wakeMsg.snd_key2 = KR_VOID; wakeMsg.snd_key3 = KR_VOID; wakeMsg.snd_data = 0; wakeMsg.snd_len = 0; wakeMsg.snd_code = RC_OK; wakeMsg.snd_w1 = ps->wrReqLen; wakeMsg.snd_w2 = 0; wakeMsg.snd_w3 = 0; ps->nWakeWriter++; SEND(&wakeMsg); ps->sleep = SL_NONE; ps->sleepSlot = KR_VOID;} voidwake_reader(pipe_state *ps){ Message wakeMsg; uint32_t len = ps->end - ps->start; if (len > ps->rdReqLen) len = ps->rdReqLen; if (len > PIPE_BUF_SZ) len = PIPE_BUF_SZ; wakeMsg.snd_invKey = ps->sleepSlot; /* if sleeping, always this KR */ wakeMsg.snd_key0 = KR_VOID; wakeMsg.snd_key1 = KR_VOID; wakeMsg.snd_key2 = KR_VOID; wakeMsg.snd_key3 = KR_VOID; wakeMsg.snd_data = &ps->buf[ps->start]; wakeMsg.snd_len = len; wakeMsg.snd_code = RC_OK; wakeMsg.snd_w1 = 0; wakeMsg.snd_w2 = 0; wakeMsg.snd_w3 = 0; ps->start += len; if (ps->wClosed && (ps->start == ps->end)) { wakeMsg.snd_code = RC_EOF; DEBUG(eof) kprintf(KR_OSTREAM, "Send EOF to reader when waking\n"); } DEBUG(sleep) kprintf(KR_OSTREAM, "pipe wakes reader sending %d\n", len); ps->nWakeReader++; SEND(&wakeMsg); ps->sleep = SL_NONE; ps->sleepSlot = KR_VOID; if (ps->start == ps->end) ps->start = ps->end = 0;} intProcessRequest(Message *msg, pipe_state *ps){ uint32_t result = RC_OK; uint32_t code = msg->rcv_code; msg->snd_w2 = 0; msg->snd_w3 = 0; switch(code) { case OC_Pipe_Read: if (msg->rcv_keyInfo != KI_READER) { result = RC_UnknownRequest; break; } DEBUG(req) kprintf(KR_OSTREAM, "pipe accepts read of length %d\n", msg->rcv_w2); if (ps->start != ps->end) { uint32_t xmit; if (ps->sleep == SL_WRITER && ps->start == 0) wake_writer(ps); xmit = ps->end - ps->start; if (xmit > msg->rcv_w2) xmit = msg->rcv_w2; if (xmit > PIPE_BUF_SZ) xmit = PIPE_BUF_SZ; msg->snd_len = xmit; msg->snd_data = ps->buf + ps->start; ps->start += xmit; if (ps->wClosed && (ps->start == ps->end)) { result = RC_EOF; DEBUG(eof) kprintf(KR_OSTREAM, "Send EOF to reader\n"); } } else if (ps->wClosed) { if (ps->sleep == SL_WRITER && ps->start == 0) wake_writer(ps); result = RC_EOF; DEBUG(eof) kprintf(KR_OSTREAM, "Send EOF to reader -- buffer empty\n"); msg->snd_len = 0; } else { /* buffer is empty -- go to sleep. */#if 0 if (ps->sleep == SL_WRITER && ps->start == 0) wake_writer(ps); DEBUG(sleep) kprintf(KR_OSTREAM, "pipe blocks reader\n"); ps->rdReqLen = msg->rcv_w2; ps->sleep = SL_READER; ps->sleepSlot = msg->snd_invKey; msg->snd_invKey = KR_VOID; msg->snd_len = 0;#else if (ps->sleep == SL_WRITER && ps->start == 0) result = return_to_writer(msg, ps); else { DEBUG(sleep) kprintf(KR_OSTREAM, "pipe blocks reader\n"); ps->rdReqLen = msg->rcv_w2; ps->sleep = SL_READER; ps->sleepSlot = msg->snd_invKey; msg->snd_invKey = KR_VOID; msg->snd_len = 0; }#endif } if (ps->start == ps->end) ps->start = ps->end = 0; break; case OC_Pipe_Write: DEBUG(req) kprintf(KR_OSTREAM, "pipe accepts write of length %d resid %d\n", msg->rcv_len, msg->rcv_w2); if (msg->rcv_keyInfo != KI_WRITER) { result = RC_UnknownRequest; break; } ps->end += msg->rcv_len; if (ps->end == PIPE_BUF_SZ) {#if 1 /* If reader is sleeping, wake them up: */ if (ps->sleep == SL_READER && (ps->wClosed || ps->end == PIPE_BUF_SZ || (ps->end - ps->start) >= ps->rdReqLen)) { result = return_to_reader(msg, ps); } else { DEBUG(sleep) kprintf(KR_OSTREAM, "pipe blocks writer\n"); /* This I/O cause a full pipe buffer. Make note that we have it and put the writer to sleep. */ ps->sleep = SL_WRITER; ps->sleepSlot = msg->snd_invKey; msg->snd_invKey = KR_VOID; msg->snd_len = 0; ps->wrReqLen = msg->rcv_len; }#else /* If reader is sleeping, wake them up: */ if (ps->sleep == SL_READER && (ps->wClosed || ps->end == PIPE_BUF_SZ || (ps->end - ps->start) >= ps->rdReqLen)) { wake_reader(ps); } DEBUG(sleep) kprintf(KR_OSTREAM, "pipe blocks writer\n"); /* This I/O cause a full pipe buffer. Make note that we have it and put the writer to sleep. */ ps->sleep = SL_WRITER; ps->sleepSlot = msg->snd_invKey; msg->snd_invKey = KR_VOID; msg->snd_len = 0; ps->wrReqLen = msg->rcv_len;#endif } else { /* If reader is sleeping, wake them up: */ if (ps->sleep == SL_READER && (ps->wClosed || ps->end == PIPE_BUF_SZ || (ps->end - ps->start) >= ps->rdReqLen)) { wake_reader(ps); } DEBUG(ack) kprintf(KR_OSTREAM, "pipe acks %d to writer\n", msg->rcv_len); msg->snd_w1 = msg->rcv_len; } break; case OC_Pipe_Close: if (msg->rcv_keyInfo == KI_WRITER) { /* DEBUG(req) */ kprintf(KR_OSTREAM, "pipe: writer closes\n"); ps->wClosed = 1; msg->snd_len = 0; /* If reader is sleeping, wake them up -- EVEN if there is no more to read. */ if (ps->sleep == SL_READER) { wake_reader(ps); ps->sleep = SL_NONE; ps->sleepSlot = KR_VOID; } /* MUST break here to prevent fall-through */ break; } if (ps->sleep == SL_WRITER) { wake_writer(ps); ps->sleep = SL_NONE; ps->sleepSlot = KR_VOID; } DEBUG(req) kprintf(KR_OSTREAM, "pipe: reader closes\n"); /* fall through is deliberate -- reader close means destroy! */ case OC_Destroy: if (msg->rcv_keyInfo != KI_READER) { result = RC_UnknownRequest; break; } kprintf(KR_OSTREAM, "nWakeWriter: %u nWakeReader %u\n", ps->nWakeWriter, ps->nWakeReader); teardown(msg->snd_invKey); return 0; /* CAN'T HAPPEN */ default: result = RC_UnknownRequest; break; } msg->snd_code = result; return 1;}pipe_state ps __attribute__ ((aligned (EROS_PAGE_SIZE)));intmain(void){ uint32_t result;#ifndef ALIGNED pipe_state ps; pipe_state *pps = &ps;#endif Message msg; msg.snd_invKey = KR_VOID; msg.snd_key0 = KR_VOID; msg.snd_key1 = KR_VOID; msg.snd_key2 = KR_VOID; msg.snd_key3 = KR_VOID; msg.snd_data = 0; msg.snd_len = 0; msg.snd_code = 0; msg.snd_w1 = 0; msg.snd_w2 = 0; msg.snd_w3 = 0; msg.rcv_key0 = KR_VOID; msg.rcv_key1 = KR_VOID; msg.rcv_key2 = KR_VOID; msg.rcv_key3 = KR_RESUME; msg.rcv_len = 0; msg.rcv_data = 0; msg.rcv_code = 0; msg.rcv_w1 = 0; msg.rcv_w2 = 0; msg.rcv_w3 = 0; node_copy(KR_CONSTIT, KC_RETURNER, KR_RETURNER); node_copy(KR_CONSTIT, KC_OSTREAM, KR_OSTREAM); /* Initialization is not permitted to fail -- this would constitute an unrunnable system! */ pps = InitPipe(&ps); /* Fabricate the reader and writer keys: */ result = process_make_start_key(KR_SELF, KI_WRITER, KR_CLIENT0); if (result != RC_OK) kdprintf(KR_OSTREAM, "Result from pipe cre strt key: 0x%x\n", result); result = process_make_start_key(KR_SELF, KI_READER, KR_CLIENT1); if (result != RC_OK) kdprintf(KR_OSTREAM, "Result from pipe cre strt key: 0x%x\n", result); msg.snd_key0 = KR_CLIENT0; msg.snd_key1 = KR_CLIENT1; msg.snd_invKey = KR_RESUME; msg.snd_len = 0; msg.rcv_data = pps->buf; msg.rcv_len = PIPE_BUF_SZ; msg.rcv_key3 = KR_CLIENT0; DEBUG(init) kprintf(KR_OSTREAM, "init pipe: accept rsm key to %d\n", msg.rcv_key3); RETURN(&msg); msg.snd_key0 = KR_VOID; msg.snd_key1 = KR_VOID; msg.snd_key2 = KR_VOID; msg.snd_key3 = KR_VOID; for(;;) { /* Until somebody decides otherwise, we plan to return to the client who calls us: */#ifdef USE_RETURNER msg.snd_invKey = KR_RETURNER; msg.snd_key3 = msg.rcv_key3;#else msg.snd_invKey = msg.rcv_key3;#endif (void) ProcessRequest(&msg, &ps); /* If, however, we have a sleeping client, we set things up to accept the other side elsewhere: */ msg.rcv_key3 = (pps->sleepSlot == KR_CLIENT0) ? KR_CLIENT1 : KR_CLIENT0; msg.rcv_data = &pps->buf[pps->end]; msg.rcv_len = PIPE_BUF_SZ - pps->end; RETURN(&msg); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -