📄 io.c
字号:
//// This file is part of the C++ threads package.//// Copyright (C) 2000 Orn E. Hansen// A C++ implementation of threaded IO.//#include "io.h"#include "buffer.h"#include "exception.h"#include "signal_num.h"#include <iostream>#if( GCC_VERSION >= 2096 )#include <sstream>#endif#include <cstdio>#include <cerrno>#include <csignal>// Yeah, it's ugly.#if( GCC_VERSIOON < 3000 )# include <strstream>#endif#include "sig_handler.h"namespace cpp_threads { PthreadIO::PthreadIO(const char *ptr_p, int port_p, AsynchronousIO::async_type_t t,__socket_type st,int ipp) :AsynchronousIO(t) { _client = 0; Pthread::debug("PthreadIO::PthreadIO"); _fd.open(SocketAddress::inet,st,ipp); _is_socket = false; _fd.connect(ptr_p,port_p); run(); } PthreadIO::PthreadIO(int port_p, AsynchronousIO::async_type_t t,__socket_type st,int ipp) :AsynchronousIO(t) { _client = 0; _fd.open(SocketAddress::inet,st,ipp); _fd.bind(port_p); _is_socket = true; } PthreadIO::PthreadIO(int port_p, int blog_p, AsynchronousIO::async_type_t t,__socket_type st,int ipp) :AsynchronousIO(t) { if( st != SOCK_STREAM && st != SOCK_SEQPACKET ) exception::fatal( "Wrong socket type" ); _client = 0; _fd.open(SocketAddress::inet,st,ipp); _fd.bind(port_p); _fd.listen(blog_p); _is_socket = true; run(); } PthreadIO::PthreadIO(int fd_p,AsynchronousIO::async_type_t t) :AsynchronousIO(t) { _client = 0; _fd.open(fd_p); _is_socket = false; } PthreadIO::PthreadIO(AsynchronousIO::async_type_t t) :AsynchronousIO(t) { _client = 0; _is_socket = false; } PthreadIO::~PthreadIO() { } void PthreadIO::incomingConnection() { SocketBuffer *n_fd; _active_set.add(n_fd = SocketBuffer::wrapSocket(_fd.accept())); open(*n_fd); Pthread::debug("%d now opened",(int)*n_fd); } void PthreadIO::selectTimeout() { } void PthreadIO::urgent() { } void PthreadIO::signalUrgent(void *) { PthreadIO *pio = (PthreadIO *)Pthread::ptr(getpid()); if( pio->tryLock() == EBUSY ) pio->flushBuffers(); else pio->urgent(); pio->unLock(); } void PthreadIO::flushBuffers() { if( getpid() != id() ) signal( s_urgent ); else _do_buffers = !_sock_queue.empty();; } void PthreadIO::queueSocket(SocketBuffer *bsno_p) { if ( bsno_p == 0 ) bsno_p = &fd(); _sock_queue.remove( bsno_p ); _sock_queue.push_back( bsno_p ); Pthread::debug("%d: queued.",(int)(*bsno_p)); } int PthreadIO::fillSocketBuffer(SocketBuffer& sb_p) { return sb_p.inputBuf().put( sb_p ); } void PthreadIO::got(SocketBuffer&) { exception::fatal( "data going to the void" ); } int PthreadIO::put(SocketBuffer& fd_p,char val_p) { fd_p.outputBuf().put( val_p ); _avail_set.add( &fd_p ); signal( s_urgent ); return 1; } int PthreadIO::put(SocketBuffer& fd_p,const std::string& str_p) { int len = -1; fd_p.outputBuf().put( str_p ); _avail_set.add( &fd_p ); signal( s_urgent ); return len; } int PthreadIO::put(char ch_p) { return put( fd(),ch_p ); } int PthreadIO::put(const std::string& str_p) { Pthread::debug( "putting [%s]",str_p.c_str() ); return put( fd(),str_p ); } int PthreadIO::putf(const char *fmt_p, ...) { std::string s; va_list ap; va_start( ap,fmt_p );#if( GCC_VERSION < 3000 ) strstream sstr; sstr.vform( fmt_p,ap ); s = sstr.str();#else char *string_storage; vasprintf( &string_storage,fmt_p,ap ); s = string_storage; delete string_storage;#endif va_end( ap ); return put( s ); } int PthreadIO::putf(SocketBuffer& fd_p, const char *fmt_p, ...) { std::string s; va_list ap; va_start( ap,fmt_p );#if( GCC_VERSION < 3000 ) strstream sstr; sstr.vform( fmt_p,ap ); s = sstr.str();#else char *string_storage; vasprintf( &string_storage,fmt_p,ap ); s = string_storage; delete string_storage;#endif va_end( ap ); return put( fd_p,s ); } bool PthreadIO::failure(int) { extern int errno; return true; } void PthreadIO::open(Socket& fd_p) { } void PthreadIO::close(Socket& fd_p) { AsynchronousIO* ai = asyncObject(&fd_p); Pthread::debug("PthreadIO::close(%d)",(int)fd_p); if( ai != this ) { unregisterFD(ai); ai->close(fd_p); } _active_set.del( fd_p ); _except_set.del( fd_p ); _avail_set.del( fd_p ); Pthread::debug("closing again?"); fd_p.close(); Pthread::debug("%d officially closed.",(int)fd_p); } void PthreadIO::registerFD(AsynchronousIO *class_p, int sel_p) { iterator i; if ( class_p && class_p->fd() > 0 ) { for( i=_others.begin();i!=_others.end();i++ ) if ( (*i)->fd() == class_p->fd() ) break; if ( sel_p & readSelect_e ) _active_set.add( &class_p->fd() ); if ( sel_p & excSelect_e ) _except_set.add( &class_p->fd() ); if ( _active_set.isSet(class_p->fd()) || _except_set.isSet(class_p->fd()) && i == _others.end() ) _others.push_back( class_p ); signal( s_urgent ); } } void PthreadIO::unregisterFD(AsynchronousIO *class_p, int sel_p) { iterator i; for( i=_others.begin();i!=_others.end();i++ ) if ( (*i) == class_p ) break; if ( i != _others.end() ) { _others.erase( i ); if ( sel_p & readSelect_e ) _active_set.del( (*i)->fd() ); if ( sel_p & excSelect_e ) _except_set.del( (*i)->fd() ); if ( !(_active_set.isSet((*i)->fd()) || _except_set.isSet((*i)->fd())) ) _others.erase( i ); signal( s_urgent ); } } int PthreadIO::thread(void *) { signalling<PthreadIO> ssyst; DescriptorSet::iterator i; DescriptorSet read_set, exc_set, write_set; struct timeval time_out; int rv; if( _mode == AsynchronousIO::dupl_half_e ) _pid = getpid(); ssyst.connect(s_urgent,this,&PthreadIO::signalUrgent); _active_set.zero(); _avail_set.zero(); _timeout.tv_sec = 0; _timeout.tv_usec = 0; if ( _fd >= 0 ) _active_set.add( &_fd ); _do_buffers = false; while( true ) { read_set = _active_set; write_set = _avail_set; exc_set = _except_set; time_out = _timeout; _client = 0; rv = EINTR; if( _do_buffers == false ) rv = select(FD_SETSIZE,read_set,write_set,exc_set,&time_out); if( rv < 0 ) { switch( errno ) { default: case ENOMEM: // Out of memory error... definately fatal. error(errno); exception::fatal( ENOMEM ); case EINTR: SocketBuffer *sb; _do_buffers = false; // // Go through the queue, and find the next socket // that can be delivered. Pthread::debug("signal intr"); if ( _sock_queue.empty() == false ) { sb = _sock_queue.front(); _sock_queue.pop_front(); asyncObject(sb)->jump( sb,1 ); } continue; case EBADF: // We assume that the reason here is, that a descriptor // has been closed, while inside the select loop. We // check all active descriptors and delete each, that is // a bad one. Pthread::debug("bad file descriptor"); for( i=_active_set.begin();i != _active_set.end();i++ ) { if( (bool)*(*i) == false && errno == EBADF ) { Pthread::debug("%d closing",(int)*(SocketBuffer *)(*i)); close( *(*i) ); _sock_queue.remove( (SocketBuffer *)(*i) ); } } continue; case EINVAL: // sleep, until we are signaled to resume. Pthread::debug("slumber chamber"); error(errno); suspend(); continue; } } else if( rv == 0 ) { _timeout.tv_sec = 0; _timeout.tv_usec = 0; selectTimeout(); continue; } for( i=read_set.begin();i != read_set.end();i++ ) if ( read_set.isSet(*(*i)) ) { _client = (SocketBuffer *)(*i); if ( _is_socket && *(*i) == fd() ) { incomingConnection(); } else { int nbytes = fillSocketBuffer( *_client ); if ( nbytes <= 0 ) { if ( nbytes == 0 || failure(*_client) ) close( *_client ); Pthread::debug("finished closing"); } else asyncObject(_client)->jump( _client,nbytes ); } } else if ( exc_set.isSet(*(*i)) ) asyncObject(_client)->jump(); for( i=write_set.begin();i != write_set.end();i++ ) if( write_set.isSet(*(*i)) ) { _client = (SocketBuffer *)(*i); std::string s; _client->outputBuf().get( *_client ); if( _client->outputBuf().empty() ) _avail_set.del( *(*i) ); } } } AsynchronousIO * PthreadIO::asyncObject(Socket* fd_p) { iterator j; if ( fd_p == 0 ) return this; for( j=_others.begin();j!=_others.end();j++ ) if ( (*j)->fd() == *fd_p ) break; if ( j == _others.end() ) return this; return (*j); }}; // namespace
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -