libpromeki 1.0.0-alpha
PROfessional MEdia toolKIt
 
Loading...
Searching...
No Matches
mediapipeline.h
Go to the documentation of this file.
1
8#pragma once
9
10
11#include <promeki/config.h>
12#if PROMEKI_ENABLE_PROAV
13#include <functional>
14#include <promeki/function.h>
15#include <promeki/clock.h>
16#include <promeki/duration.h>
17#include <promeki/error.h>
18#include <promeki/frame.h>
19#include <promeki/list.h>
20#include <promeki/logger.h>
21#include <promeki/map.h>
22#include <promeki/mediaio.h>
28#include <promeki/mutex.h>
29#include <promeki/namespace.h>
30#include <promeki/objectbase.h>
32#include <promeki/set.h>
33#include <promeki/string.h>
34#include <promeki/stringlist.h>
35
36PROMEKI_NAMESPACE_BEGIN
37
38class MediaIOStatsCollector;
39
77class MediaPipeline : public ObjectBase {
78 PROMEKI_OBJECT(MediaPipeline, ObjectBase)
79 public:
88 enum class State {
89 Empty,
90 Built,
91 Open,
92 Running,
93 Stopped,
94 Closed
95 };
96
108 enum class PlaybackState {
109 Idle,
110 Playing,
111 Paused,
112 Seeking,
113 Ended
114 };
115
128 enum class CaptureState {
129 Idle,
130 Armed,
131 Recording,
132 Paused
133 };
134
139 explicit MediaPipeline(ObjectBase *parent = nullptr);
140
142 ~MediaPipeline() override;
143
144 // ------------------------------------------------------------
145 // Lifecycle
146 // ------------------------------------------------------------
147
168 Error build(const MediaPipelineConfig &config, bool autoplan = false);
169
214 Error injectStage(MediaIO *io);
215
223 Error open();
224
230 Error start();
231
248 Error stop();
249
284 Error close(bool block = true);
285
299 static constexpr unsigned int DefaultCloseTimeoutMs = 3000;
300
309 void setCloseTimeoutMs(unsigned int ms) { _closeTimeoutMs = ms; }
310
312 unsigned int closeTimeoutMs() const { return _closeTimeoutMs; }
313
320 bool isClosing() const { return _closing; }
321
323 State state() const { return _state; }
324
325 // ------------------------------------------------------------
326 // Playback transport
327 // ------------------------------------------------------------
328
337 PlaybackState playbackState() const { return _playbackState; }
338
350 Error play();
351
363 Error pause();
364
373 Error togglePlayPause();
374
382 double rate() const;
383
396 Error setRate(double r);
397
406 FrameNumber currentFrame() const;
407
427 Error seek(FrameNumber pos, MediaIOSeekMode mode = MediaIO_SeekDefault);
428
439 Error stepForward(int64_t n = 1);
440
450 Error stepBackward(int64_t n = 1);
451
452 // ------------------------------------------------------------
453 // Capture transport
454 // ------------------------------------------------------------
455
464 CaptureState captureState() const { return _captureState; }
465
480 Error armCapture();
481
494 Error startCapture();
495
510 Error pauseCapture();
511
525 Error resumeCapture();
526
538 Error stopCapture();
539
552 Error setCaptureTrigger(MediaPipelineTrigger::UPtr trig);
553
562 Error setCaptureTrigger(Function<bool(const Frame &)> fn);
563
576 Error setCaptureTrigger(const String &queryExpr);
577
579 Error clearCaptureTrigger();
580
581 // ------------------------------------------------------------
582 // Introspection
583 // ------------------------------------------------------------
584
586 const MediaPipelineConfig &config() const { return _config; }
587
592 MediaIO *stage(const String &name) const;
593
595 StringList stageNames() const;
596
604 StringList describe() const;
605
623 MediaPipelineStats stats();
624
633 MediaPipelineStageStats stageStats(const String &name);
634
635 // ------------------------------------------------------------
636 // Signals
637 // ------------------------------------------------------------
638
648 PROMEKI_SIGNAL(pipelineError, String, Error);
649
651 PROMEKI_SIGNAL(stageOpened, String);
652
654 PROMEKI_SIGNAL(stageStarted, String);
655
657 PROMEKI_SIGNAL(stageStopped, String);
658
660 PROMEKI_SIGNAL(stageClosed, String);
661
675 PROMEKI_SIGNAL(finished, bool);
676
691 PROMEKI_SIGNAL(closed, Error);
692
706 PROMEKI_SIGNAL(statsUpdated, MediaPipelineStats);
707
720 PROMEKI_SIGNAL(playbackStateChanged, PlaybackState);
721
731 PROMEKI_SIGNAL(rateChanged, double);
732
740 PROMEKI_SIGNAL(positionChanged, FrameNumber);
741
751 PROMEKI_SIGNAL(captureStateChanged, CaptureState);
752
753 // ------------------------------------------------------------
754 // Event subscription
755 // ------------------------------------------------------------
756
763 using EventCallback = Function<void(const PipelineEvent &)>;
764
789 int subscribe(EventCallback cb);
790
800 void unsubscribe(int id);
801
819 void setStatsInterval(Duration interval);
820
822 Duration statsInterval() const { return _statsInterval; }
823
824 private:
825 // One record per outgoing route. Back-pressure,
826 // per-sink frame-count caps, and the actual frame
827 // dispatch all live on the @ref MediaIOPortConnection
828 // that owns this edge's source — the pipeline only
829 // tracks enough state per edge to drive the cascade
830 // close when the source EOFs, mark the cap as
831 // observed, and look up the sink's stage name for
832 // error reporting.
833 struct EdgeState {
834 String toName;
835 MediaIO *to = nullptr;
836 MediaIOSink *toSink = nullptr;
837 bool isSinkEdge = false;
838 bool capReached = false;
839 };
840
841 // Each source-or-transit stage owns one SourceState entry.
842 // Stages with no outgoing routes (pure sinks) do not appear
843 // here — they participate only as a sink end of some
844 // upstream stage's connection. The @c connection field is
845 // owned by the pipeline and lives from @ref start until
846 // @ref close.
847 struct SourceState {
848 MediaIO *from = nullptr;
849 MediaIOPortConnection *connection = nullptr;
850 List<EdgeState> edges;
851 bool upstreamDone = false;
852 };
853
854 struct Subscriber {
855 int id;
856 EventCallback fn;
857 EventLoop *loop;
858 };
859
860 Error destroyStages();
861 Error topologicallySort(List<String> &order) const;
862 MediaIO *instantiateStage(const MediaPipelineConfig::Stage &s);
863
876 void wireConnectionsForOpenedStage(const String &name);
877
887 void publish(PipelineEvent ev);
888
893 void publishStateChanged();
894
903 void publishTransportStateChanged(const String &scope, const String &newState);
904
912 void setPlaybackState(PlaybackState s);
913
929 Error resolvePacingStage();
930
942 Error resolveCaptureSinks();
943
948 void setCaptureState(CaptureState s);
949
960 void onCaptureFrame(const Frame &frame);
961
969 void setCaptureGates(bool open);
970
972 void installCaptureInspectorIfNeeded();
973
975 void removeCaptureInspector();
976
981 void publishStageState(const String &stageName, const String &transition);
982
984 void installLoggerTap();
985
987 void removeLoggerTap();
988
990 void startStatsTimerIfNeeded();
991
993 void stopStatsTimer();
994
996 void emitStatsSnapshot();
997
1008 void onUpstreamDone(const String &srcName);
1009
1015 void onSourceConnectionError(const String &srcName, Error err);
1016
1022 void onSinkConnectionError(const String &srcName, MediaIOSink *sink, Error err);
1023
1032 void onSinkLimitReached(const String &srcName, MediaIOSink *sink);
1033
1048 void initiateClose(bool clean);
1049
1061 void forceCloseRemaining();
1062
1063 // ObjectBase override — routes the close watchdog's
1064 // TimerEvent to @ref forceCloseRemaining.
1065 void timerEvent(TimerEvent *e) override;
1066
1074 void onStageClosed(const String &stageName, Error err);
1075
1085 void finalizeClose();
1086
1098 void attachStatsCollector(const String &name, MediaIO *io);
1099
1108 void clearStatsCollectors();
1109
1117 MediaPipelineStageStats buildStageStats(const String &name, MediaIO *io);
1118
1119 MediaPipelineConfig _config;
1120 State _state = State::Empty;
1121 PlaybackState _playbackState = PlaybackState::Idle;
1122 CaptureState _captureState = CaptureState::Idle;
1123
1124 // Pacing-stage resolution (Playback only). Populated by
1125 // @ref resolvePacingStage during @ref open, cleared by
1126 // @ref destroyStages. Null when the kind is Capture or
1127 // when no stage in the config flagged @c pacesPipeline=true.
1128 MediaIO *_pacingStage = nullptr;
1129 MediaIOPortGroup *_pacingGroup = nullptr;
1130 Clock::Ptr _pacingClock;
1131
1132 // Capture-sink resolution (Capture only). Populated by
1133 // @ref resolveCaptureSinks during @ref open, cleared by
1134 // @ref destroyStages. Empty when the kind is Playback
1135 // or when no stage in the config flagged @c captureSink=true.
1136 List<MediaIO *> _captureSinks;
1137 MediaPipelineTrigger::UPtr _captureTrigger;
1138 Map<String, MediaIO *> _stages;
1139 Map<String, MediaIO *> _injected;
1140 Map<String, MediaIOStatsCollector *> _statsCollectors;
1141 Map<String, SourceState> _sources;
1142 List<String> _topoOrder;
1143
1144 // Close-cascade bookkeeping. Latched by
1145 // @ref initiateClose and unwound in
1146 // @ref finalizeClose. @c _cleanFinish is the running
1147 // "no errors observed" bit that eventually feeds
1148 // @ref finishedSignal; drops to false on any
1149 // operational or close-time error.
1150 bool _closing = false;
1151 bool _cleanFinish = false;
1152 Set<String> _stagesAwaitingClosed;
1153 Error _closeError = Error::Ok;
1154
1155 // Close-watchdog timer. Armed by @ref initiateClose,
1156 // stopped by @ref finalizeClose, and escalates to
1157 // @ref forceCloseRemaining when it fires. Zero means
1158 // "no watchdog armed".
1159 unsigned int _closeTimeoutMs = DefaultCloseTimeoutMs;
1160 int _closeWatchdogTimerId = -1;
1161
1162 // Pipeline-wide frame-count cap, cached at build time
1163 // from @ref MediaPipelineConfig::frameCount. Zero means
1164 // "no cap" (matches the @ref FrameCount::isFinite() gate).
1165 // Non-zero values are passed as the per-sink frame
1166 // limit to @ref MediaIOPortConnection::addSink for every
1167 // terminal-sink edge; the connection enforces the cap at
1168 // safe-cut-point boundaries and emits @c sinkLimitReached
1169 // when each sink hits it. @ref _terminalSinksRemaining
1170 // counts the still-pending capped sinks; when it reaches
1171 // zero the pipeline initiates a clean close.
1172 int64_t _frameCountLimit = 0;
1173 int _terminalSinksRemaining = 0;
1174
1175 // Event subscription bookkeeping. _subsMutex guards
1176 // _subscribers and the logger-listener handle so
1177 // subscribe / unsubscribe / publish can race with each
1178 // other across threads safely. The actual dispatch in
1179 // publish() snapshots the list under the lock and
1180 // releases before invoking callbacks so a callback may
1181 // (un)subscribe re-entrantly without deadlocking.
1182 mutable Mutex _subsMutex;
1183 Map<int, Subscriber> _subscribers;
1184 int _nextSubId = 0;
1185 Logger::ListenerHandle _loggerTap = 0;
1186
1187 // Stats tick. _statsInterval is the user-configured
1188 // wall-clock cadence; _statsTimerId is the active timer
1189 // id (negative when no timer is armed).
1190 Duration _statsInterval;
1191 int _statsTimerId = -1;
1192};
1193
1194PROMEKI_NAMESPACE_END
1195
1196#endif // PROMEKI_ENABLE_PROAV