📄 ctpforwardingenginep.nc
字号:
return SUCCESS; } else { dbg("Forwarder", "%s: send failed as packet could not be enqueued.\n", __FUNCTION__); // send a debug message to the uart call CollectionDebug.logEvent(NET_C_FE_SEND_QUEUE_FULL); // Return the pool entry, as it's not for me... return FAIL; } } command error_t Send.cancel[uint8_t client](message_t* msg) { // cancel not implemented. will require being able // to pull entries out of the queue. return FAIL; } command uint8_t Send.maxPayloadLength[uint8_t client]() { return call Packet.maxPayloadLength(); } command void* Send.getPayload[uint8_t client](message_t* msg, uint8_t len) { return call Packet.getPayload(msg, len); } /* * These is where all of the send logic is. When the ForwardingEngine * wants to send a packet, it posts this task. The send logic is * independent of whether it is a forwarded packet or a packet from * a send client. * * The task first checks that there is a packet to send and that * there is a valid route. It then marshals the relevant arguments * and prepares the packet for sending. If the node is a collection * root, it signals Receive with the loopback message. Otherwise, * it sets the packet to be acknowledged and sends it. It does not * remove the packet from the send queue: while sending, the * packet being sent is at the head of the queue; a packet is dequeued * in the sendDone handler, either due to retransmission failure * or to a successful send. */ task void sendTask() { dbg("Forwarder", "%s: Trying to send a packet. Queue size is %hhu.\n", __FUNCTION__, call SendQueue.size()); if (sending) { dbg("Forwarder", "%s: busy, don't send\n", __FUNCTION__); call CollectionDebug.logEvent(NET_C_FE_SEND_BUSY); return; } else if (call SendQueue.empty()) { dbg("Forwarder", "%s: queue empty, don't send\n", __FUNCTION__); call CollectionDebug.logEvent(NET_C_FE_SENDQUEUE_EMPTY); return; } else if (!call RootControl.isRoot() && !call UnicastNameFreeRouting.hasRoute()) { dbg("Forwarder", "%s: no route, don't send, start retry timer\n", __FUNCTION__); call RetxmitTimer.startOneShot(10000); // send a debug message to the uart call CollectionDebug.logEvent(NET_C_FE_NO_ROUTE); return; } /* else if (parentCongested) { // Do nothing; the congestion timer is necessarily set which // will clear parentCongested and repost sendTask(). dbg("Forwarder", "%s: sendTask deferring for congested parent\n", __FUNCTION__); call CollectionDebug.logEvent(NET_C_FE_CONGESTION_SENDWAIT); } */ else { error_t subsendResult; fe_queue_entry_t* qe = call SendQueue.head(); uint8_t payloadLen = call SubPacket.payloadLength(qe->msg); am_addr_t dest = call UnicastNameFreeRouting.nextHop(); uint16_t gradient; if (call CtpInfo.isNeighborCongested(dest)) { // Our parent is congested. We should wait. // Don't repost the task, CongestionTimer will do the job if (! parentCongested ) { parentCongested = TRUE; call CollectionDebug.logEvent(NET_C_FE_CONGESTION_BEGIN); } if (! call CongestionTimer.isRunning()) { startCongestionTimer(CONGESTED_WAIT_WINDOW, CONGESTED_WAIT_OFFSET); } dbg("Forwarder", "%s: sendTask deferring for congested parent\n", __FUNCTION__); //call CollectionDebug.logEvent(NET_C_FE_CONGESTION_SENDWAIT); return; } if (parentCongested) { parentCongested = FALSE; call CollectionDebug.logEvent(NET_C_FE_CONGESTION_END); } // Once we are here, we have decided to send the packet. if (call SentCache.lookup(qe->msg)) { call CollectionDebug.logEvent(NET_C_FE_DUPLICATE_CACHE_AT_SEND); call SendQueue.dequeue(); if (call MessagePool.put(qe->msg) != SUCCESS) call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR); if (call QEntryPool.put(qe) != SUCCESS) call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR); post sendTask(); return; } /* If our current parent is not the same as the last parent we sent do, then reset the count of unacked packets: don't penalize a new parent for the failures of a prior one.*/ if (dest != lastParent) { qe->retries = MAX_RETRIES; lastParent = dest; } dbg("Forwarder", "Sending queue entry %p\n", qe); if (call RootControl.isRoot()) { collection_id_t collectid = getHeader(qe->msg)->type; memcpy(loopbackMsgPtr, qe->msg, sizeof(message_t)); ackPending = FALSE; dbg("Forwarder", "%s: I'm a root, so loopback and signal receive.\n", __FUNCTION__); loopbackMsgPtr = signal Receive.receive[collectid](loopbackMsgPtr, call Packet.getPayload(loopbackMsgPtr, call Packet.payloadLength(loopbackMsgPtr)), call Packet.payloadLength(loopbackMsgPtr)); signal SubSend.sendDone(qe->msg, SUCCESS); return; } // Loop-detection functionality: if (call CtpInfo.getEtx(&gradient) != SUCCESS) { // If we have no metric, set our gradient conservatively so // that other nodes don't automatically drop our packets. gradient = 0; } call CtpPacket.setEtx(qe->msg, gradient); ackPending = (call PacketAcknowledgements.requestAck(qe->msg) == SUCCESS); // Set or clear the congestion bit on *outgoing* packets. if (call CtpCongestion.isCongested()) call CtpPacket.setOption(qe->msg, CTP_OPT_ECN); else call CtpPacket.clearOption(qe->msg, CTP_OPT_ECN); subsendResult = call SubSend.send(dest, qe->msg, payloadLen); if (subsendResult == SUCCESS) { // Successfully submitted to the data-link layer. sending = TRUE; dbg("Forwarder", "%s: subsend succeeded with %p.\n", __FUNCTION__, qe->msg); if (qe->client < CLIENT_COUNT) { dbg("Forwarder", "%s: client packet.\n", __FUNCTION__); } else { dbg("Forwarder", "%s: forwarded packet.\n", __FUNCTION__); } return; } else if (subsendResult == EOFF) { // The radio has been turned off underneath us. Assume that // this is for the best. When the radio is turned back on, we'll // handle a startDone event and resume sending. radioOn = FALSE; dbg("Forwarder", "%s: subsend failed from EOFF.\n", __FUNCTION__); // send a debug message to the uart call CollectionDebug.logEvent(NET_C_FE_SUBSEND_OFF); } else if (subsendResult == EBUSY) { // This shouldn't happen, as we sit on top of a client and // control our own output; it means we're trying to // double-send (bug). This means we expect a sendDone, so just // wait for that: when the sendDone comes in, // we'll try // sending this packet again. dbg("Forwarder", "%s: subsend failed from EBUSY.\n", __FUNCTION__); // send a debug message to the uart call CollectionDebug.logEvent(NET_C_FE_SUBSEND_BUSY); } else if (subsendResult == ESIZE) { dbg("Forwarder", "%s: subsend failed from ESIZE: truncate packet.\n", __FUNCTION__); call Packet.setPayloadLength(qe->msg, call Packet.maxPayloadLength()); post sendTask(); call CollectionDebug.logEvent(NET_C_FE_SUBSEND_SIZE); } } } void sendDoneBug() { // send a debug message to the uart call CollectionDebug.logEvent(NET_C_FE_BAD_SENDDONE); } /* * The second phase of a send operation; based on whether the transmission was * successful, the ForwardingEngine either stops sending or starts the * RetxmitTimer with an interval based on what has occured. If the send was * successful or the maximum number of retransmissions has been reached, then * the ForwardingEngine dequeues the current packet. If the packet is from a * client it signals Send.sendDone(); if it is a forwarded packet it returns * the packet and queue entry to their respective pools. * */ event void SubSend.sendDone(message_t* msg, error_t error) { fe_queue_entry_t *qe = call SendQueue.head(); dbg("Forwarder", "%s to %hu and %hhu\n", __FUNCTION__, call AMPacket.destination(msg), error); if (qe == NULL || qe->msg != msg) { dbg("Forwarder", "%s: BUG: not our packet (%p != %p)!\n", __FUNCTION__, msg, qe->msg); sendDoneBug(); // Not our packet, something is very wrong... return; } else if (error != SUCCESS) { // Immediate retransmission is the worst thing to do. dbg("Forwarder", "%s: send failed\n", __FUNCTION__); call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL, call CollectionPacket.getSequenceNumber(msg), call CollectionPacket.getOrigin(msg), call AMPacket.destination(msg)); startRetxmitTimer(SENDDONE_FAIL_WINDOW, SENDDONE_FAIL_OFFSET); } else if (ackPending && !call PacketAcknowledgements.wasAcked(msg)) { // AckPending is for case when DL cannot support acks. call LinkEstimator.txNoAck(call AMPacket.destination(msg)); call CtpInfo.recomputeRoutes(); if (--qe->retries) { dbg("Forwarder", "%s: not acked\n", __FUNCTION__); call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_WAITACK, call CollectionPacket.getSequenceNumber(msg), call CollectionPacket.getOrigin(msg), call AMPacket.destination(msg)); startRetxmitTimer(SENDDONE_NOACK_WINDOW, SENDDONE_NOACK_OFFSET); } else { //max retries, dropping packet if (qe->client < CLIENT_COUNT) { clientPtrs[qe->client] = qe; signal Send.sendDone[qe->client](msg, FAIL); call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL_ACK_SEND, call CollectionPacket.getSequenceNumber(msg), call CollectionPacket.getOrigin(msg), call AMPacket.destination(msg)); } else { if (call MessagePool.put(qe->msg) != SUCCESS) call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR); if (call QEntryPool.put(qe) != SUCCESS) call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR); call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL_ACK_FWD, call CollectionPacket.getSequenceNumber(msg), call CollectionPacket.getOrigin(msg), call AMPacket.destination(msg)); } call SendQueue.dequeue(); sending = FALSE; startRetxmitTimer(SENDDONE_OK_WINDOW, SENDDONE_OK_OFFSET); } } else if (qe->client < CLIENT_COUNT) { ctp_data_header_t* hdr; uint8_t client = qe->client; dbg("Forwarder", "%s: our packet for client %hhu, remove %p from queue\n", __FUNCTION__, client, qe); call CollectionDebug.logEventMsg(NET_C_FE_SENT_MSG, call CollectionPacket.getSequenceNumber(msg), call CollectionPacket.getOrigin(msg), call AMPacket.destination(msg)); call LinkEstimator.txAck(call AMPacket.destination(msg)); clientPtrs[client] = qe; hdr = getHeader(qe->msg); call SendQueue.dequeue(); signal Send.sendDone[client](msg, SUCCESS); sending = FALSE; startRetxmitTimer(SENDDONE_OK_WINDOW, SENDDONE_OK_OFFSET); } else if (call MessagePool.size() < call MessagePool.maxSize()) { // A successfully forwarded packet. dbg("Forwarder,Route", "%s: successfully forwarded packet (client: %hhu), message pool is %hhu/%hhu.\n", __FUNCTION__, qe->client, call MessagePool.size(), call MessagePool.maxSize()); call CollectionDebug.logEventMsg(NET_C_FE_FWD_MSG, call CollectionPacket.getSequenceNumber(msg), call CollectionPacket.getOrigin(msg), call AMPacket.destination(msg)); call LinkEstimator.txAck(call AMPacket.destination(msg)); call SentCache.insert(qe->msg); call SendQueue.dequeue(); if (call MessagePool.put(qe->msg) != SUCCESS) call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR); if (call QEntryPool.put(qe) != SUCCESS) call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR); sending = FALSE; startRetxmitTimer(SENDDONE_OK_WINDOW, SENDDONE_OK_OFFSET); } else { dbg("Forwarder", "%s: BUG: we have a pool entry, but the pool is full, client is %hhu.\n", __FUNCTION__, qe->client); sendDoneBug(); // It's a forwarded packet, but there's no room the pool; // someone has double-stored a pointer somewhere and we have nowhere // to put this, so we have to leak it... } } /* * Function for preparing a packet for forwarding. Performs * a buffer swap from the message pool. If there are no free * message in the pool, it returns the passed message and does not * put it on the send queue. */ message_t* ONE forward(message_t* ONE m) { if (call MessagePool.empty()) { dbg("Route", "%s cannot forward, message pool empty.\n", __FUNCTION__); // send a debug message to the uart call CollectionDebug.logEvent(NET_C_FE_MSG_POOL_EMPTY); } else if (call QEntryPool.empty()) { dbg("Route", "%s cannot forward, queue entry pool empty.\n", __FUNCTION__); // send a debug message to the uart call CollectionDebug.logEvent(NET_C_FE_QENTRY_POOL_EMPTY); } else { message_t* newMsg; fe_queue_entry_t *qe; uint16_t gradient; qe = call QEntryPool.get(); if (qe == NULL) { call CollectionDebug.logEvent(NET_C_FE_GET_MSGPOOL_ERR); return m; } newMsg = call MessagePool.get(); if (newMsg == NULL) { call CollectionDebug.logEvent(NET_C_FE_GET_QEPOOL_ERR);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -