⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 invoker.cc

📁 编译工具
💻 CC
字号:
// -*- Mode: C++; -*-//                            Package   : omniORB// invoker.h                  Created on: 11 Apr 2001//                            Author    : Sai Lai Lo (sll)////    Copyright (C) 2001 AT&T Laboratories Cambridge////    This file is part of the omniORB library////    The omniORB library is free software; you can redistribute it and/or//    modify it under the terms of the GNU Library General Public//    License as published by the Free Software Foundation; either//    version 2 of the License, or (at your option) any later version.////    This library is distributed in the hope that it will be useful,//    but WITHOUT ANY WARRANTY; without even the implied warranty of//    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU//    Library General Public License for more details.////    You should have received a copy of the GNU Library General Public//    License along with this library; if not, write to the Free//    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA//    02111-1307, USA////// Description://	*** PROPRIETORY INTERFACE ***///*  $Log: invoker.cc,v $  Revision 1.1.2.8  2005/07/25 16:29:20  dgrisby  Async worker could still add itself to the idle list more than once.  Revision 1.1.2.7  2005/07/22 11:22:06  dgrisby  Async worker could add itself to the idle list more than once.  Thanks Hartmut Raschick  Revision 1.1.2.6  2002/09/11 20:40:15  dgrisby  Call thread interceptors from etherealiser queue.  Revision 1.1.2.5  2002/09/10 23:17:11  dgrisby  Thread interceptors.  Revision 1.1.2.4  2002/02/25 11:18:02  dpg1  Avoid thread churn in invoker.  Revision 1.1.2.3  2002/02/13 17:40:52  dpg1  Tweak to avoid destruction race in invoker.  Revision 1.1.2.2  2002/02/13 16:02:40  dpg1  Stability fixes thanks to Bastiaan Bakker, plus threading  optimisations inspired by investigating Bastiaan's bug reports.  Revision 1.1.2.1  2002/01/09 11:35:23  dpg1  Remove separate omniAsyncInvoker library to save library overhead.  Revision 1.1.2.7  2001/11/13 14:14:03  dpg1  AsyncInvoker properly waits for threads to finish.  Revision 1.1.2.6  2001/08/16 09:53:18  sll  Added stdlib.h to give abort a prototype.  Revision 1.1.2.5  2001/08/01 10:03:40  dpg1  AyncInvoker no longer maintains its own dedicated thread queue.  Derived classes must provide the implementation.  Revision 1.1.2.4  2001/07/31 15:56:48  sll  Make sure pd_nthreads is kept in sync with the actual no. of threads  serving the Anytime tasks.  Revision 1.1.2.3  2001/06/13 20:08:13  sll  Minor update to make the ORB compiles with MSVC++.  Revision 1.1.2.2  2001/05/01 16:03:16  sll  Silly typo in a switch statement causes random failure due to corrupted  link list.  Revision 1.1.2.1  2001/04/19 09:47:54  sll  New library omniAsyncInvoker.*/#include <omniORB4/CORBA.h>#include <omniORB4/omniInterceptors.h>#include <omniORB4/omniAsyncInvoker.h>#include <interceptors.h>#include <stdlib.h>OMNI_USING_NAMESPACE(omni)unsigned int omniAsyncInvoker::idle_timeout = 10;class omniAsyncWorker;class omniAsyncWorkerInfo  : public omniInterceptors::createThread_T::info_T {public:  omniAsyncWorkerInfo(omniAsyncWorker* worker) :    pd_worker(worker), pd_elmt(omniInterceptorP::createThread) {}  void run();private:  omniAsyncWorker*        pd_worker;  omniInterceptorP::elmT* pd_elmt;};class omniAsyncWorker : public omni_thread {public:  omniAsyncWorker(omniAsyncInvoker* pool, omniTask* task) :    pd_pool(pool), pd_task(task), pd_next(0), pd_id(id()), pd_in_idle_queue(0)  {    pd_cond = new omni_tracedcondition(pool->pd_lock);    start();  }  ~omniAsyncWorker() {    if (omniORB::trace(10)) {      omniORB::logger l;      l << "AsyncInvoker: thread id = " << pd_id	<< " has exited. Total threads = " << pd_pool->pd_totalthreads	<< "\n";    }    delete pd_cond;    pd_pool->pd_lock->lock();    if (--pd_pool->pd_totalthreads == 0)      pd_pool->pd_cond->signal();    pd_pool->pd_lock->unlock();  }  void run(void*) {    omniAsyncWorkerInfo info(this);    info.run();  }  void real_run() {    if (omniORB::trace(10)) {      omni_tracedmutex_lock sync(*pd_pool->pd_lock);      omniORB::logger l;      l << "AsyncInvoker: thread id = " << pd_id	<< " has started. Total threads = " << pd_pool->pd_totalthreads	<< "\n";    }    pd_pool->pd_lock->lock();    while (pd_task || pd_pool->pd_keep_working) {      if (!pd_task) {	if ( !omniTaskLink::is_empty(pd_pool->pd_anytime_tq) ) {	  pd_task = (omniTask*)pd_pool->pd_anytime_tq.next;	  pd_task->deq();	}	else {	  // Add to the idle queue	  OMNIORB_ASSERT(!pd_in_idle_queue);	  pd_next = pd_pool->pd_idle_threads;	  pd_pool->pd_idle_threads = this;	  pd_in_idle_queue = 1;	  unsigned long abs_sec,abs_nanosec;	  omni_thread::get_time(&abs_sec,&abs_nanosec,				omniAsyncInvoker::idle_timeout);	  int signalled = pd_cond->timedwait(abs_sec,abs_nanosec);	  if (pd_in_idle_queue) {	    // Remove from the idle queue	    omniAsyncWorker** pp = &pd_pool->pd_idle_threads;	    while (*pp && *pp != this) {	      pp = &((*pp)->pd_next);	    }	    if (*pp) {	      *pp = pd_next;	    }	    else {	      if (omniORB::trace(1)) {		omniORB::logger l;		l << "AsyncInvoker: Warning: thread " << pd_id		  << " thought it was in the idle queue but it was not.\n";	      }	    }	    pd_next = 0;	    pd_in_idle_queue = 0;	  }	  if (!signalled && !pd_task) {	    // We have timed out and have not been assigned a task.	    // Break out from the while loop and exit.	    break;	  }	  // If signalled, we have been dequeued by the	  // omniAsyncInvoker, and will have a task to process next	  // time around the while loop.	  continue;	}      }      unsigned int immediate = (pd_task->category() ==				omniTask::ImmediateDispatch);      pd_pool->pd_lock->unlock();      try {	pd_task->execute();      }      catch(...) {	omniORB::logs(1, "AsyncInvoker: Warning: unexpected exception "		      "caught while executing a task.");      }      pd_task = 0;      pd_pool->pd_lock->lock();      if (immediate) {	pd_pool->pd_nthreads++;      }      if (pd_pool->pd_nthreads > pd_pool->pd_maxthreads) {	// No need to keep this thread	break;      }    }    pd_pool->pd_nthreads--;    pd_pool->pd_lock->unlock();  }  friend class omniAsyncInvoker;private:  omniAsyncInvoker*     pd_pool;  omniTask*             pd_task;  omni_tracedcondition* pd_cond;  omniAsyncWorker*      pd_next;  int                   pd_id;  CORBA::Boolean        pd_in_idle_queue;  omniAsyncWorker();  omniAsyncWorker(const omniAsyncWorker&);  omniAsyncWorker& operator=(const omniAsyncWorker&);};voidomniAsyncWorkerInfo::run(){  if (pd_elmt) {    omniInterceptors::createThread_T::interceptFunc f =      (omniInterceptors::createThread_T::interceptFunc)pd_elmt->func;    pd_elmt = pd_elmt->next;    f(*this);  }  else    pd_worker->real_run();}///////////////////////////////////////////////////////////////////////////omniAsyncInvoker::omniAsyncInvoker(unsigned int max) {  pd_keep_working = 1;  pd_lock  = new omni_tracedmutex();  pd_cond  = new omni_tracedcondition(pd_lock);  pd_idle_threads = 0;  pd_nthreads = 0;  pd_maxthreads = max;  pd_totalthreads = 0;}///////////////////////////////////////////////////////////////////////////omniAsyncInvoker::~omniAsyncInvoker() {  pd_lock->lock();  pd_keep_working = 0;  while (pd_idle_threads) {    omniAsyncWorker* t = pd_idle_threads;    pd_idle_threads = t->pd_next;    t->pd_next = 0;    t->pd_in_idle_queue = 0;    t->pd_cond->signal();  }  // Wait for threads to exit  while (pd_totalthreads) {    pd_cond->wait();  }  pd_lock->unlock();  delete pd_cond;  delete pd_lock;  omniORB::logs(10, "AsyncInvoker: deleted.");}///////////////////////////////////////////////////////////////////////////intomniAsyncInvoker::insert(omniTask* t) {  switch (t->category()) {  case omniTask::AnyTime:    {      omni_tracedmutex_lock sync(*pd_lock);      if (pd_idle_threads) {	omniAsyncWorker* w = pd_idle_threads;	pd_idle_threads = w->pd_next;	w->pd_next = 0;	OMNIORB_ASSERT(w->pd_task == 0);	w->pd_task = t;	w->pd_in_idle_queue = 0;	w->pd_cond->signal();      }      else {	if (pd_nthreads < pd_maxthreads) {	  try {	    pd_nthreads++;	    pd_totalthreads++;	    omniAsyncWorker* w = new omniAsyncWorker(this,t);	    OMNIORB_ASSERT(w);	  }	  catch (...) {	    // Cannot start a new thread.	    pd_nthreads--;	    pd_totalthreads--;	    omniORB::logs(5, "Exception trying to start new thread.");	    t->enq(pd_anytime_tq);	  }	}	else {	  t->enq(pd_anytime_tq);	}      }      break;    }  case omniTask::ImmediateDispatch:    {      omni_tracedmutex_lock sync(*pd_lock);      if (pd_idle_threads) {	omniAsyncWorker* w = pd_idle_threads;	pd_idle_threads = w->pd_next;	w->pd_next = 0;	OMNIORB_ASSERT(w->pd_task == 0);	w->pd_task = t;	w->pd_in_idle_queue = 0;	w->pd_cond->signal();	pd_nthreads--;      }      else {	try {	  pd_totalthreads++;	  omniAsyncWorker* w = new omniAsyncWorker(this,t);	}	catch(...) {	  // Cannot start a new thread.	  pd_totalthreads--;	  omniORB::logs(5, "Exception trying to start new thread.");	  return 0;	}      }      break;    }  case omniTask::DedicatedThread:    {      return insert_dedicated(t);    }  }  return 1;}///////////////////////////////////////////////////////////////////////////intomniAsyncInvoker::cancel(omniTask* t) {  if (t->category() == omniTask::AnyTime) {    omni_tracedmutex_lock sync(*pd_lock);    omniTaskLink* l;    for (l = pd_anytime_tq.next; l != &pd_anytime_tq; l =l->next) {      if ((omniTask*)l == t) {	l->deq();	return 1;      }    }  }  else if (t->category() == omniTask::DedicatedThread) {    return cancel_dedicated(t);  }  return 0;}/////////////////////////////////////////////////////////////////////////////// Default do-nothing implementations of dedicated thread functionsintomniAsyncInvoker::work_pending(){  return 0;}voidomniAsyncInvoker::perform(unsigned long secs, unsigned long nanosecs){  omniORB::logs(1, "omniAsyncInvoker::perform() not implemented. aborting...\n");  abort();}intomniAsyncInvoker::insert_dedicated(omniTask*){  return 0;}intomniAsyncInvoker::cancel_dedicated(omniTask*){  return 0;}///////////////////////////////////////////////////////////////////////////voidomniTaskLink::enq(omniTaskLink& head) {  next = head.prev->next;  head.prev->next = this;  prev = head.prev;  head.prev = this;}///////////////////////////////////////////////////////////////////////////voidomniTaskLink::deq() {  prev->next = next;  next->prev = prev;}///////////////////////////////////////////////////////////////////////////unsigned intomniTaskLink::is_empty(omniTaskLink& head) {  return (head.next == &head);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -