📄 ctpforwardingenginep.nc
字号:
return m; } memset(newMsg, 0, sizeof(message_t)); memset(m->metadata, 0, sizeof(message_metadata_t)); qe->msg = m; qe->client = 0xff; qe->retries = MAX_RETRIES; if (call SendQueue.enqueue(qe) == SUCCESS) { dbg("Forwarder,Route", "%s forwarding packet %p with queue size %hhu\n", __FUNCTION__, m, call SendQueue.size()); // Loop-detection code: if (call CtpInfo.getEtx(&gradient) == SUCCESS) { // We only check for loops if we know our own metric if (call CtpPacket.getEtx(m) <= gradient) { // If our etx metric is less than or equal to the etx value // on the packet (etx of the previous hop node), then we believe // we are in a loop. // Trigger a route update and backoff. call CtpInfo.triggerImmediateRouteUpdate(); startRetxmitTimer(LOOPY_WINDOW, LOOPY_OFFSET); call CollectionDebug.logEventMsg(NET_C_FE_LOOP_DETECTED, call CollectionPacket.getSequenceNumber(m), call CollectionPacket.getOrigin(m), call AMPacket.destination(m)); } } if (!call RetxmitTimer.isRunning()) { // sendTask is only immediately posted if we don't detect a // loop. post sendTask(); } // Successful function exit point: return newMsg; } else { // There was a problem enqueuing to the send queue. if (call MessagePool.put(newMsg) != SUCCESS) call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR); if (call QEntryPool.put(qe) != SUCCESS) call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR); } } // NB: at this point, we have a resource acquistion problem. // Log the event, and drop the // packet on the floor. call CollectionDebug.logEvent(NET_C_FE_SEND_QUEUE_FULL); return m; } /* * Received a message to forward. Check whether it is a duplicate by * checking the packets currently in the queue as well as the * send history cache (in case we recently forwarded this packet). * The cache is important as nodes immediately forward packets * but wait a period before retransmitting after an ack failure. * If this node is a root, signal receive. */ event message_t* SubReceive.receive(message_t* msg, void* payload, uint8_t len) { collection_id_t collectid; bool duplicate = FALSE; fe_queue_entry_t* qe; uint8_t i, thl; collectid = call CtpPacket.getType(msg); // Update the THL here, since it has lived another hop, and so // that the root sees the correct THL. thl = call CtpPacket.getThl(msg); thl++; call CtpPacket.setThl(msg, thl); call CollectionDebug.logEventMsg(NET_C_FE_RCV_MSG, call CollectionPacket.getSequenceNumber(msg), call CollectionPacket.getOrigin(msg), thl--); if (len > call SubSend.maxPayloadLength()) { return msg; } //See if we remember having seen this packet //We look in the sent cache ... if (call SentCache.lookup(msg)) { call CollectionDebug.logEvent(NET_C_FE_DUPLICATE_CACHE); return msg; } //... and in the queue for duplicates if (call SendQueue.size() > 0) { for (i = call SendQueue.size(); --i;) { qe = call SendQueue.element(i); if (call CtpPacket.matchInstance(qe->msg, msg)) { duplicate = TRUE; break; } } } if (duplicate) { call CollectionDebug.logEvent(NET_C_FE_DUPLICATE_QUEUE); return msg; } // If I'm the root, signal receive. else if (call RootControl.isRoot()) return signal Receive.receive[collectid](msg, call Packet.getPayload(msg, call Packet.payloadLength(msg)), call Packet.payloadLength(msg)); // I'm on the routing path and Intercept indicates that I // should not forward the packet. else if (!signal Intercept.forward[collectid](msg, call Packet.getPayload(msg, call Packet.payloadLength(msg)), call Packet.payloadLength(msg))) return msg; else { dbg("Route", "Forwarding packet from %hu.\n", getHeader(msg)->origin); return forward(msg); } } event message_t* SubSnoop.receive(message_t* msg, void *payload, uint8_t len) { //am_addr_t parent = call UnicastNameFreeRouting.nextHop(); am_addr_t proximalSrc = call AMPacket.source(msg); // Check for the pull bit (P) [TEP123] and act accordingly. This // check is made for all packets, not just ones addressed to us. if (call CtpPacket.option(msg, CTP_OPT_PULL)) { call CtpInfo.triggerRouteUpdate(); } call CtpInfo.setNeighborCongested(proximalSrc, call CtpPacket.option(msg, CTP_OPT_ECN)); return signal Snoop.receive[call CtpPacket.getType(msg)] (msg, payload + sizeof(ctp_data_header_t), len - sizeof(ctp_data_header_t)); } event void RetxmitTimer.fired() { sending = FALSE; post sendTask(); } event void CongestionTimer.fired() { //parentCongested = FALSE; //call CollectionDebug.logEventSimple(NET_C_FE_CONGESTION_END, 0); post sendTask(); } command bool CtpCongestion.isCongested() { // A simple predicate for now to determine congestion state of // this node. bool congested = (call SendQueue.size() > congestionThreshold) ? TRUE : FALSE; return ((congested || clientCongested)?TRUE:FALSE); } command void CtpCongestion.setClientCongested(bool congested) { bool wasCongested = call CtpCongestion.isCongested(); clientCongested = congested; if (!wasCongested && congested) { call CtpInfo.triggerImmediateRouteUpdate(); } else if (wasCongested && ! (call CtpCongestion.isCongested())) { call CtpInfo.triggerRouteUpdate(); } } command void Packet.clear(message_t* msg) { call SubPacket.clear(msg); } command uint8_t Packet.payloadLength(message_t* msg) { return call SubPacket.payloadLength(msg) - sizeof(ctp_data_header_t); } command void Packet.setPayloadLength(message_t* msg, uint8_t len) { call SubPacket.setPayloadLength(msg, len + sizeof(ctp_data_header_t)); } command uint8_t Packet.maxPayloadLength() { return call SubPacket.maxPayloadLength() - sizeof(ctp_data_header_t); } command void* Packet.getPayload(message_t* msg, uint8_t len) { uint8_t* payload = call SubPacket.getPayload(msg, len + sizeof(ctp_data_header_t)); if (payload != NULL) { payload += sizeof(ctp_data_header_t); } return payload; } command am_addr_t CollectionPacket.getOrigin(message_t* msg) {return getHeader(msg)->origin;} command collection_id_t CollectionPacket.getType(message_t* msg) {return getHeader(msg)->type;} command uint8_t CollectionPacket.getSequenceNumber(message_t* msg) {return getHeader(msg)->originSeqNo;} command void CollectionPacket.setOrigin(message_t* msg, am_addr_t addr) {getHeader(msg)->origin = addr;} command void CollectionPacket.setType(message_t* msg, collection_id_t id) {getHeader(msg)->type = id;} command void CollectionPacket.setSequenceNumber(message_t* msg, uint8_t _seqno) {getHeader(msg)->originSeqNo = _seqno;} //command ctp_options_t CtpPacket.getOptions(message_t* msg) {return getHeader(msg)->options;} command uint8_t CtpPacket.getType(message_t* msg) {return getHeader(msg)->type;} command am_addr_t CtpPacket.getOrigin(message_t* msg) {return getHeader(msg)->origin;} command uint16_t CtpPacket.getEtx(message_t* msg) {return getHeader(msg)->etx;} command uint8_t CtpPacket.getSequenceNumber(message_t* msg) {return getHeader(msg)->originSeqNo;} command uint8_t CtpPacket.getThl(message_t* msg) {return getHeader(msg)->thl;} command void CtpPacket.setThl(message_t* msg, uint8_t thl) {getHeader(msg)->thl = thl;} command void CtpPacket.setOrigin(message_t* msg, am_addr_t addr) {getHeader(msg)->origin = addr;} command void CtpPacket.setType(message_t* msg, uint8_t id) {getHeader(msg)->type = id;} command bool CtpPacket.option(message_t* msg, ctp_options_t opt) { return ((getHeader(msg)->options & opt) == opt) ? TRUE : FALSE; } command void CtpPacket.setOption(message_t* msg, ctp_options_t opt) { getHeader(msg)->options |= opt; } command void CtpPacket.clearOption(message_t* msg, ctp_options_t opt) { getHeader(msg)->options &= ~opt; } command void CtpPacket.setEtx(message_t* msg, uint16_t e) {getHeader(msg)->etx = e;} command void CtpPacket.setSequenceNumber(message_t* msg, uint8_t _seqno) {getHeader(msg)->originSeqNo = _seqno;} // A CTP packet ID is based on the origin and the THL field, to // implement duplicate suppression as described in TEP 123. command bool CtpPacket.matchInstance(message_t* m1, message_t* m2) { return (call CtpPacket.getOrigin(m1) == call CtpPacket.getOrigin(m2) && call CtpPacket.getSequenceNumber(m1) == call CtpPacket.getSequenceNumber(m2) && call CtpPacket.getThl(m1) == call CtpPacket.getThl(m2) && call CtpPacket.getType(m1) == call CtpPacket.getType(m2)); } command bool CtpPacket.matchPacket(message_t* m1, message_t* m2) { return (call CtpPacket.getOrigin(m1) == call CtpPacket.getOrigin(m2) && call CtpPacket.getSequenceNumber(m1) == call CtpPacket.getSequenceNumber(m2) && call CtpPacket.getType(m1) == call CtpPacket.getType(m2)); } default event void Send.sendDone[uint8_t client](message_t *msg, error_t error) { } default event bool Intercept.forward[collection_id_t collectid](message_t* msg, void* payload, uint8_t len) { return TRUE; } default event message_t * Receive.receive[collection_id_t collectid](message_t *msg, void *payload, uint8_t len) { return msg; } default event message_t * Snoop.receive[collection_id_t collectid](message_t *msg, void *payload, uint8_t len) { return msg; } default command collection_id_t CollectionId.fetch[uint8_t client]() { return 0; } static void startRetxmitTimer(uint16_t mask, uint16_t offset) { uint16_t r = call Random.rand16(); r &= mask; r += offset; call RetxmitTimer.startOneShot(r); dbg("Forwarder", "Rexmit timer will fire in %hu ms\n", r); } static void startCongestionTimer(uint16_t mask, uint16_t offset) { uint16_t r = call Random.rand16(); r &= mask; r += offset; call CongestionTimer.startOneShot(r); dbg("Forwarder", "Congestion timer will fire in %hu ms\n", r); } /* signalled when this neighbor is evicted from the neighbor table */ event void LinkEstimator.evicted(am_addr_t neighbor) { } /* Default implementations for CollectionDebug calls. * These allow CollectionDebug not to be wired to anything if debugging * is not desired. */ default command error_t CollectionDebug.logEvent(uint8_t type) { return SUCCESS; } default command error_t CollectionDebug.logEventSimple(uint8_t type, uint16_t arg) { return SUCCESS; } default command error_t CollectionDebug.logEventDbg(uint8_t type, uint16_t arg1, uint16_t arg2, uint16_t arg3) { return SUCCESS; } default command error_t CollectionDebug.logEventMsg(uint8_t type, uint16_t msg, am_addr_t origin, am_addr_t node) { return SUCCESS; } default command error_t CollectionDebug.logEventRoute(uint8_t type, am_addr_t parent, uint8_t hopcount, uint16_t metric) { return SUCCESS; } }/* Rodrigo. This is an alternative event void CtpInfo.ParentCongested(bool congested) { if (congested) { // We've overheard our parent's ECN bit set. startCongestionTimer(CONGESTED_WAIT_WINDOW, CONGESTED_WAIT_OFFSET); parentCongested = TRUE; call CollectionDebug.logEvent(NET_C_FE_CONGESTION_BEGIN); } else { // We've overheard our parent's ECN bit cleared. call CongestionTimer.stop(); parentCongested = FALSE; call CollectionDebug.logEventSimple(NET_C_FE_CONGESTION_END, 1); post sendTask(); } }*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -