gioprope.cc
来自「编译工具」· CC 代码 · 共 883 行 · 第 1/2 页
CC
883 行
<< " . No connection was opened.\n"; } } } if ( s->state()== giopStrand::DYING ) { remove = 1; avail = s->safeDelete(); // If safeDelete() returns 1, this strand // can be regarded as deleted. Therefore, we // flag avail to 1 to wake up any threads waiting // on the rope to have a chance to create // another strand. } else if ( (s->biDir && !s->isClient()) || !giopStreamList::is_empty(s->clients) ) { // We do not cache the GIOP_C if this is server side bidirectional or // we already have other GIOP_Cs active or available. remove = 1; avail = 0; } else { OMNIORB_ASSERT(giop_c->state() == IOP_C::Idle); giop_c->giopStreamList::insert(s->clients); // The strand is definitely idle from this point onwards, we // reset the idle counter so that it will be retired at the right time. if ( s->isClient() && !s->biDir_has_callbacks ) s->startIdleCounter(); } if (remove) { delete giop_c; } else { giop_c->cleanup(); } // If any thread is waiting for a strand to become available, we signal // it here. if (avail && pd_nwaiting) pd_cond.signal();}////////////////////////////////////////////////////////////////////////voidgiopRope::realIncrRefCount() { ASSERT_OMNI_TRACEDMUTEX_HELD(*omniTransportLock,1); OMNIORB_ASSERT(pd_refcount >= 0); if (pd_refcount == 0 && !RopeLink::is_empty(pd_strands)) { // This Rope still has some strands in the giopStrand::active_timedout list // put there by decrRefCount() when the reference count goes to 0 // previously. We move these stands back to the giopStrand::active list so // that they can be used straight away. RopeLink* p = pd_strands.next; for (; p != &pd_strands; p = p->next) { giopStrand* g = (giopStrand*)p; if (g->state() != giopStrand::DYING) { g->StrandList::remove(); g->state(giopStrand::ACTIVE); g->StrandList::insert(giopStrand::active); } } } pd_refcount++;}////////////////////////////////////////////////////////////////////////voidgiopRope::incrRefCount() { ASSERT_OMNI_TRACEDMUTEX_HELD(*omniTransportLock,0); omni_tracedmutex_lock sync(*omniTransportLock); realIncrRefCount();}////////////////////////////////////////////////////////////////////////voidgiopRope::decrRefCount() { ASSERT_OMNI_TRACEDMUTEX_HELD(*omniTransportLock,0); omni_tracedmutex_lock sync(*omniTransportLock); pd_refcount--; OMNIORB_ASSERT(pd_refcount >=0); if (pd_refcount) return; // This Rope is not used by any object reference. // If this rope has no strand, we can remove this instance straight away. // Otherwise, we move all the strands from the giopStrand::active list to // the giopStrand::active_timedout list. Eventually when all the strands are // retired by time out, this instance will also be deleted. if (RopeLink::is_empty(pd_strands) && !pd_nwaiting) { RopeLink::remove(); delete this; } else { RopeLink* p = pd_strands.next; for (; p != &pd_strands; p = p->next) { giopStrand* g = (giopStrand*)p; if (g->state() != giopStrand::DYING) { g->state(giopStrand::TIMEDOUT); // The strand may already be on the active_timedout list. However // it is OK to remove and reinsert again. g->StrandList::remove(); g->StrandList::insert(giopStrand::active_timedout); } } }}////////////////////////////////////////////////////////////////////////const giopAddress*giopRope::notifyCommFailure(const giopAddress* addr, CORBA::Boolean heldlock) { if (heldlock) { ASSERT_OMNI_TRACEDMUTEX_HELD(*omniTransportLock,1); } else { ASSERT_OMNI_TRACEDMUTEX_HELD(*omniTransportLock,0); omniTransportLock->lock(); } const giopAddress* addr_in_use; addr_in_use = pd_addresses[pd_addresses_order[pd_address_in_use]]; if (addr == addr_in_use) { pd_address_in_use++; if (pd_address_in_use >= pd_addresses_order.size()) pd_address_in_use = 0; addr_in_use = pd_addresses[pd_addresses_order[pd_address_in_use]]; if (omniORB::trace(20)) { omniORB::logger l; l << "Switch rope to use address " << addr_in_use->address() << "\n"; } } if (!heldlock) { omniTransportLock->unlock(); } return addr_in_use;}////////////////////////////////////////////////////////////////////////intgiopRope::selectRope(const giopAddressList& addrlist, omniIOR::IORInfo* info, Rope*& r,CORBA::Boolean& loc) { omni_tracedmutex_lock sync(*omniTransportLock); // Check if we have to use a bidirectional connection. if (BiDirServerRope::selectRope(addrlist,info,r)) { loc = 0; return 1; } // Check if these are our addresses giopAddressList::const_iterator i, last; i = addrlist.begin(); last = addrlist.end(); for (; i != last; i++) { if (omniObjAdapter::matchMyEndpoints((*i)->address())) { r = 0; loc = 1; return 1; } } giopRope* gr; // Check if there already exists a rope that goes to the same addresses RopeLink* p = giopRope::ropes.next; while ( p != &giopRope::ropes ) { gr = (giopRope*)p; if (gr->match(addrlist)) { gr->realIncrRefCount(); r = (Rope*)gr; loc = 0; return 1; } else if (gr->pd_refcount == 0 && RopeLink::is_empty(gr->pd_strands) && !gr->pd_nwaiting) { // garbage rope, remove it p = p->next; gr->RopeLink::remove(); delete gr; } else { p = p->next; } } // Reach here because we cannot find an existing rope that matches, // must create a new one. omnivector<CORBA::ULong> prefer_list; CORBA::Boolean use_bidir; filterAndSortAddressList(addrlist,prefer_list,use_bidir); if (!use_bidir) { gr = new giopRope(addrlist,prefer_list); } else { if (omniObjAdapter::isInitialised()) { gr = new BiDirClientRope(addrlist,prefer_list); } else { omniORB::logs(10, "Client policies specify a bidirectional connection, " "but no object adapters have been initialised. Using a " "non-bidirectional connection."); gr = new giopRope(addrlist,prefer_list); } } gr->RopeLink::insert(giopRope::ropes); gr->realIncrRefCount(); r = (Rope*)gr; loc = 0; return 1;}////////////////////////////////////////////////////////////////////////CORBA::BooleangiopRope::match(const giopAddressList& addrlist) const{ if (addrlist.size() != pd_addresses.size()) return 0; giopAddressList::const_iterator i, last, j; i = addrlist.begin(); j = pd_addresses.begin(); last = addrlist.end(); for (; i != last; i++, j++) { if (!omni::ptrStrMatch((*i)->address(),(*j)->address())) return 0; } return 1;}////////////////////////////////////////////////////////////////////////voidgiopRope::filterAndSortAddressList(const giopAddressList& addrlist, omnivector<CORBA::ULong>& ordered_list, CORBA::Boolean& use_bidir){ // We consult the clientTransportRules to decide which address is more // preferable than others. The rules may forbid the use of some of the // addresses and these will be filtered out. We then record the order // of the remaining addresses in order_list. // If any of the non-exlusion clientTransportRules have the "bidir" // attribute, use_bidir will be set to 1, otherwise it is set to 0. use_bidir = 0; // For each address, find the rule that is applicable. Record the // rules priority in the priority list. omnivector<CORBA::ULong> prioritylist; CORBA::ULong index; CORBA::ULong total = addrlist.size(); for (index = 0; index < total; index++) { transportRules::sequenceString actions; CORBA::ULong matchedRule; if ( transportRules::clientRules().match(addrlist[index]->address(), actions,matchedRule) ) { const char* transport = strchr(addrlist[index]->type(),':'); OMNIORB_ASSERT(transport); transport++; CORBA::ULong i; CORBA::Boolean matched = 0; CORBA::Boolean usebidir = 0; CORBA::ULong priority; for (i = 0; i < actions.length(); i++ ) { size_t len = strlen(actions[i]); if (strncmp(actions[i],transport,len) == 0 ) { priority = (matchedRule << 16) + i; matched = 1; } else if ( strcmp(actions[i],"none") == 0 ) { break; } else if ( strcmp(actions[i],"bidir") == 0 ) { usebidir = 1; } } if (matched) { ordered_list.push_back(index); prioritylist.push_back(priority); if (usebidir && orbParameters::offerBiDirectionalGIOP) { use_bidir = 1; } } } } // If we have more than 1 addresses to use, sort them according to // their value in prioritylist. if ( ordered_list.size() > 1 ) { // Won't it be nice to just use stl qsort? It is tempting to just // forget about old C++ compiler and use stl. Until the time has come // use shell sort to sort the addresses in order. int n = ordered_list.size(); for (int gap=n/2; gap > 0; gap=gap/2 ) { for (int i=gap; i < n ; i++) for (int j =i-gap; j>=0; j=j-gap) { if ( prioritylist[j] > prioritylist[j+gap] ) { CORBA::ULong temp = ordered_list[j]; ordered_list[j] = ordered_list[j+gap]; ordered_list[j+gap] = temp; temp = prioritylist[j]; prioritylist[j] = prioritylist[j+gap]; prioritylist[j+gap] = temp; } } } }#if 0 { omniORB::logger log; log << "Sorted addresses are: \n"; for (int i=0; i<ordered_list.size(); i++) { log << addrlist[ordered_list[i]]->address() << "\n"; } }#endif}/////////////////////////////////////////////////////////////////////////////// Handlers for Configuration Options ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////class oneCallPerConnectionHandler : public orbOptions::Handler {public: oneCallPerConnectionHandler() : orbOptions::Handler("oneCallPerConnection", "oneCallPerConnection = 0 or 1", 1, "-ORBoneCallPerConnection < 0 | 1 >") {} void visit(const char* value,orbOptions::Source) throw (orbOptions::BadParam) { CORBA::Boolean v; if (!orbOptions::getBoolean(value,v)) { throw orbOptions::BadParam(key(),value, orbOptions::expect_boolean_msg); } orbParameters::oneCallPerConnection = v; } void dump(orbOptions::sequenceString& result) { orbOptions::addKVBoolean(key(),orbParameters::oneCallPerConnection, result); }};static oneCallPerConnectionHandler oneCallPerConnectionHandler_;/////////////////////////////////////////////////////////////////////////////class maxGIOPConnectionPerServerHandler : public orbOptions::Handler {public: maxGIOPConnectionPerServerHandler() : orbOptions::Handler("maxGIOPConnectionPerServer", "maxGIOPConnectionPerServer = n > 0", 1, "-ORBmaxGIOPConnectionPerServer < n > 0 >") {} void visit(const char* value,orbOptions::Source) throw (orbOptions::BadParam) { CORBA::ULong v; if (!orbOptions::getULong(value,v) || v < 1) { throw orbOptions::BadParam(key(),value, orbOptions::expect_greater_than_zero_ulong_msg); } orbParameters::maxGIOPConnectionPerServer = v; } void dump(orbOptions::sequenceString& result) { orbOptions::addKVULong(key(),orbParameters::maxGIOPConnectionPerServer, result); }};static maxGIOPConnectionPerServerHandler maxGIOPConnectionPerServerHandler_;/////////////////////////////////////////////////////////////////////////////// Module initialiser ///////////////////////////////////////////////////////////////////////////////class omni_giopRope_initialiser : public omniInitialiser {public: omni_giopRope_initialiser() { orbOptions::singleton().registerHandler(oneCallPerConnectionHandler_); orbOptions::singleton().registerHandler(maxGIOPConnectionPerServerHandler_); } void attach() { } void detach() { // Get rid of any remaining ropes. By now they should all be strand-less. omni_tracedmutex_lock sync(*omniTransportLock); RopeLink* p = giopRope::ropes.next; giopRope* gr; int i=0; while (p != &giopRope::ropes) { gr = (giopRope*)p; OMNIORB_ASSERT(gr->pd_refcount == 0 && RopeLink::is_empty(gr->pd_strands) && !gr->pd_nwaiting); p = p->next; gr->RopeLink::remove(); delete gr; ++i; } if (omniORB::trace(15)) { omniORB::logger l; l << i << " remaining rope" << (i == 1 ? "" : "s") << " deleted.\n"; } }};static omni_giopRope_initialiser initialiser;omniInitialiser& omni_giopRope_initialiser_ = initialiser;OMNI_NAMESPACE_END(omni)
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?