11#include <promeki/config.h>
12#if PROMEKI_ENABLE_CORE
25PROMEKI_NAMESPACE_BEGIN
63 using TaskFunc = Function<void()>;
69 explicit Strand(ThreadPool &pool) : _pool(pool) {}
78 ~Strand() { waitForIdle(); }
80 Strand(
const Strand &) =
delete;
81 Strand &operator=(
const Strand &) =
delete;
82 Strand(Strand &&) =
delete;
83 Strand &operator=(Strand &&) =
delete;
103 template <
typename F>
104 auto submit(F &&callable, TaskFunc onCancel = {}) -> Future<std::invoke_result_t<F>> {
105 return submitImpl(std::forward<F>(callable), std::move(onCancel),
false);
139 template <
typename F>
140 auto submitUrgent(F &&callable, TaskFunc onCancel = {}) -> Future<std::invoke_result_t<F>> {
141 return submitImpl(std::forward<F>(callable), std::move(onCancel),
true);
154 size_t cancelPending() {
157 Mutex::Locker lock(_mutex);
158 toCancel = std::move(_queue);
161 size_t count = toCancel.size();
162 for (
auto &entry : toCancel) {
172 Mutex::Locker lock(_mutex);
173 _idleCv.wait(_mutex, [
this] {
return !_running; });
180 bool isBusy()
const {
181 Mutex::Locker lock(_mutex);
202 size_t pendingCount()
const {
203 Mutex::Locker lock(_mutex);
204 return _queue.size();
226 void setWorkTag(ThreadPool::WorkTag tag) {
227 Mutex::Locker lock(_mutex);
232 ThreadPool::WorkTag workTag()
const {
233 Mutex::Locker lock(_mutex);
257 template <
typename F>
258 auto submitImpl(F &&callable, TaskFunc onCancel,
bool urgent) -> Future<std::invoke_result_t<F>> {
259 using R = std::invoke_result_t<F>;
260 auto promise = std::make_shared<Promise<R>>();
261 Future<R> future = promise->future();
264 entry.run = [promise, f = std::forward<F>(callable)]()
mutable {
265 if constexpr (std::is_void_v<R>) {
269 promise->setValue(f());
272 entry.cancel = [promise, onCancel = std::move(onCancel)]()
mutable {
273 if (onCancel) onCancel();
274 promise->setError(Error::Cancelled);
277 ThreadPool::WorkTag spawnTag;
278 bool needSpawn =
false;
280 Mutex::Locker lock(_mutex);
282 _queue.pushToFront(std::move(entry));
284 _queue.pushToBack(std::move(entry));
293 _pool.submit(spawnTag, [
this] { runNext(); });
305 bool haveTask =
false;
307 Mutex::Locker lock(_mutex);
308 if (!_queue.isEmpty()) {
309 entry = _queue.popFromFront();
316 if (haveTask) entry.run();
319 bool reSpawn =
false;
321 Mutex::Locker lock(_mutex);
322 if (_queue.isEmpty()) {
331 ThreadPool::WorkTag tag;
333 Mutex::Locker lock(_mutex);
336 _pool.submit(tag, [
this] { runNext(); });
341 using EntryQueue = Deque<Entry>;
344 mutable Mutex _mutex;
345 WaitCondition _idleCv;
347 ThreadPool::WorkTag _workTag;
348 bool _running =
false;