📄 macedon_transport.cc
字号:
//Copyright (c) 2004, Charles Killian, Adolfo Rodriguez, Dejan Kostic, Sooraj Bhat, and Amin Vahdat//All rights reserved.////Redistribution and use in source and binary forms, with or without//modification, are permitted provided that the following conditions are met://// * Redistributions of source code must retain the above copyright// notice, this list of conditions and the following disclaimer.// * Redistributions in binary form must reproduce the above copyright// notice, this list of conditions and the following disclaimer in// the documentation and/or other materials provided with the// distribution.// * Neither the names of Duke University nor The University of// California, San Diego, nor the names of its contributors// may be used to endorse or promote products derived from// this software without specific prior written permission.////THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"//AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE//IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE//DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE//FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL//DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR//SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER//CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,//OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE//USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.#include "stdio.h"#include "macedon.h"#include <signal.h>int num_fds=0;macedon_transport::macedon_transport(int ipaddr, int port1, int queue_size1) : here_addr_(ipaddr), receive_handler(NULL), error_handler(NULL), receiver(NULL), port(port1), queue_size(queue_size1), macedon_connections(0), lock_held(0), initialized(0){ //pthread_mutexattr_t attr; //pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); //pthread_mutex_init(&pool_lock, &attr); pthread_mutex_init(&pool_lock, NULL); int ret; pthread_t ctid; if (!parameters.isset("quiet")) { printf("macedon_transport created; this = %x\n", this); } // create the transport thread if ((ret = pthread_create(&ctid, NULL, worker_wrapper, this)) != 0) { perror("pthread_create"); printf ("Error %d in creating main transport thread.\n", ret); exit(82); } signal ( SIGPIPE, SIG_IGN );} macedon_transport::~macedon_transport(){}// API FUNCTIONS GO HERE// ---------------------------------------------- // register_receiver// ---------------------------------------------- int macedon_transport::register_receiver (macedon_transport_receiver* receiver1, tr_handler handler1, er_handler handler2){ receive_handler = handler1; receiver = receiver1; error_handler = handler2;}// ---------------------------------------------- // send// ---------------------------------------------- int macedon_transport::send ( int destination, unsigned char *header_data, int header_size, unsigned char *data, int size ){ return send(destination, 0, header_data, header_size, data, size);}int macedon_transport::send ( int destination, int index, unsigned char *header_data, int header_size, unsigned char *data, int size ){// printf ("macedon_transport: send request destination : %x %x %d %x %d queue %d port %d\n", destination, header_data, header_size, data,size, queue_size, port);// fflush(stdout); if (size<0 || header_size<0) { exit(89); } if (destination == here_addr_) { printf("Exception: trying to send to self %.8x\n", destination); dump_hex (header_data, header_size); dump_hex (data, size); abort(); exit(99); }// if (header_size + size > 4000) {// printf("Exception: what are you doing? %d %d\n", header_size, size);// exit(98);// }#ifdef TRANSPORT_MASSIVE_TRACE { printf("macedon_transport: massive trace dump, send requested to %.8x:%d index %d hsize %d size %d\n", destination, port, index, header_size, size); printf("macedon_tcp_transport: header bytes %.8x:%d ", destination, port); dump_hex(header_data, header_size); printf("macedon_tcp_transport: data bytes %.8x:%d ", destination, port); dump_hex(data, size); }#endif lock (); from_agent = 1; macedon_connection* neighbor= find_macedon_connection (destination, index); if (!neighbor) { neighbor = new_macedon_connection (destination, htons(port), index); } if (queue_size !=0 && neighbor->send_queue_items > queue_size) {#ifdef TRANSPORT_TRACE printf ("macedon_transport: send request destination rejected : %.8x %x %d %x %d items %d queue %d port %d\n", destination, header_data, header_size, data,size, neighbor->send_queue_items, queue_size, port); fflush(stdout);#endif neighbor-> items_rejected++; from_agent = 0; unlock(); return 1; } size_t mysize = header_size + size + 2*sizeof(int); unsigned char *mydata = (unsigned char *) malloc(mysize+100); if (mydata == 0 || (int)mydata%sizeof(int)) { printf("Malloc exception 1 for size %x %d!\n", mydata, mysize); exit(72); } size_t currentPos = 0; *((int *)(mydata)) = header_size; currentPos += sizeof(int); *((int *)(mydata+currentPos)) = size; currentPos += sizeof(int); if (header_size) { bcopy(header_data, mydata+currentPos, header_size); currentPos += header_size; } if (size) { bcopy( data, mydata+currentPos, size); currentPos += size; } try { macedon_transport_send_work* work = new macedon_transport_send_work (); work->tot_size = mysize; work->running_size = mysize; work->stuff = mydata; work->running_buffer = mydata;// if (work->tot_size >= 20*sizeof(int)) {// int * dude = (int *) (work->stuff+19*sizeof(int));// if (*dude == 900) // trace scribe anycast msgs // {// printf("sending anycast msg with sizes %d %d\n", header_size, size);// dump_hex((void *)work->stuff, work->tot_size);// }// } neighbor->queue_back(work); } catch (...) { printf("Exception: out of mem allocing new send work\n"); exit(97); } neighbor->trans->send_more(neighbor); from_agent = 0; unlock (); return 0; //0 here means simply that the transport will handle the message from here out.}// ---------------------------------------------- // close// ---------------------------------------------- int macedon_transport::close (int destination){ close(destination, 0);}int macedon_transport::close (int destination, int index){ // printf("macedon_transport: close destination: %x\n", destination); lock (); macedon_connection* neighbor= find_macedon_connection (destination, index); if (!neighbor) { printf ("macedon_transport: cannot find neighbor : %.8x\n", destination); unlock (); return -1; } transport_disconnect(neighbor); unlock (); return 0;}int macedon_transport::abort_destination (int destination){ // printf("macedon_transport: close destination: %x\n", destination); lock (); macedon_connection* neighbor= find_macedon_connection (destination, 0); if (!neighbor) { printf ("macedon_transport: cannot find neighbor : %.8x\n", destination); unlock (); return -1; } transport_abort(neighbor); unlock (); return 0;}int macedon_transport::transport_abort (macedon_connection *neighbor){ printf("macedon_transport:::transport_abort empty method\n"); return 0;}// END OF API FUNCTIONS// ---------------------------------------------- // try_lock// ---------------------------------------------- int macedon_transport::try_lock (){ ASSERT(lock_held != pthread_self()); int old_type = 0; int old_state = 0;#ifdef TRANSPORT_TRACE_LOCK printf("macedon_transport: %d transport trylock %x for %x in held_by %d\n",pthread_self(),&pool_lock,this, lock_held);#endif int ret = pthread_mutex_trylock( &pool_lock ); if (ret == 0) { //pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, & old_type); //pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, & old_state); ASSERT(!lock_held); lock_held = pthread_self();#ifdef TRANSPORT_TRACE_LOCK printf("macedon_transport: %d transport trylock %x for %x out (held) held_by %d\n",pthread_self(),&pool_lock,this, lock_held);#endif } else {#ifdef TRANSPORT_TRACE_LOCK printf("macedon_transport: %d transport trylock %x for %x out (not held) held_by %d\n",pthread_self(),&pool_lock,this,lock_held);#endif } return ret;}// ---------------------------------------------- // lock// ---------------------------------------------- int macedon_transport::lock (){ ASSERT(lock_held != pthread_self()); int old_type = 0; int old_state = 0;#ifdef TRANSPORT_TRACE_LOCK printf("macedon_transport: %d transport lock %x for %x held_by %d in\n",pthread_self(),&pool_lock,this, lock_held);#endif pthread_mutex_lock( &pool_lock ); //pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, & old_type); //pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, & old_state); ASSERT(!lock_held); lock_held = pthread_self();#ifdef TRANSPORT_TRACE_LOCK printf("macedon_transport: %d transport lock %x for %x held_by %d out\n",pthread_self(),&pool_lock,this, lock_held);#endif return 1;}// ---------------------------------------------- // unlock// ---------------------------------------------- void macedon_transport::unlock (){#ifdef TRANSPORT_TRACE_LOCK printf("macedon_transport: %d transport unlock %x for %x held_by %d in\n",pthread_self(),&pool_lock,this,lock_held);#endif ASSERT(lock_held == pthread_self());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -