giopserver.cc
来自「编译工具」· CC 代码 · 共 1,880 行 · 第 1/4 页
CC
1,880 行
omniORB::logger log; log << "Cannot create a rendezvouser for this endpoint: "; log << (*i)->address(); log << "\n"; } delete task; i++; continue; } pd_endpoints.erase(i); task->insert(pd_rendezvousers); } omnivector<giopStrand*>::iterator j; j = pd_bidir_strands.begin(); while (j != pd_bidir_strands.end()) { giopStrand* g = *j; pd_bidir_strands.erase(j); connectionState* cs = csInsert(g); if (cs->connection->pd_has_dedicated_thread) { giopWorker* task = new giopWorker(cs->strand,this,0); if (!orbAsyncInvoker->insert(task)) { // Cannot start serving this new connection. if (omniORB::trace(1)) { omniORB::logger log; log << "Cannot create a worker for this bidirectional connection: " << " to " << cs->connection->peeraddress() << "\n"; } delete task; cs->connection->Shutdown(); csRemove(cs->connection); pd_lock.unlock(); delete cs; pd_lock.lock(); continue; } task->insert(cs->workers); cs->connection->pd_n_workers++; pd_n_dedicated_workers++; } else { pd_lock.unlock(); cs->connection->setSelectable(1); pd_lock.lock(); } } { omnivector<giopActiveCollection*>::iterator i; i = pd_bidir_collections.begin(); while (i != pd_bidir_collections.end()) { giopMonitor* task = new giopMonitor(*i,this); if (!orbAsyncInvoker->insert(task)) { // Cannot start serving this collection. // Do not raise an exception. Instead, just moan about it. if (omniORB::trace(1)) { omniORB::logger log; log << "Cannot create a monitor for this bidir collection type: "; log << (*i)->type(); log << "\n"; } delete task; } else { task->insert(pd_bidir_monitors); } pd_bidir_collections.erase(i); } }}////////////////////////////////////////////////////////////////////////static inline voidsendCloseConnection(giopStrand* s){ // Send close connection message. char hdr[12]; hdr[0] = 'G'; hdr[1] = 'I'; hdr[2] = 'O'; hdr[3] = 'P'; hdr[4] = s->version.major; hdr[5] = s->version.minor; hdr[6] = _OMNIORB_HOST_BYTE_ORDER_; hdr[7] = (char)GIOP::CloseConnection; hdr[8] = hdr[9] = hdr[10] = hdr[11] = 0; if (omniORB::trace(25)) { omniORB::logger log; log << "sendCloseConnection: to " << s->connection->peeraddress() << " 12 bytes\n"; } if (omniORB::trace(30)) giopStream::dumpbuf((unsigned char*)hdr, 12); s->connection->Send(hdr,12);}////////////////////////////////////////////////////////////////////////////voidgiopServer::deactivate(){ ASSERT_OMNI_TRACEDMUTEX_HELD(pd_lock, 1); OMNIORB_ASSERT(pd_state == ACTIVE); pd_state = INFLUX; CORBA::Boolean waitforcompletion; CORBA::Boolean rendezvousers_terminated = 0; omniORB::logs(25, "giopServer deactivate..."); unsigned long s, ns; unsigned long timeout; if (orbParameters::scanGranularity) timeout = orbParameters::scanGranularity; else timeout = 5; if (pd_nconnections) { omniORB::logs(25, "Close connections with threads and monitors..."); CORBA::ULong closed_count = 0; for (CORBA::ULong i=0; i < connectionState::hashsize; i++) { connectionState** head = &(pd_connectionState[i]); while (*head) { if ((*head)->connection->pd_has_dedicated_thread || (*head)->strand->biDir) { sendCloseConnection((*head)->strand); (*head)->connection->Shutdown(); ++closed_count; } head = &((*head)->next); } } if (omniORB::trace(25)) { omniORB::logger l; l << "Closed " << closed_count << " connection" << plural(closed_count) << " out of " << pd_nconnections << ".\n"; } if (pd_n_dedicated_workers) { if (omniORB::trace(25)) { omniORB::logger l; l << "Wait for " << pd_n_dedicated_workers << " dedicated thread" << plural(pd_n_dedicated_workers) << " to finish...\n"; } omni_thread::get_time(&s, &ns, timeout); int go = 1; while (go && pd_n_dedicated_workers) { go = pd_cond.timedwait(s, ns); } if (omniORB::trace(25)) { omniORB::logger l; if (!go) l << "Timed out. "; l << pd_nconnections << " connection" << plural(pd_nconnections) << " and " << pd_n_dedicated_workers << " dedicated worker" << plural(pd_n_dedicated_workers) << " remaining.\n"; } } } // Stop rendezvousers Link* p = pd_rendezvousers.next; if (p != &pd_rendezvousers) { omniORB::logs(25, "Terminate rendezvousers..."); for (; p != &pd_rendezvousers; p = p->next) { ((giopRendezvouser*)p)->terminate(); } omni_thread::get_time(&s, &ns, timeout); int go = 1; while (go && pd_rendezvousers.next != & pd_rendezvousers) { go = pd_cond.timedwait(s, ns); } if (omniORB::trace(25)) { omniORB::logger l; if (go) l << "Rendezvousers terminated.\n"; else l << "Timed out waiting for rendezvousers to terminate.\n"; } } // Stop bidir monitors if (!Link::is_empty(pd_bidir_monitors)) { omniORB::logs(25, "Deactivate bidirectional monitors..."); Link* m = pd_bidir_monitors.next; for (; m != &pd_bidir_monitors; m = m->next) { ((giopMonitor*)m)->deactivate(); } omni_thread::get_time(&s, &ns, timeout); int go = 1; while (go && !Link::is_empty(pd_bidir_monitors)) { go = pd_cond.timedwait(s, ns); } if (omniORB::trace(25)) { omniORB::logger l; if (go) l << "Monitors deactivated.\n"; else l << "Timed out waiting for monitors to deactivate.\n"; } } if (pd_nconnections) { omniORB::logs(25, "Close remaining connections..."); CORBA::ULong closed_count = 0; for (CORBA::ULong i=0; i < connectionState::hashsize; i++) { connectionState** head = &(pd_connectionState[i]); while (*head) { if (!((*head)->connection->pd_has_dedicated_thread || (*head)->strand->biDir)) { sendCloseConnection((*head)->strand); (*head)->connection->Shutdown(); // Start a worker to notice the connection closure. giopWorker* task = new giopWorker((*head)->strand, this, 1); orbAsyncInvoker->insert(task); task->insert((*head)->workers); ++(*head)->connection->pd_n_workers; ++pd_n_temporary_workers; ++closed_count; } head = &((*head)->next); } } if (omniORB::trace(25)) { omniORB::logger l; l << "Closed " << closed_count << " connection" << plural(closed_count) << " out of " << pd_nconnections << ".\n"; } } if (pd_n_temporary_workers) { if (omniORB::trace(25)) { omniORB::logger l; l << "Wait for " << pd_n_temporary_workers << " temporary worker" << plural(pd_n_temporary_workers) << "...\n"; } int go = 1; while (go && pd_n_temporary_workers) { go = pd_cond.timedwait(s, ns); } if (omniORB::trace(25)) { omniORB::logger l; if (!go) l << "Timed out. "; l << pd_n_temporary_workers << " temporary worker" << plural(pd_n_temporary_workers) << " remaining.\n"; } } // Close bidir connections if (pd_bidir_strands.size()) { if (omniORB::trace(25)) { omniORB::logger l; l << "Close " << pd_bidir_strands.size() << " bidirectional connections...\n"; } omnivector<giopStrand*>::iterator i = pd_bidir_strands.begin(); while (i != pd_bidir_strands.end()) { giopStrand* g = *i; pd_bidir_strands.erase(i); g->connection->Shutdown(); g->deleteStrandAndConnection(); } } pd_state = IDLE; pd_cond.broadcast(); omniORB::logs(25, "giopServer deactivated.");}////////////////////////////////////////////////////////////////////////////voidgiopServer::ensureNotInFlux(){ while (pd_state == INFLUX) { pd_cond.wait(); // Note: we could have more than one thread blocking here so must // be wake up by a broadcast. Or else we have to keep a count // on how many is waiting. }}////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////CORBA::ULong giopServer::connectionState::hashsize = 103;////////////////////////////////////////////////////////////////////////////giopServer::connectionState::connectionState(giopConnection* c,giopStrand* s) : connection(c), strand(s), next(0){ omni_tracedmutex_lock sync(*omniTransportLock); c->incrRefCount();}////////////////////////////////////////////////////////////////////////////giopServer::connectionState::~connectionState(){ OMNIORB_ASSERT(Link::is_empty(workers)); omni_tracedmutex_lock sync(*omniTransportLock); strand->deleteStrandAndConnection();}////////////////////////////////////////////////////////////////////////////giopServer::connectionState*giopServer::csLocate(giopConnection* conn){ ASSERT_OMNI_TRACEDMUTEX_HELD(pd_lock,1); connectionState** head = &(pd_connectionState[((omni::ptr_arith_t)conn)% connectionState::hashsize]); while (*head) { if ((*head)->connection == conn) break; else { head = &((*head)->next); } } return *head;}////////////////////////////////////////////////////////////////////////////giopServer::connectionState*giopServer::csRemove(giopConnection* conn){ ASSERT_OMNI_TRACEDMUTEX_HELD(pd_lock,1); connectionState* cs = 0; connectionState** head = &(pd_connectionState[((omni::ptr_arith_t)conn)% connectionState::hashsize]); while (*head) { if ((*head)->connection == conn) { cs = *head; *head = cs->next; pd_nconnections--; if (orbParameters::threadPerConnectionPolicy) { // Check the number of connection and decide if we need to // re-enable the one thread per connection policy that has // been temporarily suspended. if (!pd_thread_per_connection && pd_nconnections <= orbParameters::threadPerConnectionLowerLimit) { pd_thread_per_connection = 1; } } break; } else { head = &((*head)->next); } } return cs;}////////////////////////////////////////////////////////////////////////////giopServer::connectionState*giopServer::csInsert(giopConnection* conn){ ASSERT_OMNI_TRACEDMUTEX_HELD(pd_lock,1); giopStrand* s = new giopStrand(conn,this); s->version.major = 1; s->version.minor = 0; { ASSERT_OMNI_TRACEDMUTEX_HELD(*omniTransportLock,0); omni_tracedmutex_lock sync(*omniTransportLock); s->StrandList::insert(giopStrand::passive); s->startIdleCounter(); } connectionState* cs = new connectionState(conn,s); connectionState** head = &(pd_connectionState[((omni::ptr_arith_t)conn)% connectionState::hashsize]); cs->next = *head; *head = cs; pd_nconnections++; if (orbParameters::threadPerConnectionPolicy) { // Check the number of connection and decide if we need to // turn off the one thread per connection policy temporarily. if (pd_thread_per_connection && pd_nconnections >= orbParameters::threadPerConnectionUpperLimit) { pd_thread_per_connection = 0; } } conn->pd_has_dedicated_thread = pd_thread_per_connection; return cs;}////////////////////////////////////////////////////////////////////////////giopServer::connectionState*giopServer::csInsert(giopStrand* s){ ASSERT_OMNI_TRACEDMUTEX_HELD(pd_lock,1); OMNIORB_ASSERT(s->biDir && s->isClient()); giopConnection* conn = s->connection; connectionState* cs = new connectionState(conn,s); connectionState** head = &(pd_connectionState[((omni::ptr_arith_t)conn)% connectionState::hashsize]); cs->next = *head; *head = cs; pd_nconnections++; if (orbParameters::threadPerConnectionPolicy) { // Check the number of connection and decide if we need to // turn off the one thread per connection policy temporarily. if (pd_thread_per_connection && pd_nconnections >= orbParameters::threadPerConnectionUpperLimit) { pd_thread_per_connection = 0; } } conn->pd_has_dedicated_thread = pd_thread_per_connection; { omni_tracedmutex_lock sync(*omniTransportLock); s->connection->decrRefCount(); } return cs;}////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////voidgiopServer::notifyRzNewConnection(giopRendezvouser* r, giopConnection* conn){ omni_tracedmutex_lock sync(pd_lock); switch (pd_state) { case ACTIVE: { connectionState* cs = csInsert(conn);
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?