📄 threadbalancer.cpp
字号:
// ========================================================
// Thread management/balancing
//
// Design and Implementation by Floris van den Berg
// ========================================================
#pragma warning (disable : 4275)
#pragma warning (disable : 4786)
#include <algorithm>
#include <vector>
#ifdef WIN32
#include <windows.h> // needed for SleepEx
#endif
#include <boost/thread/xtime.hpp>
#include "Agent.h"
#include "OpenNet.h"
#include "Transport.h"
#include "ThreadBalancer.h"
// --------------------------------------------------------
#define BALANCE 2
// --------------------------------------------------------
// Thread functor implementation
// --------------------------------------------------------
ThreadFunction::ThreadFunction(ThreadEntry *entry) :
m_entry(entry) {
}
void
ThreadFunction::iterate() {
boost::mutex::scoped_lock scoped_lock(m_entry->cs_holder);
for (DataList::iterator i = m_entry->contents.begin(); i != m_entry->contents.end(); ++i)
(*i)->transport->PollProgress();
}
void
ThreadFunction::operator()() {
while (m_entry->thread_running) {
// call all transports in the chain
iterate();
// boost sure has a lame system to do a simple sleep
if (m_entry->thread_running) {
#ifdef WIN32
SleepEx(1, TRUE);
#else
boost::xtime xt;
boost::xtime_get(&xt, boost::TIME_UTC);
xt.nsec += 1000000;
boost::thread::sleep(xt);
#endif
}
}
}
// --------------------------------------------------------
// Thread balancer implementation
// --------------------------------------------------------
ThreadBalancer::ThreadBalancer() :
m_threads_unbalanced(),
m_threads_balanced(),
m_cs(),
m_entry_number(0) {
int cpu_count = EpGetCpuCount() * 2;
for (int i = 0; i < cpu_count; ++i) {
ThreadEntry *thread = new ThreadEntry;
thread->thread_running = true;
thread->allow_balancing = true;
ThreadFunction function(thread);
thread->thread_handle = new boost::thread(function);
m_threads_balanced.push_back(thread);
}
}
ThreadBalancer::~ThreadBalancer() {
AbortThreads(m_threads_unbalanced);
AbortThreads(m_threads_balanced);
}
// --------------------------------------------------------
// Add and remove transports
// --------------------------------------------------------
int
ThreadBalancer::addTransport(ITransport *transport, bool allow_balancing) {
// allocate a new entry
DataEntry *entry = new DataEntry;
entry->transport = transport;
entry->entry_id = ++m_entry_number;
// If there are still threads unused in the pool, spawn one.
// Else add the entry to an existing thread
if (allow_balancing) {
int entry_count = 0;
ThreadList::iterator i;
ThreadList::iterator j;
i = j = m_threads_balanced.begin();
// find a nice thread to put the transport in
for (++i; i != m_threads_balanced.end(); ++i) {
boost::mutex::scoped_lock scoped_lock((*i)->cs_holder);
if ((*i)->contents.size() < (*j)->contents.size()) {
entry_count = (*i)->contents.size();
j = i;
}
}
// and finally add it
boost::mutex::scoped_lock scoped_lock((*j)->cs_holder);
(*j)->contents.push_back(entry);
} else {
ThreadEntry *thread = new ThreadEntry;
thread->allow_balancing = false;
thread->thread_running = true;
thread->contents.push_back(entry);
ThreadFunction function(thread);
thread->thread_handle = new boost::thread(function);
m_threads_unbalanced.push_back(thread);
}
return m_entry_number;
}
void
ThreadBalancer::removeTransport(int id) {
if (!removeEntry(m_threads_unbalanced, id))
removeEntry(m_threads_balanced, id);
}
bool
ThreadBalancer::removeEntry(ThreadList &list, int id) {
for (ThreadList::iterator i = list.begin(); i != list.end(); ++i) {
ThreadEntry *thread = (*i);
for (DataList::iterator j = thread->contents.begin(); j != thread->contents.end(); ++j) {
DataEntry *tmp = (*j);
if ((*j)->entry_id == id) {
int number_of_items = removeTransportFromThread(thread, j);
// if the thread is now empty, remove the thread from the pool
// if it is not, look at the balance treshold. when it is equal or
// smaller than the constant BALANCE then balance the threads
if (number_of_items == 0) {
if (!thread->allow_balancing) {
thread->thread_running = false;
thread->thread_handle->join();
delete thread->thread_handle;
delete thread;
list.erase(i);
} else {
balance();
}
} else if (number_of_items <= BALANCE) {
balance();
}
}
return true;
}
}
return false;
}
int
ThreadBalancer::removeTransportFromThread(ThreadEntry *thread, DataList::iterator j) {
boost::mutex::scoped_lock scoped_lock(thread->cs_holder);
delete (*j);
thread->contents.erase(j);
return thread->contents.size();
}
void
ThreadBalancer::AbortThreads(ThreadList &list) {
while (!list.empty()) {
ThreadEntry *thread = *list.begin();
thread->thread_running = false;
thread->thread_handle->join();
delete thread->thread_handle;
delete thread;
list.pop_front();
}
}
// --------------------------------------------------------
// Balance the thread load again. This operation is
// executed when:
//
// 1) a transport is removed and there only BALANCE items
// or fewer left
//
// 2) a transport is removed and the thread is empty
// --------------------------------------------------------
void
ThreadBalancer::balance() {
assert(m_threads_balanced.size() > 1);
_STL::vector<DataEntry *> tmp_list;
_STL::vector<boost::mutex::scoped_lock *> cs_list;
// enter critical sections
for (ThreadList::iterator m = m_threads_balanced.begin(); m != m_threads_balanced.end(); ++m)
cs_list.push_back(new boost::mutex::scoped_lock((*m)->cs_holder));
// first make a long list of all entries in all threads
for (ThreadList::iterator i = m_threads_balanced.begin(); i != m_threads_balanced.end(); ++i) {
for (DataList::iterator j = (*i)->contents.begin(); j != (*i)->contents.end(); ++j)
tmp_list.push_back(*j);
(*i)->contents.clear();
}
// spread the remaining transports over the remaining threads
int average = tmp_list.size() / m_threads_balanced.size();
int first = average + (tmp_list.size() % m_threads_balanced.size());
// spread all entries evenly over the threads
// the first thread can have more entries than the rest
ThreadList::iterator j = m_threads_balanced.begin();
int k = 0;
if (first > 0) {
for (int l = 0; l < first; ++l) {
(*j)->contents.push_back(tmp_list[k]);
++k;
}
++j;
}
// then all other threads
for (; j != m_threads_balanced.end(); ++j) {
for (int l = 0; l < average; ++l) {
(*j)->contents.push_back(tmp_list[k]);
++k;
}
}
// leave critical sections
for (_STL::vector<boost::mutex::scoped_lock *>::iterator l = cs_list.begin(); l != cs_list.end(); ++l)
delete *l;
}
// --------------------------------------------------------
// Singleton implementation
// --------------------------------------------------------
ThreadBalancer *ThreadBalancer::instance = NULL;
ThreadBalancer *
ThreadBalancer::getInstance() {
if (!isInstantiated())
ThreadBalancer::instance = new ThreadBalancer;
return ThreadBalancer::instance;
}
void
ThreadBalancer::destroyInstance() {
if (isInstantiated()) {
delete ThreadBalancer::instance;
ThreadBalancer::instance = NULL;
}
}
bool
ThreadBalancer::isInstantiated() {
return (ThreadBalancer::instance != NULL);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -