📄 wait_queue.c
字号:
#include "thread.h"#include "exception.h"#include "thread_lists.h"#include "wait_queue.h"#include "shared.h"#include <cerrno>#define OFFS(n) ((n)%__THREAD_QUEUE_MAX)#define MAGIC_WQUEUE_NO 0x212611namespace cpp_threads { WaitQueue::WaitQueue(int id_p) { _id = id_p; init(attributes::process_shared_e); } WaitQueue::WaitQueue(scope_t scope_p) { _id = SharedMemory::share.createProj( 0 ); init(scope_p); } WaitQueue::WaitQueue() { init(attributes::process_private_e); } WaitQueue::~WaitQueue() { Pthread::debug("WaitQueue::~WaitQueue"); if ( _wq ) { while( !empty() ) wakeUp(); if ( _scope == attributes::process_private_e ) delete _wq; else SharedMemory::share.deAlloc( _wq ); _wq = 0; } } void WaitQueue::init(scope_t scope_p) { if ( _id == -1 ) _scope = attributes::process_private_e; else _scope = scope_p; if ( _scope == attributes::process_shared_e ) { _wq = (storage *)SharedMemory::share.alloc( sizeof(storage),_id ); Pthread::debug("Shared queue @ %p",_wq); if( _wq == (storage *)-1 ) exception::fatal( ENOMEM ); if ( _wq->w_magic != MAGIC_WQUEUE_NO ) { Pthread::debug(" - Empty waitqueue"); _wq->w_top = 0; _wq->w_bottom = 0; _wq->w_magic = MAGIC_WQUEUE_NO; } } else { _wq = new storage; _wq->w_top = 0; _wq->w_bottom = 0; } } bool WaitQueue::empty() { return ( _wq->w_bottom == _wq->w_top ); } void WaitQueue::remove(pid_t id_p) { int pos; _wq->w_sync.acquire(); if ( !empty() ) { for( pos = _wq->w_bottom;pos != OFFS(_wq->w_top);pos = OFFS(pos+1) ) if ( _wq->w_pid[pos].q_pid == id_p ) { for( ;_wq->w_bottom != pos;pos = OFFS(pos-1) ) _wq->w_pid[pos] = _wq->w_pid[OFFS(pos-1)]; _wq->w_bottom = OFFS(_wq->w_bottom+1); break; } } _wq->w_sync.release(); } void WaitQueue::insert(pid_t id_p, int pri_p) { short place; priority_queue stor, hold; bool move = false; Pthread::debug(" - inserting %d, at priority %d",id_p,pri_p); stor.q_pid = id_p; stor.q_pri = pri_p; _wq->w_sync.acquire(); if ( _wq->w_bottom == OFFS(_wq->w_top+1) ) exception::fatal( "wait queue: internal error" ); for( place=_wq->w_bottom;place!=_wq->w_top;place=OFFS(place+1) ) if( (stor.q_pri > _wq->w_pid[place].q_pri) || ((stor.q_pri == _wq->w_pid[place].q_pri) && move) ) { hold = _wq->w_pid[place]; _wq->w_pid[place] = stor; stor = hold; move = true; } _wq->w_pid[_wq->w_top] = stor; _wq->w_top = OFFS(_wq->w_top+1); _wq->w_sync.release(); Pthread::debug(" Queue %p(%d,%d)",_wq,_wq->w_bottom,_wq->w_top); } void WaitQueue::suspendMe(int pri_p) { Pthread *th = ThreadList::__threads.self(); if ( th == 0 ) return; insert( th->id(),pri_p ); th->suspend_with_cancelation(); _wq->w_sync.acquire(); //we need to synchronize with the wakeup below. Maybe __sched.yield()? _wq->w_sync.release(); } void WaitQueue::wakeUp() { _wq->w_sync.acquire(); Pthread::debug("WakeUp on %p",_wq); if ( !empty() ) { ThreadList::__threads.restart(_wq->w_pid[_wq->w_bottom].q_pid); _wq->w_bottom = OFFS(_wq->w_bottom+1); } _wq->w_sync.release(); } }; // Namespace
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -