⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 peer.c

📁 libbt-1.04
💻 C
📖 第 1 页 / 共 3 页
字号:
        flags |= O_NONBLOCK;    if ( fcntl( sock, F_SETFL, flags)) {        bts_perror(errno, "fcntl F_SETFL");        return -1;    }#endif    ctx_setevents( ctx, sock, POLLIN);    return 0;}/*  * peer_process_queue() * * returns *    <0   Error in process_queue *    -1   Error writing message length *    -2   Error writing message params *    -3   Error writing bulk data *    =0   Request done, no bytes still waiting in output buffer *    1    Output buffer is still in progress */int peer_process_queue( btFileSet *fs, btPeer *p) {    if ( kStream_oqlen(&p->ios) <= 8096 ) {	/* output buffer has drained, grab next request from queue */	return process_queue( fs, p);    } else {#if 0	printf("%d: kStream_oqlen(&p->ios) = %d\n", p->ios.fd, kStream_oqlen(&p->ios));#endif	return 0;    }}void peer_shutdown( btContext *ctx, btPeer *peer, char *error) {    int fd = peer->ios.fd;#if 0    printf("%d: peer_shutdown()\n", fd);#endif        if(peer->currentPiece) {        int rst;	bs_clr( &ctx->downloads[peer->download]->requested, peer->currentPiece->piecenumber);	rst = bs_firstClr( &peer->currentPiece->filled);	if (rst >= 0) {	    peer->currentPiece->nextbyteReq = rst;	}    }    if (peer->ios.fd != -1 && ctx->statmap[peer->ios.fd] != -1) {	ctx_delstatus( ctx, peer->ios.fd);    }    ctx->sockpeer[fd]=NULL;    peer->state = PEER_ERROR;    peer->local.unreachable = 1;    peer->error = error;    kStream_finit( &peer->ios);    kBitSet_finit( &peer->blocks);}    /* * Returns 1 if there is another message waiting  * Returns 0 on success * Returns -1 on error * returns -2 on invalid message received * returns -3 on invalid state */int peer_recv_message( btContext *ctx, btPeer *p) {    int res=0;#if 0    printf("%d: before message state %d\n", p->ios.fd, p->state);#endif    if (p->state == PEER_INIT) {	/* socket isn't yet completely initialized */	/* int stat = ctx->statmap[p->ios.fd]; */	printf("Waiting for %d to complete connection.\n", p->ios.fd);#if 0	printf ("statblock %d, events %d, revents %d, fd %d\n", 		stat, ctx->status[ stat].events, ctx->status[ stat].revents,		ctx->status[ stat].fd);#endif	return 0;    } else if (p->state == PEER_OUTGOING || p->state == PEER_INCOMING) {	res = recv_handshake( ctx, p);	if (res==0) {	    res = 1;  /* assume there is another message */	    p->state = PEER_GOOD;	}    } else if (p->state == PEER_GOOD) {	res = recv_peermsg( ctx, p);    } else {	printf("%d: Peer in unexpected state %d\n", p->ios.fd, p->state);	res = -3;    }#if 0    printf("%d: after message state %d\n", p->ios.fd, p->state);#endif    return res;}intupdate_interested( btContext *ctx, btPeer *p) {    int interest;    btDownload *dl=ctx->downloads[p->download];    DIE_UNLESS (p->download<ctx->downloadcount);    interest = bs_hasInteresting( &dl->fileset.completed, &p->blocks, &dl->interested);    if (interest != p->local.interested) {	if (send_interested( p, interest)) return -1;	if (interest == 0 && !p->remote.choked) {	    /* stop the rate counter */	    stop_rate_timer( &p->remote, time(NULL));	}    }     return interest;}int countpeers( btPeerset *peers, int piece) {    int count = 0;    int i;    for (i=0; i < peers->len; i++) {	if (peers->peer[i]->currentPiece->piecenumber == piece) {	    count ++;	}    }    return count;}btPartialPiece * peer_assign_block( struct btContext *ctx, btPeer *p) {    btPartialPiece *piece;    btDownload *dl=ctx->downloads[p->download];    DIE_UNLESS (p->download<ctx->downloadcount);    piece=dl->fileset.partial;    if (dl->peerset.interestedpeers > 4 ||        dl->peerset.incomplete <= 4 ||        dl->peerset.len - dl->peerset.incomplete > 10)     {        /* assign a partial block if no one else is working on it */	while(piece && (bs_isSet(&dl->requested, piece->piecenumber) ||	      !bs_isSet( &p->blocks, piece->piecenumber))) {	    piece=piece->next;	}	if (piece) printf("%d: assigning partial block %d\n", p->ios.fd, piece->piecenumber);    } else {	/* use overlapping requests to get more interested peers (but not more than three on a block) */	while(piece && !bs_isSet( &p->blocks, piece->piecenumber) && countpeers(&dl->peerset, piece->piecenumber)<4) {	    piece=piece->next;	}	if (piece) printf("%d: assigning overlapping block %d\n", p->ios.fd, piece->piecenumber);    }    if(!piece) {	int blk = -1;	blk = bs_pickblock( &dl->requested, &p->blocks, &dl->interested);	if (blk < 0) {	    blk = bs_pickblock( &dl->fileset.completed, &p->blocks, &dl->interested);	}	if (blk < 0) return NULL;	piece = seg_getPiece(&dl->fileset, blk);    }    p->currentPiece = piece;    bs_set( &dl->requested, piece->piecenumber);    return piece;}#define INTERESTED_BONUS 2.0#define CHOKED_PENALTY 0.75#define NEWPEER_LEVEL 4000#define OLDPEER_LEVEL 0static int in_rate( btPeer *a, time_t now) {    float atime, arate;	int newpeer;    atime = (float)rate_timer( &a->remote, now);    arate = a->ios.read_count / atime;    newpeer = rate_timer( &a->local, now);    if (newpeer < 30) {	/* new peers start at 4k/s assumed rate for first 30s */	arate = (float)NEWPEER_LEVEL;    } else if (atime < 30) {	/* if peer doesn't send reciprocate, then clamp */	arate = (float)OLDPEER_LEVEL;    }    if (a->local.interested) {	/* interested in this peer, double effective rate */	arate *= (float)INTERESTED_BONUS;    }    if (a->remote.choked || !a->local.interested) {	/* remote has us choked, or unchoked but we aren't interested */	arate *= (float)CHOKED_PENALTY;    }    return (int)arate;}static int out_rate( btPeer *a, time_t now) {    int atime, arate;    atime = rate_timer( &a->local, now);    arate = a->ios.write_count / atime;    return arate;}intpeer_send_request( btContext *ctx, btPeer *p) {    btPartialPiece *piece;    int start;    int len;    int res=0;    int blocklen;    time_t now = time(NULL);    int arate = in_rate(p, now);    int qlen;    btDownload *dl=ctx->downloads[p->download];    DIE_UNLESS(p->download<ctx->downloadcount);    /* Queue up to REQMAX outstanding requests */    if ( p->currentPiece == NULL )    {	/* no assigned block, or block complete */    	piece = peer_assign_block( ctx, p);        if (!piece) {            update_interested( ctx, p);            return 0;        }#if 0        printf("new block assigned %d\n", p->currentRequest);#endif    } else {        piece = p->currentPiece;    }    qlen = (15 * (arate / 1024)) / 16;	/* try to get 15s of requests queued */    if (qlen > REQMAX) qlen = REQMAX;    if (qlen < 2) qlen = 2;    blocklen = seg_piecelen( &dl->fileset, piece->piecenumber);    while (queue_len( &p->inqueue) < qlen) {	start = piece->nextbyteReq;	len = blocklen - start;	if (len == 0) {	    /* reached end of block */            if (piece->isdone == 1) {		/* bad block hash: restart the block */		piece->nextbyteReq = 0;	    } else {                /* restart from unreceived */                int rst;		if (queue_len( &p->inqueue) > 0) return 0;                rst = bs_firstClr( &piece->filled);                if (rst < 0) return 0;                piece->nextbyteReq = rst;	    }	    start = piece->nextbyteReq;	    len = blocklen - start;	    if (len == 0) {	        return 0;	    }	}	if (len > REQUEST_SIZE) {	    /* limit length per request */	    len = REQUEST_SIZE;	}	piece->nextbyteReq += len;	res = send_request(p, piece->piecenumber, start, len);	queue_request( &p->inqueue, piece->piecenumber, start, len);    }    return res;}int peer_send_bitfield( btContext *ctx, btPeer *peer) {#if 0    printf("peer_send_bitfield\n");#endif    return send_bitfield( peer, &ctx->downloads[peer->download]->fileset.completed);}static int compare_rate( btContext *ctx, btPeer *a, btPeer *b, time_t now) {    int arate, brate;    if (ctx->downloads[a->download]->fileset.left == 0) {	arate = out_rate( a, now);	brate = out_rate( b, now);    } else {	arate = in_rate( a, now);	brate = in_rate( b, now);    }    if (arate > brate) return -1;    if (arate < brate) return 1;    return 0;}static void prioritize( btContext *ctx, btPeer *p[DOWNLOADS], btPeer *new, time_t now) {    int i,j;    for (i=0; i<DOWNLOADS; i++) {	if (!new->local.snubbed && new->remote.interested) 	{	    if (p[i] == NULL || compare_rate( ctx, new, p[i], now) < 0) {		for (j=DOWNLOADS-1; j>i; j--) {		    p[j]=p[j-1];		}		p[i]=new;		return;	    }	}    }}static int isfavorite( btPeer *p[DOWNLOADS], btPeer *check) {    int i;    for (i=0; i<DOWNLOADS; i++) {	if (p[i] == check) return 1;    }    return 0;}void peer_favorites( btContext *ctx, btPeerset *pset) {    int i=0;    time_t now;    btPeer *p[DOWNLOADS] = { NULL };    time(&now);    /* select the peers we will let download */    for (i=0; i<pset->len; i++) {	btPeer *peer = pset->peer[i];        if (peer->state == PEER_GOOD) {	    prioritize( ctx, p, peer, now);	}    }    /* notify all peers if there is a change in their status */    for (i=0; i<pset->len; i++) {	btPeer *peer = pset->peer[i];	if (isfavorite( p, peer)) {	    if (peer->local.choked) {		/* need to unchoke this peer */		if (send_choke( peer, 0)) {		    peer->state = PEER_ERROR;		}		start_rate_timer( &peer->local, now);	    } 	} else {	    if (!peer->local.choked && peer->state == PEER_GOOD) {		/* need to choke this peer */		if (send_choke( peer, 1)) {		    peer->state = PEER_ERROR;		}		stop_rate_timer( &peer->local, now);	    }	}    }}voidpeer_status_dump( btPeerStatus *ps, int bytes) {    int total, rate;    total = rate_timer( ps, time(NULL));        if (total == 0) total = 1;    rate = bytes/total;        printf("%c%c%c%c%5ds", 	    ps->choked ? 'C':'c',	    ps->interested ? 'I':'i',	    ps->snubbed ? 'B':'b',	    ps->unreachable ? 'R':'r',	    total);    if (rate >= 1000000) {	printf("(%3dMbs)", rate / 1000000);    } else if (rate >= 1000) {	printf("(%3dkbs)", rate / 1000);    } else {	printf("(%3dbps)", rate);    }}voidpeer_dump( btPeerset *pset) {    int i;    for (i=0; i<pset->len; i++) {	btPeer *p = pset->peer[i];	if (!p || p->state == PEER_ERROR) continue;        printf("%2d %15.15s:%-5d", p->ios.fd, inet_ntoa(p->ip), p->port);	if (p->state == PEER_INIT) {	    printf("(INI)");	} else if (p->state == PEER_OUTGOING) {	    printf("(OUT)");	} else if (p->state == PEER_INCOMING) {	    printf("(INC)");	} else if (p->state == PEER_ERROR) {	    printf("(ERR)");	} else {	    int gotbits = bs_countBits( &p->blocks);	    if (p->blocks.nbits == gotbits) {		printf("(ALL)");	    } else {		printf("(%2d%%)", (gotbits * 100) / p->blocks.nbits);	    }	}	printf("[");	peer_status_dump( &p->remote, p->ios.read_count);	printf("^%04d+%d][", p->currentPiece?p->currentPiece->piecenumber:-1, 		queue_len( &p->inqueue));	peer_status_dump( &p->local, p->ios.write_count);	printf("_%d]\n", queue_len(&p->queue));    }}void peer_summary( btPeerset *pset) {    int i;    int npeers = 0;    float rtime, ttime;    float rbytes, tbytes;    float rrate = 0, trate = 0;    for (i=0; i<pset->len; i++) {	btPeer *p = pset->peer[i];	if (p == NULL) continue;	if (p->state == PEER_GOOD) npeers++;	if (!p->remote.choked && p->local.interested) {	    rtime = (float)rate_timer( &p->remote, time(NULL));	    rbytes = (float)p->ios.read_count;	    rrate += rbytes / rtime;	}	if (!p->local.choked) {	    ttime = (float)rate_timer( &p->local, time(NULL));	    tbytes = (float)p->ios.write_count;	    trate += tbytes / ttime;	}    }        printf("%d Peers, Download ", npeers);    if (rrate >= 1000000) {	printf("%.0fMbs", rrate / 1000000);    } else if (rrate >= 1000) {	printf("%.0fkbs", rrate / 1000);    } else {	printf("%.0fbps", rrate);    }    printf(" Upload ");    if (trate >= 1000000) {	printf("%.0fMbs", trate / 1000000);    } else if (trate >= 1000) {	printf("%.0fkbs", trate / 1000);    } else {	printf("%.0fbps", trate);    }    printf("\r");    fflush(stdout);}/* * returns 1 if all peers are seeds. else 0. */intpeer_allcomplete( btPeerset *pset) {  int i;  for (i=0; i<pset->len; i++) {    btPeer *p = pset->peer[i];    if (p->state == PEER_ERROR)      continue;    if (p->state != PEER_GOOD) {      /* Something in progress */      return 0;    }    if (!bs_isFull (&p->blocks)) {      return 0;    }  }  return 1;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -