⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 netmanager.cc

📁 nandflash文件系统源代码
💻 CC
字号:
//---------------------------------------------------------- -*- Mode: C++ -*-// $Id: NetManager.cc 207 2008-11-03 20:29:50Z sriramsrao $ //// Created 2006/03/14// Author: Sriram Rao//// Copyright 2008 Quantcast Corp.// Copyright 2006-2008 Kosmix Corp.//// This file is part of Kosmos File System (KFS).//// Licensed under the Apache License, Version 2.0// (the "License"); you may not use this file except in compliance with// the License. You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or// implied. See the License for the specific language governing// permissions and limitations under the License.//// //----------------------------------------------------------------------------#include <sys/poll.h>#include <cerrno>#include <boost/scoped_array.hpp>#include "NetManager.h"#include "TcpSocket.h"#include "Globals.h"#include "common/log.h"using std::mem_fun;using std::list;using namespace KFS;using namespace KFS::libkfsio;NetManager::NetManager() : mDiskOverloaded(false), mNetworkOverloaded(false), mMaxOutgoingBacklog(0){    pthread_mutexattr_t mutexAttr;    int rval;    rval = pthread_mutexattr_init(&mutexAttr);    assert(rval == 0);    rval = pthread_mutexattr_settype(&mutexAttr, PTHREAD_MUTEX_RECURSIVE);    assert(rval == 0);    mSelectTimeout.tv_sec = 10;    mSelectTimeout.tv_usec = 0;}NetManager::NetManager(const struct timeval &selectTimeout) :     mDiskOverloaded(false), mNetworkOverloaded(false), mMaxOutgoingBacklog(0){    mSelectTimeout.tv_sec = selectTimeout.tv_sec;    mSelectTimeout.tv_usec = selectTimeout.tv_usec;}NetManager::~NetManager(){    NetConnectionListIter_t iter;    NetConnectionPtr conn;        mTimeoutHandlers.clear();    mConnections.clear();}voidNetManager::AddConnection(NetConnectionPtr &conn){    mConnections.push_back(conn);}voidNetManager::RegisterTimeoutHandler(ITimeout *handler){    mTimeoutHandlers.push_back(handler);}voidNetManager::UnRegisterTimeoutHandler(ITimeout *handler){    list<ITimeout *>::iterator iter;    ITimeout *tm;        for (iter = mTimeoutHandlers.begin(); iter != mTimeoutHandlers.end();          ++iter) {        tm = *iter;        if (tm == handler) {            mTimeoutHandlers.erase(iter);            return;        }    }}voidNetManager::MainLoop(){    boost::scoped_array<struct pollfd> pollfds;    uint32_t pollFdSize = 1024;    int numPollFds, fd, res;    NetConnectionPtr conn;    NetConnectionListIter_t iter, eltToRemove;    int pollTimeoutMs;    // if we have too many bytes to send, throttle incoming    int64_t totalNumBytesToSend = 0;    bool overloaded = false;    pollfds.reset(new struct pollfd[pollFdSize]);    while (1) {        if (mConnections.size() > pollFdSize) {            pollFdSize = mConnections.size();            pollfds.reset(new struct pollfd[pollFdSize]);        }        // build poll vector:         // make sure we are listening to the net kicker        fd = globals().netKicker.GetFd();        pollfds[0].fd = fd;        pollfds[0].events = POLLIN;        pollfds[0].revents = 0;        numPollFds = 1;        overloaded = IsOverloaded(totalNumBytesToSend);        int numBytesToSend;        totalNumBytesToSend = 0;        for (iter = mConnections.begin(); iter != mConnections.end(); ++iter) {            conn = *iter;            fd = conn->GetFd();            if (fd < 0) {                // we'll get rid of this connection in the while loop below                conn->mPollVectorIndex = -2;                continue;            }            if (fd == globals().netKicker.GetFd()) {                conn->mPollVectorIndex = -1;                continue;            }            conn->mPollVectorIndex = numPollFds;            pollfds[numPollFds].fd = fd;            pollfds[numPollFds].events = 0;            pollfds[numPollFds].revents = 0;            if (conn->IsReadReady(overloaded)) {                // By default, each connection is read ready.  We                // expect there to be 2-way data transmission, and so                // we are read ready.  In overloaded state, we only                // add the fd to the poll vector if the fd is given a                // special pass                pollfds[numPollFds].events |= POLLIN;            }            numBytesToSend = conn->GetNumBytesToWrite();            if (numBytesToSend > 0) {                totalNumBytesToSend += numBytesToSend;                // An optimization: if we are not sending any data for                // this fd in this round of poll, don't bother adding                // it to the poll vector.                pollfds[numPollFds].events |= POLLOUT;            }            numPollFds++;        }        if (!overloaded) {            overloaded = IsOverloaded(totalNumBytesToSend);            if (overloaded)                continue;        }        struct timeval startTime, endTime;        gettimeofday(&startTime, NULL);        pollTimeoutMs = mSelectTimeout.tv_sec * 1000;        res = poll(pollfds.get(), numPollFds, pollTimeoutMs);        if ((res < 0) && (errno != EINTR)) {            perror("poll(): ");            continue;        }        gettimeofday(&endTime, NULL);        // list of timeout handlers...call them back        fd = globals().netKicker.GetFd();        if (pollfds[0].revents & POLLIN) {            globals().netKicker.Drain();            globals().diskManager.ReapCompletedIOs();        }        for_each (mTimeoutHandlers.begin(), mTimeoutHandlers.end(),                   mem_fun(&ITimeout::TimerExpired));        iter = mConnections.begin();        while (iter != mConnections.end()) {            conn = *iter;            // Something happened and the connection has closed.  So,            // remove the connection from our list.            if (conn->GetFd() < 0) {                eltToRemove = iter;                ++iter;                mConnections.erase(eltToRemove);                continue;            }            if ((conn->GetFd() == globals().netKicker.GetFd()) ||                (conn->mPollVectorIndex < 0)) {                ++iter;                continue;            }            if (pollfds[conn->mPollVectorIndex].revents & POLLIN) {                fd = conn->GetFd();                if (fd > 0) {                    conn->HandleReadEvent(overloaded);                }            }            // conn could have closed due to errors during read.  so,            // need to re-get the fd and check that all is good            if (pollfds[conn->mPollVectorIndex].revents & POLLOUT) {                fd = conn->GetFd();                if (fd > 0) {                    conn->HandleWriteEvent();                }            }            if ((pollfds[conn->mPollVectorIndex].revents & POLLERR) ||                (pollfds[conn->mPollVectorIndex].revents & POLLHUP)) {                fd = conn->GetFd();                if (fd > 0) {                    conn->HandleErrorEvent();                }            }            ++iter;        }    }}boolNetManager::IsOverloaded(int64_t numBytesToSend){    static bool wasOverloaded = false;    if (mMaxOutgoingBacklog > 0) {        if (!mNetworkOverloaded) {            mNetworkOverloaded = (numBytesToSend > mMaxOutgoingBacklog);        } else if (numBytesToSend <= mMaxOutgoingBacklog / 2) {            // network was overloaded and that has now cleared            mNetworkOverloaded = false;        }    }    bool isOverloaded = mDiskOverloaded || mNetworkOverloaded;    if (!wasOverloaded && isOverloaded) {        KFS_LOG_VA_INFO("System is now in overloaded state (%ld bytes to send; %d disk IO's) ",                        numBytesToSend, globals().diskManager.NumDiskIOOutstanding());    } else if (wasOverloaded && !isOverloaded) {        KFS_LOG_VA_INFO("Clearing system overload state (%ld bytes to send; %d disk IO's)",                        numBytesToSend, globals().diskManager.NumDiskIOOutstanding());    }    wasOverloaded = isOverloaded;    return isOverloaded;}voidNetManager::ChangeDiskOverloadState(bool v){    if (mDiskOverloaded == v)        return;    mDiskOverloaded = v;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -