📄 connection.c
字号:
int priority = conn->type != CONN_TYPE_DIR;
int conn_bucket = -1;
int global_bucket = global_read_bucket;
if (connection_speaks_cells(conn)) {
or_connection_t *or_conn = TO_OR_CONN(conn);
if (conn->state == OR_CONN_STATE_OPEN)
conn_bucket = or_conn->read_bucket;
}
if (!connection_is_rate_limited(conn)) {
/* be willing to read on local conns even if our buckets are empty */
return conn_bucket>=0 ? conn_bucket : 1<<14;
}
if (connection_counts_as_relayed_traffic(conn, now) &&
global_relayed_read_bucket <= global_read_bucket)
global_bucket = global_relayed_read_bucket;
return connection_bucket_round_robin(base, priority,
global_bucket, conn_bucket);
}
/** How many bytes at most can we write onto this connection? */
ssize_t
connection_bucket_write_limit(connection_t *conn, time_t now)
{
int base = connection_speaks_cells(conn) ?
CELL_NETWORK_SIZE : RELAY_PAYLOAD_SIZE;
int priority = conn->type != CONN_TYPE_DIR;
int global_bucket = global_write_bucket;
if (!connection_is_rate_limited(conn)) {
/* be willing to write to local conns even if our buckets are empty */
return conn->outbuf_flushlen;
}
if (connection_counts_as_relayed_traffic(conn, now) &&
global_relayed_write_bucket <= global_write_bucket)
global_bucket = global_relayed_write_bucket;
return connection_bucket_round_robin(base, priority, global_bucket,
conn->outbuf_flushlen);
}
/** Return 1 if the global write buckets are low enough that we
* shouldn't send <b>attempt</b> bytes of low-priority directory stuff
* out to <b>conn</b>. Else return 0.
* Priority is 1 for v1 requests (directories and running-routers),
* and 2 for v2 requests (statuses and descriptors). But see FFFF in
* directory_handle_command_get() for why we don't use priority 2 yet.
*
* There are a lot of parameters we could use here:
* - global_relayed_write_bucket. Low is bad.
* - global_write_bucket. Low is bad.
* - bandwidthrate. Low is bad.
* - bandwidthburst. Not a big factor?
* - attempt. High is bad.
* - total bytes queued on outbufs. High is bad. But I'm wary of
* using this, since a few slow-flushing queues will pump up the
* number without meaning what we meant to mean. What we really
* mean is "total directory bytes added to outbufs recently", but
* that's harder to quantify and harder to keep track of.
*/
int
global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
{
int smaller_bucket = global_write_bucket < global_relayed_write_bucket ?
global_write_bucket : global_relayed_write_bucket;
if (authdir_mode(get_options()) && priority>1)
return 0; /* there's always room to answer v2 if we're an auth dir */
if (!connection_is_rate_limited(conn))
return 0; /* local conns don't get limited */
if (smaller_bucket < (int)attempt)
return 1; /* not enough space no matter the priority */
if (write_buckets_empty_last_second)
return 1; /* we're already hitting our limits, no more please */
if (priority == 1) { /* old-style v1 query */
/* Could we handle *two* of these requests within the next two seconds? */
or_options_t *options = get_options();
int64_t can_write = (int64_t)smaller_bucket
+ 2*(options->RelayBandwidthRate ? options->RelayBandwidthRate :
options->BandwidthRate);
if (can_write < 2*(int64_t)attempt)
return 1;
} else { /* v2 query */
/* no further constraints yet */
}
return 0;
}
/** We just read num_read and wrote num_written onto conn.
* Decrement buckets appropriately. */
static void
connection_buckets_decrement(connection_t *conn, time_t now,
size_t num_read, size_t num_written)
{
if (!connection_is_rate_limited(conn))
return; /* local IPs are free */
if (num_written >= INT_MAX || num_read >= INT_MAX) {
log_err(LD_BUG, "Value out of range. num_read=%lu, num_written=%lu, "
"connection type=%s, state=%s",
(unsigned long)num_read, (unsigned long)num_written,
conn_type_to_string(conn->type),
conn_state_to_string(conn->type, conn->state));
if (num_written >= INT_MAX) num_written = 1;
if (num_read >= INT_MAX) num_read = 1;
tor_fragile_assert();
}
if (num_read > 0)
rep_hist_note_bytes_read(num_read, now);
if (num_written > 0)
rep_hist_note_bytes_written(num_written, now);
if (connection_counts_as_relayed_traffic(conn, now)) {
global_relayed_read_bucket -= (int)num_read;
global_relayed_write_bucket -= (int)num_written;
}
global_read_bucket -= (int)num_read;
global_write_bucket -= (int)num_written;
if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN)
TO_OR_CONN(conn)->read_bucket -= (int)num_read;
}
/** If we have exhausted our global buckets, or the buckets for conn,
* stop reading. */
static void
connection_consider_empty_read_buckets(connection_t *conn)
{
const char *reason;
if (global_read_bucket <= 0) {
reason = "global read bucket exhausted. Pausing.";
} else if (connection_counts_as_relayed_traffic(conn, time(NULL)) &&
global_relayed_read_bucket <= 0) {
reason = "global relayed read bucket exhausted. Pausing.";
} else if (connection_speaks_cells(conn) &&
conn->state == OR_CONN_STATE_OPEN &&
TO_OR_CONN(conn)->read_bucket <= 0) {
reason = "connection read bucket exhausted. Pausing.";
} else
return; /* all good, no need to stop it */
LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason));
conn->read_blocked_on_bw = 1;
connection_stop_reading(conn);
}
/** If we have exhausted our global buckets, or the buckets for conn,
* stop writing. */
static void
connection_consider_empty_write_buckets(connection_t *conn)
{
const char *reason;
if (global_write_bucket <= 0) {
reason = "global write bucket exhausted. Pausing.";
} else if (connection_counts_as_relayed_traffic(conn, time(NULL)) &&
global_relayed_write_bucket <= 0) {
reason = "global relayed write bucket exhausted. Pausing.";
#if 0
} else if (connection_speaks_cells(conn) &&
conn->state == OR_CONN_STATE_OPEN &&
TO_OR_CONN(conn)->write_bucket <= 0) {
reason = "connection write bucket exhausted. Pausing.";
#endif
} else
return; /* all good, no need to stop it */
LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason));
conn->write_blocked_on_bw = 1;
connection_stop_writing(conn);
}
/** Initialize the global read bucket to options-\>BandwidthBurst. */
void
connection_bucket_init(void)
{
or_options_t *options = get_options();
/* start it at max traffic */
global_read_bucket = (int)options->BandwidthBurst;
global_write_bucket = (int)options->BandwidthBurst;
if (options->RelayBandwidthRate) {
global_relayed_read_bucket = (int)options->RelayBandwidthBurst;
global_relayed_write_bucket = (int)options->RelayBandwidthBurst;
} else {
global_relayed_read_bucket = (int)options->BandwidthBurst;
global_relayed_write_bucket = (int)options->BandwidthBurst;
}
}
/** Refill a single <b>bucket</b> called <b>name</b> with bandwith rate
* <b>rate</b> and bandwidth burst <b>burst</b>, assuming that
* <b>seconds_elapsed</b> seconds have passed since the last call.
**/
static void
connection_bucket_refill_helper(int *bucket, int rate, int burst,
int seconds_elapsed, const char *name)
{
int starting_bucket = *bucket;
if (starting_bucket < burst && seconds_elapsed) {
if (((burst - starting_bucket)/seconds_elapsed) < rate) {
*bucket = burst; /* We would overflow the bucket; just set it to
* the maximum. */
} else {
int incr = rate*seconds_elapsed;
*bucket += incr;
if (*bucket > burst || *bucket < starting_bucket) {
/* If we overflow the burst, or underflow our starting bucket,
* cap the bucket value to burst. */
/* XXXX021 this might be redundant now, but it doesn't show up
* in profiles. Remove it after analysis. */
*bucket = burst;
}
}
log(LOG_DEBUG, LD_NET,"%s now %d.", name, *bucket);
}
}
/** A second has rolled over; increment buckets appropriately. */
void
connection_bucket_refill(int seconds_elapsed, time_t now)
{
or_options_t *options = get_options();
smartlist_t *conns = get_connection_array();
int relayrate, relayburst;
if (options->RelayBandwidthRate) {
relayrate = (int)options->RelayBandwidthRate;
relayburst = (int)options->RelayBandwidthBurst;
} else {
relayrate = (int)options->BandwidthRate;
relayburst = (int)options->BandwidthBurst;
}
tor_assert(seconds_elapsed >= 0);
write_buckets_empty_last_second =
global_relayed_write_bucket == 0 || global_write_bucket == 0;
/* refill the global buckets */
connection_bucket_refill_helper(&global_read_bucket,
(int)options->BandwidthRate,
(int)options->BandwidthBurst,
seconds_elapsed, "global_read_bucket");
connection_bucket_refill_helper(&global_write_bucket,
(int)options->BandwidthRate,
(int)options->BandwidthBurst,
seconds_elapsed, "global_write_bucket");
connection_bucket_refill_helper(&global_relayed_read_bucket,
relayrate, relayburst, seconds_elapsed,
"global_relayed_read_bucket");
connection_bucket_refill_helper(&global_relayed_write_bucket,
relayrate, relayburst, seconds_elapsed,
"global_relayed_write_bucket");
/* refill the per-connection buckets */
SMARTLIST_FOREACH(conns, connection_t *, conn,
{
if (connection_speaks_cells(conn)) {
or_connection_t *or_conn = TO_OR_CONN(conn);
if (connection_read_bucket_should_increase(or_conn)) {
connection_bucket_refill_helper(&or_conn->read_bucket,
or_conn->bandwidthrate,
or_conn->bandwidthburst,
seconds_elapsed,
"or_conn->read_bucket");
//log_fn(LOG_DEBUG,"Receiver bucket %d now %d.", i,
// conn->read_bucket);
}
}
if (conn->read_blocked_on_bw == 1 /* marked to turn reading back on now */
&& global_read_bucket > 0 /* and we're allowed to read */
&& (!connection_counts_as_relayed_traffic(conn, now) ||
global_relayed_read_bucket > 0) /* even if we're relayed traffic */
&& (!connection_speaks_cells(conn) ||
conn->state != OR_CONN_STATE_OPEN ||
TO_OR_CONN(conn)->read_bucket > 0)) {
/* and either a non-cell conn or a cell conn with non-empty bucket */
LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
"waking up conn (fd %d) for read", conn->s));
conn->read_blocked_on_bw = 0;
connection_start_reading(conn);
}
if (conn->write_blocked_on_bw == 1
&& global_write_bucket > 0 /* and we're allowed to write */
&& (!connection_counts_as_relayed_traffic(conn, now) ||
global_relayed_write_bucket > 0)) {
/* even if we're relayed traffic */
LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
"waking up conn (fd %d) for write", conn->s));
conn->write_blocked_on_bw = 0;
connection_start_writing(conn);
}
});
}
/** Is the receiver bucket for connection <b>conn</b> low enough that we
* should add another pile of tokens to it?
*/
static int
connection_read_bucket_should_increase(or_connection_t *conn)
{
tor_assert(conn);
if (conn->_base.state != OR_CONN_STATE_OPEN)
return 0; /* only open connections play the rate limiting game */
if (conn->read_bucket >= conn->bandwidthburst)
return 0;
return 1;
}
/** Read bytes from conn-\>s and process them.
*
* This function gets called from conn_read() in main.c, either
* when poll() has declared that conn wants to read, or (for OR conns)
* when there are pending TLS bytes.
*
* It calls connection_read_to_buf() to bring in any new bytes,
* and then calls connection_process_inbuf() to process them.
*
* Mark the connection and return -1 if you want to close it, else
* return 0.
*/
int
connection_handle_read(connection_t *conn)
{
int max_to_read=-1, try_to_read;
size_t before, n_read = 0;
if (conn->marked_for_close)
return 0; /* do nothing */
conn->timestamp_lastread = time(NULL);
switch (conn->type) {
case CONN_TYPE_OR_LISTENER:
return connection_handle_listener_read(conn, CONN_TYPE_OR);
case CONN_TYPE_AP_LISTENER:
case CONN_TYPE_AP_TRANS_LISTENER:
case CONN_TYPE_AP_NATD_LISTENER:
return connection_handle_listener_read(conn, CONN_TYPE_AP);
case CONN_TYPE_DIR_LISTENER:
return connection_handle_listener_read(conn, CONN_TYPE_DIR);
case CONN_TYPE_CONTROL_LISTENER:
return connection_handle_listener_read(conn, CONN_TYPE_CONTROL);
case CONN_TYPE_AP_DNS_LISTENER:
/* This should never happen; eventdns.c handles the reads here. */
tor_fragile_assert();
return 0;
}
loop_again:
try_to_read = max_to_read;
tor_assert(!conn->marked_for_close);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -