libpromeki 1.0.0-alpha
PROfessional MEdia toolKIt
 
Loading...
Searching...
No Matches
strand.h
Go to the documentation of this file.
1
8#pragma once
9
10
11#include <promeki/config.h>
12#if PROMEKI_ENABLE_CORE
13#include <functional>
14#include <type_traits>
15#include <memory>
16#include <promeki/function.h>
17#include <promeki/deque.h>
18#include <promeki/namespace.h>
19#include <promeki/mutex.h>
21#include <promeki/future.h>
22#include <promeki/promise.h>
23#include <promeki/threadpool.h>
24
25PROMEKI_NAMESPACE_BEGIN
26
60class Strand {
61 public:
63 using TaskFunc = Function<void()>;
64
69 explicit Strand(ThreadPool &pool) : _pool(pool) {}
70
78 ~Strand() { waitForIdle(); }
79
80 Strand(const Strand &) = delete;
81 Strand &operator=(const Strand &) = delete;
82 Strand(Strand &&) = delete;
83 Strand &operator=(Strand &&) = delete;
84
103 template <typename F>
104 auto submit(F &&callable, TaskFunc onCancel = {}) -> Future<std::invoke_result_t<F>> {
105 return submitImpl(std::forward<F>(callable), std::move(onCancel), false);
106 }
107
139 template <typename F>
140 auto submitUrgent(F &&callable, TaskFunc onCancel = {}) -> Future<std::invoke_result_t<F>> {
141 return submitImpl(std::forward<F>(callable), std::move(onCancel), true);
142 }
143
154 size_t cancelPending() {
155 EntryQueue toCancel;
156 {
157 Mutex::Locker lock(_mutex);
158 toCancel = std::move(_queue);
159 _queue.clear();
160 }
161 size_t count = toCancel.size();
162 for (auto &entry : toCancel) {
163 entry.cancel();
164 }
165 return count;
166 }
167
171 void waitForIdle() {
172 Mutex::Locker lock(_mutex);
173 _idleCv.wait(_mutex, [this] { return !_running; });
174 }
175
180 bool isBusy() const {
181 Mutex::Locker lock(_mutex);
182 return _running;
183 }
184
202 size_t pendingCount() const {
203 Mutex::Locker lock(_mutex);
204 return _queue.size();
205 }
206
226 void setWorkTag(ThreadPool::WorkTag tag) {
227 Mutex::Locker lock(_mutex);
228 _workTag = tag;
229 }
230
232 ThreadPool::WorkTag workTag() const {
233 Mutex::Locker lock(_mutex);
234 return _workTag;
235 }
236
237 private:
244 struct Entry {
245 TaskFunc run;
246 TaskFunc cancel;
247 };
248
257 template <typename F>
258 auto submitImpl(F &&callable, TaskFunc onCancel, bool urgent) -> Future<std::invoke_result_t<F>> {
259 using R = std::invoke_result_t<F>;
260 auto promise = std::make_shared<Promise<R>>();
261 Future<R> future = promise->future();
262
263 Entry entry;
264 entry.run = [promise, f = std::forward<F>(callable)]() mutable {
265 if constexpr (std::is_void_v<R>) {
266 f();
267 promise->setValue();
268 } else {
269 promise->setValue(f());
270 }
271 };
272 entry.cancel = [promise, onCancel = std::move(onCancel)]() mutable {
273 if (onCancel) onCancel();
274 promise->setError(Error::Cancelled);
275 };
276
277 ThreadPool::WorkTag spawnTag;
278 bool needSpawn = false;
279 {
280 Mutex::Locker lock(_mutex);
281 if (urgent) {
282 _queue.pushToFront(std::move(entry));
283 } else {
284 _queue.pushToBack(std::move(entry));
285 }
286 if (!_running) {
287 _running = true;
288 needSpawn = true;
289 }
290 spawnTag = _workTag;
291 }
292 if (needSpawn) {
293 _pool.submit(spawnTag, [this] { runNext(); });
294 }
295 return future;
296 }
297
303 void runNext() {
304 Entry entry;
305 bool haveTask = false;
306 {
307 Mutex::Locker lock(_mutex);
308 if (!_queue.isEmpty()) {
309 entry = _queue.popFromFront();
310 haveTask = true;
311 }
312 }
313
314 // Possible if cancelPending() emptied the queue
315 // between spawn and execution.
316 if (haveTask) entry.run();
317
318 // Either re-submit (more work pending) or mark idle.
319 bool reSpawn = false;
320 {
321 Mutex::Locker lock(_mutex);
322 if (_queue.isEmpty()) {
323 _running = false;
324 _idleCv.wakeAll();
325 } else {
326 reSpawn = true;
327 }
328 }
329
330 if (reSpawn) {
331 ThreadPool::WorkTag tag;
332 {
333 Mutex::Locker lock(_mutex);
334 tag = _workTag;
335 }
336 _pool.submit(tag, [this] { runNext(); });
337 }
338 }
339
341 using EntryQueue = Deque<Entry>;
342
343 ThreadPool &_pool;
344 mutable Mutex _mutex;
345 WaitCondition _idleCv;
346 EntryQueue _queue;
347 ThreadPool::WorkTag _workTag;
348 bool _running = false;
349};
350
351PROMEKI_NAMESPACE_END
352
353#endif // PROMEKI_ENABLE_CORE