📄 cpuworker.c
字号:
/* Copyright (c) 2003-2004, Roger Dingledine.
* Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
* Copyright (c) 2007-2008, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/* $Id$ */
const char cpuworker_c_id[] =
"$Id$";
/**
* \file cpuworker.c
* \brief Implements a farm of 'CPU worker' processes to perform
* CPU-intensive tasks in another thread or process, to not
* interrupt the main thread.
*
* Right now, we only use this for processing onionskins.
**/
#include "or.h"
/** The maximum number of cpuworker processes we will keep around. */
#define MAX_CPUWORKERS 16
/** The minimum number of cpuworker processes we will keep around. */
#define MIN_CPUWORKERS 1
/** The tag specifies which circuit this onionskin was from. */
#define TAG_LEN 8
/** How many bytes are sent from the cpuworker back to tor? */
#define LEN_ONION_RESPONSE \
(1+TAG_LEN+ONIONSKIN_REPLY_LEN+CPATH_KEY_MATERIAL_LEN)
/** How many cpuworkers we have running right now. */
static int num_cpuworkers=0;
/** How many of the running cpuworkers have an assigned task right now. */
static int num_cpuworkers_busy=0;
/** We need to spawn new cpuworkers whenever we rotate the onion keys
* on platforms where execution contexts==processes. This variable stores
* the last time we got a key rotation event. */
static time_t last_rotation_time=0;
static void cpuworker_main(void *data) ATTR_NORETURN;
static int spawn_cpuworker(void);
static void spawn_enough_cpuworkers(void);
static void process_pending_task(connection_t *cpuworker);
/** Initialize the cpuworker subsystem.
*/
void
cpu_init(void)
{
cpuworkers_rotate();
}
/** Called when we're done sending a request to a cpuworker. */
int
connection_cpu_finished_flushing(connection_t *conn)
{
tor_assert(conn);
tor_assert(conn->type == CONN_TYPE_CPUWORKER);
connection_stop_writing(conn);
return 0;
}
/** Pack addr,port,and circ_id; set *tag to the result. (See note on
* cpuworker_main for wire format.) */
static void
tag_pack(char *tag, uint32_t addr, uint16_t port, uint16_t circ_id)
{
*(uint32_t *)tag = addr;
*(uint16_t *)(tag+4) = port;
*(uint16_t *)(tag+6) = circ_id;
}
/** Unpack <b>tag</b> into addr, port, and circ_id.
*/
static void
tag_unpack(const char *tag, uint32_t *addr, uint16_t *port, uint16_t *circ_id)
{
struct in_addr in;
char addrbuf[INET_NTOA_BUF_LEN];
*addr = *(const uint32_t *)tag;
*port = *(const uint16_t *)(tag+4);
*circ_id = *(const uint16_t *)(tag+6);
in.s_addr = htonl(*addr);
tor_inet_ntoa(&in, addrbuf, sizeof(addrbuf));
log_debug(LD_OR,
"onion was from %s:%d, circ_id %d.", addrbuf, *port, *circ_id);
}
/** Called when the onion key has changed and we need to spawn new
* cpuworkers. Close all currently idle cpuworkers, and mark the last
* rotation time as now.
*/
void
cpuworkers_rotate(void)
{
connection_t *cpuworker;
while ((cpuworker = connection_get_by_type_state(CONN_TYPE_CPUWORKER,
CPUWORKER_STATE_IDLE))) {
connection_mark_for_close(cpuworker);
--num_cpuworkers;
}
last_rotation_time = time(NULL);
if (server_mode(get_options()))
spawn_enough_cpuworkers();
}
/** If the cpuworker closes the connection,
* mark it as closed and spawn a new one as needed. */
int
connection_cpu_reached_eof(connection_t *conn)
{
log_warn(LD_GENERAL,"Read eof. CPU worker died unexpectedly.");
if (conn->state != CPUWORKER_STATE_IDLE) {
/* the circ associated with this cpuworker will have to wait until
* it gets culled in run_connection_housekeeping(), since we have
* no way to find out which circ it was. */
log_warn(LD_GENERAL,"...and it left a circuit queued; abandoning circ.");
num_cpuworkers_busy--;
}
num_cpuworkers--;
spawn_enough_cpuworkers(); /* try to regrow. hope we don't end up
spinning. */
connection_mark_for_close(conn);
return 0;
}
/** Called when we get data from a cpuworker. If the answer is not complete,
* wait for a complete answer. If the answer is complete,
* process it as appropriate.
*/
int
connection_cpu_process_inbuf(connection_t *conn)
{
char success;
char buf[LEN_ONION_RESPONSE];
uint32_t addr;
uint16_t port;
uint16_t circ_id;
or_connection_t *p_conn;
circuit_t *circ;
tor_assert(conn);
tor_assert(conn->type == CONN_TYPE_CPUWORKER);
if (!buf_datalen(conn->inbuf))
return 0;
if (conn->state == CPUWORKER_STATE_BUSY_ONION) {
if (buf_datalen(conn->inbuf) < LEN_ONION_RESPONSE) /* answer available? */
return 0; /* not yet */
tor_assert(buf_datalen(conn->inbuf) == LEN_ONION_RESPONSE);
connection_fetch_from_buf(&success,1,conn);
connection_fetch_from_buf(buf,LEN_ONION_RESPONSE-1,conn);
/* parse out the circ it was talking about */
tag_unpack(buf, &addr, &port, &circ_id);
circ = NULL;
/* (Here we use connection_or_exact_get_by_addr_port rather than
* get_by_identity_digest: we want a specific port here in
* case there are multiple connections.) */
p_conn = connection_or_exact_get_by_addr_port(addr,port);
if (p_conn)
circ = circuit_get_by_circid_orconn(circ_id, p_conn);
if (success == 0) {
log_debug(LD_OR,
"decoding onionskin failed. "
"(Old key or bad software.) Closing.");
if (circ)
circuit_mark_for_close(circ, END_CIRC_REASON_TORPROTOCOL);
goto done_processing;
}
if (!circ) {
/* This happens because somebody sends us a destroy cell and the
* circuit goes away, while the cpuworker is working. This is also
* why our tag doesn't include a pointer to the circ, because we'd
* never know if it's still valid.
*/
log_debug(LD_OR,"processed onion for a circ that's gone. Dropping.");
goto done_processing;
}
tor_assert(! CIRCUIT_IS_ORIGIN(circ));
if (onionskin_answer(TO_OR_CIRCUIT(circ), CELL_CREATED, buf+TAG_LEN,
buf+TAG_LEN+ONIONSKIN_REPLY_LEN) < 0) {
log_warn(LD_OR,"onionskin_answer failed. Closing.");
circuit_mark_for_close(circ, END_CIRC_REASON_INTERNAL);
goto done_processing;
}
log_debug(LD_OR,"onionskin_answer succeeded. Yay.");
} else {
tor_assert(0); /* don't ask me to do handshakes yet */
}
done_processing:
conn->state = CPUWORKER_STATE_IDLE;
num_cpuworkers_busy--;
if (conn->timestamp_created < last_rotation_time) {
connection_mark_for_close(conn);
num_cpuworkers--;
spawn_enough_cpuworkers();
} else {
process_pending_task(conn);
}
return 0;
}
/** Implement a cpuworker. 'data' is an fdarray as returned by socketpair.
* Read and writes from fdarray[1]. Reads requests, writes answers.
*
* Request format:
* Task type [1 byte, always CPUWORKER_TASK_ONION]
* Opaque tag TAG_LEN
* Onionskin challenge ONIONSKIN_CHALLENGE_LEN
* Response format:
* Success/failure [1 byte, boolean.]
* Opaque tag TAG_LEN
* Onionskin challenge ONIONSKIN_REPLY_LEN
* Negotiated keys KEY_LEN*2+DIGEST_LEN*2
*
* (Note: this _should_ be by addr/port, since we're concerned with specific
* connections, not with routers (where we'd use identity).)
*/
static void
cpuworker_main(void *data)
{
char question[ONIONSKIN_CHALLENGE_LEN];
uint8_t question_type;
int *fdarray = data;
int fd;
/* variables for onion processing */
char keys[CPATH_KEY_MATERIAL_LEN];
char reply_to_proxy[ONIONSKIN_REPLY_LEN];
char buf[LEN_ONION_RESPONSE];
char tag[TAG_LEN];
crypto_pk_env_t *onion_key = NULL, *last_onion_key = NULL;
fd = fdarray[1]; /* this side is ours */
#ifndef TOR_IS_MULTITHREADED
tor_close_socket(fdarray[0]); /* this is the side of the socketpair the
* parent uses */
tor_free_all(1); /* so the child doesn't hold the parent's fd's open */
handle_signals(0); /* ignore interrupts from the keyboard, etc */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -