giopserver.cc

来自「编译工具」· CC 代码 · 共 1,880 行 · 第 1/4 页

CC
1,880
字号
	omniORB::logger log;	log << "Cannot create a rendezvouser for this endpoint: ";	log << (*i)->address();	log << "\n";      }      delete task;      i++;      continue;    }    pd_endpoints.erase(i);    task->insert(pd_rendezvousers);  }  omnivector<giopStrand*>::iterator j;  j = pd_bidir_strands.begin();  while (j != pd_bidir_strands.end()) {    giopStrand* g = *j;    pd_bidir_strands.erase(j);    connectionState* cs = csInsert(g);    if (cs->connection->pd_has_dedicated_thread) {      giopWorker* task = new giopWorker(cs->strand,this,0);      if (!orbAsyncInvoker->insert(task)) {	// Cannot start serving this new connection.	if (omniORB::trace(1)) {	  omniORB::logger log;	  log << "Cannot create a worker for this bidirectional connection: "	      << " to "	      << cs->connection->peeraddress()	      << "\n";	}	delete task;	cs->connection->Shutdown();	csRemove(cs->connection);	pd_lock.unlock();	delete cs;	pd_lock.lock();	continue;      }      task->insert(cs->workers);      cs->connection->pd_n_workers++;      pd_n_dedicated_workers++;    }    else {      pd_lock.unlock();      cs->connection->setSelectable(1);      pd_lock.lock();    }  }  {    omnivector<giopActiveCollection*>::iterator i;    i = pd_bidir_collections.begin();    while (i != pd_bidir_collections.end()) {      giopMonitor* task = new giopMonitor(*i,this);      if (!orbAsyncInvoker->insert(task)) {	// Cannot start serving this collection.	// Do not raise an exception. Instead, just moan about it.	if (omniORB::trace(1)) {	  omniORB::logger log;	  log << "Cannot create a monitor for this bidir collection type: ";	  log << (*i)->type();	  log << "\n";	}	delete task;      }      else {	task->insert(pd_bidir_monitors);      }      pd_bidir_collections.erase(i);    }  }}////////////////////////////////////////////////////////////////////////static inline voidsendCloseConnection(giopStrand* s){  // Send close connection message.  char hdr[12];  hdr[0] = 'G'; hdr[1] = 'I'; hdr[2] = 'O'; hdr[3] = 'P';  hdr[4] = s->version.major;   hdr[5] = s->version.minor;  hdr[6] = _OMNIORB_HOST_BYTE_ORDER_;  hdr[7] = (char)GIOP::CloseConnection;  hdr[8] = hdr[9] = hdr[10] = hdr[11] = 0;  if (omniORB::trace(25)) {    omniORB::logger log;    log << "sendCloseConnection: to " << s->connection->peeraddress()	<< " 12 bytes\n";  }  if (omniORB::trace(30))    giopStream::dumpbuf((unsigned char*)hdr, 12);  s->connection->Send(hdr,12);}////////////////////////////////////////////////////////////////////////////voidgiopServer::deactivate(){  ASSERT_OMNI_TRACEDMUTEX_HELD(pd_lock, 1);  OMNIORB_ASSERT(pd_state == ACTIVE);  pd_state = INFLUX;  CORBA::Boolean waitforcompletion;  CORBA::Boolean rendezvousers_terminated = 0;  omniORB::logs(25, "giopServer deactivate...");  unsigned long s, ns;  unsigned long timeout;  if (orbParameters::scanGranularity)    timeout = orbParameters::scanGranularity;  else    timeout = 5;  if (pd_nconnections) {    omniORB::logs(25, "Close connections with threads and monitors...");    CORBA::ULong closed_count = 0;    for (CORBA::ULong i=0; i < connectionState::hashsize; i++) {      connectionState** head = &(pd_connectionState[i]);      while (*head) {	if ((*head)->connection->pd_has_dedicated_thread ||	    (*head)->strand->biDir) {	  sendCloseConnection((*head)->strand);	  (*head)->connection->Shutdown();	  ++closed_count;	}	head = &((*head)->next);      }    }    if (omniORB::trace(25)) {      omniORB::logger l;      l << "Closed " << closed_count << " connection" << plural(closed_count)	<< " out of " << pd_nconnections << ".\n";    }    if (pd_n_dedicated_workers) {      if (omniORB::trace(25)) {	omniORB::logger l;	l << "Wait for " << pd_n_dedicated_workers	  << " dedicated thread" << plural(pd_n_dedicated_workers)	  << " to finish...\n";      }      omni_thread::get_time(&s, &ns, timeout);      int go = 1;      while (go && pd_n_dedicated_workers) {	go = pd_cond.timedwait(s, ns);      }      if (omniORB::trace(25)) {	omniORB::logger l;	if (!go)	  l << "Timed out. ";	l << pd_nconnections << " connection" << plural(pd_nconnections)	  << " and " << pd_n_dedicated_workers << " dedicated worker"	  << plural(pd_n_dedicated_workers) << " remaining.\n";      }    }  }  // Stop rendezvousers  Link* p = pd_rendezvousers.next;  if (p != &pd_rendezvousers) {    omniORB::logs(25, "Terminate rendezvousers...");    for (; p != &pd_rendezvousers; p = p->next) {      ((giopRendezvouser*)p)->terminate();    }    omni_thread::get_time(&s, &ns, timeout);    int go = 1;    while (go && pd_rendezvousers.next != & pd_rendezvousers) {      go = pd_cond.timedwait(s, ns);    }    if (omniORB::trace(25)) {      omniORB::logger l;      if (go)	l << "Rendezvousers terminated.\n";      else	l << "Timed out waiting for rendezvousers to terminate.\n";    }  }  // Stop bidir monitors  if (!Link::is_empty(pd_bidir_monitors)) {    omniORB::logs(25, "Deactivate bidirectional monitors...");    Link* m = pd_bidir_monitors.next;    for (; m != &pd_bidir_monitors; m = m->next) {      ((giopMonitor*)m)->deactivate();    }    omni_thread::get_time(&s, &ns, timeout);    int go = 1;    while (go && !Link::is_empty(pd_bidir_monitors)) {      go = pd_cond.timedwait(s, ns);    }    if (omniORB::trace(25)) {      omniORB::logger l;      if (go)	l << "Monitors deactivated.\n";      else	l << "Timed out waiting for monitors to deactivate.\n";    }  }  if (pd_nconnections) {    omniORB::logs(25, "Close remaining connections...");    CORBA::ULong closed_count = 0;    for (CORBA::ULong i=0; i < connectionState::hashsize; i++) {      connectionState** head = &(pd_connectionState[i]);      while (*head) {	if (!((*head)->connection->pd_has_dedicated_thread ||	      (*head)->strand->biDir)) {	  sendCloseConnection((*head)->strand);	  (*head)->connection->Shutdown();	  // Start a worker to notice the connection closure.	  giopWorker* task = new giopWorker((*head)->strand, this, 1);	  orbAsyncInvoker->insert(task);	  task->insert((*head)->workers);	  ++(*head)->connection->pd_n_workers;	  ++pd_n_temporary_workers;	  ++closed_count;	}	head = &((*head)->next);      }    }    if (omniORB::trace(25)) {      omniORB::logger l;      l << "Closed " << closed_count << " connection" << plural(closed_count)	<< " out of " << pd_nconnections << ".\n";    }  }  if (pd_n_temporary_workers) {    if (omniORB::trace(25)) {      omniORB::logger l;      l << "Wait for " << pd_n_temporary_workers << " temporary worker"	<< plural(pd_n_temporary_workers) << "...\n";    }    int go = 1;    while (go && pd_n_temporary_workers) {      go = pd_cond.timedwait(s, ns);    }    if (omniORB::trace(25)) {      omniORB::logger l;      if (!go)	l << "Timed out. ";      l << pd_n_temporary_workers << " temporary worker"	<< plural(pd_n_temporary_workers) << " remaining.\n";    }  }  // Close bidir connections  if (pd_bidir_strands.size()) {    if (omniORB::trace(25)) {      omniORB::logger l;      l << "Close " << pd_bidir_strands.size()	<< " bidirectional connections...\n";    }    omnivector<giopStrand*>::iterator i = pd_bidir_strands.begin();    while (i != pd_bidir_strands.end()) {      giopStrand* g = *i;      pd_bidir_strands.erase(i);      g->connection->Shutdown();      g->deleteStrandAndConnection();    }  }  pd_state = IDLE;  pd_cond.broadcast();  omniORB::logs(25, "giopServer deactivated.");}////////////////////////////////////////////////////////////////////////////voidgiopServer::ensureNotInFlux(){  while (pd_state == INFLUX) {    pd_cond.wait();    // Note: we could have more than one thread blocking here so must    //       be wake up by a broadcast. Or else we have to keep a count    //       on how many is waiting.  }}////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////CORBA::ULong giopServer::connectionState::hashsize = 103;////////////////////////////////////////////////////////////////////////////giopServer::connectionState::connectionState(giopConnection* c,giopStrand* s) :  connection(c), strand(s), next(0){  omni_tracedmutex_lock sync(*omniTransportLock);  c->incrRefCount();}////////////////////////////////////////////////////////////////////////////giopServer::connectionState::~connectionState(){  OMNIORB_ASSERT(Link::is_empty(workers));  omni_tracedmutex_lock sync(*omniTransportLock);  strand->deleteStrandAndConnection();}////////////////////////////////////////////////////////////////////////////giopServer::connectionState*giopServer::csLocate(giopConnection* conn){  ASSERT_OMNI_TRACEDMUTEX_HELD(pd_lock,1);  connectionState** head = &(pd_connectionState[((omni::ptr_arith_t)conn)%						connectionState::hashsize]);  while (*head) {    if ((*head)->connection == conn)      break;    else {      head = &((*head)->next);    }  }  return *head;}////////////////////////////////////////////////////////////////////////////giopServer::connectionState*giopServer::csRemove(giopConnection* conn){  ASSERT_OMNI_TRACEDMUTEX_HELD(pd_lock,1);  connectionState* cs = 0;  connectionState** head = &(pd_connectionState[((omni::ptr_arith_t)conn)%						connectionState::hashsize]);  while (*head) {    if ((*head)->connection == conn) {      cs = *head;      *head = cs->next;      pd_nconnections--;      if (orbParameters::threadPerConnectionPolicy) {	// Check the number of connection and decide if we need to	// re-enable the one thread per connection policy that has	// been temporarily suspended.	if (!pd_thread_per_connection &&	    pd_nconnections <= orbParameters::threadPerConnectionLowerLimit) {	  pd_thread_per_connection = 1;	}      }      break;    }    else {      head = &((*head)->next);    }  }  return cs;}////////////////////////////////////////////////////////////////////////////giopServer::connectionState*giopServer::csInsert(giopConnection* conn){  ASSERT_OMNI_TRACEDMUTEX_HELD(pd_lock,1);  giopStrand* s = new giopStrand(conn,this);  s->version.major = 1; s->version.minor = 0;  {    ASSERT_OMNI_TRACEDMUTEX_HELD(*omniTransportLock,0);    omni_tracedmutex_lock sync(*omniTransportLock);    s->StrandList::insert(giopStrand::passive);    s->startIdleCounter();  }  connectionState* cs =  new connectionState(conn,s);  connectionState** head = &(pd_connectionState[((omni::ptr_arith_t)conn)%						connectionState::hashsize]);  cs->next = *head;  *head = cs;  pd_nconnections++;  if (orbParameters::threadPerConnectionPolicy) {    // Check the number of connection and decide if we need to    // turn off the one thread per connection policy temporarily.    if (pd_thread_per_connection &&	pd_nconnections >= orbParameters::threadPerConnectionUpperLimit) {      pd_thread_per_connection = 0;    }  }  conn->pd_has_dedicated_thread = pd_thread_per_connection;  return cs;}////////////////////////////////////////////////////////////////////////////giopServer::connectionState*giopServer::csInsert(giopStrand* s){  ASSERT_OMNI_TRACEDMUTEX_HELD(pd_lock,1);  OMNIORB_ASSERT(s->biDir && s->isClient());  giopConnection* conn = s->connection;  connectionState* cs =  new connectionState(conn,s);  connectionState** head = &(pd_connectionState[((omni::ptr_arith_t)conn)%						connectionState::hashsize]);  cs->next = *head;  *head = cs;  pd_nconnections++;  if (orbParameters::threadPerConnectionPolicy) {    // Check the number of connection and decide if we need to    // turn off the one thread per connection policy temporarily.    if (pd_thread_per_connection &&	pd_nconnections >= orbParameters::threadPerConnectionUpperLimit) {      pd_thread_per_connection = 0;    }  }  conn->pd_has_dedicated_thread = pd_thread_per_connection;  {    omni_tracedmutex_lock sync(*omniTransportLock);    s->connection->decrRefCount();  }  return cs;}////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////voidgiopServer::notifyRzNewConnection(giopRendezvouser* r, giopConnection* conn){  omni_tracedmutex_lock sync(pd_lock);  switch (pd_state) {  case ACTIVE:    {      connectionState* cs = csInsert(conn);

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?