📄 mftp_snd.cc
字号:
if(*nak_bitmap) { // "OR in" NAK byte into newly constructed NAK bitmap *nak_array |= *nak_bitmap; } nak_array++; nak_bitmap++; } } nakCount_++; // increment total number of received nak-packets}void MFTPSndAgent::init_user_file(unsigned long readAheadBufsize){ read_ahead_bufsize = readAheadBufsize; fseek_offset = 0; // initialize codeword pattern iterator.setSourceWordLen(dtus_per_group); CwPat = iterator.getNextCwPat(); // free arrays from a possible previous transmission (no effect on NULL pointers) delete [] naks; delete [] retx; // allocate naks bitmap init'd to all nak'd: naks = new unsigned char[(nb_groups + 7) / 8]; assert(naks != NULL); // or else we ran out of memory SET_ALL_BITS(naks, nb_groups); // allocate retransmission bitmap init'd to none retransmitted: retx = new unsigned char[(nb_groups + 7) / 8]; assert(retx != NULL); // or else we ran out of memory RESET_ALL_BITS(retx, nb_groups); CurrentPass = CurrentGroup = MinGroupNbInBuf = NbGroupsInBuf = 0;}// reads as many groups into the read-ahead-buffer as there is space, starting with// group CurrentGroup. The groups that were not not NACK'd by anyone will be// skipped (and the corresponding areas in the read-ahead-buffer will be skipped// as well).void MFTPSndAgent::fill_read_ahead_buf(){ unsigned int dtu_pos; // loops over [0..dtus_per_group) unsigned long seek_offset; // where to position the head for disk seeks unsigned long buf_pos = 0; // position where data is written (into main memory) when // read from disk, relative to the start of read_ahead_buf CW_PATTERN_t cw_pat_tmp = CwPat; unsigned long i; unsigned long len; // switch to next group that must be read: MinGroupNbInBuf = CurrentGroup; NbGroupsInBuf = min(read_ahead_bufsize / (bitcount(CwPat) * dtu_size), nb_groups - MinGroupNbInBuf); while(cw_pat_tmp != 0) { dtu_pos = minbit(cw_pat_tmp); assert(0 <= dtu_pos && dtu_pos < dtus_per_group); assert(MinGroupNbInBuf + NbGroupsInBuf <= nb_groups); cw_pat_tmp &= ~((CW_PATTERN_t) 1 << dtu_pos); // clear bit at position "dtu_pos" for(i = MinGroupNbInBuf; i < MinGroupNbInBuf + NbGroupsInBuf; ++i) { // continue with for-loop if group i was not NACKed by anyone if(IS_BIT_CLEARED(naks, i)) { buf_pos += dtu_size; continue; } // Note: there is never data accessed "outside" the file as the while-loop // is left as soon as the last (possibly partial) DTU has been read. seek_offset = (dtu_pos * nb_groups + i) * dtu_size; if(seek_offset >= FileSize) { // we can get there if the last group(s) have fewer than // dtus_per_group packets. If we get here, we are ready. return; // OK } if (fseek_offset != seek_offset) { // do the fseek here (omitted) seekCount_++; fseek_offset = seek_offset; } // determine number of bytes to read len = min(dtu_size, FileSize - fseek_offset); // read len bytes from file here (omitted) fseek_offset += len; buf_pos += len; if(len < dtu_size) { // we get here if the last dtu is smaller than dtu_size and if // we have just read that last dtu assert(fseek_offset == FileSize); // we must be at EOF // clear rest of read-ahead-buffer here (omitted) buf_pos = bitcount(CwPat) * NbGroupsInBuf * dtu_size; return; // that's it, no more packets to process } assert(len == dtu_size); assert(buf_pos <= bitcount(CwPat) * NbGroupsInBuf * dtu_size); } // for } // while // we get here only if no group was read with less than dtus_per_group packets and // the if not the last packet was read (in case it is too short) assert(buf_pos == bitcount(CwPat) * NbGroupsInBuf * dtu_size);}// send_data() sends next data packet.// In tcl's result buffer, return// 0, if current pass not yet finished// -1, if reached the end of the current pass int MFTPSndAgent::send_data(){ Packet* p = Agent::allocpkt(); hdr_mftp* hdr = hdr_mftp::access(p); CW_PATTERN_t mask; Tcl& tcl = Tcl::instance(); assert(0 <= CurrentGroup && CurrentGroup < nb_groups); assert(NbGroupsInBuf >= 0); assert(0 <= MinGroupNbInBuf && MinGroupNbInBuf + NbGroupsInBuf <= nb_groups); // now comes NACK processing: loop until end of file or until // a nak bit is detected: while(CurrentGroup < nb_groups && IS_BIT_CLEARED(naks, CurrentGroup)) { CurrentGroup++; // proceed to next bit of the nak bitmap } // do not transmit packet if // (1) CurrentGroup has reached the total number of groups ("end of pass") or // (2) CwPat has only bits set that refer to some packets that are cut off in // the current group (for example, if CurrentGroup has only 5 packets // with nb_groups=8 and if CwPat=64+32) if(CurrentGroup != nb_groups && ((mask = (~(CW_PATTERN_t) 0) >> (8 * sizeof(CW_PATTERN_t) - get_dtus_per_group(CurrentGroup))) & CwPat) != 0) { assert(CurrentGroup < nb_groups); // see if the read-ahead-buffer is exhausted so that we must load new data // from file. Only groups with a corresponding NAK-bit are read, that is, // those that were requested for retransmission assert(MinGroupNbInBuf <= CurrentGroup); if(CurrentGroup >= MinGroupNbInBuf + NbGroupsInBuf) { // exhausted? fill_read_ahead_buf(); // load new data from file } assert(MinGroupNbInBuf <= CurrentGroup && CurrentGroup < MinGroupNbInBuf + NbGroupsInBuf); // produce an encoded packet here (omitted) // generate the header hdr->type = hdr_mftp::PDU_DATA_TRANSFER; hdr->spec.data.pass_nb = CurrentPass; hdr->spec.data.group_nb = CurrentGroup; hdr->spec.data.cw_pat = CwPat & mask; char buf[8 * sizeof(CW_PATTERN_t) + 1]; (CwPat & mask).print(buf); tcl.evalf("%s send notify %lu %lu %s", name_, (unsigned long) CurrentPass, (unsigned long) CurrentGroup, (char*) buf); hdr_cmn* ch = hdr_cmn::access(p); ch->size() = sizeof(hdr_mftp) + dtu_size; // transmit packet target_->recv(p); RESET_BIT(naks, CurrentGroup); // reset the dtu status bit in the nak bitmap SET_BIT(retx, CurrentGroup); // set the dtus status bit in the retransmission bitmap CurrentGroup++; } // if // if last group of the file: if(CurrentGroup == nb_groups || !(CwPat & mask)) { // end of pass? do { CwPat = iterator.getNextCwPat(); // get next codeword for new pass } while(!(CwPat & ((~(CW_PATTERN_t) 0) >> (8 * sizeof(CW_PATTERN_t) - get_dtus_per_group(0))))); // } while(!(CwPat & ((((CW_PATTERN_t) 1) << get_dtus_per_group(0)) - 1))); // prepare a new pas: MinGroupNbInBuf = 0; NbGroupsInBuf = 0; CurrentGroup = 0; // reset retransmission bitmap for dealing with of latent naks RESET_ALL_BITS(retx, nb_groups); // the first dtus_per_group passes must be transmitted "in full" if(CurrentPass < dtus_per_group - 1) { SET_ALL_BITS(naks, nb_groups); } tcl.evalf("%s pass-finished %lu %lu", name_, (unsigned long) CurrentPass, (unsigned long) nb_blocks()); CurrentPass++; tcl.result("-1"); // return end-of-pass to the caller return TCL_OK; } tcl.result("0"); // end-of-pass not yet reached return TCL_OK;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -