📄 peer.c
字号:
if (err < 0) return -1; assert(err == sizeof(nbo_len)); len = ntohl(nbo_len); if (len <= 80) { nmsg = msg; } else if (len > 0 && len <= MAXMESSAGE) { nmsg = btmalloc(len+sizeof(int32_t)); } else { /* Too big a packet, kill the peer. */ peer->ios.error = BTERR_LARGEPACKET; return -1; }#if 0 printf("A/%d: looking for %d bytes %d available (addr %d)\n", peer->ios.fd, len+4, kStream_iqlen( &peer->ios), kStream_in_addr(&peer->ios));#endif if (len == 0) { /* keep alive */ err = kStream_fread( &peer->ios, (char *)&nbo_len, sizeof(nbo_len)); DIE_UNLESS(err == sizeof(nbo_len)); return 0; } DIE_UNLESS(len <= MAXMESSAGE && len >= 0); err = kStream_fread( &peer->ios, nmsg, len + sizeof(int32_t)); if (err < 0) goto cleanup; DIE_UNLESS( err == len + (int)sizeof(int32_t));#if 0 printf("A/%d: got message %d (len %d)\n", peer->ios.fd, nmsg[4], len+sizeof(int32_t));#endif /* got message */ param = nmsg+5; switch (nmsg[4]) { case BT_MSG_CHOKE: { if (peer->remote.choked == 0) { int rst; peer->remote.choked = 1; if (peer->currentPiece) { btPartialPiece *piece = peer->currentPiece; /* cancel this peer's piece and reassign */ bs_clr( &dl->requested, piece->piecenumber); rst = bs_firstClr( &piece->filled); if (rst >= 0) { piece->nextbyteReq = rst; } } peer->currentPiece=NULL; clear_request_queue( &peer->inqueue); if (peer->local.interested) { /* stop the rate counter */ stop_rate_timer( &peer->remote, time(NULL)); } } } break; case BT_MSG_UNCHOKE: time(&peer->lastreceived); if (peer->remote.choked == 1) { peer->remote.choked = 0; /* queue requests */ if (!peer->local.interested) { /* recheck interest */ int interest = update_interested( ctx, peer); if (!interest) break; } start_rate_timer( &peer->remote, time(NULL)); if (peer_send_request( ctx, peer)) { res = -1; goto cleanup; } } break; case BT_MSG_INTERESTED: peer->remote.interested = 1; ctx->downloads[peer->download]->peerset.interestedpeers++; break; case BT_MSG_NOTINTERESTED: peer->remote.interested = 0; ctx->downloads[peer->download]->peerset.interestedpeers--; break; case BT_MSG_HAVE: { int block; int interest; UNSHIFT_INT32(param,nbo,block); bs_set( &peer->blocks, block); if (bs_isFull( &peer->blocks)) { if (peer->remote.complete == 0) { peer->remote.complete = 1; dl->peerset.incomplete--; } } interest = update_interested( ctx, peer); if (interest && !peer->remote.snubbed && !peer->remote.choked) { start_rate_timer( &peer->remote, time(NULL)); if (peer_send_request( ctx, peer)) { res = -1; goto cleanup; } } } break; case BT_MSG_BITFIELD: kBitSet_readBytes( &peer->blocks, nmsg+5, len-1); if (peer->blocks.nbits == bs_countBits( &peer->blocks)) { if (peer->remote.complete == 0) { peer->remote.complete = 1; dl->peerset.incomplete--; } } update_interested( ctx, peer); break; case BT_MSG_REQUEST: { int piece; int offs; int len; UNSHIFT_INT32(param,nbo,piece); UNSHIFT_INT32(param,nbo,offs); UNSHIFT_INT32(param,nbo,len); if (peer->local.choked) { /* ignore requests from choked peers */ break; } if (!bs_isSet(&peer->blocks, piece) && bs_isSet(&ctx->downloads[peer->download]->fileset.completed, piece)) { queue_request( &peer->queue, piece, offs, len); } else { time_t now; time(&now); send_choke( peer, 1); stop_rate_timer( &peer->local, now); printf("%d: Choked by invalid request for block %d (%s have it)\n", peer->ios.fd, piece, bs_isSet(&peer->blocks, piece)?"they":"we don't"); } } break; case BT_MSG_PIECE: { btPartialPiece *pp=peer->currentPiece; int piece; int offs; UNSHIFT_INT32(param,nbo,piece); UNSHIFT_INT32(param,nbo,offs); #if 0 printf("%d: got piece %d, offs %d, len %d\n", peer->ios.fd, piece, offs, len-9);#endif /* FIXME: check requests with ones we've sent */ time(&peer->lastreceived); if (!pp || !remove_queued_request( &peer->inqueue, piece, offs, len-9)) { printf("%d: Unneeded data: piece %d %d+%d\n", peer->ios.fd, piece, offs, len-9); } else { int done = seg_writebuf( &dl->fileset, piece, offs, nmsg+13, len-9); if (done < 0) { bts_perror(errno, "error writing buffer"); abort(); } if (done) { int i; bs_clr( &dl->interested, piece);#if 0 bs_dump( "completed", &dl->fileset.completed);#endif for (i=0; i<dl->peerset.len; i++) { btPeer *p = dl->peerset.peer[i]; if (p->currentPiece == pp) { btRequest *req; p->currentPiece = NULL; /* send cancels, check that requests work */ while ((req = dequeue_request( &p->inqueue)) != NULL) { if (send_cancel(p, req->block, req->offset, req->length )) { p->state = PEER_ERROR; err = -1; goto cleanup; } } peer_send_request( ctx, p); } if (p->state == PEER_GOOD) { if (send_have( p, piece)) { p->state = PEER_ERROR; err = -1; goto cleanup; } } } /* for i in peerset.len */ } /* if done */ if (peer_send_request( ctx, peer)) { res = -1; goto cleanup; } } /* if !unknown piece */ } break; case BT_MSG_CANCEL: { int piece, offs, len; /* this cancels a specific request */ UNSHIFT_INT32(param,nbo,piece); UNSHIFT_INT32(param,nbo,offs); UNSHIFT_INT32(param,nbo,len); #if 0 printf("%d: got cancel %d, offs %d, len %d\n", peer->ios.fd, piece, offs, len);#endif remove_queued_request( &peer->queue, piece, offs, len); } break; default: /* unknown message */ res = -2; break; }cleanup: /* cleanup */ if (err < 0) { /* if there has been an error, report it */ res = -1; } if (res == 0) { /* check if there is another message waiting */ err = kStream_fpeek( &peer->ios, (char *)&nbo_len, sizeof(nbo_len)); if (err == sizeof(nbo_len)) { int tlen; tlen = ntohl(nbo_len) + sizeof(nbo_len);#if 0 printf("B/looking for %d bytes %d available (addr %d)\n", tlen, kStream_iqlen( &peer->ios), kStream_in_addr(&peer->ios));#endif if (tlen < 0 || tlen > MAXMESSAGE) { /* out of sync with peer, or packet too large */ peer->ios.error = BTERR_LARGEPACKET; return -1; } DIE_UNLESS(tlen <= MAXMESSAGE && tlen >= 0); if (kStream_iqlen( &peer->ios) >= tlen) res = 1; } } if (len > 80) { btfree(nmsg); } return res;}/* * returns * 0 - no error * -1 - permanent error sending msg size * -2 - permanent error sending message */intsend_message( btPeer *peer, char *buf, int len) { int nslen = htonl( len);#if 0 printf("%d: send message %d len=%d addr=%d\n", peer->ios.fd, buf?buf[0]:-1, len, kStream_out_addr(&peer->ios));#endif if (kStream_fwrite( &peer->ios, (void*)&nslen, sizeof(nslen)) < 0) { return -1; } if (len > 0) { if (kStream_fwrite( &peer->ios, buf, len) < 0) { return -2; } } return 0;}intsend_keepalive( btPeer *peer) { return send_message( peer, NULL, 0);}intsend_choke( btPeer *peer, int choke) { char type=choke?BT_MSG_CHOKE:BT_MSG_UNCHOKE; if (peer->local.choked != choke) { /* cancel all requests */ clear_request_queue( &peer->queue); peer->local.choked = choke; } return send_message( peer, &type, 1);}intsend_interested( btPeer *peer, int interest) { char type=interest?BT_MSG_INTERESTED:BT_MSG_NOTINTERESTED; peer->local.interested = interest; return send_message( peer, &type, 1);}intsend_have( btPeer *peer, int piece) { char buf[5]; char *p=buf; int32_t nbo; SHIFT_BYTE( p, BT_MSG_HAVE); SHIFT_INT32( p, nbo, piece); return send_message( peer, buf, p-buf);}int send_bitfield( btPeer *peer, kBitSet *set) { int32_t nslen; char type=BT_MSG_BITFIELD; int res; int nbytes = (set->nbits + 7)/8; nslen = htonl( 1 + nbytes); res = kStream_fwrite( &peer->ios, (void *)&nslen, sizeof(nslen)); if (res < 0) return -1; res = kStream_fwrite( &peer->ios, &type, 1); if (res < 0) return -2; res = kStream_fwrite( &peer->ios, set->bits, nbytes); if (res < 0) return -3; return res;}int send_request( btPeer *peer, int piece, int offs, int len) { char buf[13]; char *p=buf; int32_t nbo; SHIFT_BYTE( p, BT_MSG_REQUEST); SHIFT_INT32( p, nbo, piece); SHIFT_INT32( p, nbo, offs); SHIFT_INT32( p, nbo, len);#if 0 printf("%d: send_request( ..., %d, %d, %d)\n", peer->ios.fd, piece, offs, len);#endif return send_message( peer, buf, p-buf);}int send_piece( btPeer *peer, int piece, int offs, char* cbuf, int len) { int tlen; int32_t nslen; char buf[9]; char *p=buf; int res; int32_t nbo; SHIFT_BYTE( p, BT_MSG_PIECE); SHIFT_INT32( p, nbo, piece); SHIFT_INT32( p, nbo, offs); tlen = (p-buf) + len; nslen = htonl( tlen);#if 0 printf("%d: send message %d len=%d addr=%d\n", peer->ios.fd, buf[0], tlen, kStream_out_addr(&peer->ios));#endif res = kStream_fwrite( &peer->ios, (void *)&nslen, sizeof(nslen)); if (res < 0) return -1; res = kStream_fwrite( &peer->ios, buf, p-buf); if (res < 0) return -2; res = kStream_fwrite( &peer->ios, cbuf, len); if (res < 0) return -3; return res;}int process_queue( btFileSet *fs, btPeer *peer) { btRequest *req; int err; req = dequeue_request( &peer->queue); if (!req) return 0; /* send the requested block */ err = seg_readbuf( fs, req->block, req->offset, g_filebuffer, req->length); if (err < 0) return err; err = send_piece( peer, req->block, req->offset, g_filebuffer, req->length); if (err < 0) return err; fs->ul += req->length; return 1;}int send_cancel( btPeer *peer, int piece, int offs, int len) { char buf[13]; char *p=buf; int32_t nbo; SHIFT_BYTE( p, 8); SHIFT_INT32( p, nbo, piece); SHIFT_INT32( p, nbo, offs); SHIFT_INT32( p, nbo, len);#if 0 printf("%d: send_cancel (%d, %d + %d)\n", peer->ios.fd, piece, offs, len);#endif return send_message( peer, buf, p-buf);}#if 0int peer_send_cancel( btPeer *peer) { for (i = 0; i < queue_size( &peer->iqueue); i++) { btRequest rq = queue_pop( &peer->iqueue); send_cancel( peer, rq->block, rq->offs, rq->len); }}#endifint peer_answer( btContext *ctx, int sock) { /*struct btPeerset *pset = &ctx->peerset;*/ struct btPeer *p; struct sockaddr_in sin; int sin_len; long flags;#if 0 struct hostent *hent;#endif /* allocate status */ if (ctx_addstatus( ctx, sock)) { /* over the connection limit */ close( sock); return -1; } /* initialize peer */ p = btcalloc(1, sizeof(struct btPeer)); /* get remote hostname */ sin_len = sizeof(struct sockaddr_in); getpeername( sock, (struct sockaddr*)&sin, &sin_len);#if 0 hent = gethostbyaddr( &sin.sin_addr, sizeof(struct in_addr), AF_INET); /* memcpy(p->id, id, IDSIZE); */ if (hent && hent->h_name) { p->ip = strdup(hent->h_name); } else #endif memcpy(&p->ip, &sin.sin_addr, sizeof(p->ip)); printf("%d: New peer connected %s\n", sock, inet_ntoa(p->ip)); p->port = 0; /* inbound connection */ p->currentPiece = NULL; p->remote.choked = 1; p->local.choked = 1; p->state = PEER_INCOMING; p->download = INT_MAX; kStream_create( &p->ios, sock);#if 0 idx = pset->len++; pset->peer = btrealloc( pset->peer, sizeof(struct btPeer*) * pset->len); /* index the peer by peerid */ pset->peer[ idx] = p;#endif /* index the peer by socket */ ctx->sockpeer[sock]=p; /* change socket to non-blocking */#if WIN32 flags = ioctlsocket( sock, FIONBIO, (unsigned long *) 1); if (flags != 0) { bts_perror(errno, "ioctlsocket"); return -1; }#else flags = fcntl( sock, F_GETFL); if (flags < 0) { bts_perror(errno, "fcntl F_GETFL"); return -1; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -