📄 peer.c
字号:
flags |= O_NONBLOCK; if ( fcntl( sock, F_SETFL, flags)) { bts_perror(errno, "fcntl F_SETFL"); return -1; }#endif ctx_setevents( ctx, sock, POLLIN); return 0;}/* * peer_process_queue() * * returns * <0 Error in process_queue * -1 Error writing message length * -2 Error writing message params * -3 Error writing bulk data * =0 Request done, no bytes still waiting in output buffer * 1 Output buffer is still in progress */int peer_process_queue( btFileSet *fs, btPeer *p) { if ( kStream_oqlen(&p->ios) <= 8096 ) { /* output buffer has drained, grab next request from queue */ return process_queue( fs, p); } else {#if 0 printf("%d: kStream_oqlen(&p->ios) = %d\n", p->ios.fd, kStream_oqlen(&p->ios));#endif return 0; }}void peer_shutdown( btContext *ctx, btPeer *peer, char *error) { int fd = peer->ios.fd;#if 0 printf("%d: peer_shutdown()\n", fd);#endif if(peer->currentPiece) { int rst; bs_clr( &ctx->downloads[peer->download]->requested, peer->currentPiece->piecenumber); rst = bs_firstClr( &peer->currentPiece->filled); if (rst >= 0) { peer->currentPiece->nextbyteReq = rst; } } if (peer->ios.fd != -1 && ctx->statmap[peer->ios.fd] != -1) { ctx_delstatus( ctx, peer->ios.fd); } ctx->sockpeer[fd]=NULL; peer->state = PEER_ERROR; peer->local.unreachable = 1; peer->error = error; kStream_finit( &peer->ios); kBitSet_finit( &peer->blocks);} /* * Returns 1 if there is another message waiting * Returns 0 on success * Returns -1 on error * returns -2 on invalid message received * returns -3 on invalid state */int peer_recv_message( btContext *ctx, btPeer *p) { int res=0;#if 0 printf("%d: before message state %d\n", p->ios.fd, p->state);#endif if (p->state == PEER_INIT) { /* socket isn't yet completely initialized */ /* int stat = ctx->statmap[p->ios.fd]; */ printf("Waiting for %d to complete connection.\n", p->ios.fd);#if 0 printf ("statblock %d, events %d, revents %d, fd %d\n", stat, ctx->status[ stat].events, ctx->status[ stat].revents, ctx->status[ stat].fd);#endif return 0; } else if (p->state == PEER_OUTGOING || p->state == PEER_INCOMING) { res = recv_handshake( ctx, p); if (res==0) { res = 1; /* assume there is another message */ p->state = PEER_GOOD; } } else if (p->state == PEER_GOOD) { res = recv_peermsg( ctx, p); } else { printf("%d: Peer in unexpected state %d\n", p->ios.fd, p->state); res = -3; }#if 0 printf("%d: after message state %d\n", p->ios.fd, p->state);#endif return res;}intupdate_interested( btContext *ctx, btPeer *p) { int interest; btDownload *dl=ctx->downloads[p->download]; DIE_UNLESS (p->download<ctx->downloadcount); interest = bs_hasInteresting( &dl->fileset.completed, &p->blocks, &dl->interested); if (interest != p->local.interested) { if (send_interested( p, interest)) return -1; if (interest == 0 && !p->remote.choked) { /* stop the rate counter */ stop_rate_timer( &p->remote, time(NULL)); } } return interest;}int countpeers( btPeerset *peers, int piece) { int count = 0; int i; for (i=0; i < peers->len; i++) { if (peers->peer[i]->currentPiece->piecenumber == piece) { count ++; } } return count;}btPartialPiece * peer_assign_block( struct btContext *ctx, btPeer *p) { btPartialPiece *piece; btDownload *dl=ctx->downloads[p->download]; DIE_UNLESS (p->download<ctx->downloadcount); piece=dl->fileset.partial; if (dl->peerset.interestedpeers > 4 || dl->peerset.incomplete <= 4 || dl->peerset.len - dl->peerset.incomplete > 10) { /* assign a partial block if no one else is working on it */ while(piece && (bs_isSet(&dl->requested, piece->piecenumber) || !bs_isSet( &p->blocks, piece->piecenumber))) { piece=piece->next; } if (piece) printf("%d: assigning partial block %d\n", p->ios.fd, piece->piecenumber); } else { /* use overlapping requests to get more interested peers (but not more than three on a block) */ while(piece && !bs_isSet( &p->blocks, piece->piecenumber) && countpeers(&dl->peerset, piece->piecenumber)<4) { piece=piece->next; } if (piece) printf("%d: assigning overlapping block %d\n", p->ios.fd, piece->piecenumber); } if(!piece) { int blk = -1; blk = bs_pickblock( &dl->requested, &p->blocks, &dl->interested); if (blk < 0) { blk = bs_pickblock( &dl->fileset.completed, &p->blocks, &dl->interested); } if (blk < 0) return NULL; piece = seg_getPiece(&dl->fileset, blk); } p->currentPiece = piece; bs_set( &dl->requested, piece->piecenumber); return piece;}#define INTERESTED_BONUS 2.0#define CHOKED_PENALTY 0.75#define NEWPEER_LEVEL 4000#define OLDPEER_LEVEL 0static int in_rate( btPeer *a, time_t now) { float atime, arate; int newpeer; atime = (float)rate_timer( &a->remote, now); arate = a->ios.read_count / atime; newpeer = rate_timer( &a->local, now); if (newpeer < 30) { /* new peers start at 4k/s assumed rate for first 30s */ arate = (float)NEWPEER_LEVEL; } else if (atime < 30) { /* if peer doesn't send reciprocate, then clamp */ arate = (float)OLDPEER_LEVEL; } if (a->local.interested) { /* interested in this peer, double effective rate */ arate *= (float)INTERESTED_BONUS; } if (a->remote.choked || !a->local.interested) { /* remote has us choked, or unchoked but we aren't interested */ arate *= (float)CHOKED_PENALTY; } return (int)arate;}static int out_rate( btPeer *a, time_t now) { int atime, arate; atime = rate_timer( &a->local, now); arate = a->ios.write_count / atime; return arate;}intpeer_send_request( btContext *ctx, btPeer *p) { btPartialPiece *piece; int start; int len; int res=0; int blocklen; time_t now = time(NULL); int arate = in_rate(p, now); int qlen; btDownload *dl=ctx->downloads[p->download]; DIE_UNLESS(p->download<ctx->downloadcount); /* Queue up to REQMAX outstanding requests */ if ( p->currentPiece == NULL ) { /* no assigned block, or block complete */ piece = peer_assign_block( ctx, p); if (!piece) { update_interested( ctx, p); return 0; }#if 0 printf("new block assigned %d\n", p->currentRequest);#endif } else { piece = p->currentPiece; } qlen = (15 * (arate / 1024)) / 16; /* try to get 15s of requests queued */ if (qlen > REQMAX) qlen = REQMAX; if (qlen < 2) qlen = 2; blocklen = seg_piecelen( &dl->fileset, piece->piecenumber); while (queue_len( &p->inqueue) < qlen) { start = piece->nextbyteReq; len = blocklen - start; if (len == 0) { /* reached end of block */ if (piece->isdone == 1) { /* bad block hash: restart the block */ piece->nextbyteReq = 0; } else { /* restart from unreceived */ int rst; if (queue_len( &p->inqueue) > 0) return 0; rst = bs_firstClr( &piece->filled); if (rst < 0) return 0; piece->nextbyteReq = rst; } start = piece->nextbyteReq; len = blocklen - start; if (len == 0) { return 0; } } if (len > REQUEST_SIZE) { /* limit length per request */ len = REQUEST_SIZE; } piece->nextbyteReq += len; res = send_request(p, piece->piecenumber, start, len); queue_request( &p->inqueue, piece->piecenumber, start, len); } return res;}int peer_send_bitfield( btContext *ctx, btPeer *peer) {#if 0 printf("peer_send_bitfield\n");#endif return send_bitfield( peer, &ctx->downloads[peer->download]->fileset.completed);}static int compare_rate( btContext *ctx, btPeer *a, btPeer *b, time_t now) { int arate, brate; if (ctx->downloads[a->download]->fileset.left == 0) { arate = out_rate( a, now); brate = out_rate( b, now); } else { arate = in_rate( a, now); brate = in_rate( b, now); } if (arate > brate) return -1; if (arate < brate) return 1; return 0;}static void prioritize( btContext *ctx, btPeer *p[DOWNLOADS], btPeer *new, time_t now) { int i,j; for (i=0; i<DOWNLOADS; i++) { if (!new->local.snubbed && new->remote.interested) { if (p[i] == NULL || compare_rate( ctx, new, p[i], now) < 0) { for (j=DOWNLOADS-1; j>i; j--) { p[j]=p[j-1]; } p[i]=new; return; } } }}static int isfavorite( btPeer *p[DOWNLOADS], btPeer *check) { int i; for (i=0; i<DOWNLOADS; i++) { if (p[i] == check) return 1; } return 0;}void peer_favorites( btContext *ctx, btPeerset *pset) { int i=0; time_t now; btPeer *p[DOWNLOADS] = { NULL }; time(&now); /* select the peers we will let download */ for (i=0; i<pset->len; i++) { btPeer *peer = pset->peer[i]; if (peer->state == PEER_GOOD) { prioritize( ctx, p, peer, now); } } /* notify all peers if there is a change in their status */ for (i=0; i<pset->len; i++) { btPeer *peer = pset->peer[i]; if (isfavorite( p, peer)) { if (peer->local.choked) { /* need to unchoke this peer */ if (send_choke( peer, 0)) { peer->state = PEER_ERROR; } start_rate_timer( &peer->local, now); } } else { if (!peer->local.choked && peer->state == PEER_GOOD) { /* need to choke this peer */ if (send_choke( peer, 1)) { peer->state = PEER_ERROR; } stop_rate_timer( &peer->local, now); } } }}voidpeer_status_dump( btPeerStatus *ps, int bytes) { int total, rate; total = rate_timer( ps, time(NULL)); if (total == 0) total = 1; rate = bytes/total; printf("%c%c%c%c%5ds", ps->choked ? 'C':'c', ps->interested ? 'I':'i', ps->snubbed ? 'B':'b', ps->unreachable ? 'R':'r', total); if (rate >= 1000000) { printf("(%3dMbs)", rate / 1000000); } else if (rate >= 1000) { printf("(%3dkbs)", rate / 1000); } else { printf("(%3dbps)", rate); }}voidpeer_dump( btPeerset *pset) { int i; for (i=0; i<pset->len; i++) { btPeer *p = pset->peer[i]; if (!p || p->state == PEER_ERROR) continue; printf("%2d %15.15s:%-5d", p->ios.fd, inet_ntoa(p->ip), p->port); if (p->state == PEER_INIT) { printf("(INI)"); } else if (p->state == PEER_OUTGOING) { printf("(OUT)"); } else if (p->state == PEER_INCOMING) { printf("(INC)"); } else if (p->state == PEER_ERROR) { printf("(ERR)"); } else { int gotbits = bs_countBits( &p->blocks); if (p->blocks.nbits == gotbits) { printf("(ALL)"); } else { printf("(%2d%%)", (gotbits * 100) / p->blocks.nbits); } } printf("["); peer_status_dump( &p->remote, p->ios.read_count); printf("^%04d+%d][", p->currentPiece?p->currentPiece->piecenumber:-1, queue_len( &p->inqueue)); peer_status_dump( &p->local, p->ios.write_count); printf("_%d]\n", queue_len(&p->queue)); }}void peer_summary( btPeerset *pset) { int i; int npeers = 0; float rtime, ttime; float rbytes, tbytes; float rrate = 0, trate = 0; for (i=0; i<pset->len; i++) { btPeer *p = pset->peer[i]; if (p == NULL) continue; if (p->state == PEER_GOOD) npeers++; if (!p->remote.choked && p->local.interested) { rtime = (float)rate_timer( &p->remote, time(NULL)); rbytes = (float)p->ios.read_count; rrate += rbytes / rtime; } if (!p->local.choked) { ttime = (float)rate_timer( &p->local, time(NULL)); tbytes = (float)p->ios.write_count; trate += tbytes / ttime; } } printf("%d Peers, Download ", npeers); if (rrate >= 1000000) { printf("%.0fMbs", rrate / 1000000); } else if (rrate >= 1000) { printf("%.0fkbs", rrate / 1000); } else { printf("%.0fbps", rrate); } printf(" Upload "); if (trate >= 1000000) { printf("%.0fMbs", trate / 1000000); } else if (trate >= 1000) { printf("%.0fkbs", trate / 1000); } else { printf("%.0fbps", trate); } printf("\r"); fflush(stdout);}/* * returns 1 if all peers are seeds. else 0. */intpeer_allcomplete( btPeerset *pset) { int i; for (i=0; i<pset->len; i++) { btPeer *p = pset->peer[i]; if (p->state == PEER_ERROR) continue; if (p->state != PEER_GOOD) { /* Something in progress */ return 0; } if (!bs_isFull (&p->blocks)) { return 0; } } return 1;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -