📄 readlog.cc
字号:
/** @todo Fix support for reading gzipped files */ if (this->file == NULL) { PLAYER_ERROR2("unable to open [%s]: %s\n", this->filename, strerror(errno)); return -1; } // Rewind not requested by default this->rewind_requested = false; // Make some space for parsing data from the file. This size is not // an exact upper bound; it's just my best guess. this->line_size = PLAYER_MAX_MESSAGE_SIZE; this->line = (char*) malloc(this->line_size); assert(this->line); // Start device thread this->StartThread(); return 0;}////////////////////////////////////////////////////////////////////////////// Finalize the driverint ReadLog::Shutdown(){ // Stop the device thread this->StopThread(); // Free allocated mem free(this->line); // Close the file#if HAVE_ZLIB_H if (this->gzfile) { gzclose(this->gzfile); this->gzfile = NULL; }#endif if(this->file) { fclose(this->file); this->file = NULL; } return 0;}////////////////////////////////////////////////////////////////////////////// Driver threadvoid ReadLog::Main(){ int ret; int i, len, linenum; bool use_stored_tokens; int token_count=0; char *tokens[4096]; player_devaddr_t header_id, provide_id; struct timeval tv; double last_wall_time, curr_wall_time; double curr_log_time, last_log_time; unsigned short type, subtype; bool reading_configs; linenum = 0; last_wall_time = -1.0; last_log_time = -1.0; // First thing, we'll read all the configs from the front of the file reading_configs = true; use_stored_tokens = false; while (true) { pthread_testcancel(); // Process requests if(!reading_configs) ProcessMessages(); // If we're not supposed to playback data, sleep and loop if(!this->enable && !reading_configs) { usleep(10000); continue; } // If a client has requested that we rewind, then do so if(!reading_configs && this->rewind_requested) { // back up to the beginning of the file#if HAVE_ZLIB_H if (this->gzfile) ret = gzseek(this->file,0,SEEK_SET); else ret = fseek(this->file,0,SEEK_SET);#else ret = fseek(this->file,0,SEEK_SET);#endif if(ret < 0) { // oh well, warn the user and keep going PLAYER_WARN1("while rewinding logfile, gzseek()/fseek() failed: %s", strerror(errno)); } else { linenum = 0; // reset the time ReadLogTime_time.tv_sec = 0; ReadLogTime_time.tv_usec = 0; ReadLogTime_timeDouble = 0.0;#if 0 // reset time-of-last-write in all clients // // FIXME: It's not really thread-safe to call this here, because it // writes to a bunch of fields that are also being read and/or // written in the server thread. But I'll be damned if I'm // going to add a mutex just for this. clientmanager->ResetClientTimestamps();#endif // reset the flag this->rewind_requested = false; PLAYER_MSG0(2, "logfile rewound"); continue; } } if(!use_stored_tokens) { // Read a line from the file; note that gzgets is really slow // compared to fgets (on uncompressed files), so use the latter.#if HAVE_ZLIB_H if (this->gzfile) ret = (gzgets(this->file, this->line, this->line_size) == NULL); else ret = (fgets(this->line, this->line_size, (FILE*) this->file) == NULL);#else ret = (fgets(this->line, this->line_size, (FILE*) this->file) == NULL);#endif if (ret != 0) { PLAYER_MSG1(1, "reached end of log file %s", this->filename); // File is done, so just loop forever, unless we're on auto-rewind, // or until a client requests rewind. reading_configs = false; while(!this->autorewind && !this->rewind_requested) { usleep(100000); pthread_testcancel(); // Process requests this->ProcessMessages(); ReadLogTime_timeDouble += 0.1; ReadLogTime_time.tv_sec = (time_t)floor(ReadLogTime_timeDouble); ReadLogTime_time.tv_sec = (time_t)fmod(ReadLogTime_timeDouble,1.0); } // request a rewind and start again this->rewind_requested = true; continue; } // Possible buffer overflow, so bail assert(strlen(this->line) < this->line_size); linenum += 1; //printf("line %d\n", linenum); //continue; // Tokenize the line using whitespace separators token_count = 0; len = strlen(line); for (i = 0; i < len; i++) { if (isspace(line[i])) line[i] = 0; else if (i == 0) { assert(token_count < (int) (sizeof(tokens) / sizeof(tokens[i]))); tokens[token_count++] = line + i; } else if (line[i - 1] == 0) { assert(token_count < (int) (sizeof(tokens) / sizeof(tokens[i]))); tokens[token_count++] = line + i; } } if (token_count >= 1) { // Discard comments if (strcmp(tokens[0], "#") == 0) continue; // Parse meta-data if (strcmp(tokens[0], "##") == 0) { if (token_count == 4) { free(this->format); this->format = strdup(tokens[3]); } continue; } } } else use_stored_tokens = false; // Parse out the header info if (this->ParseHeader(linenum, token_count, tokens, &header_id, &curr_log_time, &type, &subtype) != 0) continue; if(reading_configs) { if(type != PLAYER_MSGTYPE_RESP_ACK) { // not a config reading_configs = false; // we'll reuse this tokenized string next time through, instead of // reading a fresh line from the file use_stored_tokens = true; continue; } } // Set the global timestamp ::ReadLogTime_timeDouble = curr_log_time; ::ReadLogTime_time.tv_sec = (time_t)floor(curr_log_time); ::ReadLogTime_time.tv_usec = (time_t)fmod(curr_log_time,1.0); gettimeofday(&tv,NULL); curr_wall_time = tv.tv_sec + tv.tv_usec/1e6; if(!reading_configs) { // Have we published at least one message from this log? if(last_wall_time >= 0) { // Wait until it's time to publish this message while((curr_wall_time - last_wall_time) < ((curr_log_time - last_log_time) / this->speed)) { gettimeofday(&tv,NULL); curr_wall_time = tv.tv_sec + tv.tv_usec/1e6; this->ProcessMessages(); usleep(1000); } } last_wall_time = curr_wall_time; last_log_time = curr_log_time; } // Look for a matching read interface; data will be output on // the corresponding provides interface. for (i = 0; i < this->provide_count; i++) { provide_id = this->provide_ids[i]; if(Device::MatchDeviceAddress(header_id, provide_id)) { this->ParseData(provide_id, type, subtype, linenum, token_count, tokens, curr_log_time); break; } } if(i >= this->provide_count) { PLAYER_MSG6(2, "unhandled message from %d:%d:%d:%d %d:%d\n", header_id.host, header_id.robot, header_id.interf, header_id.index, type, subtype); } } return;}////////////////////////////////////////////////////////////////////////////// Process configuration requestsint ReadLog::ProcessLogConfig(MessageQueue * resp_queue, player_msghdr_t * hdr, void * data){ player_log_set_read_state_t* sreq; player_log_get_state_t greq; switch(hdr->subtype) { case PLAYER_LOG_REQ_SET_READ_STATE: if(hdr->size != sizeof(player_log_set_read_state_t)) { PLAYER_WARN2("request wrong size (%d != %d)", hdr->size, sizeof(player_log_set_read_state_t)); return(-1); } sreq = (player_log_set_read_state_t*)data; if(sreq->state) { puts("ReadLog: start playback"); this->enable = true; } else { puts("ReadLog: stop playback"); this->enable = false; } this->Publish(this->log_id, resp_queue, PLAYER_MSGTYPE_RESP_ACK, PLAYER_LOG_REQ_SET_READ_STATE); return(0); case PLAYER_LOG_REQ_GET_STATE: greq.type = PLAYER_LOG_TYPE_READ; if(this->enable) greq.state = 1; else greq.state = 0; this->Publish(this->log_id, resp_queue, PLAYER_MSGTYPE_RESP_ACK, PLAYER_LOG_REQ_GET_STATE, (void*)&greq, sizeof(greq), NULL); return(0); case PLAYER_LOG_REQ_SET_READ_REWIND: // set the appropriate flag in the manager this->rewind_requested = true; this->Publish(this->log_id, resp_queue, PLAYER_MSGTYPE_RESP_ACK, PLAYER_LOG_REQ_SET_READ_REWIND); return(0); default: return(-1); }}intReadLog::ProcessPositionConfig(MessageQueue * resp_queue, player_msghdr_t * hdr, void * data){ switch(hdr->subtype) { case PLAYER_POSITION2D_REQ_GET_GEOM: { // Find the right place from which to retrieve it int j; for(j=0;j<this->provide_count;j++) { if(Device::MatchDeviceAddress(this->provide_ids[j], hdr->addr)) break; } if(j>=this->provide_count) return(-1); if(!this->provide_metadata[j]) return(-1); this->Publish(this->provide_ids[j], resp_queue, PLAYER_MSGTYPE_RESP_ACK, hdr->subtype, this->provide_metadata[j], sizeof(player_position2d_geom_t), NULL); return(0); } default: return(-1); }}intReadLog::ProcessLaserConfig(MessageQueue * resp_queue, player_msghdr_t * hdr, void * data){ switch(hdr->subtype) { case PLAYER_LASER_REQ_GET_GEOM: { // Find the right place from which to retrieve it int j; for(j=0;j<this->provide_count;j++) { if(Device::MatchDeviceAddress(this->provide_ids[j], hdr->addr)) break; } if(j>=this->provide_count) { puts("no matching device"); return(-1); } if(!this->provide_metadata[j]) { puts("no metadata"); return(-1); } this->Publish(this->provide_ids[j], resp_queue, PLAYER_MSGTYPE_RESP_ACK, hdr->subtype, this->provide_metadata[j], sizeof(player_laser_geom_t), NULL); return(0); } default: return(-1); }}intReadLog::ProcessSonarConfig(MessageQueue * resp_queue, player_msghdr_t * hdr, void * data){ switch(hdr->subtype) { case PLAYER_SONAR_REQ_GET_GEOM: { // Find the right place from which to retrieve it int j; for(j=0;j<this->provide_count;j++) { if(Device::MatchDeviceAddress(this->provide_ids[j], hdr->addr)) break; } if(j>=this->provide_count) return(-1); if(!this->provide_metadata[j]) return(-1); this->Publish(this->provide_ids[j], resp_queue, PLAYER_MSGTYPE_RESP_ACK, hdr->subtype, this->provide_metadata[j], sizeof(player_sonar_geom_t), NULL); return(0); } default: return(-1); }}intReadLog::ProcessWSNConfig(MessageQueue * resp_queue, player_msghdr_t * hdr, void * data){ switch(hdr->subtype) { case PLAYER_WSN_REQ_DATATYPE: { // Find the right place from which to retrieve it
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -