📄 task.cc
字号:
// $Header: /cvsroot/sourcenav/src/snavigator/demo/c++_demo/glish/Task.cc,v 1.1.1.1 2002/04/18 23:35:25 mdejong Exp $#include <stream.h>#include <stdio.h>#include <stdlib.h>#include <unistd.h>#include <osfcn.h>#include <string.h>#include <errno.h>#include "system.h"#include "Channel.h"#include "Select.h"#include "LocalExec.h"#include "RemoteExec.h"#include "Task.h"#include "Sequencer.h"#include "Reporter.h"Task::Task( TaskAttr* task_attrs, Sequencer* s ) : Agent(s) { attrs = task_attrs; pending_events = 0; channel = 0; local_channel = 0; selector = 0; task_error = 1; no_such_program = 1; executable = 0; name = 0; read_pipe_str = write_pipe_str = 0; pipes_used = 0; active = 0; // not true till we get a .established event protocol = 0; // not set until Client establishes itself id = sequencer->RegisterTask( this ); if ( attrs->task_var_ID ) agent_ID = attrs->task_var_ID; else agent_ID = id; }Task::~Task() { sequencer->DeleteTask( this ); CloseChannel(); delete pending_events; delete attrs; delete name; delete executable; delete read_pipe_str; delete write_pipe_str; }Value* Task::SendEvent( const char* event_name, parameter_list* args, int is_request, int log ) { if ( task_error ) return is_request ? error_value() : 0; Value* event_val = BuildEventValue( args, 1 ); Value* result = 0; if ( is_request && ! channel ) { // We need to synchronize; wait for the task to connect. if ( local_channel ) { // Okay, already have a channel, just connect. (void) sequencer->NewConnection( local_channel ); channel = local_channel; } else channel = sequencer->WaitForTaskConnection( this ); if ( ! channel ) // Connection problem, bail out. return error_value(); } if ( ! channel ) { if ( ! pending_events ) pending_events = new glish_event_list; GlishEvent* e = new GlishEvent( strdup( event_name ), copy_value( event_val ) ); if ( is_request ) e->SetIsRequest(); pending_events->append( e ); } else { int fd = channel->WriteFD(); if ( log ) sequencer->LogEvent( id, name, event_name, event_val, 0 ); if ( is_request ) { const char* fmt = "*%s-reply*"; char* reply_name = new char[strlen( event_name ) + strlen( fmt ) + 1]; sprintf( reply_name, fmt, event_name ); Value* new_val = create_record(); new_val->SetField( "*request*", event_val ); new_val->SetField( "*reply*", reply_name ); Unref( event_val ); event_val = new_val; GlishEvent e( event_name, event_val ); e.SetIsRequest(); send_event( fd, &e ); result = sequencer->AwaitReply( this, event_name, reply_name ); delete reply_name; } else { GlishEvent e( event_name, (const Value*) event_val ); send_event( fd, &e ); } } Unref( event_val ); return result; }void Task::SetChannel( Channel* c, Selector* s ) { channel = c; selector = s; if ( pending_events ) { loop_over_list( *pending_events, i ) { GlishEvent* e = (*pending_events)[i]; sequencer->LogEvent( id, name, e, 0 ); send_event( channel->WriteFD(), e ); Unref( e ); } pending_events = 0; } }void Task::CloseChannel() { if ( channel ) { if ( channel->ChannelState() == CHAN_IN_USE ) // don't delete the channel now; just mark it for // later deletion channel->ChannelState() = CHAN_INVALID; else { selector->DeleteSelectee( channel->ReadFD() ); delete channel; } channel = 0; } }Task* Task::AgentTask() { return this; }void Task::DescribeSelf( ostream& s ) const { s << "task " << Name(); }const char** Task::CreateArgs( const char* prog, int num_args, int& argc ) { argc = num_args; int use_socket = attrs->daemon_channel || attrs->async_flag; // Leave room for the executable's name, the -id flag, the id, // the -interpreter flag, and the interpreter's tag. argc += 5; if ( use_socket ) // Leave room for -host <host> -port <port> argc += 4; else // Local exec, leave room for -pipes <input> <output> argc += 3; if ( attrs->suspend_flag ) ++argc; // room for -suspend flag if ( attrs->ping_flag ) ++argc; // room for -ping flag const char** argv = new string[argc + 1]; // + 1 for final nil int argp = 0; argv[argp++] = prog; argv[argp++] = "-id"; argv[argp++] = id; if ( use_socket ) { pipes_used = 0; argv[argp++] = "-host"; argv[argp++] = sequencer->ConnectionHost(); argv[argp++] = "-port"; argv[argp++] = sequencer->ConnectionPort(); } else { if ( pipe( read_pipe ) < 0 || pipe( write_pipe ) < 0 ) perror( "glish: problem creating pipe" ); pipes_used = 1; mark_close_on_exec( read_pipe[0] ); mark_close_on_exec( write_pipe[1] ); argv[argp++] = "-pipes"; char buf[64]; sprintf( buf, "%d", write_pipe[0] ); argv[argp++] = write_pipe_str = strdup( buf ); sprintf( buf, "%d", read_pipe[1] ); argv[argp++] = read_pipe_str = strdup( buf ); } argv[argp++] = "-interpreter"; argv[argp++] = sequencer->InterpreterTag(); if ( attrs->suspend_flag ) argv[argp++] = "-suspend"; if ( attrs->ping_flag ) argv[argp++] = "-ping"; if ( argp != argc - num_args ) fatal->Report( "inconsistent argv in Task::CreateArgs" ); argv[argc] = 0; return argv; }void Task::Exec( const char** argv ) { char* exec_name = 0; if ( attrs->daemon_channel ) executable = new RemoteExec( attrs->daemon_channel, argv[0], argv ); else { exec_name = which_executable( argv[0] ); if ( ! exec_name ) return; executable = new LocalExec( exec_name, argv ); close( read_pipe[1] ); close( write_pipe[0] ); local_channel = sequencer->AddLocalClient( read_pipe[0], write_pipe[1] ); } no_such_program = 0; task_error = executable->ExecError(); if ( ! task_error ) // This is a little buggy - we'd like the sequencer to know // about this client only after the client makes its // initial rendezvous with the Sequencer. But the sequencer // will only accept such a rendezvous in its event loop // *after* it has determined that there are still some // active clients out there and thus it's worth waiting // for external events to arrive. Thus if the sequencer // only counted the client as active after its rendezvous // there would be a race: if no clients were active but // a new one had just been started, the sequencer wouldn't // wait for its rendezvous. So instead we tell the sequencer // about the client as soon as we've created it. If the // client fails to make its rendezvous then the sequencer's // count of active clients will always be too high; this // can be fixed if&when we add timeouts to the sequencer so // it can detect clients that have never managed to // rendezvous. sequencer->NewClientStarted(); delete exec_name; }void Task::SetActivity( int is_active ) { active = is_active; CreateEvent( "active", new Value( is_active ) ); if ( ! is_active ) CloseChannel(); }ShellTask::ShellTask( const_args_list* args, TaskAttr* task_attrs, Sequencer* s ) : Task( task_attrs, s ) { char* arg_string = paste( args ); name = strdup( "shell_client" ); // Turn off async attribute; we don't want the shell_client // program to run asynchronously. attrs->async_flag = 0; int argc; // Need three arguments: sh, -c, arg-string const char** argv = CreateArgs( name, 3, argc ); int argp = argc - 3; argv[argp++] = "sh"; argv[argp++] = "-c"; argv[argp++] = arg_string; argv[argp] = 0; Exec( argv ); delete argv; delete arg_string; }ClientTask::ClientTask( const_args_list* args, TaskAttr* task_attrs, Sequencer* s ) : Task( task_attrs, s ) { // Get the program name. const Value* arg = (*args)[0]->Deref(); if ( arg->Type() == TYPE_STRING ) name = strdup( arg->StringPtr()[0] ); else name = arg->StringVal(); // See if based on its name we should suspend this client. if ( s->ShouldSuspend( name ) ) task_attrs->suspend_flag = 1; // Count up how many arguments there are. int num_args = 0; loop_over_list( *args, i ) { arg = (*args)[i]->Deref(); if ( arg->Type() == TYPE_STRING ) num_args += arg->Length(); else ++num_args; } int argc; const char** argv = CreateArgs( name, num_args, argc ); int argp = argc - num_args; int first_arg_pos = argp; int saw_name = 0; loop_over_list( *args, j ) { arg = (*args)[j]->Deref(); if ( arg->Type() == TYPE_STRING ) { charptr* words = arg->StringPtr();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -