⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 netrecv.cpp

📁 最近在学习directshow, Directshow实务精选的源代码
💻 CPP
字号:
//------------------------------------------------------------------------------
// File: Netrecv.cpp
//
// Desc: DirectShow sample code - implementation of DSNetwork sample filters
//
// Copyright (c) 2000-2002  Microsoft Corporation.  All rights reserved.
//------------------------------------------------------------------------------

#include "precomp.h"
#include "nutil.h"
#include "dsnetifc.h"
#include "le.h"
#include "buffpool.h"
#include "netrecv.h"
#include "dsrecv.h"

//  ---------------------------------------------------------------------------
//  ---------------------------------------------------------------------------

CNetReceiver::CNetReceiver (
    IN  CBufferPool *               pBufferPool,
    IN  CNetworkReceiverFilter *    pRecvFilter,
    OUT HRESULT *                   phr
    ) : m_hAsyncSocket              (INVALID_SOCKET),
        m_hThread                   (NULL),
        m_pBufferPool               (pBufferPool),
        m_lReadsPended              (0),
        m_pRecvFilter               (pRecvFilter)
{
    DWORD   dw ;
    DWORD   i ;

    //  non-failables first
    InitializeCriticalSection (& m_crt) ;
    ZeroMemory (& m_hEvents, EVENT_COUNT * sizeof HANDLE) ;

    //  ask for winsock 2.0
    i = WSAStartup (MAKEWORD(2, 0), & m_wsaData) ;
    if (i) {
        (* phr) = E_FAIL ;
        return ;
    }

    //  get our event objects
    for (i = 0; i < EVENT_COUNT; i++) {
        //  manual reset; non-signaled
        m_hEvents [i] = CreateEvent (NULL, TRUE, FALSE, NULL) ;

        if (m_hEvents [i] == NULL) {
            dw = GetLastError () ;
            (* phr) = HRESULT_FROM_WIN32 (dw) ;
            return ;
        }
    }
}

CNetReceiver::~CNetReceiver (
    )
{
    DWORD   i ;

    ASSERT (m_hAsyncSocket == INVALID_SOCKET) ;

    for (i = 0; i < EVENT_COUNT; i++) {
        if (m_hEvents [i] != NULL) {
            CloseHandle (m_hEvents [i]) ;
        }
        else {
            break ;
        }
    }

    WSACleanup () ;

    DeleteCriticalSection (& m_crt) ;
}

void
CALLBACK
CNetReceiver::AsyncCompletionCallback (
    IN  DWORD           dwError,
    IN  DWORD           dwBytesReceived,
    IN  LPWSAOVERLAPPED pOverlapped,
    IN  DWORD           dwFlags
    )
/*++
    Description:

        This routine is the async completion callback.  The callback is made
        on the thread that pended the IO when it becomes alertable.

    Parameters:

        dwError         - win32 error code, if any; NO_ERROR if successful

        dwBytesReceived - bytes received in the passed buffer

        pOverlapped     - OVERLAPPED struct specified in the pended read; we
                            use this to recover the original data structure

        dwFlags         - flags

    Return Values:

        none

--*/
{
    CBuffer *       pBuffer ;
    CNetReceiver *  pNetReceiver ;

    //  recover the CBuffer associated with this operation
    pBuffer = reinterpret_cast <CBuffer *> (pOverlapped -> hEvent) ;

    //  then the receiver associated with the buffer
    pNetReceiver = reinterpret_cast <CNetReceiver *> (pBuffer -> GetCompletionContext ()) ;

    //  set the valid length of data in the CBuffer object
    pBuffer -> SetPayloadLength (dwBytesReceived) ;

    //  pass this off to the receiver
    pNetReceiver -> ReadCompletion (pBuffer, dwError) ;
}

void
CNetReceiver::ReadCompletion (
    IN  CBuffer *   pBuffer,
    IN  DWORD       dwError
    )
/*++
    Routine Description:

        Processes a read completion, after all the context has been
        recovered by the original entry point.

    Arguments:

        pBuffer -   CBuffer object that was used to read into.

        dwError -   win32 error, if any; NO_ERROR if successful

    Return Values:

        none
--*/
{
    //  decrement the oustanding reads counter
    InterlockedDecrement (& m_lReadsPended) ;

    //  before processing any further pend another if we can
    PendReads_ () ;

    //  only pass up buffers we know are valid
    if (dwError == NO_ERROR) {
        m_pRecvFilter -> ProcessBuffer (pBuffer) ;
    }

    //  io's ref
    pBuffer -> Release () ;
}

void
CNetReceiver::PendReads_ (
    IN  DWORD   dwBufferWaitMax
    )
/*++
    Routine Description:

        Pends async reads on the socket, if we have less outstanding than
        is our max count.

    Arguments:

        dwBufferWaitMax - caller can specify how long they are willing to wait
                            (block) on the call to get a buffer to pend an IO
                            into.

    Return Values:

        none
--*/
{
    CBuffer *   pBuffer ;
    int         i ;
    WSABUF      wsabuf ;
    DWORD       dwBytesRead ;
    DWORD       dw ;
    DWORD       dwFlags ;

    Lock_ () ;

    //  don't pend if we might be shutdown
    if (m_hAsyncSocket != INVALID_SOCKET) {

        //  while we have reads to pend
        while (m_lReadsPended < MAX_READ_PENDS) {

            //  get a buffer
            pBuffer = m_pBufferPool -> GetBuffer (
                                            m_hEvents [EVENT_GET_BLOCK],
                                            dwBufferWaitMax
                                            ) ;

            if (pBuffer) {
                //  got a buffer

                //  init the overlapped struct so we can recover pBuffer from
                //  the completion routine; we can legally set the .hEvent
                //  member to a value we wish because our completion routine
                //  param is non-NULL when we pend the read.
                pBuffer -> GetOverlapped () -> Offset       = 0 ;
                pBuffer -> GetOverlapped () -> OffsetHigh   = 0 ;
                pBuffer -> GetOverlapped () -> hEvent       = (HANDLE) pBuffer ;

                pBuffer -> SetCompletionContext ((DWORD_PTR) this) ;

                wsabuf.buf  = reinterpret_cast <char *> (pBuffer -> GetBuffer ()) ;
                wsabuf.len  = pBuffer -> GetBufferLength () ;

                //  no flags
                dwFlags = 0 ;

                //  keep the buffer's ref as the io's

                //  and pend the read
                i = WSARecv (
                        m_hAsyncSocket,
                        & wsabuf,
                        1,
                        & dwBytesRead,
                        & dwFlags,
                        pBuffer -> GetOverlapped (),
                        CNetReceiver::AsyncCompletionCallback
                        ) ;

                if (i == SOCKET_ERROR) {
                    dw = WSAGetLastError () ;
                    if (dw != WSA_IO_PENDING) {
                        //  legitimate error; release and abort
                        pBuffer -> Release () ;
                        break ;
                    }

                    //  otherwise the error is WSA_IO_PENDING and we'll get notified
                    //  later
                }

                //  we might have completed synchronously; we'll still have
                //  a notification queued to us, regardless of sync/async
                //  completion; in the case of a sync completion, we'll dequeue
                //  in sequence with other possibly async-completed
                //  notifications

                InterlockedIncrement (& m_lReadsPended) ;
            }
            else {
                //  did not get a buffer; most likely all are spoken for and
                //  we timedout waiting
                break ;
            }
        }
    }

    Unlock_ () ;
}

HRESULT
CNetReceiver::JoinMulticast_ (
    IN  ULONG   ulIP,           //  IP; network order
    IN  USHORT  usPort,         //  port; network order
    IN  ULONG   ulNIC           //  network interface; network order
    )
/*++
    Routine Description:

        Joins an IP multicast group (ip, port).  Will listen for multicast
        traffic on the specified group only on the interface.

    Arguments:

        ulIP    - multicast IP; must be class D; network order
        usPort  - multicast port; network order
        ulNIC   - NIC; network order

    Return Values:

        S_OK            - success
        failed HRESULT  - failure
--*/
{
    BOOL                t ;
    struct ip_mreq      mreq ;
    int                 i ;
    struct sockaddr_in  saddr ;
    DWORD               dw ;

    Lock_ () ;

    //  create our socket; async
    m_hAsyncSocket = WSASocket(
                        AF_INET,
                        SOCK_DGRAM,
                        0,
                        NULL,
                        0,
                        WSA_FLAG_MULTIPOINT_C_LEAF | WSA_FLAG_MULTIPOINT_D_LEAF | WSA_FLAG_OVERLAPPED
                        ) ;
    if (m_hAsyncSocket == INVALID_SOCKET) {
        goto failure ;
    }

    t = TRUE ;
    i = setsockopt(
            m_hAsyncSocket,
            SOL_SOCKET,
            SO_REUSEADDR,
            (char *)& t,
            sizeof t
            ) ;
    if (i == SOCKET_ERROR) {
        goto failure ;
    }

    ZeroMemory (& saddr, sizeof saddr) ;
    saddr.sin_family            = AF_INET ;
    saddr.sin_port              = usPort ;          //  want data on this UDP port
    saddr.sin_addr.S_un.S_addr  = INADDR_ANY ;      //  don't care about NIC we're bound to

    i = bind(
            m_hAsyncSocket,
            (LPSOCKADDR) & saddr,
            sizeof saddr
            ) ;
    if (i == SOCKET_ERROR) {
        goto failure ;
    }

    ZeroMemory (& mreq, sizeof mreq) ;
    mreq.imr_multiaddr.s_addr   = ulIP ;            //  mcast IP (port specified when we bind)
    mreq.imr_interface.s_addr   = ulNIC ;           //  over this NIC

    i = setsockopt (
            m_hAsyncSocket,
            IPPROTO_IP,
            IP_ADD_MEMBERSHIP,
            (char *) & mreq,
            sizeof mreq
            ) ;
    if (i == SOCKET_ERROR) {
        goto failure ;
    }

    Unlock_ () ;

    //  success
    return S_OK ;

    failure :

    dw = WSAGetLastError () ;
    LeaveMulticast_ () ;

    Unlock_ () ;

    return HRESULT_FROM_WIN32 (dw) ;
}

void
CNetReceiver::LeaveMulticast_ (
    )
/*++
    Routine Description:

        Leaves a multicast, if we are currently joined.  Does nothing if not
        joined.

    Arguments:

        none

    Return Values:

        none
--*/
{
    Lock_ () ;

    if (m_hAsyncSocket != INVALID_SOCKET) {
        closesocket (m_hAsyncSocket) ;
        m_hAsyncSocket = INVALID_SOCKET ;
    }

    Unlock_ () ;
}

void
CNetReceiver::ThreadProc (
    )
/*++
    Routine Description:

        Worker thread proc.  Thread loops until the stop event becomes
        signaled.  Thread blocks in an alertable state so it can handle
        async io completions on the socket.  If it times out, it tries to
        pend reads.

    Arguments:

        none

    Return Values:

        none
--*/
{
    DWORD   r ;

    for (;;) {
        r = WaitForMultipleObjectsEx (EVENT_COUNT, m_hEvents, FALSE, PULSE_MILLIS, TRUE) ;

        //  stop event was signaled; break
        if (r - WAIT_OBJECT_0 == EVENT_STOP) {
            break ;
        }

        //  timed out; try to pend more reads
        if (r == WAIT_TIMEOUT) {
            PendReads_ () ;
        }
    }

    return ;
}

HRESULT
CNetReceiver::Activate (
    IN  ULONG   ulIP,           //  IP; network order
    IN  USHORT  usPort,         //  port; network order
    IN  ULONG   ulNIC           //  network interface; network order
    )
/*++
    Routine Description:

        Activates this object.  When this object is active, it is joined to
        a multicast group (ulIP, usPort) on the specified NIC; it hosts a
        thread that pends async IO on the socket, and processes completions
        by posting the buffers into the filter.

    Arguments:

        ulIP    - multicast IP; must be class D; network order
        usPort  - multicast port; network order
        ulNIC   - NIC; network order

    Return Values:

        S_OK            - success
        failed HRESULT  - failure

--*/
{
    HRESULT hr ;
    DWORD   dw ;

    Lock_ () ;

    ASSERT (IsMulticastIP (ulIP)) ;              //  class d
    ASSERT (m_hThread == NULL) ;                //  no thread
    ASSERT (m_hEvents [EVENT_STOP] != NULL) ;   //  we can stop it

    //  initialize
    ResetEvent (m_hEvents [EVENT_STOP]) ;
    m_lReadsPended = 0 ;

    //  join the specified multicast group
    hr = JoinMulticast_ (ulIP, usPort, ulNIC) ;
    if (SUCCEEDED (hr)) {

        //  create the thread
        m_hThread = CreateThread (
                        NULL,
                        0,
                        (LPTHREAD_START_ROUTINE) CNetReceiver::ThreadEntry,
                        this,
                        NULL,
                        & dw
                        ) ;
        if (m_hThread) {

            //  success
            hr = S_OK ;
            ASSERT (m_hAsyncSocket != INVALID_SOCKET) ;
        }
        else {
            //  failed; shut everything down
            dw = GetLastError () ;
            LeaveMulticast_ () ;
            hr = HRESULT_FROM_WIN32 (dw) ;
        }
    }
    else {
        ASSERT (m_hAsyncSocket == INVALID_SOCKET) ;
    }

    Unlock_ () ;

    return S_OK ;
}

HRESULT
CNetReceiver::Stop (
    )
{
    //  signal thread to stop
    SetEvent (m_hEvents [EVENT_STOP]) ;

    //  leave the multicast
    LeaveMulticast_ () ;

    if (m_hThread) {
        //  wait for our thread to complete; cleanup
        WaitForSingleObject (m_hThread, INFINITE) ;
        CloseHandle (m_hThread) ;
        m_hThread = NULL ;
    }

    return S_OK ;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -