📄 chattersocket.cc
字号:
/* * chattersocket.{cc,hh} -- element echoes chatter to TCP/IP or Unix-domain * sockets * Eddie Kohler * * Copyright (c) 2000 Massachusetts Institute of Technology * Copyright (c) 2001 International Computer Science Institute * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), * to deal in the Software without restriction, subject to the conditions * listed in the Click LICENSE file. These conditions include: you must * preserve this copyright notice, and you cannot mention the copyright * holders in advertising related to the Software without their permission. * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This * notice is a summary of the Click LICENSE file; the license in that file is * legally binding. */#include <click/config.h>#include "chattersocket.hh"#include <click/confparse.hh>#include <click/error.hh>#include <click/router.hh>#include <click/straccum.hh>#include <clicknet/tcp.h> /* for SEQ_LT, etc. */#include <unistd.h>#include <sys/socket.h>#include <sys/un.h>#include <arpa/inet.h>#include <fcntl.h>CLICK_DECLSconst char ChatterSocket::protocol_version[] = "1.0";struct ChatterSocketErrorHandler : public ErrorVeneer { Vector<ChatterSocket *> _chatter_sockets; public: ChatterSocketErrorHandler(ErrorHandler *errh) : ErrorVeneer(errh) { } ErrorHandler *base_errh() const { return _errh; } int nchatter_sockets() const { return _chatter_sockets.size(); } void add_chatter_socket(ChatterSocket *); void remove_chatter_socket(ChatterSocket *); void handle_text(Seriousness, const String &); };voidChatterSocketErrorHandler::add_chatter_socket(ChatterSocket *cs){ for (int i = 0; i < _chatter_sockets.size(); i++) if (_chatter_sockets[i] == cs) return; _chatter_sockets.push_back(cs);}voidChatterSocketErrorHandler::remove_chatter_socket(ChatterSocket *cs){ for (int i = 0; i < _chatter_sockets.size(); i++) if (_chatter_sockets[i] == cs) { _chatter_sockets[i] = _chatter_sockets.back(); _chatter_sockets.pop_back(); return; }}voidChatterSocketErrorHandler::handle_text(Seriousness seriousness, const String &m){ String actual_m = m; if (m.length() > 0 && m.back() != '\n') actual_m += '\n'; _errh->handle_text(seriousness, actual_m); for (int i = 0; i < _chatter_sockets.size(); i++) _chatter_sockets[i]->handle_text(seriousness, actual_m);}static ChatterSocketErrorHandler *chatter_socket_errh;static ErrorHandler *base_default_errh;ChatterSocket::ChatterSocket() : _socket_fd(-1), _channel("default"), _retry_timer(0){}ChatterSocket::~ChatterSocket(){}intChatterSocket::configure(Vector<String> &conf, ErrorHandler *errh){ String socktype; if (cp_va_parse(conf, this, errh, cpString, "type of socket (`TCP' or `UNIX')", &socktype, cpIgnoreRest, cpEnd) < 0) return -1; // remove keyword arguments bool quiet_channel = true, greeting = true, retry_warnings = true; _retries = 0; if (cp_va_parse_remove_keywords(conf, 2, this, errh, "CHANNEL", cpWord, "chatter channel", &_channel, "QUIET_CHANNEL", cpElement, "channel is quiet?", &quiet_channel, "GREETING", cpBool, "greet connectors?", &greeting, "RETRIES", cpInteger, "number of retries", &_retries, "RETRY_WARNINGS", cpBool, "warn on unsuccessful socket attempt?", &retry_warnings, cpEnd) < 0) return -1; _greeting = greeting; _retry_warnings = retry_warnings; socktype = socktype.upper(); if (socktype == "TCP") { _tcp_socket = true; unsigned short portno; if (cp_va_parse(conf, this, errh, cpIgnore, cpUnsignedShort, "port number", &portno, cpEnd) < 0) return -1; _unix_pathname = String(portno); } else if (socktype == "UNIX") { _tcp_socket = false; if (cp_va_parse(conf, this, errh, cpIgnore, cpString, "filename", &_unix_pathname, cpEnd) < 0) return -1; if (_unix_pathname.length() >= (int)sizeof(((struct sockaddr_un *)0)->sun_path)) return errh->error("filename too long"); } else return errh->error("unknown socket type `%s'", socktype.cc()); // Create channel now, so that other configure() methods will get it. ChatterSocketErrorHandler *cserrh; if (_channel == "default" && chatter_socket_errh) cserrh = chatter_socket_errh; else if (_channel == "default") { base_default_errh = ErrorHandler::default_handler(); chatter_socket_errh = new ChatterSocketErrorHandler(base_default_errh); ErrorHandler::set_default_handler(chatter_socket_errh); cserrh = chatter_socket_errh; } else if (void *v = router()->attachment("ChatterChannel." + _channel)) cserrh = (ChatterSocketErrorHandler *)v; else { ErrorHandler *base = (quiet_channel ? ErrorHandler::silent_handler() : base_default_errh); if (!base) base = ErrorHandler::default_handler(); cserrh = new ChatterSocketErrorHandler(base); router()->set_attachment("ChatterChannel." + _channel, cserrh); } // install ChatterSocketErrorHandler cserrh->add_chatter_socket(this); return 0;}intChatterSocket::initialize_socket_error(ErrorHandler *errh, const char *syscall){ int e = errno; // preserve errno if (_socket_fd >= 0) { close(_socket_fd); _socket_fd = -1; } if (_retries >= 0) { if (_retry_warnings) errh->warning("%s: %s (%d %s left)", syscall, strerror(e), _retries + 1, (_retries == 0 ? "try" : "tries")); return -EINVAL; } else return errh->error("%s: %s", syscall, strerror(e));}intChatterSocket::initialize_socket(ErrorHandler *errh){ _retries--; // open socket, set options, bind to address if (_tcp_socket) { _socket_fd = socket(PF_INET, SOCK_STREAM, 0); if (_socket_fd < 0) return initialize_socket_error(errh, "socket"); int sockopt = 1; if (setsockopt(_socket_fd, SOL_SOCKET, SO_REUSEADDR, (void *)&sockopt, sizeof(sockopt)) < 0) errh->warning("setsockopt: %s", strerror(errno)); // bind to port int portno; (void) cp_integer(_unix_pathname, &portno); struct sockaddr_in sa; sa.sin_family = AF_INET; sa.sin_port = htons(portno); sa.sin_addr = inet_makeaddr(0, 0); if (bind(_socket_fd, (struct sockaddr *)&sa, sizeof(sa)) < 0) return initialize_socket_error(errh, "bind"); } else { _socket_fd = socket(PF_UNIX, SOCK_STREAM, 0); if (_socket_fd < 0) return initialize_socket_error(errh, "socket"); // bind to port struct sockaddr_un sa; sa.sun_family = AF_UNIX; memcpy(sa.sun_path, _unix_pathname.cc(), _unix_pathname.length() + 1); if (bind(_socket_fd, (struct sockaddr *)&sa, sizeof(sa)) < 0) return initialize_socket_error(errh, "bind"); } // start listening if (listen(_socket_fd, 2) < 0) return initialize_socket_error(errh, "listen"); // nonblocking I/O and close-on-exec for the socket fcntl(_socket_fd, F_SETFL, O_NONBLOCK); fcntl(_socket_fd, F_SETFD, FD_CLOEXEC); add_select(_socket_fd, SELECT_READ); return 0;}voidChatterSocket::retry_hook(Timer *t, void *thunk){ ChatterSocket *cs = (ChatterSocket *)thunk; if (cs->_socket_fd >= 0) /* nada */; else if (cs->initialize_socket(ErrorHandler::default_handler()) >= 0) /* nada */; else if (cs->_retries >= 0) t->reschedule_after_s(1); else cs->router()->please_stop_driver();}intChatterSocket::initialize(ErrorHandler *errh){ _max_pos = 0; _live_fds = 0; if (initialize_socket(errh) >= 0) return 0; else if (_retries >= 0) { _retry_timer = new Timer(retry_hook, this); _retry_timer->initialize(this); _retry_timer->schedule_after_s(1); return 0; } else return -1;}voidChatterSocket::take_state(Element *e, ErrorHandler *errh){ ChatterSocket *cs = (ChatterSocket *)e->cast("ChatterSocket"); if (!cs) return; if (_socket_fd >= 0) { errh->error("already initialized, can't take state"); return; } else if (_tcp_socket != cs->_tcp_socket || _unix_pathname != cs->_unix_pathname || _channel != cs->_channel) { errh->error("incompatible ChatterSockets"); return; } _socket_fd = cs->_socket_fd; cs->_socket_fd = -1; _messages.swap(cs->_messages); _message_pos.swap(cs->_message_pos); _max_pos = cs->_max_pos; _fd_alive.swap(cs->_fd_alive); _fd_pos.swap(cs->_fd_pos); _live_fds = cs->_live_fds; cs->_live_fds = 0; if (_socket_fd >= 0) add_select(_socket_fd, SELECT_READ); for (int i = 0; i < _fd_alive.size(); i++) if (_fd_alive[i]) add_select(i, SELECT_WRITE);}static voidremove_chatter_channel(ChatterSocketErrorHandler *&cserrh, ChatterSocket *cs){ if (cserrh) { cserrh->remove_chatter_socket(cs); if (!cserrh->nchatter_sockets()) { if (cserrh == chatter_socket_errh) ErrorHandler::set_default_handler(base_default_errh); delete cserrh; cserrh = 0; } }}voidChatterSocket::cleanup(CleanupStage){ if (_socket_fd >= 0) { // shut down the listening socket in case we forked#ifdef SHUT_RDWR shutdown(_socket_fd, SHUT_RDWR);#else shutdown(_socket_fd, 2);#endif close(_socket_fd); if (!_tcp_socket) unlink(_unix_pathname.c_str()); _socket_fd = -1; } for (int i = 0; i < _fd_alive.size(); i++) if (_fd_alive[i]) { close(i); _fd_alive[i] = 0; } _live_fds = 0; if (_retry_timer) { _retry_timer->cleanup(); delete _retry_timer; _retry_timer = 0; } // unhook from chatter socket error handler if (_channel == "default") remove_chatter_channel(chatter_socket_errh, this); else remove_chatter_channel ((ChatterSocketErrorHandler *&)(router()->force_attachment("ChatterChannel." + _channel)), this);}intChatterSocket::flush(int fd){ // check file descriptor if (fd >= _fd_alive.size() || !_fd_alive[fd]) return _messages.size(); // check if all data written if (_fd_pos[fd] == _max_pos) return _messages.size(); // find first useful message (binary search) uint32_t fd_pos = _fd_pos[fd]; int l = 0, r = _messages.size() - 1, useful_message = -1; while (l <= r) { int m = (l + r) >> 1; if (SEQ_LT(fd_pos, _message_pos[m])) r = m - 1; else if (SEQ_GEQ(fd_pos, _message_pos[m] + _messages[m].length())) l = m + 1; else { useful_message = m; break; } } // if messages found, write data until blocked or closed if (useful_message >= 0) { while (useful_message < _message_pos.size()) { const String &m = _messages[useful_message]; int mpos = _message_pos[useful_message]; const char *data = m.data() + (fd_pos - mpos); int len = m.length() - (fd_pos - mpos); int w = write(fd, data, len); if (w < 0 && errno != EINTR) { if (errno != EAGAIN) // drop connection on error, except WOULDBLOCK useful_message = -1; break; } else if (w > 0) fd_pos += w; if (SEQ_GEQ(fd_pos, mpos + m.length())) useful_message++; } } // store changed fd_pos _fd_pos[fd] = fd_pos; // close out on error, or if socket falls too far behind if (useful_message < 0 || SEQ_LT(fd_pos, _max_pos - MAX_BACKLOG)) { close(fd); remove_select(fd, SELECT_WRITE); _fd_alive[fd] = 0; _live_fds--; } else if (fd_pos == _max_pos) remove_select(fd, SELECT_WRITE); else add_select(fd, SELECT_WRITE); return useful_message;}voidChatterSocket::flush(){ int min_useful_message = _messages.size(); if (min_useful_message) for (int i = 0; i < _fd_alive.size(); i++) if (_fd_alive[i] >= 0) { int m = flush(i); if (m < min_useful_message) min_useful_message = m; } // cull old messages if (min_useful_message >= 10) { _messages.erase(_messages.begin(), _messages.begin() + min_useful_message); _message_pos.erase(_message_pos.begin(), _message_pos.begin() + min_useful_message); }}voidChatterSocket::selected(int fd){ if (fd == _socket_fd) { union { struct sockaddr_in in; struct sockaddr_un un; } sa;#ifdef __APPLE__ /* macos x ??? */ int sa_len;#else socklen_t sa_len;#endif sa_len = sizeof(sa); int new_fd = accept(_socket_fd, (struct sockaddr *)&sa, &sa_len); if (new_fd < 0) { if (errno != EAGAIN) click_chatter("%s: accept: %s", declaration().cc(), strerror(errno)); return; } fcntl(new_fd, F_SETFL, O_NONBLOCK); fcntl(new_fd, F_SETFD, FD_CLOEXEC); while (new_fd >= _fd_alive.size()) { _fd_alive.push_back(0); _fd_pos.push_back(0); } _fd_alive[new_fd] = 1; _fd_pos[new_fd] = _max_pos; _live_fds++; fd = new_fd; // no need to SELECT_WRITE; flush(fd) will do it if required if (_greeting) { // XXX - assume that this write will succeed String s = String("Click::ChatterSocket/") + protocol_version + "\r\n"; int w = write(fd, s.data(), s.length()); if (w != s.length()) click_chatter("%s fd %d: unable to write greeting!", declaration().cc(), fd); } } flush(fd);}CLICK_ENDDECLSELEMENT_REQUIRES(userlevel)EXPORT_ELEMENT(ChatterSocket)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -