📄 thread.cc
字号:
#ifdef POSIXextern "C" {#include <sys/time.h>}#endif#include "common.h"#include "logging.h"#include "thread.h"#include "time.h"#define MSDEV_SET_THREAD_NAME 0x406D1388namespace utils_base {ThreadManager g_thmgr;#ifdef POSIXpthread_key_t ThreadManager::key_;ThreadManager::ThreadManager() { pthread_key_create(&key_, NULL); main_thread_ = new Thread(); SetCurrent(main_thread_);}ThreadManager::~ThreadManager() { pthread_key_delete(key_); delete main_thread_;}Thread *ThreadManager::CurrentThread() { return (Thread *)pthread_getspecific(key_);}void ThreadManager::SetCurrent(Thread *thread) { pthread_setspecific(key_, thread);}#endif#ifdef WIN32DWORD ThreadManager::key_;ThreadManager::ThreadManager() { key_ = TlsAlloc(); main_thread_ = new Thread(); SetCurrent(main_thread_);}ThreadManager::~ThreadManager() { TlsFree(key_); delete main_thread_;}Thread *ThreadManager::CurrentThread() { return (Thread *)TlsGetValue(key_);}void ThreadManager::SetCurrent(Thread *thread) { TlsSetValue(key_, thread);}#endifvoid ThreadManager::Add(Thread *thread) { CritScope cs(&crit_); threads_.push_back(thread);}void ThreadManager::Remove(Thread *thread) { CritScope cs(&crit_); threads_.erase(std::remove(threads_.begin(), threads_.end(), thread), threads_.end());}Thread::Thread(SocketServer* ss) : MessageQueue(ss), priority_(PRIORITY_NORMAL) { g_thmgr.Add(this); started_ = false; has_sends_ = false;}Thread::~Thread() { Stop(); if (active_) Clear(NULL); g_thmgr.Remove(this);}#ifdef POSIXvoid Thread::Start() { pthread_attr_t attr; pthread_attr_init(&attr); if (priority_ == PRIORITY_IDLE) { struct sched_param param; pthread_attr_getschedparam(&attr, ¶m); param.sched_priority = 15; // +15 = pthread_attr_setschedparam(&attr, ¶m); } pthread_create(&thread_, &attr, PreRun, this); started_ = true;}void Thread::Join() { if (started_) { void *pv; pthread_join(thread_, &pv); }}#endif#ifdef WIN32typedef struct tagTHREADNAME_INFO{ DWORD dwType; LPCSTR szName; DWORD dwThreadID; DWORD dwFlags;} THREADNAME_INFO;void SetThreadName( DWORD dwThreadID, LPCSTR szThreadName){ THREADNAME_INFO info; { info.dwType = 0x1000; info.szName = szThreadName; info.dwThreadID = dwThreadID; info.dwFlags = 0; } __try { RaiseException(MSDEV_SET_THREAD_NAME, 0, sizeof(info)/sizeof(DWORD), (DWORD*)&info ); } __except(EXCEPTION_CONTINUE_EXECUTION) { }}void Thread::Start() { DWORD flags = 0; if (priority_ != PRIORITY_NORMAL) { flags = CREATE_SUSPENDED; } thread_ = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PreRun, this, flags, NULL); if (thread_) { if (priority_ != PRIORITY_NORMAL) { if (priority_ == PRIORITY_IDLE) { ::SetThreadPriority(thread_, THREAD_PRIORITY_IDLE); } ::ResumeThread(thread_); } } started_ = true;}void Thread::Join() { if (started_) { WaitForSingleObject(thread_, INFINITE); CloseHandle(thread_); started_ = false; }}#endifvoid *Thread::PreRun(void *pv) { Thread *thread = (Thread *)pv; ThreadManager::SetCurrent(thread);#if defined(WIN32) && defined(_DEBUG) char buf[256]; _snprintf(buf, sizeof(buf), "Thread 0x%.8x", thread); SetThreadName(GetCurrentThreadId(), buf);#endif thread->Run(); return NULL;}void Thread::Run() { ProcessMessages(kForever);}void Thread::Stop() { MessageQueue::Stop(); Join();}void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) { if (fStop_) return; // Sent messages are sent to the MessageHandler directly, in the context // of "thread", like Win32 SendMessage. If in the right context, // call the handler directly. Message msg; msg.phandler = phandler; msg.message_id = id; msg.pdata = pdata; if (IsCurrent()) { phandler->OnMessage(&msg); return; } AutoThread thread; Thread *current_thread = Thread::Current(); ASSERT(current_thread != NULL); // AutoThread ensures this bool ready = false; { CritScope cs(&crit_); EnsureActive(); _SendMessage smsg; smsg.thread = current_thread; smsg.msg = msg; smsg.ready = &ready; sendlist_.push_back(smsg); has_sends_ = true; } // Wait for a reply ss_->WakeUp(); bool waited = false; while (!ready) { current_thread->ReceiveSends(); current_thread->socketserver()->Wait(kForever, false); waited = true; } // Our Wait loop above may have consumed some WakeUp events for this // MessageQueue, that weren't relevant to this Send. Losing these WakeUps can // cause problems for some SocketServers. // // Concrete example: // Win32SocketServer on thread A calls Send on thread B. While processing the // message, thread B Posts a message to A. We consume the wakeup for that // Post while waiting for the Send to complete, which means that when we exit // this loop, we need to issue another WakeUp, or else the Posted message // won't be processed in a timely manner. if (waited) { current_thread->socketserver()->WakeUp(); }}void Thread::ReceiveSends() { // Before entering critical section, check boolean. if (!has_sends_) return; // Receive a sent message. Cleanup scenarios: // - thread sending exits: We don't allow this, since thread can exit // only via Join, so Send must complete. // - thread receiving exits: Wakeup/set ready in Thread::Clear() // - object target cleared: Wakeup/set ready in Thread::Clear() crit_.Enter(); while (!sendlist_.empty()) { _SendMessage smsg = sendlist_.front(); sendlist_.pop_front(); crit_.Leave(); smsg.msg.phandler->OnMessage(&smsg.msg); crit_.Enter(); *smsg.ready = true; smsg.thread->socketserver()->WakeUp(); } has_sends_ = false; crit_.Leave();}void Thread::Clear(MessageHandler *phandler, uint32 id) { CritScope cs(&crit_); // Remove messages on sendlist_ with phandler // Object target cleared: remove from send list, wakeup/set ready // if sender not NULL. std::list<_SendMessage>::iterator iter = sendlist_.begin(); while (iter != sendlist_.end()) { _SendMessage smsg = *iter; if (phandler == NULL || smsg.msg.phandler == phandler) { if (id == (uint32)-1 || smsg.msg.message_id == id) { iter = sendlist_.erase(iter); *smsg.ready = true; smsg.thread->socketserver()->WakeUp(); continue; } } ++iter; } MessageQueue::Clear(phandler, id);}bool Thread::ProcessMessages(int cmsLoop) { uint32 msEnd; if (cmsLoop != kForever) msEnd = GetMillisecondCount() + cmsLoop; int cmsNext = cmsLoop; while (true) { Message msg; if (!Get(&msg, cmsNext)) return false; Dispatch(&msg); if (cmsLoop != kForever) { uint32 msCur = GetMillisecondCount(); if (msCur >= msEnd) return true; cmsNext = msEnd - msCur; } }}AutoThread::AutoThread(SocketServer* ss) : Thread(ss) { if (!ThreadManager::CurrentThread()) { ThreadManager::SetCurrent(this); }}AutoThread::~AutoThread() { if (ThreadManager::CurrentThread() == this) { ThreadManager::SetCurrent(NULL); }}} // namespace talk_base
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -