📄 serialcomm.cpp
字号:
while(!completePacket) { buffer[count] = nextRaw(); if(sync && (count == 1) && (buffer[count] == SYNC_BYTE)) { DEBUG("SerialComm::readPacket double sync byte"); sync = false; escape = false; count = 1; crc = 0; buffer[0] = SYNC_BYTE; } if (!sync) { // wait for sync if (buffer[0] == SYNC_BYTE) { sync = true; escape = false; count = 1; crc = 0; } } else if (count >= maxMTU) { DEBUG("SerialComm::readPacket : frame too long - size = " << count << " : resynchronising") sync = false; escape = false; count = crc = 0; badPacketCount++; } else if (escape) { if (buffer[count] == SYNC_BYTE) { DEBUG("SerialComm::readPacket : resynchronising") sync = false; escape = false; count = crc = 0; badPacketCount++; } else { buffer[count] ^= 0x20; if (count > 3) { crc = SerialComm::byteCRC(buffer[count-3], crc); } ++count; escape = false; } } else if (buffer[count] == ESCAPE_BYTE) { // next byte is escaped escape = true; } else if (buffer[count] == SYNC_BYTE) { // calculate last crc byte if (count > 3) { crc = SerialComm::byteCRC(buffer[count-3], crc); } uint16_t packetCRC = (buffer[count - 2] & 0xff) | ((buffer[count - 1] << 8) & 0xff00); if (count < minMTU) { DEBUG("SerialComm::readPacket : frame too short - size = " << count << " : resynchronising ") sync = false; escape = false; count = crc = 0; badPacketCount++; } else if (crc != packetCRC) { DEBUG("SerialComm::readPacket : bad crc - calculated crc = " << crc << " packet crc = " << packetCRC << " : resynchronising " ) sync = false; escape = false; count = crc = 0; badPacketCount++; } else { pPacket.setType(buffer[typeOffset]); pPacket.setSeqno(buffer[seqnoOffset]); switch (buffer[typeOffset]) { case SF_ACK: break; case SF_PACKET_NO_ACK: case SF_PACKET_ACK: // buffer / payload // FIXME: strange packet format!? because seqno is not really defined - missing :( pPacket.setPayload(&buffer[payloadOffset]-1, count+1+1 - serialHeaderBytes); break; default: DEBUG("SerialComm::readPacket : unknown packet type = " << static_cast<uint16_t>(buffer[typeOffset] & 0xff)) ; } completePacket = true;#ifdef DEBUG_RAW_SERIALCOMM DEBUG("SerialComm::readPacket : raw data >>") for (int j=0; j <= count; j++) { cout << std::hex << static_cast<uint16_t>(buffer[j] & 0xff) << " " << std::dec; } cout << endl; cout << "as payload >> " << endl; const char* ptr = pPacket.getPayload(); for (int j=0; j < pPacket.getLength(); j++) { cout << std::hex << static_cast<uint16_t>(ptr[j] & 0xff) << " " << std::dec; } cout << endl;#endif } } else { if (count > 3) { crc = SerialComm::byteCRC(buffer[count-3], crc); } ++count; } } return true;}/* writes packet */bool SerialComm::writePacket(SFPacket &pPacket){ char type, byte = 0; uint16_t crc = 0; char buffer[2*pPacket.getLength() + 20]; int offset = 0; int err = 0; int written = 0; // put SFD into buffer buffer[offset++] = SYNC_BYTE; // packet type byte = type = pPacket.getType(); crc = byteCRC(byte, crc); offset += hdlcEncode(1, &byte, buffer + offset); // seqno byte = pPacket.getSeqno(); crc = byteCRC(byte, crc); offset += hdlcEncode(1, &byte, buffer + offset); switch (type) { case SF_ACK: break; case SF_PACKET_NO_ACK: case SF_PACKET_ACK: // compute crc for(int i = 0; i < pPacket.getLength(); i++) { crc = byteCRC(pPacket.getPayload()[i], crc); } offset += hdlcEncode(pPacket.getLength(), pPacket.getPayload(), buffer + offset); break; default: return false; } // crc two bytes byte = crc & 0xff; offset += hdlcEncode(1, &byte, buffer + offset); byte = (crc >> 8) & 0xff; offset += hdlcEncode(1, &byte, buffer + offset); // put SFD into buffer buffer[offset++] = SYNC_BYTE; written = writeFD(serialWriteFD, buffer, offset, &err); if(written < 0) { if(err != EINTR) { close(serialReadFD); serialReadFD = -1; close(serialWriteFD); serialWriteFD = -1; errno = err; reportError("SerialComm::writePacket failed",-1); return false; } } else if(written < offset) { DEBUG("SerialComm::writePacket failed"); return false; } return true;}string SerialComm::getDevice() const{ return device;}int SerialComm::getBaudRate() const{ return baudrate;}/* helper function to start serial reader pthread */void* readSerialThread(void* ob){ static_cast<SerialComm*>(ob)->readSerial(); return NULL;}/* reads from connected clients */void SerialComm::readSerial(){ while (true) { SFPacket packet; readPacket(packet); switch (packet.getType()) { case SF_ACK: // successful delivery // FIXME: seqnos are not implemented on the node ! pthread_cond_signal(&ack.received); break; case SF_PACKET_ACK: { // put ack in front of queue SFPacket ack(SF_ACK, packet.getSeqno()); writeBuffer.enqueueFront(ack); } case SF_PACKET_NO_ACK: // do nothing - fall through default: if (!readBuffer.isFull()) { ++readPacketCount; // put silently into buffer... readBuffer.enqueueBack(packet); } else { while(readBuffer.isFull()) { readBuffer.dequeue(); ++droppedReadPacketCount; } readBuffer.enqueueBack(packet); // DEBUG("SerialComm::readSerial : dropped packet") } } }}/* helper function to start serial writer pthread */void* writeSerialThread(void* ob){ static_cast<SerialComm*>(ob)->writeSerial(); return NULL;}/* writes to serial/node */void SerialComm::writeSerial(){ SFPacket packet; bool retry = false; int retryCount = 0; long long timeout; while (true) { if (!retry) { packet = writeBuffer.dequeue(); } switch (packet.getType()) { case SF_ACK: // successful delivery if (!writePacket(packet)) { DEBUG("SerialComm::writeSerial : writePacket failed (SF_ACK)") reportError("SerialComm::writeSerial : writePacket(SF_ACK)", -1); } break; case SF_PACKET_ACK: // do nothing - fall through case SF_PACKET_NO_ACK: // do nothing - fall through default: if (!retry) ++writtenPacketCount; // FIXME: this is the only currently supported type by the mote packet.setType(SF_PACKET_ACK); if (!writePacket(packet)) { DEBUG("SerialComm::writeSerial : writePacket failed (SF_PACKET)") reportError("SerialComm::writeSerial : writeFD(SF_PACKET)", -1); } // wait for ack... struct timeval currentTime; struct timespec ackTime; timeout = (long long)ackTimeout * (retryCount + 1); pthread_testcancel(); pthread_mutex_lock(&ack.lock); gettimeofday(¤tTime, NULL); ackTime.tv_sec = currentTime.tv_sec; ackTime.tv_nsec = currentTime.tv_usec * 1000; ackTime.tv_sec += timeout / (1000*1000*1000); ackTime.tv_nsec += timeout % (1000*1000*1000); pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &ack.lock); int retval = pthread_cond_timedwait(&ack.received, &ack.lock, &ackTime); if (!((retryCount < maxRetries) && (retval == ETIMEDOUT))) { if (retryCount >= maxRetries) ++droppedWritePacketCount; retry = false; retryCount = 0; } else { ++retryCount; retry = true; DEBUG("SerialComm::writeSerial : packet retryCount = " << retryCount); ++sumRetries; } // removes the cleanup handler and executes it (unlock mutex) pthread_cleanup_pop(1); } }}/* cancels all running threads */void SerialComm::cancel(){ pthread_t callingThread = pthread_self(); if(readerThreadRunning && pthread_equal(callingThread, readerThread)) { DEBUG("SerialComm::cancel : by readerThread") pthread_detach(readerThread); if (writerThreadRunning) { pthread_cancel(writerThread); DEBUG("SerialComm::cancel : writerThread canceled, joining") pthread_join(writerThread, NULL); writerThreadRunning = false; } readerThreadRunning = false; pthread_cond_signal(&control.cancel); pthread_exit(NULL); } else if(writerThreadRunning && pthread_equal(callingThread, writerThread)) { DEBUG("SerialComm::cancel : by writerThread") pthread_detach(writerThread); if (readerThreadRunning) { pthread_cancel(readerThread); DEBUG("SerialComm::cancel : readerThread canceled, joining") pthread_join(readerThread, NULL); readerThreadRunning = false; } writerThreadRunning = false; pthread_cond_signal(&control.cancel); pthread_exit(NULL); } else { DEBUG("SerialComm::cancel : by other thread") if (readerThreadRunning) { pthread_cancel(readerThread); DEBUG("SerialComm::cancel : readerThread canceled, joining") pthread_join(readerThread, NULL); readerThreadRunning = false; } if (writerThreadRunning) { pthread_cancel(writerThread); DEBUG("SerialComm::cancel : writerThread canceled, joining") pthread_join(writerThread, NULL); writerThreadRunning = false; } pthread_cond_signal(&control.cancel); }}/* reports error */int SerialComm::reportError(const char *msg, int result){ if ((result < 0) && (!errorReported)) { errorMsg << "error : SF-Server ( SerialComm on device = " << device << " ) : " << msg << " ( result = " << result << " )" << endl << "error-description : " << strerror(errno) << endl; cerr << errorMsg.str(); errorReported = true; cancel(); } return result;}/* prints out status */void SerialComm::reportStatus(ostream& os){ os << "SF-Server ( SerialComm on device " << device << " ) : " << "baudrate = " << baudrate << " , packets read = " << readPacketCount << " ( dropped = " << droppedReadPacketCount << ", bad = " << badPacketCount << " )" << " , packets written = " << writtenPacketCount << " ( dropped = " << droppedWritePacketCount << ", total retries: " << sumRetries << " )" << endl;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -