📄 connection.cpp
字号:
#include "connection.h"
#include <string>
#include "message.h"
#include "messagerouter.h"
#include "network.h"
#include "world.h"
Network* Connection::_network= NULL;
int Connection::_default_recv_buf_len= 1024;
int Connection::_default_send_buf_len= 1024;
int Connection::_type_size= 2;
int Connection::_len_size= 2;
INSTANTIATION_TYPE( bool );
INSTANTIATION_TYPE( char );
INSTANTIATION_TYPE( unsigned char );
INSTANTIATION_TYPE( short );
INSTANTIATION_TYPE( unsigned short );
INSTANTIATION_TYPE( int );
INSTANTIATION_TYPE( unsigned int );
INSTANTIATION_TYPE( long );
INSTANTIATION_TYPE( unsigned long );
INSTANTIATION_TYPE( float );
INSTANTIATION_TYPE( double );
INSTANTIATION_TYPE( long double );
template<>
EXPORT Buffer& operator >> <string> ( Buffer& __buffer, string& __t )
{
__t= __buffer._buf;
__buffer._buf += __t.size()+1;
__buffer._len -= static_cast<int>( __t.size()+1 );
return __buffer;
}
template<>
EXPORT Buffer& operator << <string> ( Buffer& __buffer, const string& __t )
{
if( static_cast<int>( __t.size()+1 ) <= __buffer._len ){
memcpy( __buffer._buf, __t.c_str(), __t.size() );
__buffer._buf += __t.size();
*__buffer._buf= '\0';
++__buffer._buf;
__buffer._len -= static_cast<int>( __t.size() + 1 );
}else{
throw -1;
}
return __buffer;
}
void Connection::send( shared_ptr<Message>& __msg_ptr )
{
recursive_mutex::scoped_lock lock( _mutex_msgs_sending );
if( _connected ){
if( __msg_ptr ){
_msgs_sending.push_back( __msg_ptr );
}
}
}
int Connection::get_default_recv_buf_len(void)
{
return _default_recv_buf_len;
}
void Connection::set_default_recv_buf_len( int __len )
{
_default_recv_buf_len= __len;
}
int Connection::get_default_send_buf_len(void)
{
return _default_send_buf_len;
}
void Connection::set_default_send_buf_len( int __len )
{
_default_send_buf_len= __len;
}
int Connection::get_type_size(void)
{
return _type_size;
}
void Connection::set_type_size( int __size )
{
if( __size==2 || __size==4 ){
_type_size= __size;
}
}
int Connection::get_len_size(void)
{
return _len_size;
}
void Connection::set_len_size( int __size )
{
if( __size==2 || __size==4 ){
_len_size= __size;
}
}
Connection::Connection()
: _recv_buf_len(Connection::_default_recv_buf_len), _send_buf_len(Connection::_default_send_buf_len)
{
_connected= false;
_recv_buf= new char[_recv_buf_len];
_recv_data_len= 0;
_send_buf= new char[_send_buf_len];
_send_data= _send_buf;
_send_data_len= 0;
_remote= NULL;
}
Connection::Connection( SOCKET __fd, sockaddr_in& __address )
: _recv_buf_len(Connection::_default_recv_buf_len), _send_buf_len(Connection::_default_send_buf_len)
{
_connected= true;
_recv_buf= new char[_recv_buf_len];
_recv_data_len= 0;
_send_buf= new char[_send_buf_len];
_send_data= _send_buf;
_send_data_len= 0;
_remote= NULL;
_fd= __fd;
_address= __address;
setnonblocking( _fd );
event_set( &_read_ev, static_cast<int>(_fd), EV_READ | EV_PERSIST, connection_recv, this );
event_add( &_read_ev, NULL );
event_set( &_write_ev, static_cast<int>(_fd), EV_WRITE | EV_PERSIST, connection_send, this );
event_add( &_write_ev, NULL );
}
Connection::~Connection()
{
if( _connected ){
event_del( &_read_ev );
event_del( &_write_ev );
socket_shutdown( _fd );
}
delete [] _recv_buf;
delete [] _send_buf;
}
int Connection::listen( int __port, int __backlog ){ recursive_mutex::scoped_lock lock( _mutex ); if( !_connected ){ _fd= socket( AF_INET, SOCK_STREAM, 0 ); if( _fd==INVALID_SOCKET ){ cerr << "listen connection socket create failed." << endl; return -1; }else{ _address.sin_family= AF_INET; _address.sin_addr.s_addr= htonl(INADDR_ANY); _address.sin_port= htons(__port); int bret= ::bind( _fd, reinterpret_cast<sockaddr*>(&_address), sizeof(_address) ); if( bret ){ socket_shutdown( _fd ); cerr << "can't bind to: " << __port << endl; return -1; } int lret= ::listen( _fd, 1024 ); if( lret ){ socket_shutdown( _fd ); cerr << "can't listen at: " << __port << endl; return -1; } setnonblocking( _fd ); event_set( &_read_ev, static_cast<int>(_fd), EV_READ | EV_PERSIST, connection_accept, this );
event_add( &_read_ev, NULL );
_connected= true;
return 0; } }else{ return -1; }}int Connection::connect( const char* __server_ip, int __port )
{
recursive_mutex::scoped_lock lock( _mutex );
if( !_connected ){ _fd= socket( AF_INET, SOCK_STREAM, 0 ); if( _fd==INVALID_SOCKET ){ cerr << "connect connection socket create failed." << endl; return -1; }else{ _address.sin_family= AF_INET; _address.sin_addr.s_addr= inet_addr( __server_ip ); _address.sin_port= htons( __port ); int cret= ::connect( _fd, reinterpret_cast<sockaddr*>(&_address), sizeof(_address) ); if( cret==0 ){//succeed setnonblocking( _fd ); event_set( &_read_ev, static_cast<int>(_fd), EV_READ | EV_PERSIST, connection_recv, this );
event_add( &_read_ev, NULL );
event_set( &_write_ev, static_cast<int>(_fd), EV_WRITE | EV_PERSIST, connection_send, this );
event_add( &_write_ev, NULL ); _connected= true; return 0; }else{//can't connect to remote host socket_shutdown( _fd ); cerr << "Can't connect to remote host: " << __server_ip << " : " << __port << endl; return -1; } } }else{ return -1; }
}
void Connection::accept(void)
{
recursive_mutex::scoped_lock lock( _mutex );
SOCKET cfd;//client socket fd sockaddr_in caddress;//client address int clen= sizeof(caddress); do{ cfd= ::accept( _fd, reinterpret_cast<sockaddr*>(&caddress), &clen ); if( cfd!=INVALID_SOCKET ){ Connection* conn= new Connection( cfd, caddress ); _network->accepted( conn ); } }while( cfd!=INVALID_SOCKET );
}
void Connection::recv(void)
{
recursive_mutex::scoped_lock lock( _mutex );
int ret= ::recv( _fd, _recv_buf+_recv_data_len, _recv_buf_len-_recv_data_len, 0 );
if( ret > 0 ){
_recv_data_len += ret;
char* data= _recv_buf;
while( _recv_data_len > 0 ){
if( _recv_data_len < _type_size + _len_size ){
break;
}
long len= 0;//len == _type_size + value_size
switch( _len_size ){
case 2:
len= ntohs( * reinterpret_cast<short*>(data) );
break;
case 4:
len= ntohl( * reinterpret_cast<long*>(data) );
break;
default:
cerr << "Wrong _len_size: " << _len_size << "\t(should be 2 or 4)" << endl;
if( _remote ){
_remote->disconnected();
}
_network->disconnect( this );
return;
break;
}
if( len > _recv_data_len - _len_size ){
break;
}
data += _len_size;
_recv_data_len -= _len_size;
long value_size= len - _type_size;
int type= 0;
switch( _type_size ){
case 2:
type= ntohs( * reinterpret_cast<short*>(data) );
break;
case 4:
type= ntohl( * reinterpret_cast<long*>(data) );
break;
default:
cerr << "Wrong _type_size: " << _type_size << "\t(should be 2 or 4)" << endl;
if( _remote ){
_remote->disconnected();
}
_network->disconnect( this );
return;
break;
}
data += _type_size;
_recv_data_len -= _type_size;
shared_ptr<Message> msg( static_cast<Message*>( _network->getWorld().getFactory().newInstance( type ) ) );
if( msg ){
try{
if( msg->load( BUFFER( data, value_size ) ) ){
if( msg->preProcess( this, _remote, & _network->getWorld(), msg ) ){
_network->getWorld().getMsgRouter().queueMsg( msg );
}else{
msg.reset();
}
}else{
cerr << "message can't load" << endl;
msg.reset();
}
}catch( exception& e ){
cerr << "network message parsing error: " << e.what() << endl;
}catch( ... ){ cerr << "network message parsing error: " << "Unknown exception" << endl; }
}else{
cerr << "can't create message from type: " << type << endl;
}
data += value_size;
_recv_data_len -= value_size;
}
if( data!=_recv_buf ){
memmove( _recv_buf, data, _recv_data_len );
}
}else if( ret = 0 ){//the peer has performed an orderly shutdown.
if( _remote ){
_remote->disconnected();
}
_network->disconnect( this );
return;
}else{//error occurs
print_error( "connection recv error" );
if( _remote ){
_remote->disconnected();
}
_network->disconnect( this );
return;
}
}
void Connection::send(void)
{
recursive_mutex::scoped_lock lock( _mutex );
recursive_mutex::scoped_lock lock_msgs_sending( _mutex_msgs_sending );
if( !_msgs_sending.empty() ){
if( _send_data!=_send_buf ){
memmove( _send_buf, _send_data, _send_data_len );
_send_data= _send_buf;
}
for( list< shared_ptr<Message> >::iterator itor= _msgs_sending.begin(); itor!=_msgs_sending.end(); ){
shared_ptr<Message>& msg= (*itor);
if( msg->getType()==0 ){
itor= _msgs_sending.erase(itor);
continue;
}
assert( _len_size + _type_size + msg->value_size() <= _send_buf_len );
if( _len_size + _type_size + msg->value_size() <= _send_buf_len-_send_data_len ){
switch( _len_size ){
case 2:
*reinterpret_cast<short*>(_send_data+_send_data_len)= htons( _type_size+msg->value_size() );
break;
case 4:
*reinterpret_cast<long*>(_send_data+_send_data_len)= htonl( _type_size+msg->value_size() );
break;
default:
cerr << "Wrong _len_size: " << _len_size << "\t(should be 2 or 4)" << endl;
if( _remote ){
_remote->disconnected();
}
_network->disconnect( this );
return;
break;
}
switch( _type_size ){
case 2:
*reinterpret_cast<short*>(_send_data+_send_data_len+_len_size)= htons( msg->getType() );
break;
case 4:
*reinterpret_cast<long*>(_send_data+_send_data_len+_len_size)= htonl( msg->getType() );
break;
default:
cerr << "Wrong _type_size: " << _type_size << "\t(should be 2 or 4)" << endl;
if( _remote ){
_remote->disconnected();
}
_network->disconnect( this );
return;
break;
}
bool ret;
try{
ret= msg->save( BUFFER( _send_data+_send_data_len+_len_size+_type_size, _send_buf_len-_send_data_len-_len_size-_type_size ) );
}catch( exception& e ){
cerr << "network message storing error: " << e.what() << endl;
}catch( ... ){ cerr << "network message storing error: " << "Unknown exception" << endl; }
if( ret ){//if successed
_send_data_len += _len_size + _type_size + msg->value_size();
}else{//error occurs
cerr << "Message saving to buffer failed, ignored" << endl;
}
itor= _msgs_sending.erase(itor);
}else{
break;
}
}
}
if( _send_data_len >0 ){
int ret= ::send( _fd, _send_data, _send_data_len, 0 );
if( ret >= _send_data_len ){//all sent
_send_data_len= 0;
_send_data= _send_buf;
}else if( ret >= 0 ){
_send_data += ret;
_send_data_len -= ret;
}else{//error occurs
print_error( "connection send error" );
if( _remote ){
_remote->disconnected();
}
_network->disconnect( this );
return;
}
}
}
void connection_accept( int __fd, short _event, void* __arg )
{
Connection& con= * static_cast<Connection*>( __arg );
con.accept();
}
void connection_recv( int __fd, short _event, void* __arg )
{
Connection& con= * static_cast<Connection*>( __arg );
con.recv();
}
void connection_send( int __fd, short _event, void* __arg )
{
Connection& con= * static_cast<Connection*>( __arg );
con.send();
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -