Magic_Game/include/WThreadPool.h

179 lines
3.9 KiB
C
Raw Permalink Normal View History

2024-05-08 20:54:36 +08:00
#pragma once
#include <functional>
#include <mutex>
#include <list>
#include <thread>
#include <memory>
#include <atomic>
#include <stdio.h>
#include <map>
#include <sstream>
#include <condition_variable>
#include <assert.h>
#define WThreadPool_log(fmt, ...) {printf(fmt, ##__VA_ARGS__);printf("\n");fflush(stdout);}
#define WPOOL_MIN_THREAD_NUM 4
#define WPOOL_MAX_THREAD_NUM 256
#define WPOOL_MANAGE_SECONDS 20
#define ADD_THREAD_BOUNDARY 1
using EventFun = std::function<void ()>;
using int64 = long long int;
template <typename T>
class LockQueue
{
public:
LockQueue()
{
QueueNode* node = new QueueNode();
node->next = nullptr;
// head->next is the first node, _tail point to last node, not _tail->next
_head = node;
_tail = _head;
};
virtual ~LockQueue()
{
clear();
delete _head;
_head = nullptr;
_tail = nullptr;
};
struct QueueNode
{
T value;
QueueNode* next;
};
bool enQueue(T data)
{
QueueNode* node = new (std::nothrow) QueueNode();
if (!node)
{
return false;
}
node->value = data;
node->next = nullptr;
std::unique_lock<std::mutex> locker(_mutex);
_tail->next = node;
_tail = node;
_queueSize++;
return true;
}
bool deQueue(T& data)
{
std::unique_lock<std::mutex> locker(_mutex);
QueueNode* currentFirstNode = _head->next;
if (!currentFirstNode)
{
return false;
}
_head->next = currentFirstNode->next;
data = currentFirstNode->value;
delete currentFirstNode;
_queueSize--;
if (_queueSize == 0)
{
_tail = _head;
}
return true;
}
int64_t size()
{
return _queueSize;
}
void clear()
{
T data;
while (deQueue(data));
}
bool empty()
{
return (_queueSize <= 0);
}
private:
QueueNode* _head;
QueueNode* _tail;
int64_t _queueSize = 0;
std::mutex _mutex;
};
class WThreadPool
{
public:
WThreadPool();
virtual ~WThreadPool();
static WThreadPool * globalInstance();
void setMaxThreadNum(int maxNum);
bool waitForDone(int waitMs = -1);
template<typename Func, typename ...Arguments >
void concurrentRun(Func func, Arguments... args) {
EventFun queunFun = std::bind(func, args...);
enQueueEvent(queunFun);
if (((int)_workThreadList.size() < _maxThreadNum) &&
(_eventQueue.size() >= ((int)_workThreadList.size() - _busyThreadNum - ADD_THREAD_BOUNDARY)))
{
_mgrCondVar.notify_one();
}
_workCondVar.notify_one();
}
template<typename T> static int64_t threadIdToint64(T threadId)
{
std::string stid;
stid.resize(32);
snprintf((char *)stid.c_str(), 32, "%lld", threadId);
long long int tid = std::stoll(stid);
return tid;
}
private:
int _minThreadNum = WPOOL_MIN_THREAD_NUM;
int _maxThreadNum = 8;
std::atomic<int> _busyThreadNum = {0};
int _stepThreadNum = 4;
volatile bool _exitAllFlag = false;
std::atomic<int> _reduceThreadNum = {0};
std::shared_ptr<std::thread> _mgrThread;
LockQueue<EventFun> _eventQueue;
std::list<std::shared_ptr<std::thread>> _workThreadList;
std::mutex _threadIsRunMutex;
std::map<std::thread::id, bool> _threadIsRunMap;
std::condition_variable _workCondVar;
std::mutex _workMutex;
std::condition_variable _mgrCondVar;
std::mutex _mgrMutex;
static std::shared_ptr<WThreadPool> s_threadPool;
static std::mutex s_globleMutex;
void enQueueEvent(EventFun fun);
EventFun deQueueEvent();
void run();
void managerThread();
void stop();
void startWorkThread();
void stopWorkThread();
void adjustWorkThread();
};