📄 poll.cxx
字号:
VVERBOSE(log) << fn << ": " << *this << VVERBOSE_END(log); pollFdEntry.revents = 0; continue; } Protocol * protocol = (*protocolIt).second; if ( protocol == 0 ) { VWARN(log) << fn << ": no protocol associated with fd = " << it->fd << VWARN_END(log); continue; } if ( processPriority(pollFdEntry, protocol) ) { VVERBOSE(log) << fn << ": " << *this << VVERBOSE_END(log); numberFdsActive--; pollFdEntry.revents = 0; continue; } if ( processIncoming(pollFdEntry, protocol) ) { VVERBOSE(log) << fn << ": " << *this << VVERBOSE_END(log); numberFdsActive--; pollFdEntry.revents = 0; continue; } if ( processOutgoing(pollFdEntry, protocol) ) { VVERBOSE(log) << fn << ": " << *this << VVERBOSE_END(log); numberFdsActive--; pollFdEntry.revents = 0; continue; } if ( processHangup(pollFdEntry, protocol) ) { VVERBOSE(log) << fn << ": " << *this << VVERBOSE_END(log); numberFdsActive--; pollFdEntry.revents = 0; continue; } if ( processError(pollFdEntry, protocol) ) { VVERBOSE(log) << fn << ": " << *this << VVERBOSE_END(log); numberFdsActive--; } pollFdEntry.revents = 0; } VDEBUG(log) << fn << ": End: number file descriptors active = " << numberFdsActive << ", protocols changed = " << ( myProtocolsChanged ? "true" : "false" ) << VDEBUG_END(log);}void Poll::interrupt()throw ( Vocal::SystemException ){ const string fn("Poll::interrupt"); VLog log(fn); Lock lock(myInterruptorMutex); (void)lock; myInterruptCount++; myInterruptor.writeFD().write(&myInterruptCount, 1); VDEBUG(log) << fn << ": Interrupting: " << (unsigned)myInterruptCount << " outstanding interrupts." << VDEBUG_END(log);}u_int8_t Poll::interruptCount() const{ return ( myInterruptCount );}ostream & Poll::writeTo(ostream & out) const{ out << "protocols (size: " << myProtocols.size() << ") = \n{\n"; for ( ProtocolMap::const_iterator it = myProtocols.begin(); it != myProtocols.end(); it++ ) { if ( it->first.fd == myInterruptor.readFD().getFD() ) { continue; } out << " "; if ( it->second ) { out << *it->second << "\n"; } else { out << "no protocol.\n"; } } out << "}\ninterruptor = " << myInterruptor << "\nnumber outstanding interruptions = " << (unsigned)myInterruptCount << "\nprotocolsChanged = " << ( myProtocolsChanged ? "true" : "false" ); size_t size = myFds.size(); out << "\nfds (size: " << size << ") = \n{\n"; for ( size_t i = 0; i < size; i++ ) { out << " ( " << myFds[i].fd << ", 0x" << hex << setw(2) << setfill('0') << (unsigned)(myFds[i].events) << ", 0x" << hex << setw(2) << setfill('0') << (unsigned)(myFds[i].revents) << dec << " )\n"; } out << "}"; return ( out );} void Poll::addInterruptorToFds(){ pollfd interruptorPollFd; interruptorPollFd.fd = myInterruptor.readFD().getFD(); interruptorPollFd.events = POLLIN; interruptorPollFd.revents = 0; myFds.insert(myFds.end(), interruptorPollFd);}void Poll::addProtocolsToFds(){ for ( ProtocolMap::iterator it = myProtocols.begin(); it != myProtocols.end(); it++ ) { myFds.insert(myFds.end(), it->first); }}boolPoll::processInterruptor(pollfd & pollFdEntry){ const string fn("Poll::processInterruptor"); VLog log(fn); VVERBOSE(log) << fn << ": fd = " << pollFdEntry.fd << VVERBOSE_END(log); if ( pollFdEntry.fd == myInterruptor.readFD().getFD() ) { if ( pollFdEntry.revents & POLLIN ) { u_int8_t interruptNumber = 0; IPAddress remoteAddr; Lock lock(myInterruptorMutex); (void)lock; VDEBUG(log) << fn << ": Interrupted: " << (unsigned)myInterruptCount << " outstanding interrupts." << VDEBUG_END(log); do { // Let the user catch the exception, if thrown. // myInterruptor.readFD().read(&interruptNumber, 1); } while ( interruptNumber < myInterruptCount ); myInterruptCount = 0; return ( true ); } else { return ( false ); } } return ( false );}boolPoll::processPriority(pollfd & pollFdEntry, Protocol * protocol){ const string fn("Poll::processPriority"); VLog log(fn); VDEBUG(log) << fn << ": fd = " << pollFdEntry.fd << VDEBUG_END(log); if ( pollFdEntry.revents & POLLPRI ) { VDEBUG(log) << fn << ": Priority for protocol = " << *protocol << VDEBUG_END(log); // Notify the user of a priority message on the file descriptor. // try { protocol->onPriority(); } catch ( Vocal::SystemStatus & ) { // The status indicates that we need to poll again. // This is the desfault behavior, so do nothing. } catch ( Vocal::Transport::ConnectionBrokenException & e) { return ( processHangup(pollFdEntry, protocol, &e) ); } catch ( Vocal::SystemException & e ) { return ( processError(pollFdEntry, protocol, &e) ); } catch ( ... ) { protocol->handleUnknownException(); } return ( true ); } return ( false );}boolPoll::processIncoming(pollfd & pollFdEntry, Protocol * protocol){ const string fn("Poll::processIncoming"); VLog log(fn); VVERBOSE(log) << fn << ": fd = " << pollFdEntry.fd << VVERBOSE_END(log); if ( pollFdEntry.revents & POLLIN ) { VDEBUG(log) << fn << ": IncomingAvailable for protocol = " << *protocol << VDEBUG_END(log); // Notify the user of an incoming message on the file // descriptor. // try { protocol->onIncomingAvailable(); } catch ( Vocal::SystemStatus & ) { // The status indicates that we need to poll again. // Do nothing. } catch ( Vocal::Transport::ConnectionBrokenException & e ) { return ( processHangup(pollFdEntry, protocol, &e) ); } catch ( Vocal::SystemException & e ) { return ( processError(pollFdEntry, protocol, &e) ); } catch ( ... ) { protocol->handleUnknownException(); } return ( true ); } return ( false );}boolPoll::processOutgoing(pollfd & pollFdEntry, Protocol * protocol){ const string fn("Poll::processOutgoing"); VLog log(fn); VVERBOSE(log) << fn << ": fd = " << pollFdEntry.fd << VVERBOSE_END(log); if ( pollFdEntry.revents & POLLOUT ) { VDEBUG(log) << fn << ": OutgoingAvailable for protocol = " << *protocol << VDEBUG_END(log); // Notify the user of that the file descriptor is // available to send outgoing messages. // try { protocol->onOutgoingAvailable(); } catch ( Vocal::SystemStatus ) { // The status indicates that we need to poll again. // Do nothing. } catch ( Vocal::Transport::ConnectionBrokenException & e ) { return ( processHangup(pollFdEntry, protocol, &e) ); } catch ( Vocal::SystemException & e ) { return ( processError(pollFdEntry, protocol, &e) ); } catch ( ... ) { protocol->handleUnknownException(); } return ( true ); } return ( false );}boolPoll::processHangup( pollfd & pollFdEntry, Protocol * protocol, ConnectionBrokenException * connectionBrokenException){ const string fn("Poll::processHangup"); VLog log(fn); VVERBOSE(log) << fn << ": fd = " << pollFdEntry.fd << VVERBOSE_END(log); if ( connectionBrokenException != 0 || pollFdEntry.revents & POLLHUP ) { VDEBUG(log) << fn << ": Disconnect for protocol = " << *protocol << VDEBUG_END(log); // Notify the user of that the file descriptor has hung up. // try { protocol->onDisconnect(connectionBrokenException); } catch ( Vocal::SystemStatus & ) { // The status indicates that we need to poll again. // Do nothing. } catch ( Vocal::SystemException & e ) { return ( processError(pollFdEntry, protocol, &e) ); } catch ( ... ) { protocol->handleUnknownException(); } return ( true ); } return ( false );}boolPoll::processError( pollfd & pollFdEntry, Protocol * protocol, SystemException * systemException){ const string fn("Poll::processError"); VLog log(fn); VVERBOSE(log) << fn << ": fd = " << pollFdEntry.fd << VVERBOSE_END(log); if ( systemException != 0 || pollFdEntry.revents & POLLERR || pollFdEntry.revents & POLLNVAL ) { VWARN(log) << fn << ": Error for protocol = " << *protocol << VWARN_END(log); // Notify the user of the error. // try { protocol->onError(systemException); } catch ( ... ) { protocol->handleUnknownException(); } return ( true ); } return ( false );}bool pollfdLess::operator()(const pollfd & left, const pollfd & right) const{ return ( left.fd < right.fd );}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -