📄 stageio.cc
字号:
int CStageIO::WriteEntity( int fd, stage_entity_t* ent ){ assert( ent ); printf( "Downloading entity %d.%d.%s\n", ent->id, ent->parent, ent->token ); int res = WritePacket( fd, (char*)ent, sizeof(stage_entity_t) ); return res;}int CStageIO::WriteEntities( int fd ){ stage_entity_t sent; int num_obs = GetEntityCount(); // announce that a load of entities are on their way WriteHeader( fd, EntityPackets, num_obs ); for( int c=0; c<this->entity_count; c++ ) { CEntity* ent = this->entities[c]; strncpy( sent.token, ent->lib_entry->token, STAGE_TOKEN_MAX ); sent.id = ent->stage_id; if( ent->m_parent_entity ) sent.parent = ent->m_parent_entity->stage_id; // valid parent else sent.parent = -1; // no parent - it's root WriteEntity( fd, &sent ); } return 0;}int CStageIO::ReadHeader( int fd, stage_header_t* hdr ){ return ReadPacket( fd, (char*)hdr, sizeof(stage_header_t) );}// int CStageIO::WriteSubscriptions( int fd )// #{// # for( int i=0; i < GetEntityCount(); i++ )// if( GetEntity(i)->Subscribed() )// WriteHeader( fd, Subscribed, i ); // return 0;// } ///////////////////////////////////////////////////////////// // check for new and cancelled subscriptions // and inform clients of themvoid CStageIO::Write( void ){ //PRINT_DEBUG( "STAGIO WRITE" ); ///////////////////////////////////////////////////////// // write out any properties that are dirty int i, p; char data[MAX_PROPERTY_DATA_LEN]; // for all the connections for( int t=0; t< m_pose_connection_count; t++ ) if( m_dirty_subscribe[t] ) // only send data to those who subscribed { int connfd = m_pose_connections[t].fd; assert( connfd > 0 ); // must be good int send_count = CountDirtyOnConnection( t ); if( send_count > 0 ) { // announce the number of packets to follow on this connection //printf( "property packets header: " // " %d packets, connection %d, fd %d\n", // send_count, t, connfd ); WriteHeader( connfd, PropertyPackets, send_count ); CEntity* ent; // loop through the entities again, this time sending the properties for( i=0; i < GetEntityCount(); i++ ) for( p=ENTITY_FIRST_PROPERTY; p < ENTITY_LAST_PROPERTY; p++ ) { assert( ent = this->GetEntity(i) ); //if( RTTI_ISPLAYERP(ent) ) //printf( "object %p is a player\n", ent ); //printf( "Inspecting entity %d property %d connection %d\n", // i, p, t ); // is the entity marked dirty for this connection & prop? if( ent->m_dirty[t][p] ) { //printf( "PROPERTY DIRTY dev: %d prop: %d\n", t, p); int datalen = ent->GetProperty((EntityProperty)p, (void*)data ); if( datalen == 0 ) { PRINT_DEBUG1( "READ EMPTY PROPERTY %d\n", p ); } else { stage_property_t prop; prop.id = i; prop.property = (EntityProperty)p; prop.len = datalen; WriteProperty( connfd, &prop, data, datalen ); } // mark it clean on this connection // it won't get re-sent here until this flag is set again ent->SetDirty( t, (EntityProperty)p, 0 ); } } } // that's all the state sent - send a continue message WriteHeader( connfd, Continue, 0 ); }}int CStageIO::CountDirtyOnConnection( int con ){ char dummydata[MAX_PROPERTY_DATA_LEN]; int count = 0; //puts( "Counting dirty properties" ); // count the number of dirty properties on this connection for( int c=0; c<this->entity_count; c++ ) { CEntity* ent = this->entities[c]; for( int p=ENTITY_FIRST_PROPERTY; p < ENTITY_LAST_PROPERTY; p++ ) { // is the entity marked dirty for this connection & property? if( ent->m_dirty[con][p] ) { //if( RTTI_ISPLAYERP(ent) ) //printf( "(counting dirty) object %p is a player\n", ent ); // if this property has any data if( ent->GetProperty( (EntityProperty)p, (void*)dummydata ) > 0 ) count++; // we count it as dirty } } } return count;}void CStageIO::DestroyConnection( int con ){#ifdef VERBOSE printf( "\nStage: Closing connection %d\n", con );#endif close( m_pose_connections[con].fd ); // if this was a sync connection, reduce the number of syncs we wait for if( m_conn_type[con] == STAGE_SYNC ) m_sync_counter--; m_pose_connection_count--; // shift the rest of the array 1 place left for( int p=con; p<m_pose_connection_count; p++ ) { // the pollfd array memcpy( &(m_pose_connections[p]), &(m_pose_connections[p+1]), sizeof( struct pollfd ) ); // the connection type array memcpy( &(m_conn_type[p]), &(m_conn_type[p+1]), sizeof( char ) ); // the subscription type array memcpy( &(m_dirty_subscribe[p]), &(m_dirty_subscribe[p+1]), sizeof( char ) ); } if( m_sync_counter < 1 && m_external_sync_required ) m_enable = false;#ifdef VERBOSE printf( "Stage: remaining connections %d\n", m_pose_connection_count );#endif}void CStageIO::HandleCommand( int con, cmd_t cmd ){ PRINT_DEBUG( "received command" ); switch( cmd ) { // TODO: //case LOADc: Load( m_filename ); break; case PAUSEc: // toggle simulation pause m_enable = !m_enable; break; case SUBSCRIBEc: PRINT_DEBUG1( "Received dirty: subscription on connection %d\n", con ); m_dirty_subscribe[con] = 1; // this connection wants to receive deltas // tell the client which entities are subscribed by player clients //WriteSubscriptions( m_pose_connections[con].fd ); break; case DOWNLOADc: // someone has requested a download of the world state PRINT_DEBUG( "DOWNLOADc" ); WriteMatrix( m_pose_connections[con].fd ); //WriteBackground( m_pose_connections[con].fd ); WriteEntities( m_pose_connections[con].fd ); WriteHeader( m_pose_connections[con].fd, DownloadComplete, 0 ); break; case SAVEc: // someone has asked us to save the world file PRINT_DEBUG( "SAVEc" ); Save(); break; default: printf( "Stage Warning: " "Received unknown command (%d); ignoring.\n", cmd ); }}// read stuff until we get a continue message on each channelint CStageIO::Read( void ){ //PRINT_DEBUG( "StageIO::Read()" ); // if we have no connections //if( m_pose_connection_count == 0 ) //return 0; // do nothing here //if( g_timer_expired < 1 ) sleep( 1 ); // otherwise, check the connections for incoming stuff // if we have nothing to set the time, just increment it //if( m_sync_counter == 0 ) m_step_num++; // in real time-mode, poll blocks until it is interrupted by // a timer signal, so we give it a time-out of -1. Otherwise, // we give it a zero time-out so it returns quickly. int timeout; m_real_timestep > 0 ? timeout = -1 : timeout = 0; // int timeout = 0; // always return quickly - experimental int readable = 0; int syncs = 0; // we loop on this poll until have the syncs. in realtime mode we // ALSO wait for the timer to run out while( m_pose_connection_count > 0 ) // as long as we have a connection { //printf( "polling with timeout: %d\n", timeout ); // use poll to see which pose connections have data if((readable = poll( m_pose_connections, m_pose_connection_count, timeout )) == -1) { if( errno == EINTR ) // interrupted by the real-time timer { printf( "EINTR: syncs %d / %d\n", syncs, m_sync_counter ); // if we have all our syncs, we;re done if( syncs >= m_sync_counter ) { puts( "HAVE SYNCS - done reading" ); return 0; } } else { PRINT_ERR( "poll(2) returned error)"); exit( -1 ); } } if( readable > 0 ) // if something was available for( int t=0; t<m_pose_connection_count; t++ )// all the connections { short revents = m_pose_connections[t].revents; if( revents & POLLIN )// data available { //printf( "poll() says data available (POLLIN)\n" ); int hdrbytes; stage_header_t hdr; hdrbytes = ReadHeader( m_pose_connections[t].fd, &hdr); if( hdrbytes < (int)sizeof(hdr) ) { printf( "Failed to read header on connection %d " "(%d/%d bytes).\n" "Connection closed", t, hdrbytes, sizeof(hdr) ); DestroyConnection( t ); // zap this connection } else { switch( hdr.type ) { case PropertyPackets: // some poses are coming in PRINT_DEBUG2( "INCOMING PROPERTIES (%d) ON %d\n", hdr.data, m_pose_connections[t].fd ); ReadProperties( t, m_pose_connections[t].fd, hdr.data ); break; case StageCommand: HandleCommand( t, (cmd_t)hdr.data ); break; case DownloadComplete: PRINT_DEBUG2( "DOWNLOAD COMPLETE (%d) on %d\n", hdr.data, m_pose_connections[t].fd ); m_downloading = false; m_enable = true; // we can start the sim now! return 0; break; case EntityPackets: PRINT_DEBUG2( "INCOMING ENTITIES (%d) on %d\n", hdr.data, m_pose_connections[t].fd ); ReadEntities( m_pose_connections[t].fd, hdr.data ); break; case MatrixPacket: PRINT_DEBUG1( "MATRIX ON %d\n", m_pose_connections[t].fd ); ReadMatrix( m_pose_connections[t].fd ); break; case BackgroundPacket: PRINT_DEBUG1( "BACKGROUND ON %d\n", m_pose_connections[t].fd ); ReadBackground( m_pose_connections[t].fd ); break; case Continue: // this marks the end of the data PRINT_DEBUG1( "CONTINUE ON %d\n", m_pose_connections[t].fd ); syncs++; m_step_num=hdr.data; // set the step number //printf( "syncs: %d/%d timer: %d\n", // syncs, m_sync_counter, g_timer_expired ); // if that's all the syncs and the timer is up, //we're done if( syncs >= m_sync_counter ) //if( syncs >= m_sync_counter && g_timer_expired > 0 ) return 0; break; default: printf( "Stage warning: unknown mesg type %d\n", hdr.type); } } } // if poll reported some badness on this fd else if( !revents & EINTR ) // { printf( "Stage: connection %d seems bad\n", t ); DestroyConnection( t ); // zap this connection } } // if we're not realtime we can bail now... if( m_real_timestep == 0 ) break; //printf( "end\n" ); } return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -