📄 pipeline_tasks.cpp
字号:
// Pipeline_Tasks.cpp,v 1.2 2005/01/21 02:43:25 ossama Exp#include "JAWS/JAWS.h"#include "JAWS/IO.h"#include "JAWS/Pipeline_Tasks.h"#include "JAWS/Pipeline_Handler_T.h"#include "JAWS/Data_Block.h"#include "JAWS/IO_Handler.h"#include "JAWS/Policy.h"ACE_RCSID(JAWS, Pipeline_Tasks, "Pipeline_Tasks.cpp,v 1.2 2005/01/21 02:43:25 ossama Exp")JAWS_Pipeline_Handler::JAWS_Pipeline_Handler (void) : policy_ (0){}JAWS_Pipeline_Handler::~JAWS_Pipeline_Handler (void){}intJAWS_Pipeline_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *tv){ JAWS_Data_Block *db = dynamic_cast<JAWS_Data_Block *> (mb); JAWS_IO_Handler *ioh = db->io_handler (); // guarantee the handler remains for the duration of this call ioh->acquire (); int status = this->handle_put (db, tv); if (status != -1 && status != 2) { JAWS_Pipeline_Handler *task = ioh->task (); JAWS_Pipeline_Handler *next = dynamic_cast<JAWS_Pipeline_Handler *> (task->next ()); ioh->task (next); } ioh->release (); return status;}JAWS_Dispatch_Policy *JAWS_Pipeline_Handler::policy (void){ return this->policy_;}voidJAWS_Pipeline_Handler::policy (JAWS_Dispatch_Policy *policy){ this->policy_ = policy;}intJAWS_Pipeline_Accept_Task::put (ACE_Message_Block *mb, ACE_Time_Value *tv){ JAWS_Data_Block *db = dynamic_cast<JAWS_Data_Block *> (mb); JAWS_Pipeline_Handler *task = db->task (); JAWS_Pipeline_Handler *next = dynamic_cast<JAWS_Pipeline_Handler *> (task->next ()); JAWS_IO_Handler *ioh = this->new_handler (db); if (ioh == 0) { ACE_ERROR ((LM_ERROR, "%p\n", "JAWS_Pipeline_Accept_Task::put")); return -1; } ioh->acquire (); ioh->task (next); db->io_handler (ioh); int result = this->handle_put (ioh->message_block (), tv); ioh->release (); return result;}intJAWS_Pipeline_Accept_Task::handle_put (JAWS_Data_Block *data, ACE_Time_Value *){ int result = -1; // JAWS_Data_Block should contain an INET_Addr and an IO JAWS_IO_Handler *handler = data->io_handler (); JAWS_Dispatch_Policy *policy = this->policy (); if (policy == 0) policy = data->policy (); // data->policy ()->update (handler); JAWS_IO *io = policy->io (); io->accept (handler); // When accept returns, the resulting handle should be stored into // the JAWS_DATA_BLOCK somewhere. // Check the handler for status of the io call switch (handler->status ()) { case JAWS_IO_Handler::ACCEPT_OK: { ACE_DEBUG ((LM_DEBUG, "(%t) ACCEPT_OK\n")); result = 0; JAWS_TRACE ("JAWS_Pipeline_Accept_Task::handle_put ACCEPT_OK"); // Move on to next stage in pipeline break; } case JAWS_IO_Handler::ACCEPT_ERROR: { ACE_DEBUG ((LM_DEBUG, "(%t) ACCEPT_ERROR\n")); result = -1; JAWS_TRACE ("JAWS_Pipeline_Accept_Task::handle_put ACCEPT_ERROR"); // Should recycle the thread break; } default: { result = 1; JAWS_TRACE ("JAWS_Pipeline_Accept_Task::handle_put ACCEPT_IDLE"); // Should mean that the IO is asynchronous, and the word isn't out // yet. break; } } // In asynchronous and synchronous models, we can -- // have the io_handler set the new task in the data_block // In asynchronous model, we can -- // insert a wait task into the task queue ACE_DEBUG ((LM_DEBUG, "(%t) Returning %d\n", result)); return result;}JAWS_IO_Handler *JAWS_Pipeline_Accept_Task::new_handler (JAWS_Data_Block *data){ // Create a new handler and message block JAWS_Data_Block *ndb = new JAWS_Data_Block (*data); if (ndb == 0) { JAWS_TRACE ("JAWS_Pipeline_Accept_Task::new_handler, failed DB"); return 0; } JAWS_Dispatch_Policy *policy = (this->policy () == 0) ? data->policy () : this->policy (); JAWS_IO_Handler_Factory *ioh_factory = policy->ioh_factory (); JAWS_IO_Handler *nioh = ioh_factory->create_io_handler (); if (nioh == 0) { delete ndb; return 0; } ndb->io_handler (nioh); nioh->task (data->task ()); nioh->message_block (ndb); return nioh;}intJAWS_Pipeline_Done_Task::put (ACE_Message_Block *mb, ACE_Time_Value *){ JAWS_TRACE ("JAWS_Pipeline_Done_Task::put"); JAWS_Data_Block *data = dynamic_cast<JAWS_Data_Block *> (mb); JAWS_IO_Handler *handler = data->io_handler (); JAWS_Dispatch_Policy *policy = this->policy (); if (policy == 0) policy = data->policy (); // JAWS_IO *io = policy->io (); data->task (0); data->io_handler (0); if (handler) handler->done (); // hack, let Concurrency know we are done. return -2;}intJAWS_Pipeline_Done_Task::handle_put (JAWS_Data_Block *, ACE_Time_Value *){ return 0;}#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)template class JAWS_Pipeline_Abstract_Handler<JAWS_Data_Block>;template class ACE_Singleton<JAWS_Pipeline_Accept_Task, ACE_SYNCH_MUTEX>;template class ACE_Singleton<JAWS_Pipeline_Done_Task, ACE_SYNCH_NULL_MUTEX>;#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)#pragma instantiate JAWS_Pipeline_Abstract_Handler<JAWS_Data_Block>#pragma instantiate ACE_Singleton<JAWS_Pipeline_Accept_Task, ACE_SYNCH_MUTEX>#pragma instantiate ACE_Singleton<JAWS_Pipeline_Done_Task, ACE_SYNCH_NULL_MUTEX>#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -