📄 sequencer.cc
字号:
}Frame* Sequencer::CurrentFrame() { int top_frame = local_frames.length() - 1; if ( top_frame < 0 ) return 0; return local_frames[top_frame]; }Value* Sequencer::FrameElement( scope_type scope, int frame_offset ) { if ( scope == LOCAL_SCOPE ) { int top_frame = local_frames.length() - 1; if ( top_frame < 0 ) fatal->Report( "local frame requested but none exist in Sequencer::FrameElement" ); return local_frames[top_frame]->FrameElement( frame_offset ); } else { if ( frame_offset < 0 || frame_offset >= global_frame.length() ) fatal->Report( "bad global frame offset in Sequencer::FrameElement" ); return global_frame[frame_offset]; } }void Sequencer::SetFrameElement( scope_type scope, int frame_offset, Value* value ) { Value* prev_value; if ( scope == LOCAL_SCOPE ) { int top_frame = local_frames.length() - 1; if ( top_frame < 0 ) fatal->Report( "local frame requested but none exist in Sequencer::SetFrameElement" ); Value*& frame_value = local_frames[top_frame]->FrameElement( frame_offset ); prev_value = frame_value; frame_value = value; } else { if ( frame_offset < 0 || frame_offset >= global_frame.length() ) fatal->Report( "bad global frame offset in Sequencer::SetFrameElement" ); prev_value = global_frame.replace( frame_offset, value ); } Unref( prev_value ); }char* Sequencer::RegisterTask( Task* new_task ) { char buf[128]; sprintf( buf, "task%d", ++last_task_id ); char* new_ID = strdup( buf ); ids_to_tasks.Insert( new_ID, new_task ); return new_ID; }void Sequencer::DeleteTask( Task* task ) { (void) ids_to_tasks.Remove( task->TaskID() ); }void Sequencer::AddStmt( Stmt* addl_stmt ) { stmts = merge_stmts( stmts, addl_stmt ); }int Sequencer::RegisterStmt( Stmt* stmt ) { registered_stmts.append( stmt ); return registered_stmts.length(); }Stmt* Sequencer::LookupStmt( int index ) { if ( index <= 0 || index > registered_stmts.length() ) return 0; return registered_stmts[index - 1]; }Channel* Sequencer::GetHostDaemon( const char* host ) { if ( ! host || ! strcmp( host, ConnectionHost() ) || ! strcmp( host, "localhost" ) ) // request is for local host return 0; RemoteDaemon* d = daemons[host]; if ( ! d ) d = CreateDaemon( host ); return d->DaemonChannel(); }void Sequencer::Exec() { if ( interactive ) return; if ( error->Count() > 0 ) { message->Report( "execution aborted" ); return; } stmt_flow_type flow; Unref( stmts->Exec( 0, flow ) ); EventLoop(); }void Sequencer::Await( Stmt* arg_await_stmt, int only_flag, Stmt* arg_except_stmt ) { Stmt* hold_await_stmt = await_stmt; int hold_only_flag = await_only_flag; Stmt* hold_except_stmt = except_stmt; await_stmt = arg_await_stmt; await_only_flag = only_flag; except_stmt = arg_except_stmt; EventLoop(); await_stmt = hold_await_stmt; await_only_flag = only_flag; except_stmt = hold_except_stmt; }Value* Sequencer::AwaitReply( Task* task, const char* event_name, const char* reply_name ) { GlishEvent* reply = recv_event( task->GetChannel()->ReadFD() ); Value* result = 0; if ( ! reply ) { warn->Report( task, " terminated without replying to ", event_name, " request" ); result = error_value(); } else if ( ! strcmp( reply->name, reply_name ) ) { result = reply->value; Ref( result ); } else { warn->Report( "expected reply from ", task, " to ", event_name, " request, instead got \"", reply->name, "\"" ); Ref( reply ); // So NewEvent doesn't throw it away. NewEvent( task, reply ); result = reply->value; Ref( result ); // So following Unref doesn't discard value. } Unref( reply ); return result; }Channel* Sequencer::AddLocalClient( int read_fd, int write_fd ) { Channel* c = new Channel( read_fd, write_fd ); Selectee* s = new LocalClientSelectee( this, c ); selector->AddSelectee( s ); return c; }Channel* Sequencer::WaitForTaskConnection( Task* task ) { Task* t; Channel* chan; // Need to loop because perhaps we'll receive connections // from tasks other than the one we're waiting for. do { int new_conn = accept_connection( connection_socket->FD() ); mark_close_on_exec( new_conn ); chan = new Channel( new_conn, new_conn ); t = NewConnection( chan ); } while ( t && t != task ); if ( t ) return chan; else return 0; }Task* Sequencer::NewConnection( Channel* connection_channel ) { GlishEvent* establish_event = recv_event( connection_channel->ReadFD() ); // It's possible there's already a Selectee for this channel, // due to using a LocalClientSelectee. If so, remove it, so // it doesn't trigger additional activity. RemoveSelectee( connection_channel ); if ( ! establish_event ) { error->Report( "new connection immediately broken" ); return 0; } Value* v = establish_event->value; char* task_id; int protocol; if ( v->Type() == TYPE_STRING ) { task_id = v->StringVal(); protocol = 1; } else if ( ! v->FieldVal( "name", task_id ) || ! v->FieldVal( "protocol", protocol ) ) { error->Report( "bad connection establishment" ); return 0; } // ### Should check for protocol compatibility here. Task* task = ids_to_tasks[task_id]; if ( ! task ) { error->Report( "connection received from non-existent task ", task_id ); Unref( establish_event ); return 0; } else { task->SetProtocol( protocol ); AssociateTaskWithChannel( task, connection_channel ); NewEvent( task, establish_event ); } delete task_id; return task; }void Sequencer::AssociateTaskWithChannel( Task* task, Channel* chan ) { task->SetChannel( chan, selector ); task->SetActive(); selector->AddSelectee( new ClientSelectee( this, task ) ); // empty out buffer so subsequent select()'s will work if ( chan->DataInBuffer() ) EmptyTaskChannel( task ); }void Sequencer::RemoveSelectee( Channel* chan ) { if ( selector->FindSelectee( chan->ReadFD() ) ) selector->DeleteSelectee( chan->ReadFD() ); }int Sequencer::NewEvent( Task* task, GlishEvent* event ) { if ( ! event ) { // task termination task->CloseChannel(); if ( ! task->Active() ) return 0; // Abnormal termination - no "done" message first. event = new GlishEvent( (const char*) "fail", new Value( task->AgentID() ) ); } const char* event_name = event->name; Value* value = event->value; if ( verbose > 0 ) message->Report( name, ": received event ", task->Name(), ".", event_name, " ", value ); if ( monitor_task && task != monitor_task ) LogEvent( task->TaskID(), task->Name(), event_name, value, 1 ); // If true, generate message if no interest in event. int complain_if_no_interest = 0; if ( ! strcmp( event_name, "established" ) ) { // We already did the SetActive() when the channel // was established. } else if ( ! strcmp( event_name, "done" ) ) task->SetDone(); else if ( ! strcmp( event_name, "fail" ) ) { task->SetDone(); complain_if_no_interest = 1; } else if ( ! strcmp( event_name, "*rendezvous*" ) ) Rendezvous( event_name, value ); else if ( ! strcmp( event_name, "*forward*" ) ) ForwardEvent( event_name, value ); else complain_if_no_interest = 1; int ignore_event = 0; int await_finished = 0; if ( await_stmt ) { await_finished = task->HasRegisteredInterest( await_stmt, event_name ); if ( ! await_finished && await_only_flag && ! task->HasRegisteredInterest( except_stmt, event_name ) ) ignore_event = 1; } if ( ignore_event ) warn->Report( "event ", task->Name(), ".", event_name, " ignored due to \"await\"" ); else { // We're going to want to keep the event value as a field // in the task's AgentRecord. Ref( value ); int was_interest = task->CreateEvent( event_name, value ); if ( ! was_interest && complain_if_no_interest ) warn->Report( "event ", task->Name(), ".", event_name, " (", value, ") dropped" ); RunQueue(); // process effects of CreateEvent() } Unref( event ); if ( await_finished ) { pending_task = task; // Make sure the pending task isn't delete'd before // we can exhaust its pending input. Ref( pending_task ); return 1; } else return 0; }void Sequencer::NewClientStarted() { ++num_active_processes; }int Sequencer::ShouldSuspend( const char* task_var_ID ) { if ( task_var_ID ) return suspend_list[task_var_ID]; else // This is an anonymous client - don't suspend. return 0; }int Sequencer::EmptyTaskChannel( Task* task, int force_read ) { int status = 0; if ( task->Active() ) { Channel* c = task->GetChannel(); ChanState old_state = c->ChannelState(); c->ChannelState() = CHAN_IN_USE; if ( force_read ) status = NewEvent( task, recv_event( c->ReadFD() ) ); while ( status == 0 && c->ChannelState() == CHAN_IN_USE && c->DataInBuffer() ) { status = NewEvent( task, recv_event( c->ReadFD() ) ); } if ( c->ChannelState() == CHAN_INVALID ) { // This happens iff the given task has exited selector->DeleteSelectee( c->ReadFD() ); delete c; while ( reap_terminated_process() ) ; --num_active_processes; } else c->ChannelState() = old_state; } return status; }void Sequencer::MakeEnvGlobal() { Value* env_value = create_record(); extern char** environ; for ( char** env_ptr = environ; *env_ptr; ++env_ptr ) { char* delim = strchr( *env_ptr, '=' ); if ( delim ) { *delim = '\0'; env_value->AssignRecordElement( *env_ptr, new Value( delim + 1 ) ); *delim = '='; } else env_value->AssignRecordElement( *env_ptr, new Value( glish_false ) ); } Expr* env_expr = LookupID( strdup( "environ" ), GLOBAL_SCOPE ); env_expr->Assign( env_value ); }void Sequencer::MakeArgvGlobal( char** argv, int argc ) { // If there's an initial "--" argument, remove it, it's a vestige // from when "--" was needed to separate script files from their // arguments. if ( argc > 0 && ! strcmp( argv[0], "--" ) ) ++argv, --argc; Value* argv_value = new Value( (charptr*) argv, argc, COPY_ARRAY ); Expr* argv_expr = LookupID( strdup( "argv" ), GLOBAL_SCOPE ); argv_expr->Assign( argv_value ); }void Sequencer::BuildSuspendList() { char* suspend_env_list = getenv( "suspend" ); if ( ! suspend_env_list ) return; char* suspendee = strtok( suspend_env_list, " " ); while ( suspendee ) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -