Extra2D/src/core/scheduler.cpp

323 lines
7.9 KiB
C++
Raw Normal View History

#include <core/scheduler.h>
#include <algorithm>
#include <thread>
#include <future>
namespace extra2d {
namespace {
class IntervalTimer : public Timer {
public:
IntervalTimer(Scheduler::Cb cb, float interval, uint32 repeat, float delay)
: cb_(std::move(cb)) {
interval_ = interval;
repeat_ = repeat;
delay_ = delay;
useDelay_ = delay > 0.0f;
runForever_ = repeat == 0;
elapsed_ = useDelay_ ? 0.0f : -interval_;
}
void update(float dt) override {
if (paused_ || done_) return;
elapsed_ += dt;
if (useDelay_) {
if (elapsed_ < delay_) return;
elapsed_ -= delay_;
useDelay_ = false;
}
if (elapsed_ >= interval_) {
trigger();
elapsed_ -= interval_;
if (!runForever_) {
timesExecuted_++;
if (timesExecuted_ >= repeat_) {
done_ = true;
}
}
}
}
void trigger() override {
if (cb_) cb_(elapsed_);
}
private:
Scheduler::Cb cb_;
};
class OnceTimer : public Timer {
public:
OnceTimer(Scheduler::VoidCb cb, float delay)
: cb_(std::move(cb)) {
delay_ = delay;
elapsed_ = 0.0f;
}
void update(float dt) override {
if (paused_ || done_) return;
elapsed_ += dt;
if (elapsed_ >= delay_) {
trigger();
done_ = true;
}
}
void trigger() override {
if (cb_) cb_();
}
private:
Scheduler::VoidCb cb_;
};
}
// SafePriorityQueue 实现
void Scheduler::SafePriorityQueue::push(const UpdateEntry& entry) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(entry);
}
bool Scheduler::SafePriorityQueue::pop(UpdateEntry& entry) {
std::lock_guard<std::mutex> lock(mutex_);
if (queue_.empty()) return false;
entry = queue_.top();
queue_.pop();
return true;
}
bool Scheduler::SafePriorityQueue::empty() const {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.empty();
}
size_t Scheduler::SafePriorityQueue::size() const {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.size();
}
void Scheduler::SafePriorityQueue::clear() {
std::lock_guard<std::mutex> lock(mutex_);
while (!queue_.empty()) queue_.pop();
}
Scheduler& Scheduler::inst() {
static Scheduler instance;
return instance;
}
TimerHdl Scheduler::scheduleUpdate(TimerTarget* target, int pri) {
if (!target) return INVALID_HDL;
std::lock_guard<std::mutex> lock(updateIndexMutex_);
UpdateEntry entry{target, pri, false, false};
updates_.push_back(entry);
updateIndex_[target] = updates_.size() - 1;
return genHdl();
}
void Scheduler::unscheduleUpdate(TimerTarget* target) {
if (!target) return;
std::lock_guard<std::mutex> lock(updateIndexMutex_);
auto it = updateIndex_.find(target);
if (it != updateIndex_.end()) {
size_t idx = it->second;
if (idx < updates_.size()) {
updates_[idx].markedForDel = true;
}
updateIndex_.erase(it);
}
}
TimerHdl Scheduler::schedule(Cb cb, float interval, uint32 repeat, float delay) {
if (!cb) return INVALID_HDL;
auto timer = makePtr<IntervalTimer>(std::move(cb), interval, repeat, delay);
TimerHdl hdl = genHdl();
timer->hdl_ = hdl;
{
std::lock_guard<std::mutex> lock(timersMutex_);
timers_[hdl] = timer;
}
return hdl;
}
TimerHdl Scheduler::scheduleOnce(VoidCb cb, float delay) {
if (!cb) return INVALID_HDL;
auto timer = makePtr<OnceTimer>(std::move(cb), delay);
TimerHdl hdl = genHdl();
timer->hdl_ = hdl;
{
std::lock_guard<std::mutex> lock(timersMutex_);
timers_[hdl] = timer;
}
return hdl;
}
TimerHdl Scheduler::scheduleForever(Cb cb, float interval) {
return schedule(std::move(cb), interval, 0, 0.0f);
}
void Scheduler::unschedule(TimerHdl hdl) {
std::lock_guard<std::mutex> lock(timersMutex_);
timers_.erase(hdl);
}
void Scheduler::unscheduleAll() {
{
std::lock_guard<std::mutex> lock(timersMutex_);
timers_.clear();
}
{
std::lock_guard<std::mutex> lock(updateIndexMutex_);
updates_.clear();
updateIndex_.clear();
}
updateQueue_.clear();
}
void Scheduler::pause(TimerHdl hdl) {
std::lock_guard<std::mutex> lock(timersMutex_);
auto it = timers_.find(hdl);
if (it != timers_.end()) {
it->second->pause();
}
}
void Scheduler::resume(TimerHdl hdl) {
std::lock_guard<std::mutex> lock(timersMutex_);
auto it = timers_.find(hdl);
if (it != timers_.end()) {
it->second->resume();
}
}
void Scheduler::update(float dt) {
float scaledDt = dt * timeScale_.load();
locked_ = true;
// 更新所有 update 回调
{
std::lock_guard<std::mutex> lock(updateIndexMutex_);
for (auto& entry : updates_) {
if (!entry.markedForDel && !entry.paused && entry.target) {
entry.target->update(scaledDt);
}
}
updates_.erase(
std::remove_if(updates_.begin(), updates_.end(),
[](const UpdateEntry& e) { return e.markedForDel; }),
updates_.end()
);
}
// 更新定时器
std::vector<TimerHdl> toRemove;
{
std::lock_guard<std::mutex> lock(timersMutex_);
for (auto it = timers_.begin(); it != timers_.end(); ++it) {
auto& timer = it->second;
timer->update(scaledDt);
if (timer->isDone()) {
toRemove.push_back(it->first);
}
}
for (auto hdl : toRemove) {
timers_.erase(hdl);
}
}
locked_ = false;
}
void Scheduler::updateParallel(float dt) {
float scaledDt = dt * timeScale_.load();
locked_ = true;
// 并行更新所有 update 回调(使用标准库线程)
{
std::lock_guard<std::mutex> lock(updateIndexMutex_);
size_t numThreads = std::thread::hardware_concurrency();
if (numThreads == 0) numThreads = 4;
size_t batchSize = updates_.size() / numThreads;
if (batchSize == 0) batchSize = 1;
std::vector<std::future<void>> futures;
for (size_t t = 0; t < numThreads && t * batchSize < updates_.size(); ++t) {
size_t start = t * batchSize;
size_t end = (t == numThreads - 1) ? updates_.size() : (t + 1) * batchSize;
futures.push_back(std::async(std::launch::async, [this, start, end, scaledDt]() {
for (size_t i = start; i < end; ++i) {
auto& entry = updates_[i];
if (!entry.markedForDel && !entry.paused && entry.target) {
entry.target->update(scaledDt);
}
}
}));
}
for (auto& f : futures) {
f.wait();
}
updates_.erase(
std::remove_if(updates_.begin(), updates_.end(),
[](const UpdateEntry& e) { return e.markedForDel; }),
updates_.end()
);
}
// 更新定时器
std::vector<TimerHdl> toRemove;
{
std::lock_guard<std::mutex> lock(timersMutex_);
for (auto it = timers_.begin(); it != timers_.end(); ++it) {
auto& timer = it->second;
timer->update(scaledDt);
if (timer->isDone()) {
toRemove.push_back(it->first);
}
}
for (auto hdl : toRemove) {
timers_.erase(hdl);
}
}
locked_ = false;
}
bool Scheduler::isScheduled(TimerHdl hdl) const {
std::lock_guard<std::mutex> lock(timersMutex_);
return timers_.find(hdl) != timers_.end();
}
size_t Scheduler::count() const {
std::lock_guard<std::mutex> lock(timersMutex_);
return timers_.size();
}
TimerHdl Scheduler::genHdl() {
return nextHdl_.fetch_add(1, std::memory_order_relaxed);
}
} // namespace extra2d