libpromeki main
PROfessional MEdia toolKIt
 
Loading...
Searching...
No Matches
medianode.h
Go to the documentation of this file.
1
8#pragma once
9
10#include <functional>
11#include <mutex>
14#include <promeki/core/string.h>
15#include <promeki/core/error.h>
16#include <promeki/core/list.h>
17#include <promeki/core/map.h>
19#include <promeki/core/queue.h>
23#include <promeki/proav/frame.h>
24
26
27class ThreadPool;
28class MediaNode;
29class MediaPipeline;
30
35enum class Severity {
36 Info,
37 Warning,
38 Error,
39 Fatal
40};
41
53
67
81class MediaNode : public ObjectBase {
82 PROMEKI_OBJECT(MediaNode, ObjectBase)
83 public:
91
98
104
106 virtual ~MediaNode();
107
109 State state() const { return _state; }
110
112 const String &name() const { return _name; }
113
118 void setName(const String &name) { _name = name; return; }
119
120 // ---- Port management ----
121
123 const MediaPort::PtrList &inputPorts() const { return _inputPorts; }
124
126 const MediaPort::PtrList &outputPorts() const { return _outputPorts; }
127
133 MediaPort::Ptr inputPort(int index) const;
134
140 MediaPort::Ptr outputPort(int index) const;
141
148
155
157 int inputPortCount() const { return _inputPorts.size(); }
158
160 int outputPortCount() const { return _outputPorts.size(); }
161
162 // ---- Threading ----
163
169 _threadingPolicy = policy;
170 return;
171 }
172
178 _threadingPolicy = CustomPool;
179 _customPool = pool;
180 return;
181 }
182
184 ThreadingPolicy threadingPolicy() const { return _threadingPolicy; }
185
187 ThreadPool *customThreadPool() const { return _customPool; }
188
189 // ---- Input queue ----
190
198 void setIdealQueueSize(int size) { _idealQueueSize = size; return; }
199
201 int idealQueueSize() const { return _idealQueueSize; }
202
204 int queuedFrameCount() const { return _inputQueue.size(); }
205
206 // ---- Virtual lifecycle ----
207
215 virtual Error configure();
216
224 virtual Error start();
225
231 virtual void stop();
232
238 virtual void process() = 0;
239
246 virtual void starvation();
247
248 // ---- Property interface ----
249
258
265 virtual Error setProperty(const String &name, const Variant &value);
266
273
274 // ---- Node type registry ----
275
281 static void registerNodeType(const String &typeName, std::function<MediaNode *()> factory);
282
288 static MediaNode *createNode(const String &typeName);
289
295
296 // ---- Statistics ----
297
303
306
316
317 // ---- Signals ----
318
321
324
327
328 protected:
337
347 void recordProcessTiming(double duration);
348
356
362
368
374
384
394
403
414 void emitMessage(Severity severity, const String &message, uint64_t frameNumber = 0);
415
420 void emitWarning(const String &message);
421
429 void emitError(const String &message);
430
431 private:
432 String _name;
433 State _state = Idle;
434 ThreadingPolicy _threadingPolicy = UseGraphPool;
435 ThreadPool *_customPool = nullptr;
436 int _idealQueueSize = 2;
437 MediaPort::PtrList _inputPorts;
438 MediaPort::PtrList _outputPorts;
439 Queue<Frame::Ptr> _inputQueue;
440
441 // Outgoing links (managed by MediaGraph)
442 MediaLink::PtrList _outgoingLinks;
443 friend class MediaGraph;
444 friend class MediaLink;
445 friend class MediaPipeline;
446
447 // Statistics (guarded by _statsMutex)
448 mutable std::mutex _statsMutex;
449 uint64_t _processCount = 0;
450 uint64_t _starvationCount = 0;
451 double _lastProcessDuration = 0.0;
452 double _avgProcessDuration = 0.0;
453 double _peakProcessDuration = 0.0;
454 int _peakQueueDepth = 0;
455
456 static Map<String, std::function<MediaNode *()>> &nodeRegistry();
457};
458
466#define PROMEKI_REGISTER_NODE(ClassName) \
467 static struct ClassName##Registrar { \
468 ClassName##Registrar() { \
469 MediaNode::registerNodeType(#ClassName, []() -> MediaNode * { return new ClassName(); }); \
470 } \
471 } __##ClassName##Registrar;
472
Lightweight error code wrapper for the promeki library.
Definition error.h:39
SharedPtr< Frame > Ptr
Shared pointer type for Frame.
Definition frame.h:40
Dynamic array container wrapping std::vector.
Definition list.h:40
size_t size() const noexcept
Returns the number of elements in the list.
Definition list.h:301
Ordered associative container wrapping std::map.
Definition map.h:38
Directed acyclic graph of MediaNodes connected by MediaLinks.
Definition mediagraph.h:32
Base class for all pipeline processing nodes.
Definition medianode.h:81
void deliverOutput(int portIndex, Frame::Ptr frame)
Delivers a frame to all outgoing links on the given output port.
void recordProcessTiming(double duration)
Records timing for a process() call.
int queuedFrameCount() const
Returns the current input queue depth.
Definition medianode.h:204
int idealQueueSize() const
Returns the ideal input queue size.
Definition medianode.h:201
virtual Error setProperty(const String &name, const Variant &value)
Sets a property by name.
virtual void starvation()
Called when the node's input queue is empty and data is needed.
ThreadingPolicy
Threading policy for this node.
Definition medianode.h:93
@ CustomPool
Use a custom thread pool.
Definition medianode.h:96
@ DedicatedThread
Run on a dedicated thread.
Definition medianode.h:95
@ UseGraphPool
Use the graph's default thread pool.
Definition medianode.h:94
static void registerNodeType(const String &typeName, std::function< MediaNode *()> factory)
Registers a node type for runtime creation.
MediaNode(ObjectBase *parent=nullptr)
Constructs a MediaNode.
void addOutputPort(MediaPort::Ptr port)
Adds an output port to this node.
MediaPort::Ptr outputPort(int index) const
Returns the output port at the given index.
void setIdealQueueSize(int size)
Sets the ideal input queue depth.
Definition medianode.h:198
MediaPort::Ptr inputPort(const String &name) const
Returns the input port with the given name.
void deliverOutput(Frame::Ptr frame)
Delivers a frame to all outgoing links on all output ports.
void addInputPort(MediaPort::Ptr port)
Adds an input port to this node.
virtual Error start()
Begins processing.
virtual Map< String, Variant > properties() const
Returns all configurable properties as key-value pairs.
void setState(State state)
Sets the node state and emits stateChanged.
void setThreadingPolicy(ThreadingPolicy policy)
Sets the threading policy.
Definition medianode.h:168
const MediaPort::PtrList & outputPorts() const
Returns the list of output ports.
Definition medianode.h:126
ThreadingPolicy threadingPolicy() const
Returns the current threading policy.
Definition medianode.h:184
MediaPort::Ptr inputPort(int index) const
Returns the input port at the given index.
static MediaNode * createNode(const String &typeName)
Creates a node by registered type name.
static List< String > registeredNodeTypes()
Returns the list of all registered node type names.
virtual void stop()
Stops processing.
void emitMessage(Severity severity, const String &message, uint64_t frameNumber=0)
Emits a message with the given severity and text.
Variant property(const String &name) const
Gets a single property value by name.
void emitWarning(const String &message)
Emits a Warning-severity message.
void resetStats()
Resets all statistics counters to zero.
State state() const
Returns the current state.
Definition medianode.h:109
Frame::Ptr dequeueInput()
Dequeues a frame from this node's input queue.
PROMEKI_SIGNAL(stateChanged, State)
Emitted when the node's state changes.
int inputPortCount() const
Returns the number of input ports.
Definition medianode.h:157
void setName(const String &name)
Sets the node name.
Definition medianode.h:118
void enqueueInput(Frame::Ptr frame)
Enqueues a frame into this node's input queue.
int outputPortCount() const
Returns the number of output ports.
Definition medianode.h:160
MediaPort::Ptr outputPort(const String &name) const
Returns the output port with the given name.
virtual void process()=0
Processes one cycle of data.
virtual Map< String, Variant > extendedStats() const
Returns additional node-specific statistics.
void emitError(const String &message)
Emits an Error-severity message.
PROMEKI_SIGNAL(messageEmitted, NodeMessage)
Emitted when the node produces a message.
virtual Error configure()
Validates ports and allocates resources.
const String & name() const
Returns the node name.
Definition medianode.h:112
State
Node lifecycle state.
Definition medianode.h:85
@ ErrorState
An error has occurred.
Definition medianode.h:89
@ Running
Actively processing data.
Definition medianode.h:88
@ Configured
Configured and ready to start.
Definition medianode.h:87
@ Idle
Initial state, not configured.
Definition medianode.h:86
void setThreadingPolicy(ThreadPool *pool)
Sets a custom thread pool and switches policy to CustomPool.
Definition medianode.h:177
PROMEKI_SIGNAL(errorOccurred, Error)
Emitted when an error occurs.
virtual ~MediaNode()
Virtual destructor.
ThreadPool * customThreadPool() const
Returns the custom thread pool, or nullptr if not using CustomPool.
Definition medianode.h:187
const MediaPort::PtrList & inputPorts() const
Returns the list of input ports.
Definition medianode.h:123
void recordStarvation()
Records a starvation event.
NodeStats stats() const
Returns a snapshot of the node's performance statistics.
Owns a MediaGraph and orchestrates pipeline processing.
Definition mediapipeline.h:29
promeki::List< Ptr > PtrList
List of shared pointers to MediaPort.
Definition mediaport.h:41
Base object for promeki.
Definition objectbase.h:129
ObjectBase * parent() const
Returns the parent object, if one. nullptr if none.
Definition objectbase.h:258
Encoding-aware string class with copy-on-write semantics.
Definition string.h:35
General-purpose thread pool for submitting callable tasks.
Definition threadpool.h:44
A monotonic timestamp based on std::chrono::steady_clock.
Definition timestamp.h:32
Severity
Severity level for node messages.
Definition medianode.h:35
@ Warning
Warning — non-fatal issue.
@ Info
Informational message.
@ Fatal
Fatal — pipeline should stop.
@ Error
Error — node transitions to ErrorState.
#define PROMEKI_NAMESPACE_BEGIN
Starts a promeki namespace block.
Definition namespace.h:14
#define PROMEKI_NAMESPACE_END
Ends a promeki namespace block.
Definition namespace.h:19
const T & value(const Result< T > &r)
Returns the value from a Result.
Definition result.h:56
Structured message emitted by a pipeline node.
Definition medianode.h:46
String message
Human-readable message text.
Definition medianode.h:48
MediaNode * node
The node that emitted this message.
Definition medianode.h:51
TimeStamp timestamp
When the message was created.
Definition medianode.h:50
uint64_t frameNumber
Frame number this message relates to (0 if not frame-specific).
Definition medianode.h:49
Severity severity
Message severity level.
Definition medianode.h:47
Snapshot of node performance statistics.
Definition medianode.h:58
uint64_t starvationCount
Total starvation() invocations.
Definition medianode.h:60
uint64_t processCount
Total process() invocations.
Definition medianode.h:59
double avgProcessDuration
Exponential moving average of process() duration in seconds.
Definition medianode.h:62
int peakQueueDepth
Peak input queue depth observed.
Definition medianode.h:65
double lastProcessDuration
Wall-clock time of last process() call in seconds.
Definition medianode.h:61
double peakProcessDuration
Peak process() duration in seconds.
Definition medianode.h:63
int currentQueueDepth
Current input queue depth.
Definition medianode.h:64