📄 rtpinterface.cpp
字号:
// Read from the TCP connection: bytesRead = 0; unsigned totBytesToRead = fNextTCPReadSize; if (totBytesToRead > bufferMaxSize) totBytesToRead = bufferMaxSize; unsigned curBytesToRead = totBytesToRead; int curBytesRead; while ((curBytesRead = readSocket(envir(), fNextTCPReadStreamSocketNum, &buffer[bytesRead], curBytesToRead, fromAddress)) > 0) { bytesRead += curBytesRead; if (bytesRead >= totBytesToRead) break; curBytesToRead -= curBytesRead; } fNextTCPReadSize -= bytesRead; if (curBytesRead == 0 && curBytesToRead > 0) { packetReadWasIncomplete = True; return True; } else if (curBytesRead < 0) { bytesRead = 0; readSuccess = False; } else { readSuccess = True; } fNextTCPReadStreamSocketNum = -1; // default, for next time } if (readSuccess && fAuxReadHandlerFunc != NULL) { // Also pass the newly-read packet data to our auxilliary handler: (*fAuxReadHandlerFunc)(fAuxReadHandlerClientData, buffer, bytesRead); } return readSuccess;}void RTPInterface::stopNetworkReading() { // Normal case envir().taskScheduler().turnOffBackgroundReadHandling(fGS->socketNum()); // Also turn off read handling on each of our TCP connections: for (tcpStreamRecord* streams = fTCPStreams; streams != NULL; streams = streams->fNext) { deregisterSocket(envir(), streams->fStreamSocketNum, streams->fStreamChannelId); }}////////// Helper Functions - Implementation /////////void sendRTPOverTCP(unsigned char* packet, unsigned packetSize, int socketNum, unsigned char streamChannelId) {#ifdef DEBUG fprintf(stderr, "sendRTPOverTCP: %d bytes over channel %d (socket %d)\n", packetSize, streamChannelId, socketNum); fflush(stderr);#endif // Send RTP over TCP, using the encoding defined in // RFC 2326, section 10.12: do { char const dollar = '$'; if (send(socketNum, &dollar, 1, 0) != 1) break; if (send(socketNum, (char*)&streamChannelId, 1, 0) != 1) break; char netPacketSize[2]; netPacketSize[0] = (char) ((packetSize&0xFF00)>>8); netPacketSize[1] = (char) (packetSize&0xFF); if (send(socketNum, netPacketSize, 2, 0) != 2) break; if (send(socketNum, (char*)packet, packetSize, 0) != (int)packetSize) break;#ifdef DEBUG fprintf(stderr, "sendRTPOverTCP: completed\n"); fflush(stderr);#endif return; } while (0);#ifdef DEBUG fprintf(stderr, "sendRTPOverTCP: failed!\n"); fflush(stderr);#endif}SocketDescriptor::SocketDescriptor(UsageEnvironment& env, int socketNum) :fEnv(env), fOurSocketNum(socketNum), fSubChannelHashTable(HashTable::create(ONE_WORD_HASH_KEYS)), fServerRequestAlternativeByteHandler(NULL), fServerRequestAlternativeByteHandlerClientData(NULL), fTCPReadingState(AWAITING_DOLLAR) {}SocketDescriptor::~SocketDescriptor() { delete fSubChannelHashTable;}void SocketDescriptor::registerRTPInterface(unsigned char streamChannelId, RTPInterface* rtpInterface) { Boolean isFirstRegistration = fSubChannelHashTable->IsEmpty(); fSubChannelHashTable->Add((char const*)(long)streamChannelId, rtpInterface); if (isFirstRegistration) { // Arrange to handle reads on this TCP socket: TaskScheduler::BackgroundHandlerProc* handler = (TaskScheduler::BackgroundHandlerProc*)&tcpReadHandler; fEnv.taskScheduler(). turnOnBackgroundReadHandling(fOurSocketNum, handler, this); }}RTPInterface* SocketDescriptor::lookupRTPInterface(unsigned char streamChannelId) { char const* lookupArg = (char const*)(long)streamChannelId; return (RTPInterface*)(fSubChannelHashTable->Lookup(lookupArg));}void SocketDescriptor::deregisterRTPInterface(unsigned char streamChannelId) { fSubChannelHashTable->Remove((char const*)(long)streamChannelId); if (fSubChannelHashTable->IsEmpty()) { // No more interfaces are using us, so it's curtains for us now fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum); removeSocketDescription(fEnv, fOurSocketNum); delete this; }}void SocketDescriptor::tcpReadHandler(SocketDescriptor* socketDescriptor, int mask) { socketDescriptor->tcpReadHandler1(mask);}void SocketDescriptor::tcpReadHandler1(int mask) { // We expect the following data over the TCP channel: // optional RTSP command or response bytes (before the first '$' character) // a '$' character // a 1-byte channel id // a 2-byte packet size (in network byte order) // the packet data. // However, because the socket is being read asynchronously, this data might arrive in pieces. u_int8_t c; struct sockaddr_in fromAddress; if (fTCPReadingState != AWAITING_PACKET_DATA) { int result = readSocket(fEnv, fOurSocketNum, &c, 1, fromAddress); if (result != 1) { // error reading TCP socket, or no more data available if (result < 0) { // error fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum); // stops further calls to us } return; } } switch (fTCPReadingState) { case AWAITING_DOLLAR: { if (c == '$') { fTCPReadingState = AWAITING_STREAM_CHANNEL_ID; } else { // This character is part of a RTSP request or command, which is handled separately: if (fServerRequestAlternativeByteHandler != NULL) { (*fServerRequestAlternativeByteHandler)(fServerRequestAlternativeByteHandlerClientData, c); } } break; } case AWAITING_STREAM_CHANNEL_ID: { // The byte that we read is the stream channel id. fStreamChannelId = c; fTCPReadingState = AWAITING_SIZE1; break; } case AWAITING_SIZE1: { // The byte that we read is the first (high) byte of the 16-bit RTP or RTCP packet 'size'. fSizeByte1 = c; fTCPReadingState = AWAITING_SIZE2; break; } case AWAITING_SIZE2: { // The byte that we read is the second (low) byte of the 16-bit RTP or RTCP packet 'size'. unsigned short size = (fSizeByte1<<8)|c; // Record the information about the packet data that will be read next: RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId); if (rtpInterface != NULL) { rtpInterface->fNextTCPReadSize = size; rtpInterface->fNextTCPReadStreamSocketNum = fOurSocketNum; rtpInterface->fNextTCPReadStreamChannelId = fStreamChannelId; } fTCPReadingState = AWAITING_PACKET_DATA; break; } case AWAITING_PACKET_DATA: { // Call the appropriate read handler to get the packet data from the TCP stream: RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId); if (rtpInterface != NULL) { if (rtpInterface->fNextTCPReadSize == 0) { // We've already read all the data for this packet. fTCPReadingState = AWAITING_DOLLAR; break; } if (rtpInterface->fReadHandlerProc != NULL) {#ifdef DEBUG fprintf(stderr, "SocketDescriptor::tcpReadHandler() reading %d bytes on channel %d\n", rtpInterface->fNextTCPReadSize, rtpInterface->fNextTCPReadStreamChannelId);#endif rtpInterface->fReadHandlerProc(rtpInterface->fOwner, mask); } } return; } }}////////// tcpStreamRecord implementation //////////tcpStreamRecord::tcpStreamRecord(int streamSocketNum, unsigned char streamChannelId, tcpStreamRecord* next) : fNext(next), fStreamSocketNum(streamSocketNum), fStreamChannelId(streamChannelId) {}tcpStreamRecord::~tcpStreamRecord() { delete fNext;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -