libpromeki 1.0.0-alpha
PROfessional MEdia toolKIt
 
Loading...
Searching...
No Matches
eventloop.h
Go to the documentation of this file.
1
8#pragma once
9
10
11#include <promeki/config.h>
12#if PROMEKI_ENABLE_CORE
13#include <variant>
14#include <functional>
15#include <promeki/function.h>
16#include <promeki/namespace.h>
17#include <promeki/atomic.h>
18#include <promeki/duration.h>
19#include <promeki/hashmap.h>
20#include <promeki/mutex.h>
21#include <promeki/queue.h>
22#include <promeki/list.h>
23#include <promeki/string.h>
25#include <promeki/timestamp.h>
26#include <promeki/event.h>
27#include <promeki/uniqueptr.h>
28
29PROMEKI_NAMESPACE_BEGIN
30
31class ObjectBase;
32
33// Defined in eventloop.cpp; owns the platform-specific wake file
34// descriptor (eventfd on Linux, self-pipe elsewhere) used to
35// interrupt poll() when new work is posted.
36class EventLoopWakeFd;
37
80class EventLoop {
81 public:
83 enum ProcessEventsFlag : uint32_t {
84 ExcludeTimers = 0x01,
85 ExcludePosted = 0x02,
86 WaitForMore = 0x04
87 };
88
105 using Label = StringRegistry<"EventLoopLabel">::Item;
106
111 static EventLoop *current();
112
120 EventLoop();
121
129 ~EventLoop();
130
131 EventLoop(const EventLoop &) = delete;
132 EventLoop &operator=(const EventLoop &) = delete;
133
138 int exec();
139
149 void processEvents(uint32_t flags = 0, unsigned int timeoutMs = 0);
150
159 void quit(int returnCode = 0);
160
169 void postCallable(Function<void()> func);
170
200 void postCallable(Label label, Function<void()> func);
201
212 void postEvent(ObjectBase *receiver, Event *event);
213
228 int startTimer(ObjectBase *receiver, unsigned int intervalMs, bool singleShot = false);
229
242 int startTimer(unsigned int intervalMs, Function<void()> func, bool singleShot = false);
243
258 void stopTimer(int timerId);
259
264 bool isRunning() const { return _running.value(); }
265
270 int exitCode() const { return _exitCode.value(); }
271
284 unsigned int nextTimerTimeout() const;
285
296 static constexpr uint32_t IoRead = 0x01;
297 static constexpr uint32_t IoWrite = 0x02;
298 static constexpr uint32_t IoError = 0x04;
311 using IoCallback = Function<void(int fd, uint32_t events)>;
312
356 int addIoSource(int fd, uint32_t events, IoCallback cb);
357
369 void removeIoSource(int handle);
370
410 struct Report {
422 struct EventStat {
423 Duration elapsed;
424 int64_t count = 0;
425 };
426
427 String loopName;
428 Duration wallElapsed;
429 Duration sleep;
430 Duration queueWait;
431 Duration timers;
432 Duration events;
433 Duration callables;
434 Duration io;
435 Duration overhead;
436
437 int64_t timersCount = 0;
438 int64_t eventsCount = 0;
439 int64_t callablesCount = 0;
440 int64_t ioCount = 0;
441
443 HashMap<int, EventStat> eventsByType;
444
454 HashMap<uint64_t, EventStat> callablesByLabel;
455 };
456
468 using ReportFunction = Function<void(const Report &)>;
469
488 void setName(const String &name);
489
494 String name() const;
495
526 void installMonitor(const Duration &interval, ReportFunction fn = {});
527
547 void removeMonitor();
548
557 bool hasMonitor() const;
558
571 Report peekStats() const;
572
583 Report consumeStats();
584
585 private:
586 struct CallableItem {
587 Function<void()> func;
595 uint64_t labelId = Label::InvalidID;
596 };
597 struct EventItem {
598 ObjectBase *receiver;
599 Event *event;
600 };
601 struct QuitItem {
602 int code;
603 };
604 using Item = std::variant<CallableItem, EventItem, QuitItem>;
605
606 struct TimerInfo {
607 int id;
608 ObjectBase *receiver;
609 Function<void()> func;
610 unsigned int intervalMs;
611 bool singleShot;
612 TimeStamp nextFire;
613 };
614
615 static thread_local EventLoop *_current;
616
617 Queue<Item> _queue;
618 Atomic<bool> _running;
619 Atomic<int> _exitCode;
620
621 // Timer list access is guarded by _timersMutex. Any
622 // thread may install or stop timers via startTimer /
623 // stopTimer, so the mutex is acquired on every touch.
624 // processTimers() takes a snapshot of the ready-to-fire
625 // entries under the lock, releases the lock, and then
626 // invokes callbacks — this avoids deadlocks if a timer
627 // callback calls startTimer() or stopTimer() on the
628 // same event loop, and keeps the lock hold time bounded.
629 mutable Mutex _timersMutex;
630 List<TimerInfo> _timers;
631 Atomic<int> _nextTimerId{1};
632
633 // Platform wake fd (eventfd on Linux, self-pipe
634 // elsewhere). Owned by the EventLoop; opened in the
635 // constructor, closed in the destructor. Written by
636 // postCallable / postEvent / quit / startTimer /
637 // stopTimer to unblock poll() in waitOnSources, and
638 // included in every poll set as index 0.
639 using WakeFdUPtr = UniquePtr<EventLoopWakeFd>;
640 WakeFdUPtr _wake;
641
642 // I/O source registration. Mutation under _ioMutex,
643 // poll set built under _ioMutex into a short-lived
644 // stack copy, callbacks fired after the lock is
645 // released so a callback may call addIoSource /
646 // removeIoSource on the same EventLoop without
647 // deadlocking.
648 struct IoSource {
649 int handle;
650 int fd;
651 uint32_t events;
652 IoCallback cb;
653 bool pendingRemove = false;
654 };
655 mutable Mutex _ioMutex;
656 List<IoSource> _ioSources;
657 Atomic<int> _nextIoHandle{1};
658
668 void wakeSelf();
669
670 bool dispatchItem(Item &item);
671 void processTimers();
672
673 // Waits on the wake fd + registered I/O sources via
674 // poll(), for up to @p waitMs milliseconds (0 = wait
675 // indefinitely — callers clamp by timers before
676 // calling). On return, drains the wake fd and fires
677 // any ready I/O source callbacks. On non-POSIX
678 // platforms, falls back to the condvar-based
679 // Queue::pop wait so Windows / Emscripten builds
680 // still compile and behave as before (minus
681 // IoSource support).
682 void waitOnSources(unsigned int waitMs);
683
684 // ----------------------------------------------------
685 // Stats / monitor support. The hot-path bracket reads
686 // _monitorActive once at construction; when false it
687 // skips the timestamp grab, the lock, and the
688 // accumulator update. When true it stamps once on
689 // construction and once on destruction, takes the
690 // mutex, adds the elapsed nanoseconds + 1 to the
691 // attributed duration / count buckets, and updates
692 // _eventsByType when an event-type is supplied.
693 // ----------------------------------------------------
694
695 struct EventStatNs {
696 int64_t elapsed = 0;
697 int64_t count = 0;
698 };
699
700 Atomic<bool> _monitorActive;
701 mutable Mutex _statsMutex;
702 TimeStamp _statsLastSnapshot;
703
704 int64_t _sleepNs = 0;
705 int64_t _queueWaitNs = 0;
706 int64_t _timersNs = 0;
707 int64_t _eventsNs = 0;
708 int64_t _callablesNs = 0;
709 int64_t _ioNs = 0;
710
711 int64_t _timersCount = 0;
712 int64_t _eventsCount = 0;
713 int64_t _callablesCount = 0;
714 int64_t _ioCount = 0;
715
716 HashMap<int, EventStatNs> _eventsByType;
717
721 HashMap<uint64_t, EventStatNs> _callablesByLabel;
722
723 int _monitorTimerId = 0;
724 ReportFunction _monitorFn;
725 String _name;
726
727 // RAII helper that brackets a single dispatch site.
728 // Reads _monitorActive ONCE in the constructor and
729 // caches the boolean; both the constructor's
730 // TimeStamp::now() grab and the destructor's
731 // accumulator update key off that cached value so
732 // an enable-mid-bracket cannot leave the destructor
733 // trying to attribute time without a start
734 // timestamp. When the cached gate is false, no
735 // timestamp is stored and the destructor early-outs
736 // without locking.
737 class StatsBracket {
738 public:
739 StatsBracket(EventLoop *loop, int64_t *durBucket,
740 int64_t *countBucket);
741 ~StatsBracket();
742 void attributeEventType(int type) { _eventType = type; }
743 void attributeCallableLabel(uint64_t labelId) {
744 _callableLabel = labelId;
745 }
746
747 private:
748 EventLoop *_loop;
749 int64_t *_durBucket;
750 int64_t *_countBucket;
751 TimeStamp _start;
752 int _eventType = -1;
753 uint64_t _callableLabel = Label::InvalidID;
754 bool _active = false;
755 };
756
757 friend class StatsBracket;
758
759 // Default formatter used when _monitorFn is empty.
760 // Defined in eventloop.cpp and exposed via
761 // installMonitor's "empty fn = default" semantics.
762 static void defaultMonitorReporter(const Report &r);
763};
764
765PROMEKI_NAMESPACE_END
766
767#endif // PROMEKI_ENABLE_CORE