11#include <promeki/config.h>
12#if PROMEKI_ENABLE_CORE
21PROMEKI_NAMESPACE_BEGIN
65template <
typename T>
class Queue {
87 void setMaxSize(
size_t n) {
88 Mutex::Locker locker(_mutex);
100 size_t maxSize()
const {
101 Mutex::Locker locker(_mutex);
115 void push(
const T &val) {
116 Mutex::Locker locker(_mutex);
127 Mutex::Locker locker(_mutex);
128 _queue.push(std::move(val));
138 template <
typename... Args>
void emplace(Args &&...args) {
139 Mutex::Locker locker(_mutex);
140 _queue.emplace(std::forward<Args>(args)...);
149 void push(
const List<T> &list) {
150 Mutex::Locker locker(_mutex);
151 for (
const auto &item : list) _queue.push(item);
175 Error pushBlocking(
const T &val,
unsigned int timeoutMs = 0) {
176 Mutex::Locker locker(_mutex);
177 Error err = waitForCapacity(timeoutMs);
178 if (err != Error::Ok)
return err;
187 Error pushBlocking(T &&val,
unsigned int timeoutMs = 0) {
188 Mutex::Locker locker(_mutex);
189 Error err = waitForCapacity(timeoutMs);
190 if (err != Error::Ok)
return err;
191 _queue.push(std::move(val));
199 template <
typename... Args> Error emplaceBlocking(
unsigned int timeoutMs, Args &&...args) {
200 Mutex::Locker locker(_mutex);
201 Error err = waitForCapacity(timeoutMs);
202 if (err != Error::Ok)
return err;
203 _queue.emplace(std::forward<Args>(args)...);
252 size_t pushDropOldest(
const T &val) {
253 Mutex::Locker locker(_mutex);
254 if (_cancelled)
return 0;
256 while (_maxSize > 0 && _queue.size() >= _maxSize) {
262 if (dropped > 0) _notFull.wakeAll();
269 size_t pushDropOldest(T &&val) {
270 Mutex::Locker locker(_mutex);
271 if (_cancelled)
return 0;
273 while (_maxSize > 0 && _queue.size() >= _maxSize) {
277 _queue.push(std::move(val));
279 if (dropped > 0) _notFull.wakeAll();
293 Result<T> pop(
unsigned int timeoutMs = 0) {
294 Mutex::Locker locker(_mutex);
295 Error err = _cv.wait(_mutex,
296 [
this] {
return _cancelled || !_queue.empty(); },
298 if (err != Error::Ok)
return Result<T>(T{}, err);
299 if (_cancelled && _queue.empty())
return Result<T>(T{}, Error::Cancelled);
300 T ret = std::move(_queue.front());
302 if (_queue.empty()) _cv.wakeAll();
306 return makeResult(std::move(ret));
316 Mutex::Locker locker(_mutex);
317 if (_queue.empty())
return Result<T>(T{}, Error::Empty);
318 T ret = std::move(_queue.front());
320 if (_queue.empty()) _cv.wakeAll();
322 return makeResult(std::move(ret));
334 Result<T> peek(
unsigned int timeoutMs = 0) {
335 Mutex::Locker locker(_mutex);
336 Error err = _cv.wait(_mutex,
337 [
this] {
return _cancelled || !_queue.empty(); },
339 if (err != Error::Ok)
return Result<T>(T{}, err);
340 if (_cancelled && _queue.empty())
return Result<T>(T{}, Error::Cancelled);
341 return makeResult(T(_queue.front()));
350 Result<T> tryPeek() {
351 Mutex::Locker locker(_mutex);
352 if (_queue.empty())
return Result<T>(T{}, Error::Empty);
353 return makeResult(T(_queue.front()));
369 Error waitForEmpty(
unsigned int timeoutMs = 0) {
370 Mutex::Locker locker(_mutex);
371 return _cv.wait(_mutex, [
this] {
return _queue.empty() || _cancelled; }, timeoutMs);
378 bool isEmpty()
const {
379 Mutex::Locker locker(_mutex);
380 return _queue.empty();
387 size_t size()
const {
388 Mutex::Locker locker(_mutex);
389 return _queue.size();
401 Mutex::Locker locker(_mutex);
403 std::swap(_queue, empty);
430 void cancelWaiters() {
431 Mutex::Locker locker(_mutex);
447 Mutex::Locker locker(_mutex);
456 bool isCancelled()
const {
457 Mutex::Locker locker(_mutex);
472 Error waitForCapacity(
unsigned int timeoutMs) {
473 if (_cancelled)
return Error::Cancelled;
474 if (_maxSize == 0)
return Error::Ok;
479 Error err = _notFull.wait(_mutex,
480 [
this] {
return _cancelled || _maxSize == 0 || _queue.size() < _maxSize; },
482 if (err != Error::Ok)
return err;
483 if (_cancelled)
return Error::Cancelled;
487 mutable Mutex _mutex;
489 WaitCondition _notFull;
490 std::queue<T> _queue;
492 bool _cancelled =
false;