⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 iocpserver.cpp

📁 这个是网络编程
💻 CPP
📖 第 1 页 / 共 4 页
字号:
                     WaitEvents[MAX_COMPLETION_THREAD_COUNT],
                     hrc;
    int              endpointcount=0,
                     waitcount=0,
                     interval,
                     rc,
                     i;
    struct addrinfo *res=NULL,
                    *ptr=NULL;

    // Validate the command line
    ValidateArgs(argc, argv);

    // Load Winsock
    if (WSAStartup(MAKEWORD(2,2), &wsd) != 0)
    {
        fprintf(stderr, "unable to load Winsock!\n");
        return -1;
    }

    InitializeCriticalSection(&gSocketListCs);
    InitializeCriticalSection(&gBufferListCs);
    InitializeCriticalSection(&gPendingCritSec);

    // Create the completion port used by this server
    CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0);
    if (CompletionPort == NULL)
    {
        fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError());
        return -1;
    }

    // Find out how many processors are on this system
    GetSystemInfo(&sysinfo);

    if (sysinfo.dwNumberOfProcessors > MAX_COMPLETION_THREAD_COUNT)
    {
        sysinfo.dwNumberOfProcessors = MAX_COMPLETION_THREAD_COUNT;
    }

    // Round the buffer size to the next increment of the page size
    if ((gBufferSize % sysinfo.dwPageSize) != 0)
    {
        gBufferSize = ((gBufferSize / sysinfo.dwPageSize) + 1) * sysinfo.dwPageSize;
    }

    printf("Buffer size = %lu (page size = %lu)\n", 
        gBufferSize, sysinfo.dwPageSize);
    
    // Create the worker threads to service the completion notifications
    for(waitcount=0; waitcount < (int)sysinfo.dwNumberOfProcessors ;waitcount++)
    {
        WaitEvents[waitcount] = CreateThread(NULL, 0, CompletionThread, (LPVOID)CompletionPort, 0, NULL);
        if (WaitEvents[waitcount] == NULL)
        {
            fprintf(stderr, "CreatThread failed: %d\n", GetLastError());
            return -1;
        }
    }

    printf("Local address: %s; Port: %s; Family: %d\n",
            gBindAddr, gBindPort, gAddressFamily);

    // Obtain the "wildcard" addresses for all the available address families
    res = ResolveAddress(gBindAddr, gBindPort, gAddressFamily, gSocketType, gProtocol);
    if (res == NULL)
    {
        fprintf(stderr, "ResolveAddress failed to return any addresses!\n");
        return -1;
    }

    // For each local address returned, create a listening/receiving socket
    ptr = res;
    while (ptr)
    {
        printf("Listening address: ");
        PrintAddress(ptr->ai_addr, ptr->ai_addrlen); 
        printf("\n");

        listenobj = (LISTEN_OBJ *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(LISTEN_OBJ));
        if (listenobj == NULL)
        {
            fprintf(stderr, "Out of memory!\n");
            return -1;
        }
        
        listenobj->LoWaterMark = gInitialAccepts;

        InitializeCriticalSection(&listenobj->ListenCritSec);
        
        // Save off the address family of this socket
        listenobj->AddressFamily = ptr->ai_family;

        // create the socket
        listenobj->s = socket(ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol);
        if (listenobj->s == INVALID_SOCKET)
        {
            fprintf(stderr, "socket failed: %d\n", WSAGetLastError());
            return -1;
        }

        // Create an event to register for FD_ACCEPT events on
        listenobj->AcceptEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
        if (listenobj->AcceptEvent == NULL)
        {
            fprintf(stderr, "CreateEvent failed: %d\n", GetLastError());
            return -1;
        }

        listenobj->RepostAccept = CreateEvent(NULL, TRUE, FALSE, NULL);
        if (listenobj->RepostAccept == NULL)
        {
            fprintf(stderr, "CreateSemaphore failed: %d\n", GetLastError());
            return -1;
        }

        // Add the event to the liste of waiting events
        WaitEvents[waitcount++] = listenobj->AcceptEvent;

        WaitEvents[waitcount++] = listenobj->RepostAccept;

        // Associate the socket and its SOCKET_OBJ to the completion port
        hrc = CreateIoCompletionPort((HANDLE)listenobj->s, CompletionPort, (ULONG_PTR)listenobj, 0);
        if (hrc == NULL)
        {
            fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError());
            return -1;
        }

        // bind the socket to a local address and port
        rc = bind(listenobj->s, ptr->ai_addr, ptr->ai_addrlen);
        if (rc == SOCKET_ERROR)
        {
            fprintf(stderr, "bind failed: %d\n", WSAGetLastError());
            return -1;
        }

        // Need to load the Winsock extension functions from each provider
        //    -- e.g. AF_INET and AF_INET6. 
        rc = WSAIoctl(
                listenobj->s,
                SIO_GET_EXTENSION_FUNCTION_POINTER,
               &guidAcceptEx,
                sizeof(guidAcceptEx),
               &listenobj->lpfnAcceptEx,
                sizeof(listenobj->lpfnAcceptEx),
               &bytes,
                NULL,
                NULL
                );
        if (rc == SOCKET_ERROR)
        {
            fprintf(stderr, "WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER failed: %d\n",
                    WSAGetLastError());
            return -1;
        }

        // Load the Winsock extensions from each provider
        rc = WSAIoctl(
                listenobj->s,
                SIO_GET_EXTENSION_FUNCTION_POINTER,
               &guidGetAcceptExSockaddrs,
                sizeof(guidGetAcceptExSockaddrs),
               &listenobj->lpfnGetAcceptExSockaddrs,
                sizeof(listenobj->lpfnGetAcceptExSockaddrs),
               &bytes,
                NULL,
                NULL
                );
        if (rc == SOCKET_ERROR)
        {
            fprintf(stderr, "WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER faled: %d\n",
                    WSAGetLastError());
            return -1;
        }

        // Put the socket into listening mode
        rc = listen(listenobj->s, 200);
        if (rc == SOCKET_ERROR)
        {
            fprintf(stderr, "listen failed: %d\n", WSAGetLastError());
            return -1;
        }

        // Register for FD_ACCEPT notification on listening socket
        rc = WSAEventSelect(listenobj->s, listenobj->AcceptEvent, FD_ACCEPT);
        if (rc == SOCKET_ERROR)
        {
            fprintf(stderr, "WSAEventSelect failed: %d\n", WSAGetLastError());
            return -1;
        }

        // Initiate the initial accepts for each listen socket
        for(i=0; i < gInitialAccepts ;i++)
        {
            acceptobj = GetBufferObj(gBufferSize);
            if (acceptobj == NULL)
            {
                fprintf(stderr, "Out of memory!\n");
                return -1;
            }

            acceptobj->PostAccept = listenobj->AcceptEvent;

            InsertPendingAccept(listenobj, acceptobj);

            PostAccept(listenobj, acceptobj);
        }

        //
        // Maintain a list of the listening socket structures
        //
        if (ListenSockets == NULL)
        {
            ListenSockets = listenobj;
        }
        else
        {
            listenobj->next = ListenSockets;
            ListenSockets   = listenobj;
        }

        endpointcount++;
        ptr = ptr->ai_next;
    }
    // free the addrinfo structure for the 'bind' address
    freeaddrinfo(res);

    gStartTime = gStartTimeLast = GetTickCount();

    interval = 0;
    while (1)
    {
        rc = WSAWaitForMultipleEvents(
                waitcount,
                WaitEvents,
                FALSE,
                5000,
                FALSE
                );
        if (rc == WAIT_FAILED)
        {
            fprintf(stderr, "WSAWaitForMultipleEvents failed: %d\n", WSAGetLastError());
            break;
        }
        else if (rc == WAIT_TIMEOUT)
        {
            interval++;

            PrintStatistics();

            if (interval == 36)
            {
                int          optval,
                             optlen;

                // For TCP, cycle through all the outstanding AcceptEx operations
                //   to see if any of the client sockets have been connected but
                //   haven't received any data. If so, close them as they could be
                //   a denial of service attack.

                listenobj = ListenSockets;
                while (listenobj)
                {
                    EnterCriticalSection(&listenobj->ListenCritSec);

                    acceptobj = listenobj->PendingAccepts;

                    while (acceptobj)
                    {
                        optlen = sizeof(optval);
                        rc = getsockopt(
                                acceptobj->sclient,
                                SOL_SOCKET,
                                SO_CONNECT_TIME,
                                (char *)&optval,
                               &optlen
                                );
                        if (rc == SOCKET_ERROR)
                        {
                            fprintf(stderr, "getsockopt: SO_CONNECT_TIME failed: %d\n",
                                    WSAGetLastError());
                        }
                        else
                        {
                            // If the socket has been connected for more than 5 minutes,
                            //    close it. If closed, the AcceptEx call will fail in the
                            //    completion thread.
                            if ((optval != 0xFFFFFFFF) && (optval > 300))
                            {
                                printf("closing stale handle\n");
                                closesocket(acceptobj->sclient);
                                acceptobj->sclient = INVALID_SOCKET;
                            }
                        }

                        acceptobj = acceptobj->next;
                    }
                    LeaveCriticalSection(&listenobj->ListenCritSec);

                    listenobj = listenobj->next;
                }
                interval = 0;
            }
        }
        else
        {
            int index;

            index = rc - WAIT_OBJECT_0;

            for( ; index < waitcount ; index++)
            {
                rc = WaitForSingleObject(WaitEvents[index], 0);
                if (rc == WAIT_FAILED || rc == WAIT_TIMEOUT)
                {
                    continue;
                }
                if (index < (int)sysinfo.dwNumberOfProcessors)
                {
                    // One of the completion threads exited
                    //   This is bad so just bail - a real server would want
                    //   to gracefully exit but this is just a sample ...
                    ExitProcess(-1);
                }
                else
                {
                    // An FD_ACCEPT event occured
                    listenobj = ListenSockets;
                    while (listenobj)
                    {
                        if ((listenobj->AcceptEvent == WaitEvents[index]) || 
                                (listenobj->RepostAccept  == WaitEvents[index]))
                            break;
                        listenobj = listenobj->next;
                    }

                    if (listenobj)
                    {
                        WSANETWORKEVENTS ne;
                        int              limit=0;

                        if (listenobj->AcceptEvent == WaitEvents[index])
                        {
                            // EnumNetworkEvents to see if FD_ACCEPT was set
                            rc = WSAEnumNetworkEvents(
                                    listenobj->s,
                                    listenobj->AcceptEvent,
                                    &ne
                                                     );
                            if (rc == SOCKET_ERROR)
                            {
                                fprintf(stderr, "WSAEnumNetworkEvents failed: %d\n",
                                        WSAGetLastError());
                            }
                            if ((ne.lNetworkEvents & FD_ACCEPT) == FD_ACCEPT)
                            {
                                // We got an FD_ACCEPT so post multiple accepts to 
                                // cover the burst
                                limit = BURST_ACCEPT_COUNT;
                            }
                        }
                        else if (listenobj->RepostAccept == WaitEvents[index])
                        {
                            // Semaphore is signaled
                            limit = InterlockedExchange(&listenobj->RepostCount, 0);

                            ResetEvent(listenobj->RepostAccept);
                        }

                        i = 0;
                        while ( (i++ < limit) &&
                                (listenobj->PendingAcceptCount < gMaxAccepts) )
                        {
                            acceptobj = GetBufferObj(gBufferSize);
                            if (acceptobj)
                            {
                                acceptobj->PostAccept = listenobj->AcceptEvent;

                                InsertPendingAccept(listenobj, acceptobj);

                                PostAccept(listenobj, acceptobj);
                            }
                        }
                    }
                }
            }
        }
    }

    WSACleanup();
    return 0;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -