📄 peer.c
字号:
int32_t nbo_len; int len; char msg[80]; char *nmsg; int res = 0; int err; assert(peer->download<ctx->downloadcount); err = kStream_fpeek( &peer->ios, (char *)&nbo_len, sizeof(nbo_len)); if (err < 0) return -1; assert(err == sizeof(nbo_len)); len = ntohl(nbo_len); if (len <= 80) { nmsg = msg; } else if (len <= MAXMESSAGE) { nmsg = malloc(len+sizeof(int32_t)); } else { /* Too big a packet, kill the peer. */ peer->ios.error = BTERR_LARGEPACKET; return -1; }#if 0 printf("A/%d: looking for %d bytes %d available (addr %d)\n", peer->ios.fd, len+4, kStream_iqlen( &peer->ios), kStream_in_addr(&peer->ios));#endif if (len == 0) { /* keep alive */ err = kStream_fread( &peer->ios, (char *)&nbo_len, sizeof(nbo_len)); assert(err == sizeof(nbo_len)); return 0; } assert(len <= MAXMESSAGE && len >= 0); err = kStream_fread( &peer->ios, nmsg, len + sizeof(int32_t)); if (err < 0) goto cleanup; assert( err == len + (int)sizeof(int32_t));#if 0 printf("A/%d: got message %d (len %d)\n", peer->ios.fd, nmsg[4], len+sizeof(int32_t));#endif /* got message */ switch (nmsg[4]) { case BT_MSG_CHOKE: { if (peer->remote.choked == 0) { int rst; peer->remote.choked = 1; if (peer->currentPiece) { btPartialPiece *piece = peer->currentPiece; /* cancel this peer's piece and reassign */ bs_clr( &dl->requested, piece->piecenumber); rst = bs_firstClr( &piece->filled); if (rst >= 0) { piece->nextbyteReq = rst; } } peer->currentPiece=NULL; clear_request_queue( &peer->inqueue); if (peer->local.interested) { /* stop the rate counter */ stop_rate_timer( &peer->remote, time(NULL)); } } } break; case BT_MSG_UNCHOKE: time(&peer->lastreceived); if (peer->remote.choked == 1) { peer->remote.choked = 0; /* queue requests */ if (!peer->local.interested) { /* recheck interest */ int interest = update_interested( ctx, peer); if (!interest) break; } start_rate_timer( &peer->remote, time(NULL)); if (peer_send_request( ctx, peer)) { res = -1; goto cleanup; } } break; case BT_MSG_INTERESTED: peer->remote.interested = 1; ctx->downloads[peer->download]->peerset.interestedpeers++; break; case BT_MSG_NOTINTERESTED: peer->remote.interested = 0; ctx->downloads[peer->download]->peerset.interestedpeers--; break; case BT_MSG_HAVE: { int block = ntohl(*(int *)(nmsg+5)); int interest; bs_set( &peer->blocks, block); interest = update_interested( ctx, peer); if (interest && !peer->remote.snubbed && !peer->remote.choked) { start_rate_timer( &peer->remote, time(NULL)); if (peer_send_request( ctx, peer)) { res = -1; goto cleanup; } } } break; case BT_MSG_BITFIELD: kBitSet_readBytes( &peer->blocks, nmsg+5, len-1); update_interested( ctx, peer); break; case BT_MSG_REQUEST: { int piece = ntohl(*(int *)(nmsg+5)); int offs = ntohl(*(int *)(nmsg+9)); int len = ntohl(*(int *)(nmsg+13)); if (peer->local.choked) { /* ignore requests from choked peers */ break; } if (!bs_isSet(&peer->blocks, piece) && bs_isSet(&ctx->downloads[peer->download]->fileset.completed, piece)) { queue_request( &peer->queue, piece, offs, len); } else { time_t now; time(&now); send_choke( peer, 1); stop_rate_timer( &peer->local, now); printf("%d: Choked by invalid request for block %d (%s have it)\n", peer->ios.fd, piece, bs_isSet(&peer->blocks, piece)?"they":"we don't"); } } break; case BT_MSG_PIECE: { int32_t ibuf; btPartialPiece *pp=peer->currentPiece; int piece; int offs; memcpy(&ibuf,nmsg+5,sizeof(int32_t)); piece = ntohl(ibuf); memcpy(&ibuf,nmsg+9,sizeof(int32_t)); offs = ntohl(ibuf); #if 0 printf("%d: got piece %d, offs %d, len %d\n", peer->ios.fd, piece, offs, len-9);#endif /* FIXME: check requests with ones we've sent */ time(&peer->lastreceived); if (!pp || !remove_queued_request( &peer->inqueue, piece, offs, len-9)) { printf("%d: Unneeded data: piece %d %d+%d\n", peer->ios.fd, piece, offs, len-9); } else { int done = seg_writebuf( &dl->fileset, piece, offs, nmsg+13, len-9); if (done < 0) { bts_perror(errno, "error writing buffer"); abort(); } if (done) { int i; bs_clr( &dl->interested, piece);#if 0 bs_dump( "completed", &dl->fileset.completed);#endif for (i=0; i<dl->peerset.len; i++) { btPeer *p = dl->peerset.peer[i]; if (p->currentPiece == pp) { btRequest *req; p->currentPiece = NULL; /* send cancels, check that requests work */ while ((req = dequeue_request( &p->inqueue)) != NULL) { if (send_cancel(p, req->block, req->offset, req->length )) { p->state = PEER_ERROR; err = -1; goto cleanup; } } peer_send_request( ctx, p); } if (p->state == PEER_GOOD) { if (send_have( p, piece)) { p->state = PEER_ERROR; err = -1; goto cleanup; } } } /* for i in peerset.len */ } /* if done */ if (peer_send_request( ctx, peer)) { res = -1; goto cleanup; } } /* if !unknown piece */ } break; case BT_MSG_CANCEL: { int piece, offs, len; int32_t ibuf; /* this cancels a specific request */ memcpy(&ibuf,nmsg+5,sizeof(int32_t)); piece = ntohl(ibuf); memcpy(&ibuf,nmsg+9,sizeof(int32_t)); offs = ntohl(ibuf); memcpy(&ibuf,nmsg+13,sizeof(int32_t)); len = ntohl(ibuf); #if 0 printf("%d: got cancel %d, offs %d, len %d\n", peer->ios.fd, piece, offs, len);#endif remove_queued_request( &peer->queue, piece, offs, len); } break; default: /* unknown message */ res = -2; break; }cleanup: /* cleanup */ if (err < 0) { /* if there has been an error, report it */ res = -1; } if (res == 0) { /* check if there is another message waiting */ err = kStream_fpeek( &peer->ios, (char *)&nbo_len, sizeof(nbo_len)); if (err == sizeof(nbo_len)) { int tlen; tlen = ntohl(nbo_len) + sizeof(nbo_len);#if 0 printf("B/looking for %d bytes %d available (addr %d)\n", tlen, kStream_iqlen( &peer->ios), kStream_in_addr(&peer->ios));#endif assert(tlen <= MAXMESSAGE && tlen >= 0); if (kStream_iqlen( &peer->ios) >= tlen) res = 1; } } if (len > 80) { free(nmsg); } return res;}/* * returns * 0 - no error * -1 - permanent error sending msg size * -2 - permanent error sending message */#define SHIFT_INT32(ptr,nbo,ival) \ (nbo=htonl(ival), memcpy(ptr,&nbo,sizeof(int32_t)), ptr+=sizeof(int32_t))#define SHIFT_BYTE(ptr,ival) ((*((unsigned char *)(ptr))++) = ival)intsend_message( btPeer *peer, char *buf, int len) { int nslen = htonl( len);#if 0 printf("%d: send message %d len=%d addr=%d\n", peer->ios.fd, buf?buf[0]:-1, len, kStream_out_addr(&peer->ios));#endif if (kStream_fwrite( &peer->ios, (void*)&nslen, sizeof(nslen)) < 0) { return -1; } if (len > 0) { if (kStream_fwrite( &peer->ios, buf, len) < 0) { return -2; } } return 0;}intsend_keepalive( btPeer *peer) { return send_message( peer, NULL, 0);}intsend_choke( btPeer *peer, int choke) { char type=choke?BT_MSG_CHOKE:BT_MSG_UNCHOKE; if (peer->local.choked != choke) { /* cancel all requests */ clear_request_queue( &peer->queue); peer->local.choked = choke; } return send_message( peer, &type, 1);}intsend_interested( btPeer *peer, int interest) { char type=interest?BT_MSG_INTERESTED:BT_MSG_NOTINTERESTED; peer->local.interested = interest; return send_message( peer, &type, 1);}intsend_have( btPeer *peer, int piece) { char buf[5]; char *p=buf; int32_t nbo; SHIFT_BYTE( p, BT_MSG_HAVE); SHIFT_INT32( p, nbo, piece); return send_message( peer, buf, p-buf);}int send_bitfield( btPeer *peer, kBitSet *set) { int32_t nslen; char type=BT_MSG_BITFIELD; int res; int nbytes = (set->nbits + 7)/8; nslen = htonl( 1 + nbytes); res = kStream_fwrite( &peer->ios, (void *)&nslen, sizeof(nslen)); if (res < 0) return -1; res = kStream_fwrite( &peer->ios, &type, 1); if (res < 0) return -2; res = kStream_fwrite( &peer->ios, set->bits, nbytes); if (res < 0) return -3; return res;}int send_request( btPeer *peer, int piece, int offs, int len) { char buf[13]; char *p=buf; int32_t nbo; SHIFT_BYTE( p, BT_MSG_REQUEST); SHIFT_INT32( p, nbo, piece); SHIFT_INT32( p, nbo, offs); SHIFT_INT32( p, nbo, len);#if 0 printf("%d: send_request( ..., %d, %d, %d)\n", peer->ios.fd, piece, offs, len);#endif return send_message( peer, buf, p-buf);}int send_piece( btPeer *peer, int piece, int offs, char* cbuf, int len) { int tlen; int32_t nslen; char buf[9]; char *p=buf; int res; int32_t nbo; SHIFT_BYTE( p, BT_MSG_PIECE); SHIFT_INT32( p, nbo, piece); SHIFT_INT32( p, nbo, offs); tlen = (p-buf) + len; nslen = htonl( tlen);#if 0 printf("%d: send message %d len=%d addr=%d\n", peer->ios.fd, buf[0], tlen, kStream_out_addr(&peer->ios));#endif res = kStream_fwrite( &peer->ios, (void *)&nslen, sizeof(nslen)); if (res < 0) return -1; res = kStream_fwrite( &peer->ios, buf, p-buf); if (res < 0) return -2; res = kStream_fwrite( &peer->ios, cbuf, len); if (res < 0) return -3; return res;}int process_queue( btFileSet *fs, btPeer *peer) { btRequest *req; int err; req = dequeue_request( &peer->queue); if (!req) return 0; /* send the requested block */ err = seg_readbuf( fs, req->block, req->offset, g_filebuffer, req->length); if (err < 0) return err; err = send_piece( peer, req->block, req->offset, g_filebuffer, req->length); if (err < 0) return err; fs->ul += req->length; return 1;}int send_cancel( btPeer *peer, int piece, int offs, int len) { char buf[13]; char *p=buf; int32_t nbo; SHIFT_BYTE( p, 8); SHIFT_INT32( p, nbo, piece); SHIFT_INT32( p, nbo, offs); SHIFT_INT32( p, nbo, len);#if 0 printf("%d: send_cancel (%d, %d + %d)\n", peer->ios.fd, piece, offs, len);#endif return send_message( peer, buf, p-buf);}#if 0int peer_send_cancel( btPeer *peer) { for (i = 0; i < queue_size( &peer->iqueue); i++) { btRequest rq = queue_pop( &peer->iqueue); send_cancel( peer, rq->block, rq->offs, rq->len); }}#endifint peer_answer( btContext *ctx, int sock) { /*struct btPeerset *pset = &ctx->peerset;*/ struct btPeer *p; struct sockaddr_in sin; int sin_len; long flags;#if 0 struct hostent *hent;#endif /* allocate status */ if (ctx_addstatus( ctx, sock)) { /* over the connection limit */ close( sock); return -1; } /* initialize peer */ p = calloc(1, sizeof(struct btPeer)); /* get remote hostname */ sin_len = sizeof(struct sockaddr_in); getpeername( sock, (struct sockaddr*)&sin, &sin_len);#if 0 hent = gethostbyaddr( &sin.sin_addr, sizeof(struct in_addr), AF_INET); /* memcpy(p->id, id, IDSIZE); */ if (hent && hent->h_name) { p->ip = strdup(hent->h_name); } else #endif memcpy(&p->ip, &sin.sin_addr, sizeof(p->ip)); printf("%d: New peer connected %s\n", sock, inet_ntoa(p->ip)); p->port = 0; /* inbound connection */ p->currentPiece = NULL; p->remote.choked = 1; p->local.choked = 1; p->state = PEER_INCOMING; p->download = INT_MAX; kStream_create( &p->ios, sock);#if 0 idx = pset->len++; pset->peer = realloc( pset->peer, sizeof(struct btPeer*) * pset->len); /* index the peer by peerid */ pset->peer[ idx] = p;#endif /* index the peer by socket */ ctx->sockpeer[sock]=p; /* change socket to non-blocking */#if WIN32
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -