⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcp_srv.cc

📁 Source code for an Numeric Cmputer
💻 CC
📖 第 1 页 / 共 4 页
字号:
	server->kill_server();	break;    case REMOTE_CMS_CLOSE_CHANNEL_REQUEST_TYPE:	client_port_to_check = (CLIENT_TCP_PORT *) client_ports->get_head();	while (NULL != client_port_to_check) {	    if (client_port_to_check->socket_fd ==		_client_tcp_port->socket_fd) {		break;	    }	    client_port_to_check =		(CLIENT_TCP_PORT *) client_ports->get_next();	}	FD_CLR(_client_tcp_port->socket_fd, &read_fd_set);	close(_client_tcp_port->socket_fd);	current_clients--;	if (NULL != _client_tcp_port->subscriptions) {	    remove_subscription_client(_client_tcp_port, buffer_number);	}	_client_tcp_port->socket_fd = -1;	delete _client_tcp_port;	client_ports->delete_current_node();	break;    case REMOTE_CMS_GET_KEYS_REQUEST_TYPE:	server->get_keys_req.buffer_number = buffer_number;	if (recvn(_client_tcp_port->socket_fd,		server->get_keys_req.name, 16, 0, -1, NULL) < 0) {	    _client_tcp_port->errors++;	    return;	}	server->get_keys_reply =	    (REMOTE_GET_KEYS_REPLY *) server->process_request(&server->	    get_keys_req);	if (NULL == server->get_keys_reply) {	    rcs_print_error("Server could not process request.\n");	    memset(temp_buffer, 0, 20);	    *((u_long *) temp_buffer) =		htonl(_client_tcp_port->serial_number);	    server->gen_random_key(((char *) temp_buffer) + 4, 2);	    server->gen_random_key(((char *) temp_buffer) + 12, 2);	    sendn(_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);	    return;	} else {	    *((u_long *) temp_buffer) =		htonl(_client_tcp_port->serial_number);	    memcpy(((char *) temp_buffer) + 4, server->get_keys_reply->key1,		8);	    memcpy(((char *) temp_buffer) + 12, server->get_keys_reply->key2,		8);	    /* successful ? */	    sendn(_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);	    return;	}	break;    case REMOTE_CMS_LOGIN_REQUEST_TYPE:	server->login_req.buffer_number = buffer_number;	if (recvn(_client_tcp_port->socket_fd,		server->login_req.name, 16, 0, -1, NULL) < 0) {	    _client_tcp_port->errors++;	    return;	}	if (recvn(_client_tcp_port->socket_fd,		server->login_req.passwd, 16, 0, -1, NULL) < 0) {	    _client_tcp_port->errors++;	    return;	}	server->login_reply =	    (REMOTE_LOGIN_REPLY *) server->process_request(&server->	    login_req);	if (NULL == server->login_reply) {	    rcs_print_error("Server could not process request.\n");	    *((u_long *) temp_buffer) =		htonl(_client_tcp_port->serial_number);	    *((u_long *) temp_buffer + 1) = htonl(0);	/* not successful */	    sendn(_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout);	    return;	} else {	    *((u_long *) temp_buffer) =		htonl(_client_tcp_port->serial_number);	    *((u_long *) temp_buffer + 1) =		htonl(server->login_reply->success);	    /* successful ? */	    sendn(_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout);	    return;	}	break;    case REMOTE_CMS_SET_SUBSCRIPTION_REQUEST_TYPE:	server->set_subscription_req.buffer_number = buffer_number;	server->set_subscription_req.subscription_type =	    ntohl(*((u_long *) temp_buffer + 3));	server->set_subscription_req.poll_interval_millis =	    ntohl(*((u_long *) temp_buffer + 4));	server->set_subscription_reply =	    (REMOTE_SET_SUBSCRIPTION_REPLY *) server->	    process_request(&server->set_subscription_req);	if (NULL == server->set_subscription_reply) {	    rcs_print_error("Server could not process request.\n");	    *((u_long *) temp_buffer) =		htonl(_client_tcp_port->serial_number);	    *((u_long *) temp_buffer + 1) = htonl(0);	/* not successful */	    sendn(_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout);	    return;	} else {	    if (server->set_subscription_reply->success) {		if (server->set_subscription_req.subscription_type ==		    CMS_POLLED_SUBSCRIPTION		    || server->set_subscription_req.subscription_type ==		    CMS_VARIABLE_SUBSCRIPTION) {		    add_subscription_client(buffer_number,			server->set_subscription_req.			subscription_type,			server->set_subscription_req.			poll_interval_millis, _client_tcp_port);		}		if (server->set_subscription_req.subscription_type ==		    CMS_NO_SUBSCRIPTION) {		    remove_subscription_client(_client_tcp_port,			buffer_number);		}	    }	    *((u_long *) temp_buffer) =		htonl(_client_tcp_port->serial_number);	    *((u_long *) temp_buffer + 1) =		htonl(server->set_subscription_reply->success);	    /* successful ? */	    sendn(_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout);	    return;	}	break;    default:	_client_tcp_port->errors++;	rcs_print_error("Unrecognized request type received.(%ld)\n",	    request_type);	break;    }}void CMS_SERVER_REMOTE_TCP_PORT::add_subscription_client(int buffer_number,    int subscription_type, int poll_interval_millis, CLIENT_TCP_PORT * clnt){    if (NULL == subscription_buffers) {	subscription_buffers = new LinkedList();    }    if (NULL == subscription_buffers) {	rcs_print_error("Can`t create subscription_buffers list.\n");    }    TCP_BUFFER_SUBSCRIPTION_INFO *buf_info =	(TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head();    while (NULL != buf_info) {	if (buf_info->buffer_number == buffer_number) {	    break;	}	buf_info =	    (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next();    }    if (NULL == buf_info) {	buf_info = new TCP_BUFFER_SUBSCRIPTION_INFO();	buf_info->buffer_number = buffer_number;	buf_info->sub_clnt_info = new LinkedList();	buf_info->list_id =	    subscription_buffers->store_at_tail(buf_info, sizeof(*buf_info),	    0);    }    buf_info->min_last_id = 0;    if (NULL == clnt->subscriptions) {	clnt->subscriptions = new LinkedList();    }    TCP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info =	(TCP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_head();    while (temp_clnt_info != NULL) {	if (temp_clnt_info->buffer_number == buffer_number) {	    break;	}	temp_clnt_info =	    (TCP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_next();    }    if (NULL == temp_clnt_info) {	temp_clnt_info = new TCP_CLIENT_SUBSCRIPTION_INFO();	temp_clnt_info->last_sub_sent_time = 0.0;	temp_clnt_info->buffer_number = buffer_number;	temp_clnt_info->subscription_paused = 0;	temp_clnt_info->last_id_read = 0;	temp_clnt_info->sub_buf_info = buf_info;	temp_clnt_info->clnt_port = clnt;	temp_clnt_info->last_sub_sent_time = etime();	temp_clnt_info->subscription_list_id =	    clnt->subscriptions->store_at_tail(temp_clnt_info,	    sizeof(*temp_clnt_info), 0);	buf_info->sub_clnt_info->store_at_tail(temp_clnt_info,	    sizeof(*temp_clnt_info), 0);    }    temp_clnt_info->subscription_type = subscription_type;    temp_clnt_info->poll_interval_millis = poll_interval_millis;    recalculate_polling_interval();}void CMS_SERVER_REMOTE_TCP_PORT::remove_subscription_client(CLIENT_TCP_PORT *    clnt, int buffer_number){    TCP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info =	(TCP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_head();    while (temp_clnt_info != NULL) {	if (temp_clnt_info->buffer_number == buffer_number) {	    if (NULL != temp_clnt_info->sub_buf_info) {		if (NULL != temp_clnt_info->sub_buf_info->sub_clnt_info) {		    temp_clnt_info->sub_buf_info->sub_clnt_info->			delete_node(temp_clnt_info->subscription_list_id);		    if (temp_clnt_info->sub_buf_info->sub_clnt_info->			list_size == 0) {			subscription_buffers->delete_node(temp_clnt_info->			    sub_buf_info->list_id);			delete temp_clnt_info->sub_buf_info->sub_clnt_info;			temp_clnt_info->sub_buf_info->sub_clnt_info = NULL;			delete temp_clnt_info->sub_buf_info;			temp_clnt_info->sub_buf_info = NULL;		    }		}	    }	    delete temp_clnt_info;	    temp_clnt_info = NULL;	    break;	}	temp_clnt_info =	    (TCP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_next();    }    recalculate_polling_interval();}void CMS_SERVER_REMOTE_TCP_PORT::recalculate_polling_interval(){    int min_poll_interval_millis = 30000;    polling_enabled = 0;    TCP_BUFFER_SUBSCRIPTION_INFO *buf_info =	(TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head();    while (NULL != buf_info) {	TCP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info =	    (TCP_CLIENT_SUBSCRIPTION_INFO *) buf_info->sub_clnt_info->	    get_head();	while (temp_clnt_info != NULL) {	    if (temp_clnt_info->poll_interval_millis <		min_poll_interval_millis		&& temp_clnt_info->subscription_type ==		CMS_POLLED_SUBSCRIPTION) {		min_poll_interval_millis =		    temp_clnt_info->poll_interval_millis;		polling_enabled = 1;	    }	    temp_clnt_info = (TCP_CLIENT_SUBSCRIPTION_INFO *)		buf_info->sub_clnt_info->get_next();	}	buf_info =	    (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next();    }    if (min_poll_interval_millis >= ((int) (clk_tck() * 1000.0))) {	current_poll_interval_millis = min_poll_interval_millis;    } else {	current_poll_interval_millis = ((int) (clk_tck() * 1000.0));    }    select_timeout.tv_sec = current_poll_interval_millis / 1000;    select_timeout.tv_usec = (current_poll_interval_millis % 1000) * 1000;    dtimeout = (current_poll_interval_millis + 10) * 1000.0;    if (dtimeout < 0.5) {	dtimeout = 0.5;    }}void CMS_SERVER_REMOTE_TCP_PORT::update_subscriptions(){    pid_t pid = getpid();    pid_t tid = 0;    CMS_SERVER *server;    server = find_server(pid, tid);    if (NULL == server) {	rcs_print_error	    ("CMS_SERVER_REMOTE_TCP_PORT::update_subscriptions Cannot find server object for pid = %d.\n",	    pid);	return;    }    if (NULL == subscription_buffers) {	return;    }    double cur_time = etime();    TCP_BUFFER_SUBSCRIPTION_INFO *buf_info =	(TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head();    while (NULL != buf_info) {	server->read_req.buffer_number = buf_info->buffer_number;	server->read_req.access_type = CMS_READ_ACCESS;	server->read_req.last_id_read = buf_info->min_last_id;	server->read_reply =	    (REMOTE_READ_REPLY *) server->process_request(&server->read_req);	if (NULL == server->read_reply) {	    rcs_print_error("Server could not process request.\n");	    buf_info = (TCP_BUFFER_SUBSCRIPTION_INFO *)		subscription_buffers->get_next();	    continue;	}	if (server->read_reply->write_id == buf_info->min_last_id ||	    server->read_reply->size < 1) {	    buf_info = (TCP_BUFFER_SUBSCRIPTION_INFO *)		subscription_buffers->get_next();	    continue;	}	*((u_long *) temp_buffer) = 0;	*((u_long *) temp_buffer + 1) = htonl(server->read_reply->status);	*((u_long *) temp_buffer + 2) = htonl(server->read_reply->size);	*((u_long *) temp_buffer + 3) = htonl(server->read_reply->write_id);	*((u_long *) temp_buffer + 4) = htonl(server->read_reply->was_read);	TCP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info =	    (TCP_CLIENT_SUBSCRIPTION_INFO *) buf_info->sub_clnt_info->	    get_head();	buf_info->min_last_id = server->read_reply->write_id;	while (temp_clnt_info != NULL) {	    double time_diff = cur_time - temp_clnt_info->last_sub_sent_time;	    int time_diff_millis = (int) ((double) time_diff * 1000.0);	    rcs_print_debug(PRINT_SERVER_SUBSCRIPTION_ACTIVITY,		"Subscription time_diff_millis=%d\n", time_diff_millis);	    if (((temp_clnt_info->subscription_type == CMS_POLLED_SUBSCRIPTION			&& time_diff_millis + 10 >=			temp_clnt_info->poll_interval_millis)		    || temp_clnt_info->subscription_type ==		    CMS_VARIABLE_SUBSCRIPTION)		&& temp_clnt_info->last_id_read !=		server->read_reply->write_id) {		temp_clnt_info->last_id_read = server->read_reply->write_id;		temp_clnt_info->last_sub_sent_time = cur_time;		temp_clnt_info->clnt_port->serial_number++;		*((u_long *) temp_buffer) =		    htonl(temp_clnt_info->clnt_port->serial_number);		if (server->read_reply->size < 0x2000 - 20		    && server->read_reply->size > 0) {		    memcpy(temp_buffer + 20, server->read_reply->data,			server->read_reply->size);		    if (sendn			(temp_clnt_info->clnt_port->socket_fd, temp_buffer,			    20 + server->read_reply->size, 0, dtimeout) < 0) {			temp_clnt_info->clnt_port->errors++;			return;		    }		} else {		    if (sendn(temp_clnt_info->clnt_port->socket_fd,			    temp_buffer, 20, 0, dtimeout) < 0) {			temp_clnt_info->clnt_port->errors++;			return;		    }		    if (server->read_reply->size > 0) {			if (sendn(temp_clnt_info->clnt_port->socket_fd,				server->read_reply->data,				server->read_reply->size, 0, dtimeout) < 0) {			    temp_clnt_info->clnt_port->errors++;			    return;			}		    }		}	    }	    if (temp_clnt_info->last_id_read < buf_info->min_last_id) {		buf_info->min_last_id = temp_clnt_info->last_id_read;	    }	    temp_clnt_info = (TCP_CLIENT_SUBSCRIPTION_INFO *)		buf_info->sub_clnt_info->get_next();	}	buf_info =	    (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next();    }}TCP_BUFFER_SUBSCRIPTION_INFO::TCP_BUFFER_SUBSCRIPTION_INFO(){    buffer_number = -1;    min_last_id = 0;    list_id = -1;    sub_clnt_info = NULL;}TCP_BUFFER_SUBSCRIPTION_INFO::~TCP_BUFFER_SUBSCRIPTION_INFO(){    buffer_number = -1;    min_last_id = 0;    list_id = -1;    if (NULL != sub_clnt_info) {	delete sub_clnt_info;	sub_clnt_info = NULL;    }}TCP_CLIENT_SUBSCRIPTION_INFO::TCP_CLIENT_SUBSCRIPTION_INFO(){    subscription_type = CMS_NO_SUBSCRIPTION;    poll_interval_millis = 30000;    last_sub_sent_time = 0.0;    subscription_list_id = -1;    buffer_number = -1;    subscription_paused = 0;    last_id_read = 0;    sub_buf_info = NULL;    clnt_port = NULL;}TCP_CLIENT_SUBSCRIPTION_INFO::~TCP_CLIENT_SUBSCRIPTION_INFO(){    subscription_type = CMS_NO_SUBSCRIPTION;    poll_interval_millis = 30000;    last_sub_sent_time = 0.0;    subscription_list_id = -1;    buffer_number = -1;    subscription_paused = 0;    last_id_read = 0;    sub_buf_info = NULL;    clnt_port = NULL;}CLIENT_TCP_PORT::CLIENT_TCP_PORT(){    serial_number = 0;    errors = 0;    max_errors = 50;    address.sin_port = 0;    address.sin_family = AF_INET;    address.sin_addr.s_addr = htonl(INADDR_ANY);    socket_fd = -1;    subscriptions = NULL;    tid = -1;    pid = -1;    blocking_read_req = NULL;    threadId = 0;    diag_info = NULL;}CLIENT_TCP_PORT::~CLIENT_TCP_PORT(){    if (socket_fd > 0) {	close(socket_fd);	socket_fd = -1;    }    if (NULL != subscriptions) {	TCP_CLIENT_SUBSCRIPTION_INFO *sub_info =	    (TCP_CLIENT_SUBSCRIPTION_INFO *) subscriptions->get_head();	while (NULL != sub_info) {	    delete sub_info;	    sub_info =		(TCP_CLIENT_SUBSCRIPTION_INFO *) subscriptions->get_next();	}	delete subscriptions;	subscriptions = NULL;    }#ifdef NO_THREADS    if (NULL != blocking_read_req) {	delete blocking_read_req;	blocking_read_req = NULL;    }#endif    if (NULL != diag_info) {	delete diag_info;	diag_info = NULL;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -