⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 connection.c

📁 关于tor匿名通信的源代码
💻 C
📖 第 1 页 / 共 5 页
字号:
  int priority = conn->type != CONN_TYPE_DIR;
  int conn_bucket = -1;
  int global_bucket = global_read_bucket;

  if (connection_speaks_cells(conn)) {
    or_connection_t *or_conn = TO_OR_CONN(conn);
    if (conn->state == OR_CONN_STATE_OPEN)
      conn_bucket = or_conn->read_bucket;
  }

  if (!connection_is_rate_limited(conn)) {
    /* be willing to read on local conns even if our buckets are empty */
    return conn_bucket>=0 ? conn_bucket : 1<<14;
  }

  if (connection_counts_as_relayed_traffic(conn, now) &&
      global_relayed_read_bucket <= global_read_bucket)
    global_bucket = global_relayed_read_bucket;

  return connection_bucket_round_robin(base, priority,
                                       global_bucket, conn_bucket);
}

/** How many bytes at most can we write onto this connection? */
ssize_t
connection_bucket_write_limit(connection_t *conn, time_t now)
{
  int base = connection_speaks_cells(conn) ?
               CELL_NETWORK_SIZE : RELAY_PAYLOAD_SIZE;
  int priority = conn->type != CONN_TYPE_DIR;
  int global_bucket = global_write_bucket;

  if (!connection_is_rate_limited(conn)) {
    /* be willing to write to local conns even if our buckets are empty */
    return conn->outbuf_flushlen;
  }

  if (connection_counts_as_relayed_traffic(conn, now) &&
      global_relayed_write_bucket <= global_write_bucket)
    global_bucket = global_relayed_write_bucket;

  return connection_bucket_round_robin(base, priority, global_bucket,
                                       conn->outbuf_flushlen);
}

/** Return 1 if the global write buckets are low enough that we
 * shouldn't send <b>attempt</b> bytes of low-priority directory stuff
 * out to <b>conn</b>. Else return 0.

 * Priority is 1 for v1 requests (directories and running-routers),
 * and 2 for v2 requests (statuses and descriptors). But see FFFF in
 * directory_handle_command_get() for why we don't use priority 2 yet.
 *
 * There are a lot of parameters we could use here:
 * - global_relayed_write_bucket. Low is bad.
 * - global_write_bucket. Low is bad.
 * - bandwidthrate. Low is bad.
 * - bandwidthburst. Not a big factor?
 * - attempt. High is bad.
 * - total bytes queued on outbufs. High is bad. But I'm wary of
 *   using this, since a few slow-flushing queues will pump up the
 *   number without meaning what we meant to mean. What we really
 *   mean is "total directory bytes added to outbufs recently", but
 *   that's harder to quantify and harder to keep track of.
 */
int
global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
{
  int smaller_bucket = global_write_bucket < global_relayed_write_bucket ?
                       global_write_bucket : global_relayed_write_bucket;
  if (authdir_mode(get_options()) && priority>1)
    return 0; /* there's always room to answer v2 if we're an auth dir */

  if (!connection_is_rate_limited(conn))
    return 0; /* local conns don't get limited */

  if (smaller_bucket < (int)attempt)
    return 1; /* not enough space no matter the priority */

  if (write_buckets_empty_last_second)
    return 1; /* we're already hitting our limits, no more please */

  if (priority == 1) { /* old-style v1 query */
    /* Could we handle *two* of these requests within the next two seconds? */
    or_options_t *options = get_options();
    int64_t can_write = (int64_t)smaller_bucket
      + 2*(options->RelayBandwidthRate ? options->RelayBandwidthRate :
                                         options->BandwidthRate);
    if (can_write < 2*(int64_t)attempt)
      return 1;
  } else { /* v2 query */
    /* no further constraints yet */
  }
  return 0;
}

/** We just read num_read and wrote num_written onto conn.
 * Decrement buckets appropriately. */
static void
connection_buckets_decrement(connection_t *conn, time_t now,
                             size_t num_read, size_t num_written)
{
  if (!connection_is_rate_limited(conn))
    return; /* local IPs are free */
  if (num_written >= INT_MAX || num_read >= INT_MAX) {
    log_err(LD_BUG, "Value out of range. num_read=%lu, num_written=%lu, "
             "connection type=%s, state=%s",
             (unsigned long)num_read, (unsigned long)num_written,
             conn_type_to_string(conn->type),
             conn_state_to_string(conn->type, conn->state));
    if (num_written >= INT_MAX) num_written = 1;
    if (num_read >= INT_MAX) num_read = 1;
    tor_fragile_assert();
  }

  if (num_read > 0)
    rep_hist_note_bytes_read(num_read, now);
  if (num_written > 0)
    rep_hist_note_bytes_written(num_written, now);

  if (connection_counts_as_relayed_traffic(conn, now)) {
    global_relayed_read_bucket -= (int)num_read;
    global_relayed_write_bucket -= (int)num_written;
  }
  global_read_bucket -= (int)num_read;
  global_write_bucket -= (int)num_written;
  if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN)
    TO_OR_CONN(conn)->read_bucket -= (int)num_read;
}

/** If we have exhausted our global buckets, or the buckets for conn,
 * stop reading. */
static void
connection_consider_empty_read_buckets(connection_t *conn)
{
  const char *reason;

  if (global_read_bucket <= 0) {
    reason = "global read bucket exhausted. Pausing.";
  } else if (connection_counts_as_relayed_traffic(conn, time(NULL)) &&
             global_relayed_read_bucket <= 0) {
    reason = "global relayed read bucket exhausted. Pausing.";
  } else if (connection_speaks_cells(conn) &&
             conn->state == OR_CONN_STATE_OPEN &&
             TO_OR_CONN(conn)->read_bucket <= 0) {
    reason = "connection read bucket exhausted. Pausing.";
  } else
    return; /* all good, no need to stop it */

  LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason));
  conn->read_blocked_on_bw = 1;
  connection_stop_reading(conn);
}

/** If we have exhausted our global buckets, or the buckets for conn,
 * stop writing. */
static void
connection_consider_empty_write_buckets(connection_t *conn)
{
  const char *reason;

  if (global_write_bucket <= 0) {
    reason = "global write bucket exhausted. Pausing.";
  } else if (connection_counts_as_relayed_traffic(conn, time(NULL)) &&
             global_relayed_write_bucket <= 0) {
    reason = "global relayed write bucket exhausted. Pausing.";
#if 0
  } else if (connection_speaks_cells(conn) &&
             conn->state == OR_CONN_STATE_OPEN &&
             TO_OR_CONN(conn)->write_bucket <= 0) {
    reason = "connection write bucket exhausted. Pausing.";
#endif
  } else
    return; /* all good, no need to stop it */

  LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason));
  conn->write_blocked_on_bw = 1;
  connection_stop_writing(conn);
}

/** Initialize the global read bucket to options-\>BandwidthBurst. */
void
connection_bucket_init(void)
{
  or_options_t *options = get_options();
  /* start it at max traffic */
  global_read_bucket = (int)options->BandwidthBurst;
  global_write_bucket = (int)options->BandwidthBurst;
  if (options->RelayBandwidthRate) {
    global_relayed_read_bucket = (int)options->RelayBandwidthBurst;
    global_relayed_write_bucket = (int)options->RelayBandwidthBurst;
  } else {
    global_relayed_read_bucket = (int)options->BandwidthBurst;
    global_relayed_write_bucket = (int)options->BandwidthBurst;
  }
}

/** Refill a single <b>bucket</b> called <b>name</b> with bandwith rate
 * <b>rate</b> and bandwidth burst <b>burst</b>, assuming that
 * <b>seconds_elapsed</b> seconds have passed since the last call.
 **/
static void
connection_bucket_refill_helper(int *bucket, int rate, int burst,
                                int seconds_elapsed, const char *name)
{
  int starting_bucket = *bucket;
  if (starting_bucket < burst && seconds_elapsed) {
    if (((burst - starting_bucket)/seconds_elapsed) < rate) {
      *bucket = burst;  /* We would overflow the bucket; just set it to
                         * the maximum. */
    } else {
      int incr = rate*seconds_elapsed;
      *bucket += incr;
      if (*bucket > burst || *bucket < starting_bucket) {
        /* If we overflow the burst, or underflow our starting bucket,
         * cap the bucket value to burst. */
        /* XXXX021 this might be redundant now, but it doesn't show up
         * in profiles.  Remove it after analysis. */
        *bucket = burst;
      }
    }
    log(LOG_DEBUG, LD_NET,"%s now %d.", name, *bucket);
  }
}

/** A second has rolled over; increment buckets appropriately. */
void
connection_bucket_refill(int seconds_elapsed, time_t now)
{
  or_options_t *options = get_options();
  smartlist_t *conns = get_connection_array();
  int relayrate, relayburst;

  if (options->RelayBandwidthRate) {
    relayrate = (int)options->RelayBandwidthRate;
    relayburst = (int)options->RelayBandwidthBurst;
  } else {
    relayrate = (int)options->BandwidthRate;
    relayburst = (int)options->BandwidthBurst;
  }

  tor_assert(seconds_elapsed >= 0);

  write_buckets_empty_last_second =
    global_relayed_write_bucket == 0 || global_write_bucket == 0;

  /* refill the global buckets */
  connection_bucket_refill_helper(&global_read_bucket,
                                  (int)options->BandwidthRate,
                                  (int)options->BandwidthBurst,
                                  seconds_elapsed, "global_read_bucket");
  connection_bucket_refill_helper(&global_write_bucket,
                                  (int)options->BandwidthRate,
                                  (int)options->BandwidthBurst,
                                  seconds_elapsed, "global_write_bucket");
  connection_bucket_refill_helper(&global_relayed_read_bucket,
                                  relayrate, relayburst, seconds_elapsed,
                                  "global_relayed_read_bucket");
  connection_bucket_refill_helper(&global_relayed_write_bucket,
                                  relayrate, relayburst, seconds_elapsed,
                                  "global_relayed_write_bucket");

  /* refill the per-connection buckets */
  SMARTLIST_FOREACH(conns, connection_t *, conn,
  {
    if (connection_speaks_cells(conn)) {
      or_connection_t *or_conn = TO_OR_CONN(conn);
      if (connection_read_bucket_should_increase(or_conn)) {
        connection_bucket_refill_helper(&or_conn->read_bucket,
                                        or_conn->bandwidthrate,
                                        or_conn->bandwidthburst,
                                        seconds_elapsed,
                                        "or_conn->read_bucket");
        //log_fn(LOG_DEBUG,"Receiver bucket %d now %d.", i,
        //       conn->read_bucket);
      }
    }

    if (conn->read_blocked_on_bw == 1 /* marked to turn reading back on now */
        && global_read_bucket > 0 /* and we're allowed to read */
        && (!connection_counts_as_relayed_traffic(conn, now) ||
            global_relayed_read_bucket > 0) /* even if we're relayed traffic */
        && (!connection_speaks_cells(conn) ||
            conn->state != OR_CONN_STATE_OPEN ||
            TO_OR_CONN(conn)->read_bucket > 0)) {
        /* and either a non-cell conn or a cell conn with non-empty bucket */
      LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
                         "waking up conn (fd %d) for read", conn->s));
      conn->read_blocked_on_bw = 0;
      connection_start_reading(conn);
    }

    if (conn->write_blocked_on_bw == 1
        && global_write_bucket > 0 /* and we're allowed to write */
        && (!connection_counts_as_relayed_traffic(conn, now) ||
            global_relayed_write_bucket > 0)) {
            /* even if we're relayed traffic */
      LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
                         "waking up conn (fd %d) for write", conn->s));
      conn->write_blocked_on_bw = 0;
      connection_start_writing(conn);
    }
  });
}

/** Is the receiver bucket for connection <b>conn</b> low enough that we
 * should add another pile of tokens to it?
 */
static int
connection_read_bucket_should_increase(or_connection_t *conn)
{
  tor_assert(conn);

  if (conn->_base.state != OR_CONN_STATE_OPEN)
    return 0; /* only open connections play the rate limiting game */
  if (conn->read_bucket >= conn->bandwidthburst)
    return 0;

  return 1;
}

/** Read bytes from conn-\>s and process them.
 *
 * This function gets called from conn_read() in main.c, either
 * when poll() has declared that conn wants to read, or (for OR conns)
 * when there are pending TLS bytes.
 *
 * It calls connection_read_to_buf() to bring in any new bytes,
 * and then calls connection_process_inbuf() to process them.
 *
 * Mark the connection and return -1 if you want to close it, else
 * return 0.
 */
int
connection_handle_read(connection_t *conn)
{
  int max_to_read=-1, try_to_read;
  size_t before, n_read = 0;

  if (conn->marked_for_close)
    return 0; /* do nothing */

  conn->timestamp_lastread = time(NULL);

  switch (conn->type) {
    case CONN_TYPE_OR_LISTENER:
      return connection_handle_listener_read(conn, CONN_TYPE_OR);
    case CONN_TYPE_AP_LISTENER:
    case CONN_TYPE_AP_TRANS_LISTENER:
    case CONN_TYPE_AP_NATD_LISTENER:
      return connection_handle_listener_read(conn, CONN_TYPE_AP);
    case CONN_TYPE_DIR_LISTENER:
      return connection_handle_listener_read(conn, CONN_TYPE_DIR);
    case CONN_TYPE_CONTROL_LISTENER:
      return connection_handle_listener_read(conn, CONN_TYPE_CONTROL);
    case CONN_TYPE_AP_DNS_LISTENER:
      /* This should never happen; eventdns.c handles the reads here. */
      tor_fragile_assert();
      return 0;
  }

loop_again:
  try_to_read = max_to_read;
  tor_assert(!conn->marked_for_close);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -