📄 clientcontroller.java
字号:
// notify application application.onLeave( client ); } } /** * Dispatches closure vent to status listeners */ public void dispatchClosure ( ) { // System.out.println( System.currentTimeMillis() + " " + id + " ClientController.dispatchClosure " ); // create status object WrapperMap info = new WrapperMap( Library.STATUSKEYS , Library.CLOSUREARR ); // if listener exist dispatch if ( statusListener != null ) statusListener.onEvent( new StatusEvent( info , client ) ); } /** * Synchronized packet pushing, multiple client threads can trigger this function * @param packetX RtmpPacket */ public void addOutgoingPacket ( RtmpPacket packetX ) { // System.out.println( System.currentTimeMillis( ) + " " + id + " ClientController.addOutgoingPacket " + packetX ); synchronized ( outgoingList ) { outgoingList.add( packetX ); } } /** * Execution step * syncing is needed from previous function **/ public void step ( ) { // System.out.println( System.currentTimeMillis( ) + " " + id + " ClientController.step " + outgoingList.size() ); synchronized ( outgoingList ) { // packets exchange socketController.takePackets( outgoingList ); socketController.giveDataPackets( incomingList ); try { // decode packets for ( RtmpPacket packet : incomingList ) receivePacket( packet ); } catch ( Exception exception ) { // notify user on standard output about the error System.out.println( Library.CODEEX ); exception.printStackTrace( ); } } // update rtmp message every one second if ( ++counter > stepRound ) { update( ); counter = 0; } } /** * updates RTMP flow related events **/ public void update ( ) { // System.out.println( System.currentTimeMillis( ) + " " + id + " ClientController.update" ); currentTime = System.currentTimeMillis( ); if ( accepted ) { // sending out ping if ( currentTime - lastPing > Library.PINGTIME ) sendPing( 6 , ( int ) lastPing & 0xffffffff , -1 , -1 ); // updating bandwidth information if ( currentTime - lastBand > Library.BANDTIME ) updateBand( currentTime - lastBand ); } } /** * Updates bandwidth data * @param delayX time delay since last band update **/ public void updateBand ( long delayX ) { // System.out.println( System.currentTimeMillis( ) + " " + id + " ClientController.updateBand " + socketController.bytesIn ); lastBand = System.currentTimeMillis( ); bytesIn = socketController.bytesIn; bytesOut = socketController.bytesOut; bandIn = ( bytesIn - lastBytesIn ) / ( delayX / 1000 ); bandOut = ( bytesOut - lastBytesOut ) / ( delayX / 1000 ); lastBytesIn = bytesIn; lastBytesOut = bytesOut; // if we exceeded incoming byte step, send new read if ( bytesIn - lastRead > readStepClient ) sendRead( ); } /** * Sorts incoming data packets * @param packetX RtmpPacket packet to sort **/ public void receivePacket ( RtmpPacket packetX ) { // System.out.println( System.currentTimeMillis() + " " + id + " ClientController.receivePacket " + packetX ); switch ( packetX.bodyType ) { // 0x03 - Read amount // 0x04 - Ping received // 0x05 - Server or donwloading bandwidth // 0x06 - Clint or uploading bandwidth // 0x12 - Stream metadata // 0x14 - Invoke case 0x03 : receiveRead( packetX ); break; case 0x04 : receivePing( packetX ); break; case 0x05 : receiveReadStepClient( packetX ); break; case 0x06 : receiveReadStepServer( packetX ); break; case 0x12 : receiveInvoke( packetX ); break; case 0x14 : receiveInvoke( packetX ); break; default : receiveUnknown( packetX ); break; } } /** * Sends read bytes amount to client **/ public void sendRead ( ) { // System.out.println( System.currentTimeMillis( ) + " " + id + " ClientController.sendRead " + bytesIn ); RtmpPacket packet = new RtmpPacket( ); // sending received bytes amount as a 4 byte plain integer packet.rtmpChannel = 0x02; packet.bodyType = 0x03; packet.body = Encoder.concatenate( Encoder.intToBytes( bytesIn , 8 ) ); // System.out.println( System.currentTimeMillis( ) + " " + id + " ClientController.sendRead " + Encoder.getHexa( packet.body ) ); lastRead = bytesIn; addOutgoingPacket( packet ); } /** * Receives read notification from client, have to compare with our write amount, if there is a big * difference, we have to decrease data output * @param packetX RtmpPacket incoming packet **/ public void receiveRead ( RtmpPacket packetX ) { // System.out.println( System.currentTimeMillis( ) + " " + id + " ClientController.receiveRead " + packetX ); long clientRead = Encoder.bytesToInt( packetX.body ); // System.out.println( System.currentTimeMillis() + " clientRead: " + clientRead + " difference: " + difference ); // if difference is big, drop frames if ( clientRead < ( bytesOut - 10000 ) ) streamController.addDropping( ); // if difference is small, remove dropping if ( clientRead >= ( bytesOut - 10000 ) ) streamController.removeDropping( ); } /** * Sends a ping message * @param typeX type of ping * @param p1X...p3X ping parts **/ public void sendPing ( int typeX , int p1X , int p2X , int p3X ) { // System.out.println( System.currentTimeMillis( ) + " " + id + " ClientController.sendPing " + typeX + " " + p1X + " " +p2X + " " + p3X ); // checking ping timeout if ( ping > Library.PINGTIMEOUT ) detach( ); else { addOutgoingPacket( RtmpFactory.ping( typeX , p1X , p2X , p3X ) ); lastPing = System.currentTimeMillis( ); } } /** * Receives ping from client * receivePing : rtmp flow control ping uzenetek. elso ket byte a tipus. 00 - stream reset , 01 - stream buffer clear, * 03 - stream buffer meret beallitas , 06 - ping , 07 -pong, 08 - first ping talan * @param packetX **/ public void receivePing ( RtmpPacket packetX ) { // System.out.println( System.currentTimeMillis( ) + " " + id + " RTMPFlowController.receivePing " + packetX ); byte [ ] body = packetX.body; // extracting parts from body int type = Encoder.bytesToInt( new byte [ ] { body[ 0 ] , body[ 1 ] } ); int part1 = Encoder.bytesToInt( new byte [ ] { body[ 2 ] , body[ 3 ] , body[ 4 ] , body[ 5 ] } ); int part2 = body.length > 6 ? Encoder.bytesToInt( new byte [ ] { body[ 6 ] , body[ 7 ] , body[ 8 ] , body[ 9 ] } ) : 0; int part3 = body.length > 10 ? Encoder.bytesToInt( new byte [ ] { body[ 10 ] , body[ 11 ] , body[ 12 ] , body[13]} ) : 0; switch ( type ) { // stream buffer length, sending it to router, and sending buffer clear ping message // part1 is the flv channel this case case 3 : streamController.setBufferLength( part1 , part2 ); break; // normal ping request, sending pong case 6 : sendPing( 7 , part1 , part2 , part3 ); break; // normal pong, checking roundtrip delay case 7 : ping = System.currentTimeMillis() - lastPing; break; // first ping ? case 8 : break; // unknown // default : System.out.println( "Ping: " + Encoder.getHexa( packetX.body ) ); break; } } /** * Sends server ( server side download ) bandwidth in passive connection * @param packetX **/ public void sendReadStepServer ( int stepX ) { // System.out.println( System.currentTimeMillis( ) + " " + id + " ClientController.sendReadStepServer " ); RtmpPacket packet = new RtmpPacket ( ); packet.bodyType = 0x05; packet.rtmpChannel = 0x02; packet.body = Encoder.intToBytes( stepX , 4 ); addOutgoingPacket( packet ); } /** * Receives server ( server side download ) bandwidth in active connection * @param packetX */ public void receiveReadStepClient ( RtmpPacket packetX ) { // System.out.println( System.currentTimeMillis() + " " + id + " ClientController.receiveReadStepClient " + packetX ); // System.out.println( Encoder.bytesToInt( packetX.body ) ); } /** * Sends client ( client side download ) bandwidth in passive connection * @param packetX **/ public void sendReadStepClient ( int multiX ) { // System.out.println( System.currentTimeMillis( ) + " " + id + " ClientController.sendReadStepClient " ); RtmpPacket packet = new RtmpPacket ( ); packet.bodyType = 0x06; packet.rtmpChannel = 0x02; packet.body = Encoder.concatenate( Encoder.intToBytes( multiX , 4 ) , new byte [ ] { 0x02 } ); addOutgoingPacket( packet ); } /** * Receives client ( client side download ) bandwidth in active connection * @param packetX **/ public void receiveReadStepServer ( RtmpPacket packetX ) { // System.out.println( System.currentTimeMillis() + " " + id + " RTMPFlowController.receiveReadStepServer " + packetX ); // System.out.println( Encoder.bytesToInt( packetX.body ) ); } /** * Unknown packet type * @param packet */ public void receiveUnknown ( RtmpPacket packetX ) { // System.out.println( System.currentTimeMillis() + " " + id + "Unknown RtmpPacket " + packetX.toString( ) ); } /** * Receives invoke * @param packetX rtmp packet **/ public void receiveInvoke ( RtmpPacket packetX ) { // System.out.println( System.currentTimeMillis( ) + " " + id + " ClientController.receiveInvoke " ); try { // decode packet WrapperList arguments = Encoder.decode( packetX.body ); String invokeId = arguments.getString( 0 ); // System.out.println( "invoke: " + invokeId ); if ( invokeId.equals( "receiveAudio" ) ) streamController.onAudioReceiveState ( arguments , packetX ); else if ( invokeId.equals( "receiveVideo" ) ) streamController.onVideoReceiveState ( arguments , packetX ); else if ( invokeId.equals( "createStream" ) ) streamController.onStreamCreateRequest ( arguments , packetX ); else if ( invokeId.equals( "deleteStream" ) ) streamController.onStreamDeleteRequest ( arguments , packetX ); else if ( invokeId.equals( "closeStream" ) ) streamController.onStreamCloseRequest ( arguments , packetX ); else if ( invokeId.equals( "play" ) ) streamController.onStreamPlayRequest ( arguments , packetX ); else if ( invokeId.equals( "pause" ) ) streamController.onStreamPauseRequest ( arguments , packetX ); else if ( invokeId.equals( "seek" ) ) streamController.onStreamSeekRequest ( arguments , packetX ); else if ( invokeId.equals( "speed" ) ) streamController.onStreamSpeedRequest ( arguments , packetX ); else if ( invokeId.equals( "publish" ) ) streamController.onStreamPublishRequest( arguments , packetX ); else
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -