📄 protocol.c
字号:
pkt->dataLen = (UInt8)pktBuf[2]; if (pkt->dataLen > 0) { memcpy(pkt->data, pktBuf + 3, (int)pkt->dataLen); } } else { // invalid header: ERROR_PACKET_HEADER ClientPacketType_t hdrType = CLIENT_HEADER_TYPE(pktBuf[0],pktBuf[1]); _protocolQueueError(pv,"%2x%2x", (UInt32)ERROR_PACKET_HEADER, (UInt32)hdrType); return (Packet_t*)0; } /* return packet */ return pkt;}// ----------------------------------------------------------------------------/* read packet from server */static int _protocolReadServerPacket(ProtocolVars_t *pv, Packet_t *pkt){ UInt8 buf[PACKET_MAX_ENCODED_LENGTH]; int bufLen = pv->xFtns->xportReadPacket(buf, sizeof(buf)); /* parse and validate packet */ if (bufLen < 0) { // read error (transport not open?) return -1; // Fix: MDF 2006/07/28 } else if (bufLen == 0) { // timeoout (or no data) return 0; } else { // this count won't be accurate if an error occurred during a packet read pv->totalReadBytes += bufLen; pv->sessionReadBytes += bufLen; // parse packet _protocolParseServerPacket(pv, pkt, buf); // , bufLen); return 1; } }// ----------------------------------------------------------------------------/* flush input buffer (from server) */static void _protocolFlushInput(ProtocolVars_t *pv){ // This typically only has meaning in speakFreely mode (eq. serial transport) // This prevents reading old/obsolete messages that may have been queue by // the server some time ago that we weren't able to pick up quickly enough. if (pv->isSerial) { pv->xFtns->xportReadFlush(); }}/* return true if we have pending data to send */static utBool _protocolHasDataToSend(ProtocolVars_t *pv){ if (pv->sendIdentification != SEND_ID_NONE) { // identification has been requested return utTrue; } else if (pqueHasPackets(&(pv->pendingQueue))) { // has pending important packets return utTrue; } else if (pqueHasPackets(&(pv->volatileQueue))) { // has miscellaneous volatile packets return utTrue; } else if (pqueHasPackets(_protocolGetEventQueue(pv))) { // has event packets return utTrue; } return utFalse;}/* send identification packets */static utBool _protocolSendIdentification(ProtocolVars_t *pv){ if (pv->sendIdentification != SEND_ID_NONE) { Packet_t idPkt; /* account/device */ const char *acctId = propGetAccountID(); // propGetString(PROP_STATE_ACCOUNT_ID,""); const char *devId = propGetDeviceID(pv->protoNdx); // propGetString(PROP_STATE_DEVICE_ID,""); /* first try our UniqueID */ utBool okUniqueId = (pv->sendIdentification == SEND_ID_UNIQUE)? utTrue : utFalse; if (okUniqueId) { UInt8 id[MAX_ID_SIZE]; int maxLen = sizeof(id); PropertyError_t err = propGetValue(PROP_STATE_UNIQUE_ID, id, maxLen); // ok as-is int b, len = PROP_ERROR_OK_LENGTH(err); if (len >= MIN_UNIQUE_SIZE) { // length must at least MIN_UNIQUE_SIZE for (b = 0; (b < len) && (id[b] == 0); b++); if (b < len) { // at least one field must be non-zero pktInit(&idPkt, PKT_CLIENT_UNIQUE_ID, "%*b", len, id); if (_protocolWritePacket(pv,&idPkt) < 0) { return utFalse; // write error } pv->sendIdentification = SEND_ID_NONE; return utTrue; // no write-error } } } // AccountID utBool sentAcct = utFalse; if (acctId && *acctId) { // Account ID is sent iff it is defined pktInit(&idPkt, PKT_CLIENT_ACCOUNT_ID, "%*s", MAX_ID_SIZE, acctId); if (_protocolWritePacket(pv,&idPkt) < 0) { return utFalse; // write error } sentAcct = utTrue; } // DeviceID utBool sentDev = utFalse; if (devId && *devId) { // Device ID is sent iff it is defined //logDEBUG(LOGSRC,"Device: %s", devId); pktInit(&idPkt, PKT_CLIENT_DEVICE_ID, "%*s", MAX_ID_SIZE, devId); if (_protocolWritePacket(pv,&idPkt) < 0) { return utFalse; // write error } sentDev = utTrue; } /* ID successfully sent */ pv->sendIdentification = SEND_ID_NONE; return utTrue; // no write-error } return utTrue; // no write-error}/* send contents of specified queue */static utBool _protocolSendQueue(ProtocolVars_t *pv, PacketQueue_t *pq, PacketPriority_t maxPri, int maxEvents, utBool *hasMorePackets){ int rtnWriteLen = 0; // rtnVal /* adjust arguments */ if (maxPri < PRIORITY_LOW) { maxPri = PRIORITY_LOW; } // at least low priority packets if (maxEvents == 0) { maxEvents = 1; } // at least 1 packet // a 'maxEvent' < 0 means there is no maximum number of events to send /* iterate through queue */ // This loop stops as soon as one of the following has occured: // - We've sent the specified 'maxEvents'. // - All events in the queue have been sent. // - We've run into a packet that exceeds our maximum allowable priority. Packet_t *quePkt; PacketQueueIterator_t queIter; pqueGetIterator(pq, &queIter); for (; (maxEvents != 0) && (quePkt=pqueGetNextPacket((Packet_t*)0,&queIter)) && (quePkt->priority <= maxPri) ;) { /* write packet */ rtnWriteLen = _protocolWritePacket(pv,quePkt); if (rtnWriteLen < 0) { break; } /* mark this packet as sent */ quePkt->sent = utTrue; // mark it as sent /* decrement counter */ if (maxEvents > 0) { maxEvents--; } /* unknown sequence? */ if ((quePkt->seqLen > 0) && (quePkt->sequence == SEQUENCE_ALL)) { // stop if we find a 'sequence' anomoly break; } } /* still have more packets? */ if (hasMorePackets) { *hasMorePackets = pqueHasNextPacket(&queIter); } /* check for errors */ if (rtnWriteLen < 0) { return utFalse; // write error: close socket } else { return utTrue; } }/* end End-Of-Block */static utBool _protocolSendEOB(ProtocolVars_t *pv, utBool hasMoreEvents){ // Duplex Transport if (!pv->speakFreely) { UInt8 buf[PACKET_MAX_ENCODED_LENGTH]; Buffer_t bb, *dest = binBuffer(&bb, buf, sizeof(buf), BUFFER_DESTINATION); Packet_t eob; ClientPacketType_t eobType = hasMoreEvents? PKT_CLIENT_EOB_MORE : PKT_CLIENT_EOB_DONE; /* Add Fletcher checksum if encoding is binary */ if (ENCODING_VALUE(pv->sessionFirstEncoding) == ENCODING_BINARY) { /* encode packet with a placeholder for the checksum */ // Fixed checksum length check, was "sizeof(ChecksumFletcher_t)" pktInit(&eob, eobType, "%*z", FLETCHER_CHECKSUM_LENGTH); // zero-fill 2 bytes pktEncodePacket(dest, &eob, ENCODING_BINARY); // we ignore any internal errors cksumCalcFletcher(BUFFER_PTR(dest), BUFFER_DATA_LENGTH(dest)); // length should be 5 /* calculate the checksum and insert it into the packet */ ChecksumFletcher_t fcs; cksumGetFletcherChecksum(&fcs); // encode binPrintf(BUFFER_PTR(dest)+3, FLETCHER_CHECKSUM_LENGTH, "%*b", FLETCHER_CHECKSUM_LENGTH, fcs.C); } else { /* encode packet without checksum */ pktInit(&eob, eobType, (char*)0); pktEncodePacket(dest, &eob, pv->sessionFirstEncoding); // we ignore any internal errors } /* write EOB packet */ int rtnWriteLen = _protocolWrite(pv, BUFFER_PTR(dest), BUFFER_DATA_LENGTH(dest), utFalse); if (rtnWriteLen < 0) { return utFalse; // write error: close socket } pv->speakFreely = utFalse; // relinquish any granted "speak freely" permission on EOB pv->speakFreelyMaxEvents = -1; /* next encoding */ pv->sessionFirstEncoding = pv->sessionEncoding; } return utTrue;}/* send packets to server */static utBool _protocolSendAllPackets(ProtocolVars_t *pv, TransportType_t xportType, utBool brief, int dftMaxEvents){ /* reset checksum before we start transmitting */ cksumResetFletcher(); /* transmit identification packets */ if (!_protocolSendIdentification(pv)) { return utFalse; // write error } /* 'brief' means send only the identification and EOB packets */ // If the ID packet aren't sent (don't need to be sent), and 'speakFreekly' is true, // then its possible that nothing will be sent. utBool hasMoreEvents = utFalse; if (brief) { hasMoreEvents = _protocolHasDataToSend(pv); } else { /* transmit pending packets (sent first) */ if (!_protocolSendQueue(pv, &(pv->pendingQueue), PRIORITY_HIGH, -1, (utBool*)0)) { return utFalse; // write error: close socket } /* transmit volatile packets (sent second) */ if (!_protocolSendQueue(pv, &(pv->volatileQueue), PRIORITY_HIGH, -1, (utBool*)0)) { return utFalse; // write error: close socket } /* reset queues */ // wait until all queues have successfully been sent before clearing pqueResetQueue(&(pv->volatileQueue)); pqueResetQueue(&(pv->pendingQueue)); /* send events flag */ // default to sending events if the specified default maximum is not explicitly '0' PacketQueue_t *eventQueue = _protocolGetEventQueue(pv); utBool sendEvents = (eventQueue && (dftMaxEvents != 0))? utTrue : utFalse; // other criteria may also set this to false below /* determine if we should force-relinquish speak-freely */ if (pv->speakFreely) { // Always relinquish speak-freely after sending a block of events if (sendEvents && pqueHasPackets(eventQueue)) { // If we have any events at all, relinquish speak-freely // This will allow the server to acknowledge these events and let the client // know that the server is listening. pv->speakFreely = utFalse; pv->speakFreelyMaxEvents = -1; } } /* send events */ if (sendEvents) { // && (dftMaxEvents != 0) /* max events to send during this block */ int maxEvents = 8; switch ((int)xportType) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -