⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threadbalancer.cpp

📁 .net 方面的开发说明资料。
💻 CPP
字号:
// ========================================================
// Thread management/balancing
//
// Design and Implementation by Floris van den Berg
// ========================================================

#pragma warning (disable : 4275)
#pragma warning (disable : 4786)

#include <algorithm>
#include <vector>

#ifdef WIN32
#include <windows.h> // needed for SleepEx
#endif

#include <boost/thread/xtime.hpp>

#include "Agent.h"
#include "OpenNet.h"
#include "Transport.h"
#include "ThreadBalancer.h"

// --------------------------------------------------------

#define BALANCE 2

// --------------------------------------------------------
// Thread functor implementation
// --------------------------------------------------------

ThreadFunction::ThreadFunction(ThreadEntry *entry) :
m_entry(entry) {
}

void
ThreadFunction::iterate() {
	boost::mutex::scoped_lock scoped_lock(m_entry->cs_holder);

	for (DataList::iterator i = m_entry->contents.begin(); i != m_entry->contents.end(); ++i)
		(*i)->transport->PollProgress();
}

void
ThreadFunction::operator()() {
	while (m_entry->thread_running) {
		// call all transports in the chain

		iterate();

		// boost sure has a lame system to do a simple sleep

		if (m_entry->thread_running) {
#ifdef WIN32
			SleepEx(1, TRUE);
#else
			boost::xtime xt;
			boost::xtime_get(&xt, boost::TIME_UTC);
			xt.nsec += 1000000;
			boost::thread::sleep(xt);
#endif
		}
	}
}

// --------------------------------------------------------
// Thread balancer implementation
// --------------------------------------------------------

ThreadBalancer::ThreadBalancer() :
m_threads_unbalanced(),
m_threads_balanced(),
m_cs(),
m_entry_number(0) {
	int cpu_count = EpGetCpuCount() * 2;

	for (int i = 0; i < cpu_count; ++i) {		
		ThreadEntry *thread = new ThreadEntry;
		thread->thread_running = true;
		thread->allow_balancing = true;

		ThreadFunction function(thread);
		thread->thread_handle = new boost::thread(function);

		m_threads_balanced.push_back(thread);
	}
}

ThreadBalancer::~ThreadBalancer() {
	AbortThreads(m_threads_unbalanced);
	AbortThreads(m_threads_balanced);
}

// --------------------------------------------------------
// Add and remove transports
// --------------------------------------------------------

int
ThreadBalancer::addTransport(ITransport *transport, bool allow_balancing) {
	// allocate a new entry

	DataEntry *entry = new DataEntry;
	entry->transport = transport;
	entry->entry_id = ++m_entry_number;

	// If there are still threads unused in the pool, spawn one.
	// Else add the entry to an existing thread

	if (allow_balancing) {
		int entry_count = 0;
		ThreadList::iterator i;
		ThreadList::iterator j;
		i = j = m_threads_balanced.begin();

		// find a nice thread to put the transport in

		for (++i; i != m_threads_balanced.end(); ++i) {
			boost::mutex::scoped_lock scoped_lock((*i)->cs_holder);

			if ((*i)->contents.size() < (*j)->contents.size()) {
				entry_count = (*i)->contents.size();

				j = i;
			}
		}

		// and finally add it

		boost::mutex::scoped_lock scoped_lock((*j)->cs_holder);

		(*j)->contents.push_back(entry);
	} else {
		ThreadEntry *thread = new ThreadEntry;
		thread->allow_balancing = false;
		thread->thread_running = true;
		thread->contents.push_back(entry);		

		ThreadFunction function(thread);
		thread->thread_handle = new boost::thread(function);

		m_threads_unbalanced.push_back(thread);
	}

	return m_entry_number;
}

void
ThreadBalancer::removeTransport(int id) {
	if (!removeEntry(m_threads_unbalanced, id))
		removeEntry(m_threads_balanced, id);
}

bool
ThreadBalancer::removeEntry(ThreadList &list, int id) {
	for (ThreadList::iterator i = list.begin(); i != list.end(); ++i) {
		ThreadEntry *thread = (*i);

		for (DataList::iterator j = thread->contents.begin(); j != thread->contents.end(); ++j) {
			DataEntry *tmp = (*j);

			if ((*j)->entry_id == id) {
				int number_of_items = removeTransportFromThread(thread, j);

				// if the thread is now empty, remove the thread from the pool
				// if it is not, look at the balance treshold. when it is equal or
				// smaller than the constant BALANCE then balance the threads

				if (number_of_items == 0) {
					if (!thread->allow_balancing) {
						thread->thread_running = false;
						thread->thread_handle->join();

						delete thread->thread_handle;
						delete thread;

						list.erase(i);
					} else {
						balance();
					}
				} else if (number_of_items <= BALANCE) {
					balance();
				}
			}

			return true;
		}
	}

	return false;
}

int
ThreadBalancer::removeTransportFromThread(ThreadEntry *thread, DataList::iterator j) {
	boost::mutex::scoped_lock scoped_lock(thread->cs_holder);

	delete (*j);

	thread->contents.erase(j);

	return thread->contents.size();
}

void
ThreadBalancer::AbortThreads(ThreadList &list) {
	while (!list.empty()) {
		ThreadEntry *thread = *list.begin();

		thread->thread_running = false;
		thread->thread_handle->join();

		delete thread->thread_handle;
		delete thread;

		list.pop_front();
	}
}

// --------------------------------------------------------
// Balance the thread load again. This operation is
// executed when:
// 
// 1) a transport is removed and there only BALANCE items
//    or fewer left
// 
// 2) a transport is removed and the thread is empty
// --------------------------------------------------------

void
ThreadBalancer::balance() {
	assert(m_threads_balanced.size() > 1);
	_STL::vector<DataEntry *> tmp_list;
	_STL::vector<boost::mutex::scoped_lock *> cs_list;

	// enter critical sections

	for (ThreadList::iterator m = m_threads_balanced.begin(); m != m_threads_balanced.end(); ++m)
		cs_list.push_back(new boost::mutex::scoped_lock((*m)->cs_holder));

	// first make a long list of all entries in all threads

	for (ThreadList::iterator i = m_threads_balanced.begin(); i != m_threads_balanced.end(); ++i) {
		for (DataList::iterator j = (*i)->contents.begin(); j != (*i)->contents.end(); ++j)
			tmp_list.push_back(*j);

		(*i)->contents.clear();
	}

	// spread the remaining transports over the remaining threads

	int average = tmp_list.size() / m_threads_balanced.size();
	int first = average + (tmp_list.size() % m_threads_balanced.size());

	// spread all entries evenly over the threads
	// the first thread can have more entries than the rest

	ThreadList::iterator j = m_threads_balanced.begin();
	int k = 0;

	if (first > 0) {
		for (int l = 0; l < first; ++l) {
			(*j)->contents.push_back(tmp_list[k]);
			++k;
		}

		++j;
	}

	// then all other threads

	for (; j != m_threads_balanced.end(); ++j) {
		for (int l = 0; l < average; ++l) {
			(*j)->contents.push_back(tmp_list[k]);
			++k;
		}
	}

	// leave critical sections

	for (_STL::vector<boost::mutex::scoped_lock *>::iterator l = cs_list.begin(); l != cs_list.end(); ++l)
		delete *l;
}

// --------------------------------------------------------
// Singleton implementation
// --------------------------------------------------------

ThreadBalancer *ThreadBalancer::instance = NULL;

ThreadBalancer *
ThreadBalancer::getInstance() {
	if (!isInstantiated())
		ThreadBalancer::instance = new ThreadBalancer;

	return ThreadBalancer::instance;
}

void
ThreadBalancer::destroyInstance() {
	if (isInstantiated()) {
		delete ThreadBalancer::instance;

		ThreadBalancer::instance = NULL;
	}
}

bool
ThreadBalancer::isInstantiated() {
	return (ThreadBalancer::instance != NULL);
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -