⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 macedon_transport.cc

📁 这是一个著名的应用层组播中间件的源码
💻 CC
📖 第 1 页 / 共 2 页
字号:
//Copyright (c) 2004, Charles Killian, Adolfo Rodriguez, Dejan Kostic, Sooraj Bhat, and Amin Vahdat//All rights reserved.////Redistribution and use in source and binary forms, with or without//modification, are permitted provided that the following conditions are met:////   * Redistributions of source code must retain the above copyright//     notice, this list of conditions and the following disclaimer.//   * Redistributions in binary form must reproduce the above copyright//     notice, this list of conditions and the following disclaimer in//     the documentation and/or other materials provided with the//     distribution.//   * Neither the names of Duke University nor The University of//     California, San Diego, nor the names of its contributors//     may be used to endorse or promote products derived from//     this software without specific prior written permission.////THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"//AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE//IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE//DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE//FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL//DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR//SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER//CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,//OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE//USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.#include "stdio.h"#include "macedon.h"#include <signal.h>int num_fds=0;macedon_transport::macedon_transport(int ipaddr, int port1, 		  int queue_size1)  : here_addr_(ipaddr), receive_handler(NULL), error_handler(NULL), receiver(NULL), port(port1), queue_size(queue_size1), macedon_connections(0), lock_held(0), initialized(0){  //pthread_mutexattr_t attr;  //pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);  //pthread_mutex_init(&pool_lock, &attr);  pthread_mutex_init(&pool_lock, NULL);  int ret;  pthread_t ctid;  if (!parameters.isset("quiet")) {    printf("macedon_transport created; this = %x\n", this);  }  // create the transport thread  if ((ret = pthread_create(&ctid, NULL, worker_wrapper, this)) != 0) {    perror("pthread_create");    printf ("Error %d in creating main transport thread.\n", ret);    exit(82);  }  signal ( SIGPIPE, SIG_IGN );} macedon_transport::~macedon_transport(){}//   API FUNCTIONS GO HERE// ---------------------------------------------- // register_receiver// ---------------------------------------------- int  macedon_transport::register_receiver (macedon_transport_receiver* receiver1, tr_handler handler1, er_handler handler2){  receive_handler = handler1;  receiver = receiver1;  error_handler = handler2;}// ---------------------------------------------- // send// ---------------------------------------------- int  macedon_transport::send ( int destination, unsigned char *header_data,  int header_size, unsigned char *data,  int size ){  return send(destination, 0, header_data, header_size, data, size);}int  macedon_transport::send ( int destination, int index, unsigned char *header_data,  int header_size, unsigned char *data,  int size ){//        printf ("macedon_transport: send request destination  : %x %x %d %x %d queue %d port %d\n", destination, header_data, header_size, data,size,  queue_size, port);//        fflush(stdout);  if (size<0 || header_size<0) {    exit(89);  }  if (destination == here_addr_) {    printf("Exception: trying to send to self %.8x\n", destination);    dump_hex (header_data, header_size);    dump_hex (data, size);    abort();    exit(99);  }//    if (header_size + size > 4000) {//      printf("Exception: what are you doing? %d %d\n", header_size, size);//      exit(98);//    }#ifdef TRANSPORT_MASSIVE_TRACE  {    printf("macedon_transport: massive trace dump, send requested to %.8x:%d index %d hsize %d size %d\n", destination, port, index, header_size, size);    printf("macedon_tcp_transport: header bytes %.8x:%d ", destination, port);    dump_hex(header_data, header_size);    printf("macedon_tcp_transport: data bytes %.8x:%d ", destination, port);    dump_hex(data, size);  }#endif  lock ();    from_agent = 1;  macedon_connection* neighbor= find_macedon_connection (destination, index);  if (!neighbor) {    neighbor = new_macedon_connection (destination, htons(port), index);  }    if (queue_size !=0 &&      neighbor->send_queue_items > queue_size)    {#ifdef TRANSPORT_TRACE      printf ("macedon_transport: send request destination rejected : %.8x %x %d %x %d items %d queue %d port %d\n", destination, header_data, header_size, data,size, neighbor->send_queue_items, queue_size, port);      fflush(stdout);#endif      neighbor-> items_rejected++;      from_agent = 0;      unlock();      return 1;    }    size_t mysize =  header_size + size + 2*sizeof(int);  unsigned char *mydata = (unsigned char *) malloc(mysize+100);  if (mydata == 0 || (int)mydata%sizeof(int)) {    printf("Malloc exception 1 for size %x %d!\n", mydata, mysize);    exit(72);  }  size_t currentPos = 0;  *((int *)(mydata)) = header_size;  currentPos += sizeof(int);  *((int *)(mydata+currentPos)) = size;  currentPos += sizeof(int);  if (header_size) {    bcopy(header_data, mydata+currentPos, header_size);    currentPos += header_size;  }  if (size) {    bcopy( data, mydata+currentPos, size);    currentPos += size;  }    try {    macedon_transport_send_work* work = new macedon_transport_send_work ();    work->tot_size =  mysize;    work->running_size =  mysize;    work->stuff = mydata;    work->running_buffer = mydata;//     if (work->tot_size >= 20*sizeof(int)) {//       int * dude = (int *) (work->stuff+19*sizeof(int));//       if (*dude == 900) // trace scribe anycast msgs // 	{// 	  printf("sending anycast msg with sizes %d %d\n", header_size, size);// 	  dump_hex((void *)work->stuff, work->tot_size);// 	}//     }        neighbor->queue_back(work);  }  catch (...) {    printf("Exception: out of mem allocing new send work\n");    exit(97);  }  neighbor->trans->send_more(neighbor);  from_agent = 0;  unlock ();    return 0; //0 here means simply that the transport will handle the message from here out.}// ---------------------------------------------- // close// ---------------------------------------------- int  macedon_transport::close (int destination){  close(destination, 0);}int  macedon_transport::close (int destination, int index){  //  printf("macedon_transport: close destination: %x\n", destination);  lock ();  macedon_connection* neighbor= find_macedon_connection (destination, index);  if (!neighbor) {    printf ("macedon_transport: cannot find neighbor : %.8x\n", destination);    unlock ();    return -1;  }    transport_disconnect(neighbor);  unlock ();  return 0;}int  macedon_transport::abort_destination (int destination){  //  printf("macedon_transport: close destination: %x\n", destination);  lock ();  macedon_connection* neighbor= find_macedon_connection (destination, 0);  if (!neighbor) {    printf ("macedon_transport: cannot find neighbor : %.8x\n", destination);    unlock ();    return -1;  }    transport_abort(neighbor);  unlock ();  return 0;}int  macedon_transport::transport_abort (macedon_connection *neighbor){  printf("macedon_transport:::transport_abort empty method\n");  return 0;}// END OF API FUNCTIONS// ---------------------------------------------- // try_lock// ---------------------------------------------- int macedon_transport::try_lock (){  ASSERT(lock_held != pthread_self());  int old_type = 0;  int old_state = 0;#ifdef TRANSPORT_TRACE_LOCK  printf("macedon_transport: %d transport trylock %x for %x in held_by %d\n",pthread_self(),&pool_lock,this, lock_held);#endif  int ret = pthread_mutex_trylock( &pool_lock );  if (ret == 0) {    //pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, & old_type);    //pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, & old_state);    ASSERT(!lock_held);    lock_held = pthread_self();#ifdef TRANSPORT_TRACE_LOCK    printf("macedon_transport: %d transport trylock %x for %x out (held) held_by %d\n",pthread_self(),&pool_lock,this, lock_held);#endif  } else {#ifdef TRANSPORT_TRACE_LOCK    printf("macedon_transport: %d transport trylock %x for %x out (not held) held_by %d\n",pthread_self(),&pool_lock,this,lock_held);#endif  }  return ret;}// ---------------------------------------------- // lock// ---------------------------------------------- int macedon_transport::lock (){  ASSERT(lock_held != pthread_self());  int old_type = 0;  int old_state = 0;#ifdef TRANSPORT_TRACE_LOCK  printf("macedon_transport: %d transport lock %x for %x held_by %d in\n",pthread_self(),&pool_lock,this, lock_held);#endif  pthread_mutex_lock( &pool_lock );  //pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, & old_type);  //pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, & old_state);  ASSERT(!lock_held);  lock_held = pthread_self();#ifdef TRANSPORT_TRACE_LOCK  printf("macedon_transport: %d transport lock %x for %x held_by %d out\n",pthread_self(),&pool_lock,this, lock_held);#endif  return 1;}// ---------------------------------------------- // unlock// ---------------------------------------------- void macedon_transport::unlock (){#ifdef TRANSPORT_TRACE_LOCK  printf("macedon_transport: %d transport unlock %x for %x held_by %d in\n",pthread_self(),&pool_lock,this,lock_held);#endif  ASSERT(lock_held == pthread_self());

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -