#pragma once #include #include #include #include #include #include #include #include #include #include #include #define WThreadPool_log(fmt, ...) {printf(fmt, ##__VA_ARGS__);printf("\n");fflush(stdout);} #define WPOOL_MIN_THREAD_NUM 4 #define WPOOL_MAX_THREAD_NUM 256 #define WPOOL_MANAGE_SECONDS 20 #define ADD_THREAD_BOUNDARY 1 using EventFun = std::function; using int64 = long long int; template class LockQueue { public: LockQueue() { QueueNode* node = new QueueNode(); node->next = nullptr; // head->next is the first node, _tail point to last node, not _tail->next _head = node; _tail = _head; }; virtual ~LockQueue() { clear(); delete _head; _head = nullptr; _tail = nullptr; }; struct QueueNode { T value; QueueNode* next; }; bool enQueue(T data) { QueueNode* node = new (std::nothrow) QueueNode(); if (!node) { return false; } node->value = data; node->next = nullptr; std::unique_lock locker(_mutex); _tail->next = node; _tail = node; _queueSize++; return true; } bool deQueue(T& data) { std::unique_lock locker(_mutex); QueueNode* currentFirstNode = _head->next; if (!currentFirstNode) { return false; } _head->next = currentFirstNode->next; data = currentFirstNode->value; delete currentFirstNode; _queueSize--; if (_queueSize == 0) { _tail = _head; } return true; } int64_t size() { return _queueSize; } void clear() { T data; while (deQueue(data)); } bool empty() { return (_queueSize <= 0); } private: QueueNode* _head; QueueNode* _tail; int64_t _queueSize = 0; std::mutex _mutex; }; class WThreadPool { public: WThreadPool(); virtual ~WThreadPool(); static WThreadPool * globalInstance(); void setMaxThreadNum(int maxNum); bool waitForDone(int waitMs = -1); template void concurrentRun(Func func, Arguments... args) { EventFun queunFun = std::bind(func, args...); enQueueEvent(queunFun); if (((int)_workThreadList.size() < _maxThreadNum) && (_eventQueue.size() >= ((int)_workThreadList.size() - _busyThreadNum - ADD_THREAD_BOUNDARY))) { _mgrCondVar.notify_one(); } _workCondVar.notify_one(); } template static int64_t threadIdToint64(T threadId) { std::string stid; stid.resize(32); snprintf((char *)stid.c_str(), 32, "%lld", threadId); long long int tid = std::stoll(stid); return tid; } private: int _minThreadNum = WPOOL_MIN_THREAD_NUM; int _maxThreadNum = 8; std::atomic _busyThreadNum = {0}; int _stepThreadNum = 4; volatile bool _exitAllFlag = false; std::atomic _reduceThreadNum = {0}; std::shared_ptr _mgrThread; LockQueue _eventQueue; std::list> _workThreadList; std::mutex _threadIsRunMutex; std::map _threadIsRunMap; std::condition_variable _workCondVar; std::mutex _workMutex; std::condition_variable _mgrCondVar; std::mutex _mgrMutex; static std::shared_ptr s_threadPool; static std::mutex s_globleMutex; void enQueueEvent(EventFun fun); EventFun deQueueEvent(); void run(); void managerThread(); void stop(); void startWorkThread(); void stopWorkThread(); void adjustWorkThread(); };