⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 io.c

📁 用c++包装好的线程库,直接拿来使用,提高效率.
💻 C
字号:
//// This file is part of the C++ threads package.//// Copyright (C) 2000 Orn E. Hansen// A C++ implementation of threaded IO.//#include "io.h"#include "buffer.h"#include "exception.h"#include "signal_num.h"#include <iostream>#if( GCC_VERSION >= 2096 )#include <sstream>#endif#include <cstdio>#include <cerrno>#include <csignal>// Yeah, it's ugly.#if( GCC_VERSIOON < 3000 )# include <strstream>#endif#include "sig_handler.h"namespace cpp_threads {  PthreadIO::PthreadIO(const char *ptr_p, int port_p, AsynchronousIO::async_type_t t,__socket_type st,int ipp)    :AsynchronousIO(t)  {    _client = 0;    Pthread::debug("PthreadIO::PthreadIO");    _fd.open(SocketAddress::inet,st,ipp);    _is_socket = false;    _fd.connect(ptr_p,port_p);    run();  }  PthreadIO::PthreadIO(int port_p, AsynchronousIO::async_type_t t,__socket_type st,int ipp)    :AsynchronousIO(t)  {    _client = 0;    _fd.open(SocketAddress::inet,st,ipp);    _fd.bind(port_p);    _is_socket = true;  }  PthreadIO::PthreadIO(int port_p, int blog_p, AsynchronousIO::async_type_t t,__socket_type st,int ipp)    :AsynchronousIO(t)  {    if( st != SOCK_STREAM && st != SOCK_SEQPACKET )      exception::fatal( "Wrong socket type" );    _client = 0;    _fd.open(SocketAddress::inet,st,ipp);    _fd.bind(port_p);    _fd.listen(blog_p);    _is_socket = true;    run();  }  PthreadIO::PthreadIO(int fd_p,AsynchronousIO::async_type_t t)    :AsynchronousIO(t)  {    _client = 0;    _fd.open(fd_p);    _is_socket = false;  }  PthreadIO::PthreadIO(AsynchronousIO::async_type_t t)    :AsynchronousIO(t)  {    _client = 0;    _is_socket = false;  }  PthreadIO::~PthreadIO()  {  }  void  PthreadIO::incomingConnection()  {    SocketBuffer *n_fd;    _active_set.add(n_fd = SocketBuffer::wrapSocket(_fd.accept()));    open(*n_fd);    Pthread::debug("%d now opened",(int)*n_fd);  }  void  PthreadIO::selectTimeout()  {  }  void  PthreadIO::urgent()  {  }  void  PthreadIO::signalUrgent(void *)  {    PthreadIO *pio = (PthreadIO *)Pthread::ptr(getpid());    if( pio->tryLock() == EBUSY )      pio->flushBuffers();    else      pio->urgent();    pio->unLock();  }  void  PthreadIO::flushBuffers()  {    if( getpid() != id() )      signal( s_urgent );    else      _do_buffers = !_sock_queue.empty();;  }  void  PthreadIO::queueSocket(SocketBuffer *bsno_p)  {    if ( bsno_p == 0 )      bsno_p = &fd();    _sock_queue.remove( bsno_p );    _sock_queue.push_back( bsno_p );    Pthread::debug("%d: queued.",(int)(*bsno_p));  }  int  PthreadIO::fillSocketBuffer(SocketBuffer& sb_p)  {    return sb_p.inputBuf().put( sb_p );  }  void  PthreadIO::got(SocketBuffer&)  {    exception::fatal( "data going to the void" );  }  int  PthreadIO::put(SocketBuffer& fd_p,char val_p)  {    fd_p.outputBuf().put( val_p );    _avail_set.add( &fd_p );    signal( s_urgent );    return 1;  }  int  PthreadIO::put(SocketBuffer& fd_p,const std::string& str_p)  {    int len = -1;    fd_p.outputBuf().put( str_p );    _avail_set.add( &fd_p );    signal( s_urgent );    return len;  }  int  PthreadIO::put(char ch_p)  {    return put( fd(),ch_p );  }  int  PthreadIO::put(const std::string& str_p)  {    Pthread::debug( "putting [%s]",str_p.c_str() );    return put( fd(),str_p );  }  int  PthreadIO::putf(const char *fmt_p, ...)  {    std::string s;    va_list ap;    va_start( ap,fmt_p );#if( GCC_VERSION < 3000 )    strstream sstr;    sstr.vform( fmt_p,ap );    s = sstr.str();#else    char *string_storage;    vasprintf( &string_storage,fmt_p,ap );    s = string_storage;    delete string_storage;#endif    va_end( ap );    return put( s );  }  int  PthreadIO::putf(SocketBuffer& fd_p, const char *fmt_p, ...)  {    std::string s;    va_list ap;    va_start( ap,fmt_p );#if( GCC_VERSION < 3000 )    strstream sstr;    sstr.vform( fmt_p,ap );    s = sstr.str();#else    char *string_storage;    vasprintf( &string_storage,fmt_p,ap );    s = string_storage;    delete string_storage;#endif    va_end( ap );    return put( fd_p,s );  }  bool  PthreadIO::failure(int)  {    extern int errno;    return true;  }  void  PthreadIO::open(Socket& fd_p)  {  }  void  PthreadIO::close(Socket& fd_p)  {    AsynchronousIO* ai = asyncObject(&fd_p);    Pthread::debug("PthreadIO::close(%d)",(int)fd_p);    if( ai != this ) {      unregisterFD(ai);      ai->close(fd_p);    }    _active_set.del( fd_p );    _except_set.del( fd_p );    _avail_set.del( fd_p );    Pthread::debug("closing again?");    fd_p.close();    Pthread::debug("%d officially closed.",(int)fd_p);  }  void  PthreadIO::registerFD(AsynchronousIO *class_p, int sel_p)  {    iterator i;    if ( class_p && class_p->fd() >  0  ) {      for( i=_others.begin();i!=_others.end();i++ )	if ( (*i)->fd() == class_p->fd() )	  break;      if ( sel_p & readSelect_e )	_active_set.add( &class_p->fd() );      if ( sel_p & excSelect_e )	_except_set.add( &class_p->fd() );      if ( _active_set.isSet(class_p->fd()) || 	   _except_set.isSet(class_p->fd()) &&	   i == _others.end() )	_others.push_back( class_p );      signal( s_urgent );    }  }  void  PthreadIO::unregisterFD(AsynchronousIO *class_p, int sel_p)  {    iterator i;    for( i=_others.begin();i!=_others.end();i++ )      if ( (*i) == class_p )	break;    if ( i != _others.end() ) {      _others.erase( i );      if ( sel_p & readSelect_e )	_active_set.del( (*i)->fd() );      if ( sel_p & excSelect_e )	_except_set.del( (*i)->fd() );      if ( !(_active_set.isSet((*i)->fd()) || _except_set.isSet((*i)->fd())) )	_others.erase( i );      signal( s_urgent );    }  }  int  PthreadIO::thread(void *)  {    signalling<PthreadIO>    ssyst;    DescriptorSet::iterator  i;    DescriptorSet            read_set, exc_set, write_set;    struct timeval           time_out;    int                      rv;    if( _mode == AsynchronousIO::dupl_half_e )      _pid = getpid();    ssyst.connect(s_urgent,this,&PthreadIO::signalUrgent);    _active_set.zero();    _avail_set.zero();    _timeout.tv_sec  = 0;    _timeout.tv_usec = 0;    if ( _fd >= 0 )      _active_set.add( &_fd );    _do_buffers  = false;    while( true ) {      read_set   = _active_set;      write_set  = _avail_set;      exc_set    = _except_set;      time_out   = _timeout;      _client    = 0;      rv         = EINTR;      if( _do_buffers == false )	rv       = select(FD_SETSIZE,read_set,write_set,exc_set,&time_out);      if( rv < 0 ) {	switch( errno ) {	default:	case ENOMEM: // Out of memory error... definately fatal.	  error(errno);	  exception::fatal( ENOMEM );	case EINTR:	  SocketBuffer *sb;	  _do_buffers = false;	  //	  // Go through the queue, and find the next socket	  // that can be delivered.	  Pthread::debug("signal intr");	  if ( _sock_queue.empty() == false ) {	    sb = _sock_queue.front();	    _sock_queue.pop_front();	    asyncObject(sb)->jump( sb,1 );	  }	  continue;	case EBADF:	  // We assume that the reason here is, that a descriptor	  // has been closed, while inside the select loop.  We	  // check all active descriptors and delete each, that is	  // a bad one.	  Pthread::debug("bad file descriptor");	  for( i=_active_set.begin();i != _active_set.end();i++ ) {	    if( (bool)*(*i) == false && errno == EBADF ) {	      Pthread::debug("%d closing",(int)*(SocketBuffer *)(*i));	      close( *(*i) );	      _sock_queue.remove( (SocketBuffer *)(*i) );	    }	  }	  continue;	case EINVAL: // sleep, until we are signaled to resume.	  Pthread::debug("slumber chamber");	  error(errno);	  suspend();	  continue;	}      } else if( rv == 0 ) {	_timeout.tv_sec  = 0;	_timeout.tv_usec = 0;	selectTimeout();	continue;      }      for( i=read_set.begin();i != read_set.end();i++ )	if ( read_set.isSet(*(*i)) ) {	  _client = (SocketBuffer *)(*i);	  if ( _is_socket && *(*i) == fd() ) {	    incomingConnection();	  } else {	    int nbytes = fillSocketBuffer( *_client );	    if ( nbytes <= 0 ) {	      if ( nbytes == 0 || failure(*_client) )		close( *_client );	      Pthread::debug("finished closing");	    } else	      asyncObject(_client)->jump( _client,nbytes );	  }	} else if ( exc_set.isSet(*(*i)) )	  asyncObject(_client)->jump();      for( i=write_set.begin();i != write_set.end();i++ )	if( write_set.isSet(*(*i)) ) {	  _client = (SocketBuffer *)(*i);	  std::string s;	  _client->outputBuf().get( *_client );	  if( _client->outputBuf().empty() )	    _avail_set.del( *(*i) );	}    }  }  AsynchronousIO *  PthreadIO::asyncObject(Socket* fd_p)  {    iterator j;        if ( fd_p == 0 )      return this;    for( j=_others.begin();j!=_others.end();j++ )      if ( (*j)->fd() == *fd_p )	break;    if ( j == _others.end() )      return this;    return (*j);  }}; // namespace

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -