📄 giopstream.cc
字号:
} } if (omniORB::trace(25)) { omniORB::logger log; log << "inputMessage: from " << pd_strand->connection->peeraddress() << " " << buf->last - buf->start << " bytes\n"; } if (omniORB::trace(30)) { dumpbuf((unsigned char*)buf+buf->start,buf->last - buf->start); } buf->size = ensureSaneHeader(__FILE__,__LINE__,buf,buf->start); if (buf->size > (buf->last - buf->start)) { // Not enough data in the buffer. Try to fetch as much as can be fit // into the buffer. CORBA::ULong total = buf->size; if (total > (buf->end - buf->start)) { total = buf->end - buf->start; } total -= (buf->last - buf->start); while (total) { int rsz = pd_strand->connection->Recv((void*) ((omni::ptr_arith_t)buf+buf->last), (size_t) total, pd_deadline_secs, pd_deadline_nanosecs); if (rsz > 0) { if (omniORB::trace(25)) { omniORB::logger log; log << "inputMessage: (body) from " << pd_strand->connection->peeraddress() << " " << rsz << " bytes\n"; } if (omniORB::trace(30)) { dumpbuf((unsigned char*)buf+buf->last,rsz); } buf->last += rsz; total -= rsz; } else { errorOnReceive(rsz,__FILE__,__LINE__,buf,0); // never reaches here. } } } else if (buf->size < (buf->last - buf->start)) { // Too much data in the buffer. Locate the beginning of the next // message header(s) and uses a separate Buffer for each message. CORBA::ULong first = buf->start + buf->size; giopStream_Buffer** tail = &pd_strand->head; while (*tail) tail = &(*tail)->next; int splitcount = 0; do { CORBA::ULong sz = buf->last - first; giopStream_Buffer* newbuf; if (sz >= 12) { CORBA::ULong msz = ensureSaneHeader(__FILE__,__LINE__,buf,first); if (msz <= sz) { sz = msz; } else { if (msz > giopStream::bufferSize) msz = giopStream::bufferSize; if (msz < sz) { // Don't think this could happen because the code at present // never allocate buffer bigger than giopStream::bufferSize // and so we will never have received data bigger than can be // accomodated in a buffer of size giopStream::bufferSize. // However, this may well change in the future and we // have to prepare for this. In this case, we allocate a // buffer that is multiple of 8 bytes in size and can // store all the data received so far. msz = omni::align_to((omni::ptr_arith_t)sz,omni::ALIGN_8); } } newbuf = giopStream_Buffer::newBuffer(msz); } else { // incomplete header, we don't know the size of the message. // allocate a normal buffer to accomodate the rest of the message newbuf = giopStream_Buffer::newBuffer(); } memcpy((void*)((omni::ptr_arith_t)newbuf+newbuf->start), (void*)((omni::ptr_arith_t)buf + first), sz); newbuf->last += sz; if (omniORB::trace(40)) { omniORB::logger log; log << "Split to new buffer\n"; } splitcount++; *tail = newbuf; tail = &(newbuf->next); first += sz; } while (first != buf->last); buf->last = buf->start + buf->size; if (omniORB::trace(30)) { omniORB::logger log; log << "Split input data into " << splitcount << " messages\n"; } } return buf;}////////////////////////////////////////////////////////////////////////giopStream_Buffer*giopStream::inputChunk(CORBA::ULong maxsize) { OMNIORB_ASSERT(pd_rdlocked); giopStream_Buffer* buf; if (pd_strand->head) { // We are expecting a chunk of a message and yet what comes // in is another message. This indicates something seriously // wrong with the data sent by the other end. pd_strand->state(giopStrand::DYING); CORBA::ULong minor; CORBA::Boolean retry; notifyCommFailure(0,minor,retry); CommFailure::_raise(minor,(CORBA::CompletionStatus)completion(),retry, __FILE__,__LINE__); // never reaches here. } else if (pd_strand->spare) { buf = pd_strand->spare; pd_strand->spare = buf->next; buf->next = 0; buf->last = buf->start; } else { buf = giopStream_Buffer::newBuffer(); } if (maxsize > (buf->end - buf->start)) { maxsize = buf->end - buf->start; } while (maxsize) { int rsz = pd_strand->connection->Recv((void*) ((omni::ptr_arith_t)buf+buf->last), (size_t) maxsize, pd_deadline_secs, pd_deadline_nanosecs); if (rsz > 0) { buf->last += rsz; maxsize -= rsz; } else { errorOnReceive(rsz,__FILE__,__LINE__,buf,0); // never reaches here. } } if (omniORB::trace(25)) { omniORB::logger log; log << "inputChunk: from " << pd_strand->connection->peeraddress() << " " << buf->last - buf->start << " bytes\n"; } if (omniORB::trace(30)) { dumpbuf((unsigned char*)buf+buf->start,buf->last - buf->start); } return buf;}////////////////////////////////////////////////////////////////////////voidgiopStream::inputCopyChunk(void* dest, CORBA::ULong size) { OMNIORB_ASSERT(pd_rdlocked); if (pd_strand->head) { // We are expecting a chunk of a message and yet what comes // in is another message. This indicates something seriously // wrong with the data sent by the other end. pd_strand->state(giopStrand::DYING); CORBA::ULong minor; CORBA::Boolean retry; notifyCommFailure(0,minor,retry); CommFailure::_raise(minor,(CORBA::CompletionStatus)completion(),retry, __FILE__,__LINE__); // never reaches here. } char* p = (char*) dest; if (omniORB::trace(25)) { omniORB::logger log; log << "inputCopyChunk: from " << pd_strand->connection->peeraddress() << " " << size << " bytes\n"; } while (size) { int rsz = pd_strand->connection->Recv((void*)p,(size_t) size, pd_deadline_secs, pd_deadline_nanosecs); if (rsz > 0) { if (omniORB::trace(30)) { dumpbuf((unsigned char*)p,rsz); } p += rsz; size -= rsz; } else { errorOnReceive(rsz,__FILE__,__LINE__,0,0); // never reaches here. } }}////////////////////////////////////////////////////////////////////////voidgiopStream::sendChunk(giopStream_Buffer* buf) { if (!pd_strand->connection) { OMNIORB_ASSERT(pd_strand->address); if (pd_strand->state() != giopStrand::DYING) { if (omniORB::trace(25)) { omniORB::logger log; log << "Client attempt to connect to " << pd_strand->address->address() << "\n"; } giopActiveConnection* c = pd_strand->address->Connect(pd_deadline_secs, pd_deadline_nanosecs); if (c) pd_strand->connection = &(c->getConnection()); } if (!pd_strand->connection) { errorOnSend(TRANSIENT_ConnectFailed,__FILE__,__LINE__,0); } if (omniORB::trace(20)) { omniORB::logger log; log << "Client opened connection to " << pd_strand->connection->peeraddress() << "\n"; } } CORBA::ULong first = buf->start; size_t total; if (omniORB::trace(25)) { omniORB::logger log; log << "sendChunk: to " << pd_strand->connection->peeraddress() << " " << buf->last - buf->start << " bytes\n"; } if (omniORB::trace(30)) { dumpbuf((unsigned char*)buf+buf->start,buf->last-buf->start); } while ((total = buf->last - first)) { int ssz = pd_strand->connection->Send((void*) ((omni::ptr_arith_t)buf+first), total, pd_deadline_secs, pd_deadline_nanosecs); if (ssz > 0) { first += ssz; } else { errorOnSend(ssz,__FILE__,__LINE__,0); // never reaches here. } }}////////////////////////////////////////////////////////////////////////voidgiopStream::sendCopyChunk(void* buf,CORBA::ULong size) { if (!pd_strand->connection) { OMNIORB_ASSERT(pd_strand->address); if (pd_strand->state() != giopStrand::DYING) { giopActiveConnection* c = pd_strand->address->Connect(pd_deadline_secs, pd_deadline_nanosecs); if (c) pd_strand->connection = &(c->getConnection()); } if (!pd_strand->connection) { errorOnSend(TRANSIENT_ConnectFailed,__FILE__,__LINE__,0); } if (omniORB::trace(20)) { omniORB::logger log; log << "Client opened connection to " << pd_strand->connection->peeraddress() << "\n"; } } if (omniORB::trace(25)) { omniORB::logger log; log << "sendCopyChunk: to " << pd_strand->connection->peeraddress() << " " << size << " bytes\n"; } if (omniORB::trace(30)) { dumpbuf((unsigned char*)buf,size); } while (size) { int ssz = pd_strand->connection->Send(buf, size, pd_deadline_secs, pd_deadline_nanosecs); if (ssz > 0) { size -= ssz; buf = (void*)((omni::ptr_arith_t)buf + ssz); } else { errorOnSend(ssz,__FILE__,__LINE__,0); // never reaches here. } }}////////////////////////////////////////////////////////////////////////voidgiopStream::errorOnSend(int rc, const char* filename, CORBA::ULong lineno, CORBA::Boolean heldlock) { CORBA::ULong minor; CORBA::Boolean retry; notifyCommFailure(heldlock,minor,retry); if (rc == 0) { // Timeout. // We do not use the return code from the function. pd_strand->state(giopStrand::DYING); retry = 0; minor = TRANSIENT_CallTimedout; } else if (rc == TRANSIENT_ConnectFailed) { pd_strand->state(giopStrand::DYING); minor = rc; } else { pd_strand->state(giopStrand::DYING); } CommFailure::_raise(minor,(CORBA::CompletionStatus)completion(),retry, filename,lineno); // never reaches here.}/////////////////////////////////////////////////////////////////////////static inline char printable_char(char c) { return (c < 32 || c > 126) ? '.' : c;}/////////////////////////////////////////////////////////////////////////void giopStream::dumpbuf(unsigned char* buf, size_t sz){ static omni_tracedmutex lock; omni_tracedmutex_lock sync(lock); unsigned i; char row[80]; omniORB::logger l; const size_t dumplimit = 128; if (!omniORB::trace(40) && sz > dumplimit) { l << dumplimit << " bytes out of " << sz << "\n"; sz = dumplimit; } else { l << "\n"; } // Do complete rows of 16 octets. while( sz >= 16u ) { sprintf(row, "%02x%02x %02x%02x %02x%02x %02x%02x " "%02x%02x %02x%02x %02x%02x %02x%02x ", (int) buf[0], (int) buf[1], (int) buf[2], (int) buf[3], (int) buf[4], (int) buf[5], (int) buf[6], (int) buf[7], (int) buf[8], (int) buf[9], (int) buf[10], (int) buf[11], (int) buf[12], (int) buf[13], (int) buf[14], (int) buf[15]); l << row; char* p = row; for( i = 0u; i < 16u; i++ ) *p++ = printable_char(*buf++); *p++ = '\0'; l << row << "\n"; sz -= 16u; } if( sz ) { unsigned row_pos = 0; // The final part-row. for( i = 0u; i < sz; i++ ) { sprintf(row+row_pos, (i & 1u) ? "%02x ":"%02x", (int) buf[i]); row_pos += (i%2 == 0)? 2 : 3; } for( ; i < 16u; i++ ) { sprintf(row+row_pos, (i & 1u) ? " ":" "); row_pos += (i%2 == 0)? 2 : 3; } for( i = 0u; i < sz; i++ ) { sprintf(row+row_pos, "%c", printable_char(buf[i])); row_pos++; } l << row << "\n"; }}OMNI_NAMESPACE_END(omni)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -