libpromeki 1.0.0-alpha
PROfessional MEdia toolKIt
 
Loading...
Searching...
No Matches
queue.h
Go to the documentation of this file.
1
8#pragma once
9
10
11#include <promeki/config.h>
12#if PROMEKI_ENABLE_CORE
13#include <queue>
14#include <promeki/namespace.h>
15#include <promeki/error.h>
16#include <promeki/list.h>
17#include <promeki/mutex.h>
18#include <promeki/result.h>
20
21PROMEKI_NAMESPACE_BEGIN
22
65template <typename T> class Queue {
66 public:
67 Queue() = default;
68 ~Queue() = default;
69
87 void setMaxSize(size_t n) {
88 Mutex::Locker locker(_mutex);
89 _maxSize = n;
90 // A larger / disabled cap may have just made room for
91 // producers waiting on a smaller cap that was set
92 // earlier. No-op when no producer is parked.
93 _notFull.wakeAll();
94 }
95
100 size_t maxSize() const {
101 Mutex::Locker locker(_mutex);
102 return _maxSize;
103 }
104
115 void push(const T &val) {
116 Mutex::Locker locker(_mutex);
117 _queue.push(val);
118 _cv.wakeOne();
119 return;
120 }
121
126 void push(T &&val) {
127 Mutex::Locker locker(_mutex);
128 _queue.push(std::move(val));
129 _cv.wakeOne();
130 return;
131 }
132
138 template <typename... Args> void emplace(Args &&...args) {
139 Mutex::Locker locker(_mutex);
140 _queue.emplace(std::forward<Args>(args)...);
141 _cv.wakeOne();
142 return;
143 }
144
149 void push(const List<T> &list) {
150 Mutex::Locker locker(_mutex);
151 for (const auto &item : list) _queue.push(item);
152 _cv.wakeAll();
153 return;
154 }
155
175 Error pushBlocking(const T &val, unsigned int timeoutMs = 0) {
176 Mutex::Locker locker(_mutex);
177 Error err = waitForCapacity(timeoutMs);
178 if (err != Error::Ok) return err;
179 _queue.push(val);
180 _cv.wakeOne();
181 return Error::Ok;
182 }
183
187 Error pushBlocking(T &&val, unsigned int timeoutMs = 0) {
188 Mutex::Locker locker(_mutex);
189 Error err = waitForCapacity(timeoutMs);
190 if (err != Error::Ok) return err;
191 _queue.push(std::move(val));
192 _cv.wakeOne();
193 return Error::Ok;
194 }
195
199 template <typename... Args> Error emplaceBlocking(unsigned int timeoutMs, Args &&...args) {
200 Mutex::Locker locker(_mutex);
201 Error err = waitForCapacity(timeoutMs);
202 if (err != Error::Ok) return err;
203 _queue.emplace(std::forward<Args>(args)...);
204 _cv.wakeOne();
205 return Error::Ok;
206 }
207
252 size_t pushDropOldest(const T &val) {
253 Mutex::Locker locker(_mutex);
254 if (_cancelled) return 0;
255 size_t dropped = 0;
256 while (_maxSize > 0 && _queue.size() >= _maxSize) {
257 _queue.pop();
258 ++dropped;
259 }
260 _queue.push(val);
261 _cv.wakeOne();
262 if (dropped > 0) _notFull.wakeAll();
263 return dropped;
264 }
265
269 size_t pushDropOldest(T &&val) {
270 Mutex::Locker locker(_mutex);
271 if (_cancelled) return 0;
272 size_t dropped = 0;
273 while (_maxSize > 0 && _queue.size() >= _maxSize) {
274 _queue.pop();
275 ++dropped;
276 }
277 _queue.push(std::move(val));
278 _cv.wakeOne();
279 if (dropped > 0) _notFull.wakeAll();
280 return dropped;
281 }
282
293 Result<T> pop(unsigned int timeoutMs = 0) {
294 Mutex::Locker locker(_mutex);
295 Error err = _cv.wait(_mutex,
296 [this] { return _cancelled || !_queue.empty(); },
297 timeoutMs);
298 if (err != Error::Ok) return Result<T>(T{}, err);
299 if (_cancelled && _queue.empty()) return Result<T>(T{}, Error::Cancelled);
300 T ret = std::move(_queue.front());
301 _queue.pop();
302 if (_queue.empty()) _cv.wakeAll();
303 // A consumer popping always frees a slot for any
304 // producer parked on the not-full condition.
305 _notFull.wakeOne();
306 return makeResult(std::move(ret));
307 }
308
315 Result<T> tryPop() {
316 Mutex::Locker locker(_mutex);
317 if (_queue.empty()) return Result<T>(T{}, Error::Empty);
318 T ret = std::move(_queue.front());
319 _queue.pop();
320 if (_queue.empty()) _cv.wakeAll();
321 _notFull.wakeOne();
322 return makeResult(std::move(ret));
323 }
324
334 Result<T> peek(unsigned int timeoutMs = 0) {
335 Mutex::Locker locker(_mutex);
336 Error err = _cv.wait(_mutex,
337 [this] { return _cancelled || !_queue.empty(); },
338 timeoutMs);
339 if (err != Error::Ok) return Result<T>(T{}, err);
340 if (_cancelled && _queue.empty()) return Result<T>(T{}, Error::Cancelled);
341 return makeResult(T(_queue.front()));
342 }
343
350 Result<T> tryPeek() {
351 Mutex::Locker locker(_mutex);
352 if (_queue.empty()) return Result<T>(T{}, Error::Empty);
353 return makeResult(T(_queue.front()));
354 }
355
369 Error waitForEmpty(unsigned int timeoutMs = 0) {
370 Mutex::Locker locker(_mutex);
371 return _cv.wait(_mutex, [this] { return _queue.empty() || _cancelled; }, timeoutMs);
372 }
373
378 bool isEmpty() const {
379 Mutex::Locker locker(_mutex);
380 return _queue.empty();
381 }
382
387 size_t size() const {
388 Mutex::Locker locker(_mutex);
389 return _queue.size();
390 }
391
400 void clear() {
401 Mutex::Locker locker(_mutex);
402 std::queue<T> empty;
403 std::swap(_queue, empty);
404 _notFull.wakeAll();
405 return;
406 }
407
430 void cancelWaiters() {
431 Mutex::Locker locker(_mutex);
432 _cancelled = true;
433 _cv.wakeAll();
434 _notFull.wakeAll();
435 }
436
446 void reset() {
447 Mutex::Locker locker(_mutex);
448 _cancelled = false;
449 }
450
456 bool isCancelled() const {
457 Mutex::Locker locker(_mutex);
458 return _cancelled;
459 }
460
461 private:
472 Error waitForCapacity(unsigned int timeoutMs) {
473 if (_cancelled) return Error::Cancelled;
474 if (_maxSize == 0) return Error::Ok;
475 // Re-check after each wake: a consumer may have
476 // popped one slot and woken multiple parked
477 // producers, so any one of them must verify the
478 // cap before proceeding.
479 Error err = _notFull.wait(_mutex,
480 [this] { return _cancelled || _maxSize == 0 || _queue.size() < _maxSize; },
481 timeoutMs);
482 if (err != Error::Ok) return err;
483 if (_cancelled) return Error::Cancelled;
484 return Error::Ok;
485 }
486
487 mutable Mutex _mutex;
488 WaitCondition _cv;
489 WaitCondition _notFull;
490 std::queue<T> _queue;
491 size_t _maxSize = 0;
492 bool _cancelled = false;
493};
494
495PROMEKI_NAMESPACE_END
496
497#endif // PROMEKI_ENABLE_CORE