📄 table.c
字号:
return GNUNET_OK; } selected -= distance; } } GNUNET_GE_BREAK (NULL, 0); GNUNET_mutex_unlock (lock); return GNUNET_SYSERR;}/** * Send a discovery message to the other peer. * * @param cls NULL or pre-build discovery message */static voidbroadcast_dht_discovery (const GNUNET_PeerIdentity * other, void *cls){ P2P_DHT_Discovery *disco = cls; unsigned int pc; unsigned int i; GNUNET_PeerIdentity *pos; if (stats != NULL) stats->change (stat_dht_advertisements, 1); if (disco != NULL) { coreAPI->ciphertext_send (other, &disco->header, GNUNET_EXTREME_PRIORITY / 4, MAINTAIN_FREQUENCY * MAINTAIN_CHANCE / 2); return; } pc = total_peers; if (pc > MAINTAIN_ADV_CAP) pc = MAINTAIN_ADV_CAP; if (pc == 0) pc = 1; disco = GNUNET_malloc (pc * sizeof (GNUNET_PeerIdentity) + sizeof (P2P_DHT_Discovery)); disco->header.type = htons (GNUNET_P2P_PROTO_DHT_DISCOVERY); disco->space_available = -1; /* FIXME */ pos = (GNUNET_PeerIdentity *) & disco[1]; i = 0; if (total_peers == 0) { /* put in our own identity (otherwise we get into a storm of empty discovery messages) */ pos[0] = *coreAPI->my_identity; i = 1; } while (i < pc) { if (GNUNET_OK != GNUNET_DHT_select_peer (&pos[i], &other->hashPubKey, pos, i)) pc--; else i++; } disco->header.size = htons (pc * sizeof (GNUNET_PeerIdentity) + sizeof (P2P_DHT_Discovery)); coreAPI->ciphertext_send (other, &disco->header, 0, MAINTAIN_FREQUENCY * MAINTAIN_CHANCE / 2); GNUNET_free (disco);}static voidbroadcast_dht_discovery_prob (const GNUNET_PeerIdentity * other, void *cls){ if (GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, MAINTAIN_CHANCE) != 0) return; broadcast_dht_discovery (other, cls);}/** * Cron job to maintain DHT routing table. */static voidmaintain_dht_job (void *unused){ P2P_DHT_Discovery disc; if (total_peers == 0) { disc.header.size = htons (sizeof (P2P_DHT_Discovery)); disc.header.type = htons (GNUNET_P2P_PROTO_DHT_DISCOVERY); disc.space_available = -1; /* FIXME */ coreAPI->p2p_connections_iterate (&broadcast_dht_discovery_prob, &disc); } else { coreAPI->p2p_connections_iterate (&broadcast_dht_discovery_prob, NULL); }}/** * We have received a pong from a peer and know it is still * there. */static voidpongNotify (void *cls){ GNUNET_PeerIdentity *peer = cls; PeerInfo *pi; pi = findPeerEntry (peer); if (pi != NULL) { pi->lastActivity = GNUNET_get_time (); pi->expected_latency = pi->lastActivity - pi->lastTimePingSend; pi->response_count++; } GNUNET_free (peer);}/** * Send a ping to the given peer to check if it is still * running. */static voidpingPeer (PeerInfo * pi){ GNUNET_PeerIdentity *p; p = GNUNET_malloc (sizeof (GNUNET_PeerIdentity)); *p = pi->id; if (GNUNET_OK == pingpong->ping (p, &pongNotify, p, GNUNET_NO, rand ())) { pi->lastTimePingSend = GNUNET_get_time (); pi->request_count++; }}/** * Check if pi is still up and running. May also try * to confirm that the peer is still live. * * @return GNUNET_YES if the peer should be removed from the DHT table */static intcheckExpired (PeerInfo * pi){ GNUNET_CronTime now; now = GNUNET_get_time (); if (pi->lastActivity >= now) return GNUNET_NO; if (now - pi->lastActivity > MAINTAIN_PEER_TIMEOUT) return GNUNET_YES; if (now - pi->lastActivity > MAINTAIN_PEER_TIMEOUT / 2) pingPeer (pi); return GNUNET_NO;}/** * Check for expired peers in the given bucket. */static voidcheckExpiration (PeerBucket * bucket){ unsigned int i; PeerInfo *peer; for (i = 0; i < bucket->peers_size; i++) { peer = bucket->peers[i]; if (checkExpired (peer) == GNUNET_YES) { total_peers--; if (stats != NULL) stats->change (stat_dht_total_peers, -1); GNUNET_free (peer); bucket->peers[i] = bucket->peers[bucket->peers_size - 1]; GNUNET_array_grow (bucket->peers, bucket->peers_size, bucket->peers_size - 1); i--; } }}/** * Consider adding the given peer to the DHT. */static voidconsiderPeer (const GNUNET_PeerIdentity * sender, const GNUNET_PeerIdentity * peer){ PeerInfo *pi; PeerBucket *bucket; P2P_DHT_ASK_HELLO ask; GNUNET_MessageHello *hello; bucket = findBucketFor (peer); if (bucket == NULL) return; /* peers[i] == self */ if (bucket->peers_size >= MAINTAIN_BUCKET_SIZE) checkExpiration (bucket); if (bucket->peers_size >= MAINTAIN_BUCKET_SIZE) return; /* do not care */ if (NULL != findPeerEntryInBucket (bucket, peer)) return; /* already have this peer in buckets */ /* do we know how to contact this peer? */ hello = identity->identity2Hello (peer, GNUNET_TRANSPORT_PROTOCOL_NUMBER_ANY, GNUNET_NO); if (hello == NULL) { /* if identity not known, ask sender for HELLO of other peer */ ask.header.size = htons (sizeof (P2P_DHT_ASK_HELLO)); ask.header.type = htons (sizeof (GNUNET_P2P_PROTO_DHT_ASK_HELLO)); ask.reserved = 0; ask.peer = *peer; coreAPI->ciphertext_send (sender, &ask.header, 0, /* FIXME: priority */ 5 * GNUNET_CRON_SECONDS); return; } GNUNET_free (hello); /* check if connected, if not, send discovery */ if (GNUNET_OK != coreAPI->p2p_connection_status_check (peer, NULL, NULL)) { /* not yet connected; connect sending DISCOVERY */ broadcast_dht_discovery (peer, NULL); return; } /* we are connected (in core), add to bucket */ pi = GNUNET_malloc (sizeof (PeerInfo)); memset (pi, 0, sizeof (PeerInfo)); pi->id = *peer; pingPeer (pi); GNUNET_array_grow (bucket->peers, bucket->peers_size, bucket->peers_size + 1); bucket->peers[bucket->peers_size - 1] = pi; total_peers++; if (stats != NULL) stats->change (stat_dht_total_peers, 1);}/** * Handle discovery message. */static inthandleDiscovery (const GNUNET_PeerIdentity * sender, const GNUNET_MessageHeader * msg){ unsigned int pc; unsigned int i; const P2P_DHT_Discovery *disco; const GNUNET_PeerIdentity *peers; pc = (ntohs (msg->size) - sizeof (P2P_DHT_Discovery)) / sizeof (GNUNET_PeerIdentity); if (pc > MAINTAIN_ADV_CAP * 8) { GNUNET_GE_BREAK_OP (coreAPI->ectx, 0); return GNUNET_SYSERR; /* far too big */ } if (ntohs (msg->size) != sizeof (P2P_DHT_Discovery) + pc * sizeof (GNUNET_PeerIdentity)) { GNUNET_GE_BREAK_OP (coreAPI->ectx, 0); return GNUNET_SYSERR; /* malformed */ } disco = (const P2P_DHT_Discovery *) msg; if (stats != NULL) stats->change (stat_dht_discoveries, 1); if (pc == 0) { /* if peer has 0 connections, be sure to send discovery back */ broadcast_dht_discovery (sender, NULL); } GNUNET_mutex_lock (lock); considerPeer (sender, sender); peers = (const GNUNET_PeerIdentity *) &disco[1]; for (i = 0; i < pc; i++) considerPeer (sender, &peers[i]); GNUNET_mutex_unlock (lock); return GNUNET_OK;}/** * Handle ask hello message. */static inthandleAskHello (const GNUNET_PeerIdentity * sender, const GNUNET_MessageHeader * msg){ const P2P_DHT_ASK_HELLO *ask; GNUNET_MessageHello *hello; if (ntohs (msg->size) != sizeof (P2P_DHT_ASK_HELLO)) { GNUNET_GE_BREAK_OP (coreAPI->ectx, 0); return GNUNET_SYSERR; } ask = (const P2P_DHT_ASK_HELLO *) msg; if (NULL == findBucketFor (&ask->peer)) return GNUNET_OK; hello = identity->identity2Hello (&ask->peer, GNUNET_TRANSPORT_PROTOCOL_NUMBER_ANY, GNUNET_NO); if (hello == NULL) return GNUNET_OK; coreAPI->ciphertext_send (sender, &hello->header, 0, 5 * GNUNET_CRON_SECONDS); GNUNET_free (hello); return GNUNET_OK;}static voidpeer_disconnect_handler (const GNUNET_PeerIdentity * peer, void *unused){ PeerBucket *bucket; PeerInfo *info; GNUNET_mutex_lock (lock); bucket = findBucketFor (peer); if (bucket != NULL) { info = findPeerEntryInBucket (bucket, peer); if (info != NULL) { info->lastActivity = 0; checkExpiration (bucket); } } GNUNET_mutex_unlock (lock);}/** * Initialize table DHT component. * * @param capi the core API * @return GNUNET_OK on success */intGNUNET_DHT_table_init (GNUNET_CoreAPIForPlugins * capi){ unsigned long long i; coreAPI = capi; /* use less than 50% of peer's ideal number of connections for DHT table size */ i = coreAPI->core_slots_count () / MAINTAIN_BUCKET_SIZE / 2; if (i < 4) i = 4; GNUNET_array_grow (buckets, bucketCount, i); for (i = 0; i < bucketCount; i++) { buckets[i].bstart = 512 * i / bucketCount; buckets[i].bend = 512 * (i + 1) / bucketCount; } lock = capi->global_lock_get (); stats = capi->service_request ("stats"); if (stats != NULL) { stat_dht_total_peers = stats->create (gettext_noop ("# dht connections")); stat_dht_discoveries = stats->create (gettext_noop ("# dht discovery messages received")); stat_dht_route_looks = stats->create (gettext_noop ("# dht route host lookups performed")); stat_dht_advertisements = stats->create (gettext_noop ("# dht discovery messages sent")); } identity = coreAPI->service_request ("identity"); GNUNET_GE_ASSERT (coreAPI->ectx, identity != NULL); pingpong = coreAPI->service_request ("pingpong"); GNUNET_GE_ASSERT (coreAPI->ectx, pingpong != NULL); capi->p2p_ciphertext_handler_register (GNUNET_P2P_PROTO_DHT_DISCOVERY, &handleDiscovery); capi->p2p_ciphertext_handler_register (GNUNET_P2P_PROTO_DHT_ASK_HELLO, &handleAskHello); capi->peer_disconnect_notification_register (&peer_disconnect_handler, NULL); GNUNET_cron_add_job (coreAPI->cron, &maintain_dht_job, MAINTAIN_FREQUENCY, MAINTAIN_FREQUENCY, NULL); return GNUNET_OK;}/** * Shutdown table DHT component. * * @return GNUNET_OK on success */intGNUNET_DHT_table_done (){ unsigned int i; unsigned int j; coreAPI->peer_disconnect_notification_unregister (&peer_disconnect_handler, NULL); coreAPI->p2p_ciphertext_handler_unregister (GNUNET_P2P_PROTO_DHT_DISCOVERY, &handleDiscovery); coreAPI->p2p_ciphertext_handler_unregister (GNUNET_P2P_PROTO_DHT_ASK_HELLO, &handleAskHello); GNUNET_cron_del_job (coreAPI->cron, &maintain_dht_job, MAINTAIN_FREQUENCY, NULL); if (stats != NULL) { coreAPI->service_release (stats); stats = NULL; } coreAPI->service_release (identity); identity = NULL; coreAPI->service_release (pingpong); pingpong = NULL; for (i = 0; i < bucketCount; i++) { for (j = 0; j < buckets[i].peers_size; j++) GNUNET_free (buckets[i].peers[j]); GNUNET_array_grow (buckets[i].peers, buckets[i].peers_size, 0); } GNUNET_array_grow (buckets, bucketCount, 0); lock = NULL; return GNUNET_OK;}/* end of table.c */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -