📄 siptcpconnection.cxx
字号:
buf[numBytes] = '\0'; cpLog(LOG_DEBUG_STACK, "appending data: "); cpLog(LOG_DEBUG_STACK, "::::::::::" ); cpLog(LOG_DEBUG_STACK, "%s", buf); cpLog(LOG_DEBUG_STACK, "::::::::::"); myBuf += buf; cpLog(LOG_DEBUG_STACK, "has read data, will check"); cpLog(LOG_DEBUG_STACK, "%s ", myBuf.logData()); } return tcpfd;}intNTcpConnInfo::setTCPFds(fd_set* fdSet){ LockHelper lLock(mutex); map < TcpFd, Sptr < NTcpStuff > > ::iterator i; int maxfd = -1; i = myMap.begin(); while (i != myMap.end()) { int mapfd = i->first; if(mapfd <= 0) continue; FD_SET(mapfd, fdSet); if (mapfd > maxfd) { maxfd = mapfd; } ++i; } // special for connected but unaccepted connections return maxfd;}namespace Vocal{ class SipTcpConnection_impl_{ public: /// SipTcpConnection_impl_(Fifo <SipMsgContainer*> * fifo, int port = SIP_PORT); /// ~SipTcpConnection_impl_(); /// void send(SipMsgContainer *msgPtr, const Data& host, const Data& port); private: /// void readOnFdSet(fd_set* fdSet, TcpServerSocket* tcpStack); /// Sptr < Connection > createRequestTransaction(Sptr<SipCommand> command); /// Sptr < Connection > createOrGetPersistentConnection(NetworkAddress); /// void processMsgsIfReady(int fd); /// void prepareEvent(SipMsgContainer* sipMsg); /// static void* receiveThreadWrapper(void* p); /// void* receiveMain(); /// static void* sendThreadWrapper(void* p); /// void* sendMain(); /// static void* processThreadWrapper(void* p); /// void* processMain(); /// TcpServerSocket mytcpStack; /// NTcpConnInfo tcpConnInfo; /// VThread sendThread; /// VThread receiveThread; /// VThread processThread; /// Fifo <SipMsgContainer*> sendFifo; Fifo <int > processFifo; /// Fifo <SipMsgContainer*> * recFifo; /// bool shutdownNow; /// map <string, Sptr < Connection > > myDestinationMap;}; }SipTcpConnection_impl_::SipTcpConnection_impl_( Fifo <SipMsgContainer*> * fifo, int port) : mytcpStack(port), sendFifo(), recFifo(fifo), shutdownNow(false){ // put the tcpStack into the tcpConnInfo map int fd = mytcpStack.getServerConn().getConnId(); Sptr < Connection > myConn = new Connection(mytcpStack.getServerConn()); tcpConnInfo.setConnNSenderIp(fd, myConn, Vocal::theSystem.gethostAddress()); sendThread.spawn(sendThreadWrapper, this); receiveThread.spawn(receiveThreadWrapper, this); processThread.spawn(processThreadWrapper, this);}SipTcpConnection_impl_::~SipTcpConnection_impl_(){ // also need to wait on the appropriate threads after they've been killed // wake the sender with an appropriate message shutdownNow = true; sendFifo.add(0); processFifo.add(0); sendThread.join(); receiveThread.join(); processThread.join();}voidSipTcpConnection_impl_::send(SipMsgContainer* msg, const Data& host, const Data& port){ Data nhost; int nport=0; if(host.length()) { nhost = host; nport = port.convertInt(); } else { Sptr<SipCommand> command; if(command.dynamicCast(msg->msg.in)!=0) { Sptr<SipUrl> sipDest; sipDest.dynamicCast(command->getRequestLine().getUrl()); if(sipDest != 0) { nhost = sipDest->getMaddrParam(); if(!nhost.length()) nhost = sipDest->getHost(); nport = sipDest->getPort().convertInt(); cpLog( LOG_DEBUG_STACK, "got host from request line: %s:%d", nhost.logData(), nport); if ( nport == 0 ) { nport = SIP_PORT; cpLog( LOG_DEBUG_STACK, "Changed port to %d", nport); } } } } if(nhost.length()) { try { LockHelper lo(msg->myLock); msg->msg.netAddr = new NetworkAddress(nhost.convertString(), nport); } catch(NetworkAddress::UnresolvedException& e) { cpLog(LOG_ERR, "Destination (%s) is not reachable, reason:%s.", nhost.logData(), e.getDescription().c_str()); } } sendFifo.add(msg);}void*SipTcpConnection_impl_::receiveThreadWrapper(void* p){ return static_cast < SipTcpConnection_impl_* > (p)->receiveMain();}void*SipTcpConnection_impl_::processThreadWrapper(void* p){ return static_cast < SipTcpConnection_impl_* > (p)->processMain();}void*SipTcpConnection_impl_::sendThreadWrapper(void* p){ return static_cast < SipTcpConnection_impl_* > (p)->sendMain();}void*SipTcpConnection_impl_::sendMain(){ while (1) { SipMsgContainer* sipMsg = sendFifo.getNext(); if ( !shutdownNow ) { int type = sipMsg->msg.type; // If connection is persistent, try connecting atleast these times for(int i = 0; i < 3; i++) { try { prepareEvent(sipMsg); break; } catch(VNetworkException& e) { cpLog(LOG_ERR, "Network error, reason %s", e.getDescription().c_str()); if(sipMsg->msg.in->nextHopIsAProxy()) { continue; } else { //TODO send error to application ?? break; } } } /// also inform the cleanup if(type != SIP_INVITE) { SipTransactionGC::instance()-> collect(sipMsg, MESSAGE_CLEANUP_DELAY); } } else { return 0; } } return 0;}void*SipTcpConnection_impl_::processMain(){ while (1) { int fdToRead = processFifo.getNext(); if ( !shutdownNow ) { processMsgsIfReady(fdToRead); } else { return 0; } } return 0;}Sptr < Connection >SipTcpConnection_impl_::createRequestTransaction(Sptr < SipCommand > command){ //come here only if this is a command. assert (command != 0); Sptr<SipUrl> dest = command->postProcessRouteAndGetNextHop(); assert (dest != 0); Data host = dest->getMaddrParam(); if(host == "") host = dest->getHost(); int port = dest->getPort().convertInt(); cpLog( LOG_DEBUG_STACK,"Using destination: %s:%d", host.logData(), port); if ( port == 0 ) { port = SIP_PORT; cpLog( LOG_DEBUG_STACK, "Changed port to %d", port); } NetworkAddress nwaddr(host.convertString(), port); TcpClientSocket clientSocket(nwaddr, false ); try { clientSocket.connect(); } catch (VNetworkException& exception) { cpLog(LOG_ERR, exception.getDescription().c_str()); } Sptr<Connection> connection = new Connection(clientSocket.getConn()); cpLog(LOG_DEBUG_STACK, "Adding non-persistent connection %d",connection->getConnId()); char buf[56]; ostrstream strm(buf,56); strm << nwaddr.getIpName() << ":" << nwaddr.getPort() << ends; tcpConnInfo.setConnNSenderIp(connection->getConnId(),connection,strm.str()); return connection;}Sptr < Connection >SipTcpConnection_impl_::createOrGetPersistentConnection( NetworkAddress nwaddr){ //come here only if this is a command. // NetworkAddress nwaddr(host.getData(lo), port); string ipName = nwaddr.getIpName().convertString(); char buf[56]; sprintf(buf, "%s:%d", ipName.c_str(), nwaddr.getPort()); if(myDestinationMap.count(buf)) { //Found an existing connection cpLog( LOG_DEBUG_STACK, "Found existing connection for %s", buf ); if(myDestinationMap[buf]->isLive()) { return myDestinationMap[buf]; } else { //close the connection , remove it from the map that //would re-establish it cpLog(LOG_ERR, "Stale connection from (%s), re-connecting..", buf); myDestinationMap.erase(buf); } } cpLog( LOG_DEBUG_STACK, "Creating persistent connection for %s", buf ); TcpClientSocket clientSocket(nwaddr, false ); try { clientSocket.connect(); Sptr < Connection > connection = new Connection(clientSocket.getConn()); // insert the appropriate connection myDestinationMap[buf] = connection; cpLog(LOG_DEBUG_STACK, "Adding persistent connection %d" , connection->getConnId()); tcpConnInfo.setConnNSenderIp(connection->getConnId(), connection, buf); return connection; } catch (VNetworkException& exception) { cpLog(LOG_ERR, exception.getDescription().c_str()); myDestinationMap.erase(buf); return 0; }}voidSipTcpConnection_impl_::prepareEvent(SipMsgContainer* sipMsg){ Sptr < Connection > conn; LocalScopeAllocator lo; // create or use the current TCP details as appropriate //If for a duplicate request a previous response is being //sent, there in no information on transport to send to. if(sipMsg->msg.in == 0) { cpLog( LOG_DEBUG_STACK, "retransmission of the message :%s being ignored", sipMsg->msg.out.logData() ); return; } int type = sipMsg->msg.in->getType(); Sptr < SipCommand > sipCommand; sipCommand.dynamicCast(sipMsg->msg.in); Sptr < StatusMsg > statusMsg; statusMsg.dynamicCast(sipMsg->msg.in); if ((type == SIP_STATUS) && (statusMsg != 0)) { conn = tcpConnInfo.getStatusMsgConn(sipMsg->msg.in); } else if (sipCommand != 0) { if (sipMsg->msg.netAddr == 0){ cpLog(LOG_WARNING, "TCP Send is NULL"); return;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -