SwitchGame/source/Tool/ThreadPool.h

81 lines
2.2 KiB
C++

#pragma once
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <atomic>
#include <memory>
#include <map>
#include <stdexcept>
class ThreadPool
{
public:
// 获取单例实例
static ThreadPool &GetInstance();
// 删除拷贝构造函数和赋值运算符
ThreadPool(const ThreadPool &) = delete;
ThreadPool &operator=(const ThreadPool &) = delete;
// 向指定线程添加任务
template <class F, class... Args>
void enqueueToThread(int threadId, F &&f, Args &&...args)
{
if (threadId < 0 || threadId >= static_cast<int>(workers.size()))
throw std::runtime_error("Invalid thread ID");
{
std::unique_lock<std::mutex> lock(queue_mutex);
// 不允许在停止后添加新任务
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
// 将任务添加到指定线程的队列
threadTasks[threadId].emplace([=]() mutable
{ std::invoke(f, args...); });
}
// 通知指定线程
threadConditions[threadId]->notify_one();
}
// 向任意线程添加任务(负载均衡)
template <class F, class... Args>
void enqueue(F &&f, Args &&...args)
{
// 使用轮询方式选择线程
static std::atomic<int> nextThreadId(0);
int threadId = nextThreadId.load();
nextThreadId = (nextThreadId + 1) % workers.size();
enqueueToThread(threadId, std::forward<F>(f), std::forward<Args>(args)...);
}
~ThreadPool();
void shutdown();
// 获取线程池大小
size_t size() const;
// 获取指定线程的负载(待处理任务数)
size_t getThreadLoad(int threadId) const;
private:
// 私有构造函数
ThreadPool(size_t numThreads);
private:
std::vector<std::thread> workers;
std::vector<std::queue<std::function<void()>>> threadTasks; // 每个线程有自己的任务队列
mutable std::mutex queue_mutex;
std::vector<std::condition_variable *> threadConditions; // 使用指针存储条件变量
std::atomic<bool> stop;
};