📄 reliable.c
字号:
gc_free (&gc); return false;}/* make sure that incoming packet ID won't deadlock the receive buffer */boolreliable_wont_break_sequentiality (const struct reliable *rel, packet_id_type id){ struct gc_arena gc = gc_new (); int ret; if ((int)id < (int)rel->packet_id + rel->size) { ret = true; } else { msg (D_REL_LOW, "ACK " packet_id_format " breaks sequentiality: %s", (packet_id_print_type)id, reliable_print_ids (rel, &gc)); ret = false; } gc_free (&gc); return ret;}/* grab a free buffer */struct buffer *reliable_get_buf (struct reliable *rel){ int i; for (i = 0; i < rel->size; ++i) { struct reliable_entry *e = &rel->array[i]; if (!e->active) { ASSERT (buf_init (&e->buf, rel->offset)); return &e->buf; } } return NULL;}/* grab a free buffer, fail if buffer clogged by unacknowledged low packet IDs */struct buffer *reliable_get_buf_output_sequenced (struct reliable *rel){ struct gc_arena gc = gc_new (); int i; packet_id_type min_id = 0; bool min_id_defined = false; struct buffer *ret = NULL; /* find minimum active packet_id */ for (i = 0; i < rel->size; ++i) { const struct reliable_entry *e = &rel->array[i]; if (e->active) { if (!min_id_defined || e->packet_id < min_id) { min_id_defined = true; min_id = e->packet_id; } } } if (!min_id_defined || (int)(rel->packet_id - min_id) < rel->size) { ret = reliable_get_buf (rel); } else { msg (D_REL_LOW, "ACK output sequence broken: %s", reliable_print_ids (rel, &gc)); } gc_free (&gc); return ret;}/* get active buffer for next sequentially increasing key ID */struct buffer *reliable_get_buf_sequenced (struct reliable *rel){ int i; for (i = 0; i < rel->size; ++i) { struct reliable_entry *e = &rel->array[i]; if (e->active && e->packet_id == rel->packet_id) { return &e->buf; } } return NULL;}/* return true if reliable_send would return a non-NULL result */boolreliable_can_send (const struct reliable *rel){ struct gc_arena gc = gc_new (); int i; int n_active = 0, n_current = 0; for (i = 0; i < rel->size; ++i) { const struct reliable_entry *e = &rel->array[i]; if (e->active) { ++n_active; if (now >= e->next_try) ++n_current; } } msg (D_REL_DEBUG, "ACK reliable_can_send active=%d current=%d : %s", n_active, n_current, reliable_print_ids (rel, &gc)); gc_free (&gc); return n_current > 0;}/* return a unique point-in-time to trigger retry */static time_treliable_unique_retry (struct reliable *rel, time_t retry){ int i; while (true) { for (i = 0; i < rel->size; ++i) { struct reliable_entry *e = &rel->array[i]; if (e->active && e->next_try == retry) goto again; } break; again: ++retry; } return retry;}/* return next buffer to send to remote */struct buffer *reliable_send (struct reliable *rel, int *opcode){ int i; struct reliable_entry *best = NULL; const time_t local_now = now; for (i = 0; i < rel->size; ++i) { struct reliable_entry *e = &rel->array[i]; if (e->active && local_now >= e->next_try) { if (!best || e->packet_id < best->packet_id) best = e; } } if (best) {#if 1 /* exponential backoff */ best->next_try = reliable_unique_retry (rel, local_now + best->timeout); best->timeout *= 2;#else /* constant timeout, no backoff */ best->next_try = local_now + best->timeout;#endif *opcode = best->opcode; msg (D_REL_DEBUG, "ACK reliable_send ID " packet_id_format " (size=%d to=%d)", (packet_id_print_type)best->packet_id, best->buf.len, (int)(best->next_try - local_now)); return &best->buf; } return NULL;}/* schedule all pending packets for immediate retransmit */voidreliable_schedule_now (struct reliable *rel){ int i; msg (D_REL_DEBUG, "ACK reliable_schedule_now"); for (i = 0; i < rel->size; ++i) { struct reliable_entry *e = &rel->array[i]; if (e->active) { e->next_try = now; e->timeout = rel->initial_timeout; } }}/* in how many seconds should we wake up to check for timeout *//* if we return BIG_TIMEOUT, nothing to wait for */interval_treliable_send_timeout (const struct reliable *rel){ struct gc_arena gc = gc_new (); interval_t ret = BIG_TIMEOUT; int i; const time_t local_now = now; for (i = 0; i < rel->size; ++i) { const struct reliable_entry *e = &rel->array[i]; if (e->active) { if (e->next_try <= local_now) { ret = 0; break; } else { ret = min_int (ret, e->next_try - local_now); } } } msg (D_REL_DEBUG, "ACK reliable_send_timeout %d %s", (int) ret, reliable_print_ids (rel, &gc)); gc_free (&gc); return ret;}/* * Enable an incoming buffer previously returned by a get function as active. */voidreliable_mark_active_incoming (struct reliable *rel, struct buffer *buf, packet_id_type pid, int opcode){ int i; for (i = 0; i < rel->size; ++i) { struct reliable_entry *e = &rel->array[i]; if (buf == &e->buf) { e->active = true; /* packets may not arrive in sequential order */ e->packet_id = pid; /* check for replay */ ASSERT (pid >= rel->packet_id); e->opcode = opcode; e->next_try = 0; e->timeout = 0; msg (D_REL_DEBUG, "ACK mark active incoming ID " packet_id_format, (packet_id_print_type)e->packet_id); return; } } ASSERT (0); /* buf not found in rel */}/* * Enable an outgoing buffer previously returned by a get function as active. */voidreliable_mark_active_outgoing (struct reliable *rel, struct buffer *buf, int opcode){ int i; for (i = 0; i < rel->size; ++i) { struct reliable_entry *e = &rel->array[i]; if (buf == &e->buf) { /* Write mode, increment packet_id (i.e. sequence number) linearly and prepend id to packet */ packet_id_type net_pid; e->packet_id = rel->packet_id++; net_pid = htonpid (e->packet_id); ASSERT (buf_write_prepend (buf, &net_pid, sizeof (net_pid))); e->active = true; e->opcode = opcode; e->next_try = 0; e->timeout = rel->initial_timeout; msg (D_REL_DEBUG, "ACK mark active outgoing ID " packet_id_format, (packet_id_print_type)e->packet_id); return; } } ASSERT (0); /* buf not found in rel */}/* delete a buffer previously activated by reliable_mark_active() */voidreliable_mark_deleted (struct reliable *rel, struct buffer *buf, bool inc_pid){ int i; for (i = 0; i < rel->size; ++i) { struct reliable_entry *e = &rel->array[i]; if (buf == &e->buf) { e->active = false; if (inc_pid) rel->packet_id = e->packet_id + 1; return; } } ASSERT (0);}#if 0voidreliable_ack_debug_print (const struct reliable_ack *ack, char *desc){ int i; printf ("********* struct reliable_ack %s\n", desc); for (i = 0; i < ack->len; ++i) { printf (" %d: " packet_id_format "\n", i, (packet_id_print_type) ack->packet_id[i]); }}voidreliable_debug_print (const struct reliable *rel, char *desc){ int i; update_time (); printf ("********* struct reliable %s\n", desc); printf (" initial_timeout=%d\n", (int)rel->initial_timeout); printf (" packet_id=" packet_id_format "\n", rel->packet_id); printf (" now=" time_format "\n", now); for (i = 0; i < rel->size; ++i) { const struct reliable_entry *e = &rel->array[i]; if (e->active) { printf (" %d: packet_id=" packet_id_format " len=%d", i, e->packet_id, e->buf.len); printf (" next_try=" time_format, e->next_try); printf ("\n"); } }}#endif#elsestatic void dummy(void) {}#endif /* USE_CRYPTO && USE_SSL*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -