📄 serialcomm.cpp
字号:
else {
// fifo empty -- need to get some bytes
rawFifo.tail = 0;
rawFifo.head = readFD(serialReadFD, rawFifo.queue, rawReadBytes, maxMTU-1, &err);
if(rawFifo.head < 0) {
close(serialReadFD);
close(serialWriteFD);
serialReadFD = -1;
serialWriteFD = -1;
errno = err;
}
reportError("SerialComm::nextRaw: readFD(serialReadFD, rawFifo.queue, rawReadBytes, maxMTU-1)",
rawFifo.head);
nextByte = rawFifo.queue[rawFifo.tail++];
}
return nextByte;
}
/* reads packet */
bool SerialComm::readPacket(SFPacket &pPacket)
{
uint8_t buffer[maxMTU + 10];
int count = 0;
rx_states_t state = WAIT_FOR_SYNC;
for(;;) {
uint8_t nextByte = nextRaw();
if(state == WAIT_FOR_SYNC) {
if(nextByte == SYNC_BYTE) {
count = 0;
state = IN_SYNC;
}
}
else if(state == IN_SYNC) {
if(nextByte == SYNC_BYTE) {
if(count < minMTU) {
DEBUG("SerialComm::readPacket : frame too short - size = " << count << " : resynchronising ");
badPacketCount++;
count = 0;
}
else {
bool dobreak = true;
DEBUG("SerialComm::readPacket : frame size = " << count);
if(checkCrc(buffer, count)) {
pPacket.setType(buffer[typeOffset]);
pPacket.setSeqno(buffer[seqnoOffset]);
switch (buffer[typeOffset]) {
case SF_ACK:
break;
case SF_PACKET_NO_ACK:
pPacket.setPayload((char *)(&buffer[payloadOffset]-1), count+1+1 - serialHeaderBytes);
break;
case SF_PACKET_ACK:
pPacket.setPayload((char *)(&buffer[payloadOffset]), count+1 - serialHeaderBytes);
break;
default:
dobreak = false;
DEBUG("SerialComm::readPacket : unknown packet type = " \
<< static_cast<uint16_t>(buffer[typeOffset] & 0xff));
break;
}
if(dobreak) break; // leave loop
}
else {
DEBUG("SerialComm::readPacket : bad crc");
count = 0;
badPacketCount++;
}
}
}
else if(nextByte == ESCAPE_BYTE) {
state = ESCAPED;
}
else {
buffer[count++] = nextByte;
if(count >= maxMTU) {
DEBUG("SerialComm::readPacket : packet too long, resynchronizing");
count = 0;
badPacketCount++;
state = WAIT_FOR_SYNC;
}
}
}
else if(state == ESCAPED) {
if(nextByte == SYNC_BYTE) {
DEBUG("SerialComm::readPacket : state ESCAPED, packet got sync byte, resynchronizing");
count = 0;
badPacketCount++;
state = IN_SYNC;
}
else {
buffer[count++] = nextByte ^ 0x20;
if(count >= maxMTU) {
DEBUG("SerialComm::readPacket : state ESCAPED, packet too long, resynchronizing");
count = 0;
badPacketCount++;
state = WAIT_FOR_SYNC;
}
else {
state = IN_SYNC;
}
}
}
}
return true;
}
/* writes packet */
bool SerialComm::writePacket(SFPacket &pPacket)
{
char type, byte = 0;
uint16_t crc = 0;
char buffer[2*pPacket.getLength() + 20];
int offset = 0;
int err = 0;
int written = 0;
// put SFD into buffer
buffer[offset++] = SYNC_BYTE;
// packet type
byte = type = pPacket.getType();
crc = byteCRC(byte, crc);
offset += hdlcEncode(1, &byte, buffer + offset);
// seqno
byte = pPacket.getSeqno();
crc = byteCRC(byte, crc);
offset += hdlcEncode(1, &byte, buffer + offset);
switch (type)
{
case SF_ACK:
break;
case SF_PACKET_NO_ACK:
case SF_PACKET_ACK:
// compute crc
for(int i = 0; i < pPacket.getLength(); i++) {
crc = byteCRC(pPacket.getPayload()[i], crc);
}
offset += hdlcEncode(pPacket.getLength(), pPacket.getPayload(), buffer + offset);
break;
default:
return false;
}
// crc two bytes
byte = crc & 0xff;
offset += hdlcEncode(1, &byte, buffer + offset);
byte = (crc >> 8) & 0xff;
offset += hdlcEncode(1, &byte, buffer + offset);
// put SFD into buffer
buffer[offset++] = SYNC_BYTE;
written = writeFD(serialWriteFD, buffer, offset, &err);
if(written < 0) {
if(err != EINTR) {
close(serialReadFD);
serialReadFD = -1;
close(serialWriteFD);
serialWriteFD = -1;
errno = err;
reportError("SerialComm::writePacket failed",-1);
return false;
}
}
else if(written < offset) {
DEBUG("SerialComm::writePacket failed");
return false;
}
return true;
}
string SerialComm::getDevice() const
{
return device;
}
int SerialComm::getBaudRate() const
{
return baudrate;
}
/* helper function to start serial reader pthread */
void* readSerialThread(void* ob)
{
static_cast<SerialComm*>(ob)->readSerial();
return NULL;
}
/* reads from connected clients */
void SerialComm::readSerial()
{
while (true)
{
SFPacket packet;
readPacket(packet);
switch (packet.getType())
{
case SF_ACK:
// successful delivery
// FIXME: seqnos are not implemented on the node !
pthread_cond_signal(&ack.received);
break;
case SF_PACKET_ACK:
{
// put ack in front of queue
SFPacket ack(SF_ACK, packet.getSeqno());
writeBuffer.enqueueFront(ack);
}
case SF_PACKET_NO_ACK:
// do nothing - fall through
default:
if (!readBuffer.isFull())
{
++readPacketCount;
// put silently into buffer...
readBuffer.enqueueBack(packet);
}
else
{
while(readBuffer.isFull()) {
readBuffer.dequeue();
++droppedReadPacketCount;
}
readBuffer.enqueueBack(packet);
// DEBUG("SerialComm::readSerial : dropped packet")
}
}
}
}
/* helper function to start serial writer pthread */
void* writeSerialThread(void* ob)
{
static_cast<SerialComm*>(ob)->writeSerial();
return NULL;
}
/* writes to serial/node */
void SerialComm::writeSerial()
{
SFPacket packet;
bool retry = false;
int retryCount = 0;
long long timeout;
while (true)
{
if (!retry)
{
cerr << " serial deqeue packet, empty: " << writeBuffer.isEmpty() << endl;
packet = writeBuffer.dequeue();
}
switch (packet.getType())
{
case SF_ACK:
// successful delivery
if (!writePacket(packet))
{
DEBUG("SerialComm::writeSerial : writePacket failed (SF_ACK)")
reportError("SerialComm::writeSerial : writePacket(SF_ACK)", -1);
}
break;
case SF_PACKET_ACK:
// do nothing - fall through
case SF_PACKET_NO_ACK:
// do nothing - fall through
default:
if (!retry)
++writtenPacketCount;
// FIXME: this is the only currently supported type by the mote
packet.setType(SF_PACKET_ACK);
if (!writePacket(packet))
{
DEBUG("SerialComm::writeSerial : writePacket failed (SF_PACKET)")
reportError("SerialComm::writeSerial : writeFD(SF_PACKET)", -1);
}
// wait for ack...
struct timeval currentTime;
struct timespec ackTime;
timeout = (long long)ackTimeout * (retryCount + 1);
pthread_testcancel();
pthread_mutex_lock(&ack.lock);
gettimeofday(¤tTime, NULL);
ackTime.tv_sec = currentTime.tv_sec;
ackTime.tv_nsec = currentTime.tv_usec * 1000;
ackTime.tv_sec += timeout / (1000*1000*1000);
ackTime.tv_nsec += timeout % (1000*1000*1000);
pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &ack.lock);
int retval = pthread_cond_timedwait(&ack.received, &ack.lock, &ackTime);
if (!((retryCount < maxRetries) && (retval == ETIMEDOUT)))
{
if (retryCount >= maxRetries) ++droppedWritePacketCount;
retry = false;
retryCount = 0;
}
else
{
++retryCount;
retry = true;
DEBUG("SerialComm::writeSerial : packet retryCount = " << retryCount);
++sumRetries;
}
// removes the cleanup handler and executes it (unlock mutex)
pthread_cleanup_pop(1); }
}
}
/* cancels all running threads */
void SerialComm::cancel()
{
pthread_t callingThread = pthread_self();
if(readerThreadRunning && pthread_equal(callingThread, readerThread))
{
DEBUG("SerialComm::cancel : by readerThread")
pthread_detach(readerThread);
if (writerThreadRunning)
{
pthread_cancel(writerThread);
DEBUG("SerialComm::cancel : writerThread canceled, joining")
pthread_join(writerThread, NULL);
writerThreadRunning = false;
}
readerThreadRunning = false;
pthread_cond_signal(&control.cancel);
pthread_exit(NULL);
}
else if(writerThreadRunning && pthread_equal(callingThread, writerThread))
{
DEBUG("SerialComm::cancel : by writerThread")
pthread_detach(writerThread);
if (readerThreadRunning)
{
pthread_cancel(readerThread);
DEBUG("SerialComm::cancel : readerThread canceled, joining")
pthread_join(readerThread, NULL);
readerThreadRunning = false;
}
writerThreadRunning = false;
pthread_cond_signal(&control.cancel);
pthread_exit(NULL);
}
else
{
DEBUG("SerialComm::cancel : by other thread")
if (readerThreadRunning)
{
pthread_cancel(readerThread);
DEBUG("SerialComm::cancel : readerThread canceled, joining")
pthread_join(readerThread, NULL);
readerThreadRunning = false;
}
if (writerThreadRunning)
{
pthread_cancel(writerThread);
DEBUG("SerialComm::cancel : writerThread canceled, joining")
pthread_join(writerThread, NULL);
writerThreadRunning = false;
}
pthread_cond_signal(&control.cancel);
}
}
/* reports error */
int SerialComm::reportError(const char *msg, int result)
{
if ((result < 0) && (!errorReported))
{
errorMsg << "error : SF-Server ( SerialComm on device = " << device << " ) : "
<< msg << " ( result = " << result << " )" << endl
<< "error-description : " << strerror(errno) << endl;
cerr << errorMsg.str();
errorReported = true;
cancel();
}
return result;
}
/* prints out status */
void SerialComm::reportStatus(ostream& os)
{
os << "SF-Server ( SerialComm on device " << device << " ) : "
<< "baudrate = " << baudrate
<< " , packets read = " << readPacketCount
<< " ( dropped = " << droppedReadPacketCount
<< ", bad = " << badPacketCount << " )"
<< " , packets written = " << writtenPacketCount
<< " ( dropped = " << droppedWritePacketCount
<< ", total retries: " << sumRetries << " )"
<< endl;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -