📄 socket.cc
字号:
Socket::cleanup(CleanupStage){ if (_active >= 0 && _active != _fd) { close(_active); _active = -1; } if (_rq) _rq->kill(); if (_wq) _wq->kill(); if (_fd >= 0) { // shut down the listening socket in case we forked#ifdef SHUT_RDWR shutdown(_fd, SHUT_RDWR);#else shutdown(_fd, 2);#endif close(_fd); if (_family == AF_UNIX) unlink(_local_pathname.c_str()); _fd = -1; }}boolSocket::allowed(IPAddress addr){ IPAddress gw; if (_allow && _allow->lookup_route(addr, gw) >= 0) return true; else if (_deny && _deny->lookup_route(addr, gw) >= 0) return false; else return true;}voidSocket::close_active(void){ if (_active >= 0) { remove_select(_active, SELECT_READ | SELECT_WRITE); close(_active); if (_verbose) click_chatter("%s: closed connection %d", declaration().c_str(), _active); _active = -1; }}voidSocket::selected(int fd){ int len; union { struct sockaddr_in in; struct sockaddr_un un; } from; socklen_t from_len = sizeof(from); bool allow; if (noutputs()) { // accept new connections if (_socktype == SOCK_STREAM && !_client && _active < 0 && fd == _fd) { _active = accept(_fd, (struct sockaddr *)&from, &from_len); if (_active < 0) { if (errno != EAGAIN) click_chatter("%s: accept: %s", declaration().c_str(), strerror(errno)); return; } if (_family == AF_INET) { allow = allowed(IPAddress(from.in.sin_addr)); if (_verbose) click_chatter("%s: %s connection %d from %s:%d", declaration().c_str(), allow ? "opened" : "denied", _active, IPAddress(from.in.sin_addr).unparse().c_str(), ntohs(from.in.sin_port)); if (!allow) { close(_active); _active = -1; return; } } else { if (_verbose) click_chatter("%s: opened connection %d from %s", declaration().c_str(), _active, from.un.sun_path); } fcntl(_active, F_SETFL, O_NONBLOCK); fcntl(_active, F_SETFD, FD_CLOEXEC); add_select(_active, SELECT_READ | SELECT_WRITE); _events = SELECT_READ | SELECT_WRITE; } // read data from socket if (!_rq) _rq = Packet::make(_headroom, 0, _snaplen, 0); if (_rq) { if (_socktype == SOCK_STREAM) len = read(_active, _rq->data(), _rq->length()); else if (_client) len = recv(_active, _rq->data(), _rq->length(), MSG_TRUNC); else { // datagram server, find out who we are talking to len = recvfrom(_active, _rq->data(), _rq->length(), MSG_TRUNC, (struct sockaddr *)&from, &from_len); if (_family == AF_INET && !allowed(IPAddress(from.in.sin_addr))) { if (_verbose) click_chatter("%s: dropped datagram from %s:%d", declaration().c_str(), IPAddress(from.in.sin_addr).unparse().c_str(), ntohs(from.in.sin_port)); len = -1; errno = EAGAIN; } else if (len > 0) { memcpy(&_remote, &from, from_len); _remote_len = from_len; } } // this segment OK if (len > 0) { if (len > _snaplen) { // truncate packet to max length (should never happen) assert(_rq->length() == (uint32_t)_snaplen); SET_EXTRA_LENGTH_ANNO(_rq, len - _snaplen); } else { // trim packet to actual length _rq->take(_snaplen - len); } // set timestamp if (_timestamp) _rq->timestamp_anno().set_now(); // push packet output(0).push(_rq); _rq = 0; } // connection terminated or fatal error else if (len == 0 || errno != EAGAIN) { if (errno != EAGAIN && _verbose) click_chatter("%s: %s", declaration().c_str(), strerror(errno)); close_active(); return; } } } if (ninputs() && input_is_pull(0)) run_task(0);}intSocket::write_packet(Packet *p){ int len; assert(_active >= 0); while (p->length()) { if (!IPAddress(_remote_ip) && _client && _family == AF_INET && _socktype != SOCK_STREAM) { // If the IP address specified when the element was created is 0.0.0.0, // send the packet to its IP destination annotation address _remote.in.sin_addr = p->dst_ip_anno(); } // write segment if (_socktype == SOCK_STREAM) len = write(_active, p->data(), p->length()); else len = sendto(_active, p->data(), p->length(), 0, (struct sockaddr *)&_remote, _remote_len); // error if (len < 0) { // out of memory or would block if (errno == ENOBUFS || errno == EAGAIN) return -1; // interrupted by signal, try again immediately else if (errno == EINTR) continue; // connection probably terminated or other fatal error else { if (_verbose) click_chatter("%s: %s", declaration().c_str(), strerror(errno)); close_active(); break; } } else // this segment OK p->pull(len); } p->kill(); return 0;}voidSocket::push(int, Packet *p){ fd_set fds; int err; if (_active >= 0) { // block do { FD_ZERO(&fds); FD_SET(_active, &fds); err = select(_active + 1, NULL, &fds, NULL, NULL); } while (err < 0 && errno == EINTR); if (err >= 0) { // write do { err = write_packet(p); } while (err < 0 && (errno == ENOBUFS || errno == EAGAIN)); } if (err < 0) { if (_verbose) click_chatter("%s: %s, dropping packet", declaration().c_str(), strerror(err)); p->kill(); } } else p->kill();}boolSocket::run_task(Task *){ assert(ninputs() && input_is_pull(0)); bool any = false; if (_active >= 0) { Packet *p = 0; int err = 0; // write as much as we can do { p = _wq ? _wq : input(0).pull(); _wq = 0; if (p) { any = true; err = write_packet(p); } } while (p && err >= 0); if (err < 0) { // queue packet for writing when socket becomes available _wq = p; p = 0; add_select(_active, SELECT_WRITE); } else if (_signal) // more pending _task.fast_reschedule(); else // wrote all we could and no more pending remove_select(_active, SELECT_WRITE); } // true if we wrote at least one packet return any;}voidSocket::add_handlers(){ add_task_handlers(&_task);}CLICK_ENDDECLSELEMENT_REQUIRES(userlevel IPRouteTable)EXPORT_ELEMENT(Socket)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -