📄 sequencer.cc
字号:
suspend_list.Insert( suspendee, 1 ); suspendee = strtok( 0, " " ); } }void Sequencer::Parse( FILE* file, const char* filename ) { restart_yylex( file ); yyin = file; current_sequencer = this; line_num = 1; input_file_name = filename ? strdup( filename ) : 0; if ( yyin && isatty( fileno( yyin ) ) ) { message->Report( "Glish version ", GLISH_VERSION, "." ); // We're about to enter the "interactive" loop, so // first execute any statements we've seen so far due // to .glishrc files. Exec(); // And add a special Selectee for detecting user input. selector->AddSelectee( new UserInputSelectee( fileno( yyin ) ) ); interactive = 1; } else interactive = 0; if ( yyparse() ) error->Report( "syntax errors parsing input" ); // Don't need to delete input_file_name, yylex() already did // that on <<EOF>>. input_file_name = 0; line_num = 0; }void Sequencer::Parse( const char file[] ) { FILE* f = fopen( file, "r" ); if ( ! f ) error->Report( "can't open file \"", file, "\"" ); else Parse( f, file ); }void Sequencer::Parse( const char* strings[] ) { scan_strings( strings ); Parse( 0, "glish internal initialization" ); }RemoteDaemon* Sequencer::CreateDaemon( const char* host ) { RemoteDaemon* rd = OpenDaemonConnection( host ); if ( rd ) // We're all done, the daemon was already running. return rd; // Have to start up the daemon. message->Report( "activating Glish daemon on ", host ); char daemon_cmd[1024]; sprintf( daemon_cmd, "%s %s -n glishd &", RSH, host ); system( daemon_cmd ); rd = OpenDaemonConnection( host ); while ( ! rd ) { message->Report( "waiting for daemon ..." ); sleep( 1 ); rd = OpenDaemonConnection( host ); } return rd; }RemoteDaemon* Sequencer::OpenDaemonConnection( const char* host ) { int daemon_socket = get_tcp_socket(); if ( remote_connection( daemon_socket, host, DAEMON_PORT ) ) { // Connected. mark_close_on_exec( daemon_socket ); Channel* daemon_channel = new Channel( daemon_socket, daemon_socket ); RemoteDaemon* r = new RemoteDaemon( host, daemon_channel ); daemons.Insert( strdup( host ), r ); // Read and discard daemon's "establish" event. GlishEvent* e = recv_event( daemon_channel->ReadFD() ); Unref( e ); // Tell the daemon which directory we want to work out of. char work_dir[MAXPATHLEN]; if ( ! getcwd( work_dir, sizeof( work_dir ) ) ) fatal->Report( "problems getting cwd:", work_dir ); Value work_dir_value( work_dir ); send_event( daemon_channel->WriteFD(), "setwd", &work_dir_value ); selector->AddSelectee( new DaemonSelectee( r, selector, this ) ); return r; } else { close( daemon_socket ); return 0; } }void Sequencer::ActivateMonitor( char* monitor_client_name ) { TaskAttr* monitor_attrs = new TaskAttr( "*monitor*", "localhost", 0, 0, 0, 0 ); const_args_list monitor_args; monitor_args.append( new Value( monitor_client_name ) ); monitor_task = new ClientTask( &monitor_args, monitor_attrs, this ); if ( monitor_task->TaskError() ) { Unref( monitor_task ); monitor_task = 0; } }void Sequencer::LogEvent( const char* gid, const char* id, const char* event_name, const Value* event_value, int is_inbound ) { if ( ! monitor_task ) return; Value gid_value( gid ); Value id_value( id ); Value name_value( event_name ); parameter_list args; ConstExpr gid_expr( &gid_value ); ConstExpr id_expr( &id_value ); ConstExpr name_expr( &name_value ); ConstExpr value_expr( event_value ); Parameter gid_param( "glish_id", VAL_VAL, &gid_expr, 0 ); Parameter id_param( "id", VAL_VAL, &id_expr, 0 ); Parameter name_param( "name", VAL_VAL, &name_expr, 0 ); Parameter value_param( "value", VAL_VAL, &value_expr, 0 ); args.insert( &name_param ); args.insert( &id_param ); args.insert( &gid_param ); args.insert( &value_param ); const char* monitor_event_name = is_inbound ? "event_in" : "event_out"; monitor_task->SendEvent( monitor_event_name, &args, 0, 0 ); }void Sequencer::LogEvent( const char* gid, const char* id, const GlishEvent* e, int is_inbound ) { LogEvent( gid, id, e->name, e->value, is_inbound ); }void Sequencer::SystemEvent( const char* name, const Value* val ) { system_agent->SendSingleValueEvent( name, val, 1 ); }void Sequencer::Rendezvous( const char* event_name, Value* value ) { char* source_id; char* sink_id; if ( ! value->FieldVal( "source_id", source_id ) || ! value->FieldVal( "sink_id", sink_id ) ) fatal->Report( "bad internal", event_name, "event" ); Task* src = ids_to_tasks[source_id]; Task* snk = ids_to_tasks[sink_id]; if ( ! src || ! snk ) fatal->Report( "no such source or sink ID in internal", event_name, "event:", source_id, sink_id ); // By sending out these two events immediately, before any other // *rendezvous* events can arise, we impose a serial ordering on // all rendezvous. This avoids deadlock. // // Actually, now that we always use sockets (even locally), the // following isn't necessary, since connecting to a socket won't // block (unless the "listen" queue for the remote socket is // full). We could just pass along the *rendezvous-resp* event // to the sink and be done with it. But we retain the protocol // because it was a lot of work getting it right and we don't want // to have to figure it out again if for some reason we don't // always use sockets. src->SendSingleValueEvent( "*rendezvous-orig*", value, 1 ); snk->SendSingleValueEvent( "*rendezvous-resp*", value, 1 ); delete source_id; delete sink_id; }void Sequencer::ForwardEvent( const char* event_name, Value* value ) { char* receipient_id; char* new_event_name; if ( ! value->FieldVal( "receipient", receipient_id ) || ! value->FieldVal( "event", new_event_name ) ) fatal->Report( "bad internal event \"", event_name, "\"" ); Task* task = ids_to_tasks[receipient_id]; if ( ! task ) fatal->Report( "no such receipient ID in ", event_name, "internal event:", receipient_id ); task->SendSingleValueEvent( new_event_name, value, 1 ); delete receipient_id; delete new_event_name; }void Sequencer::EventLoop() { RunQueue(); if ( pending_task ) { EmptyTaskChannel( pending_task ); // We Ref()'d the pending_task when assigning it, to make // sure it didn't go away due to the effects of RunQueue(). Unref( pending_task ); pending_task = 0; } while ( ActiveClients() && ! selector->DoSelection() ) RunQueue(); }void Sequencer::RunQueue() { Notification* n; while ( (n = notification_queue.DeQueue()) ) { if ( verbose > 1 ) message->Report( "doing", n ); if ( n->notifiee->frame ) PushFrame( n->notifiee->frame ); Value* notifier_val = n->notifier->AgentRecord(); if ( notifier_val->Type() == TYPE_RECORD && notifier_val->HasRecordElement( n->field ) != n->value ) // Need to assign the event value. notifier_val->AssignRecordElement( n->field, n->value ); // There are a bunch of Ref's and Unref's here because the // Notify() call below can lead to a recursive call to this // routine (due to an "await" statement), so 'n' might // otherwise be deleted underneath our feet. Unref( last_notification ); last_notification = n; Ref( n ); n->notifiee->stmt->Notify( n->notifier ); if ( n->notifiee->frame ) (void) PopFrame(); Unref( n ); } }ClientSelectee::ClientSelectee( Sequencer* s, Task* t ) : Selectee( t->GetChannel()->ReadFD() ) { sequencer = s; task = t; }int ClientSelectee::NotifyOfSelection() { return sequencer->EmptyTaskChannel( task, 1 ); }LocalClientSelectee::LocalClientSelectee( Sequencer* s, Channel* c ) : Selectee( c->ReadFD() ) { sequencer = s; chan = c; }int LocalClientSelectee::NotifyOfSelection() { (void) sequencer->NewConnection( chan ); return 0; }AcceptSelectee::AcceptSelectee( Sequencer* s, Socket* conn_socket ) : Selectee( conn_socket->FD() ) { sequencer = s; connection_socket = conn_socket; }int AcceptSelectee::NotifyOfSelection() { int new_conn; if ( connection_socket->IsLocal() ) new_conn = accept_local_connection( connection_socket->FD() ); else new_conn = accept_connection( connection_socket->FD() ); mark_close_on_exec( new_conn ); (void) sequencer->NewConnection( new Channel( new_conn, new_conn ) ); return 0; }ScriptSelectee::ScriptSelectee( Client* client, Agent* agent, int conn_socket ) : Selectee( conn_socket ) { script_client = client; script_agent = agent; connection_socket = conn_socket; }int ScriptSelectee::NotifyOfSelection() { fd_set fd_mask; FD_ZERO( &fd_mask ); FD_SET( connection_socket, &fd_mask ); GlishEvent* e = script_client->NextEvent( &fd_mask ); if ( ! e ) { delete script_client; exit( 0 ); } // Ref() the value, since CreateEvent is going to Unref() it, and the // script_client is also going to Unref() it via Unref()'ing the // whole GlishEvent. Ref( e->value ); script_agent->CreateEvent( e->name, e->value ); return 0; }DaemonSelectee::DaemonSelectee( RemoteDaemon* arg_daemon, Selector* sel, Sequencer* s ): Selectee( arg_daemon->DaemonChannel()->ReadFD() ) { daemon = arg_daemon; selector = sel; sequencer = s; }int DaemonSelectee::NotifyOfSelection() { int fd = daemon->DaemonChannel()->ReadFD(); GlishEvent* e = recv_event( fd ); const char* message_name = 0; if ( e ) { if ( ! strcmp( e->name, "probe-reply" ) ) { if ( daemon->State() == DAEMON_LOST ) { message->Report( "connectivity to daemon @ ", daemon->Host(), " restored" ); message_name = "connection_restored"; } daemon->SetState( DAEMON_OK ); } else { error->Report( "received unsolicited message from daemon @ ", daemon->Host() ); } Unref( e ); } else { error->Report( "Glish daemon @ ", daemon->Host(), " terminated" ); selector->DeleteSelectee( fd ); message_name = "daemon_terminated"; } if ( message_name ) { Value message_val( daemon->Host() ); sequencer->SystemEvent( message_name, &message_val ); } return 0; }ProbeTimer::ProbeTimer( PDict(RemoteDaemon)* arg_daemons, Sequencer* s ): SelectTimer( PROBE_DELAY, 0, PROBE_INTERVAL, 0 ) { daemons = arg_daemons; sequencer = s; }int ProbeTimer::DoExpiration() { IterCookie* c = daemons->InitForIteration(); RemoteDaemon* r; const char* key; while ( (r = daemons->NextEntry( key, c )) ) { if ( r->State() == DAEMON_REPLY_PENDING ) { // Oops. Haven't gotten a reply from our last probe. warn->Report( "connection to Glish daemon @ ", key, " lost" ); r->SetState( DAEMON_LOST ); Value message_val( r->Host() ); sequencer->SystemEvent( "connection_lost", &message_val ); } // Probe the daemon, regardless of its state. send_event( r->DaemonChannel()->WriteFD(), "probe", false_value ); if ( r->State() == DAEMON_OK ) r->SetState( DAEMON_REPLY_PENDING ); } return 1; }ScriptClient::ScriptClient( int& argc, char** argv ) : Client( argc, argv ) { selector = 0; agent = 0; }void ScriptClient::SetInterface( Selector* s, Agent* a ) { selector = s; agent = a; selector->AddSelectee( new ScriptSelectee( this, agent, read_fd ) ); }void ScriptClient::FD_Change( int fd, int add_flag ) { if ( ! agent ) return; if ( add_flag ) selector->AddSelectee( new ScriptSelectee( this, agent, fd ) ); else selector->DeleteSelectee( fd ); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -