libpromeki 1.0.0-alpha
PROfessional MEdia toolKIt
 
Loading...
Searching...
No Matches
rtmpmediaio.h
Go to the documentation of this file.
1
8#pragma once
9
10
11#include <promeki/config.h>
12#if PROMEKI_ENABLE_NETWORK
13#include <promeki/atomic.h>
14#include <promeki/audiodesc.h>
16#include <promeki/enums.h>
17#include <promeki/frame.h>
18#include <promeki/framenumber.h>
19#include <promeki/framerate.h>
20#include <promeki/histogram.h>
21#include <promeki/imagedesc.h>
23#include <promeki/namespace.h>
24#include <promeki/pacinggate.h>
25#include <promeki/queue.h>
26#include <promeki/rtmpsession.h>
27#include <promeki/string.h>
28#include <promeki/uniqueptr.h>
29#include <promeki/url.h>
30
31PROMEKI_NAMESPACE_BEGIN
32
33class RtmpClient;
34class Thread;
35
117class RtmpMediaIO : public DedicatedThreadMediaIO {
118 PROMEKI_OBJECT(RtmpMediaIO, DedicatedThreadMediaIO)
119 public:
120 // ---- Telemetry keys ----
121
123 static inline const MediaIOStats::ID StatsFramesSent{"FramesSent"};
125 static inline const MediaIOStats::ID StatsFramesReceived{"FramesReceived"};
127 static inline const MediaIOStats::ID StatsVideoMessagesSent{"VideoMessagesSent"};
129 static inline const MediaIOStats::ID StatsAudioMessagesSent{"AudioMessagesSent"};
131 static inline const MediaIOStats::ID StatsVideoMessagesReceived{"VideoMessagesReceived"};
133 static inline const MediaIOStats::ID StatsAudioMessagesReceived{"AudioMessagesReceived"};
135 static inline const MediaIOStats::ID StatsBytesSent{"BytesSent"};
137 static inline const MediaIOStats::ID StatsBytesReceived{"BytesReceived"};
139 static inline const MediaIOStats::ID StatsSendQueueDepth{"SendQueueDepth"};
141 static inline const MediaIOStats::ID StatsReadQueueDepth{"ReadQueueDepth"};
143 static inline const MediaIOStats::ID StatsSendQueueOverflows{"SendQueueOverflows"};
145 static inline const MediaIOStats::ID StatsConnectDurationMs{"ConnectDurationMs"};
147 static inline const MediaIOStats::ID StatsHandshakeDurationMs{"HandshakeDurationMs"};
149 static inline const MediaIOStats::ID StatsVideoFramesDroppedPreIdr{"VideoFramesDroppedPreIdr"};
150
152 static inline const MediaIOStats::ID StatsPacingTicksOnTime{"PacingTicksOnTime"};
154 static inline const MediaIOStats::ID StatsPacingTicksLate{"PacingTicksLate"};
156 static inline const MediaIOStats::ID StatsPacingTicksSkipped{"PacingTicksSkipped"};
158 static inline const MediaIOStats::ID StatsPacingReanchors{"PacingReanchors"};
160 static inline const MediaIOStats::ID StatsPacingClockKind{"PacingClockKind"};
161
163 explicit RtmpMediaIO(ObjectBase *parent = nullptr);
164
166 ~RtmpMediaIO() override;
167
175 uint64_t objectId() const { return _objectId; }
176
196 Error proposeInput(const MediaDesc &offered, MediaDesc *preferred) const override;
197
198 protected:
199 Error executeCmd(MediaIOCommandOpen &cmd) override;
200 Error executeCmd(MediaIOCommandClose &cmd) override;
201 Error executeCmd(MediaIOCommandRead &cmd) override;
202 Error executeCmd(MediaIOCommandWrite &cmd) override;
203 Error executeCmd(MediaIOCommandStats &cmd) override;
204
223 Error executeCmd(MediaIOCommandSetClock &cmd) override;
224
225 // Wakes the reader-side executeCmd(Read) loop so close()
226 // can drain a strand parked on the reader queue pop.
227 void cancelBlockingWork() override;
228
229 private:
230 class PacketizerThread;
231 class DepacketizerThread;
232
234 void resetAll();
235
237 RtmpConnectOptions buildConnectOptions(const MediaIO::Config &cfg) const;
238
240 static bool hasVideoEssence(const Frame &frame);
241
243 static bool hasAudioEssence(const Frame &frame);
244
256 bool paceVideoFrame();
257
269 void armVideoPaceGate();
270
272 String paceClockKind() const;
273
287 void onClientDisconnected(Error reason);
288
289 // Owned RtmpClient — instantiated in executeCmd(Open),
290 // destroyed in resetAll. Lives across the open/close
291 // cycle so the writer / reader threads it owns service
292 // the wire.
293 UniquePtr<RtmpClient> _client;
294
295 // Worker threads. Both are nullptr when idle.
296 UniquePtr<PacketizerThread> _packetizer;
297 UniquePtr<DepacketizerThread> _depacketizer;
298
299 // Strand-side bounded reader queue — populated by the
300 // depacketizer thread, drained by executeCmd(Read).
301 Queue<Frame> _readerQueue;
302
303 // Cancellation latch for executeCmd(Read). Set by
304 // cancelBlockingWork(); cleared at every Open.
305 Atomic<bool> _readCancelled{false};
306
307 // Latched when @c RtmpClient::disconnectedSignal fires
308 // (peer went away, socket I/O error, etc.). Polled by
309 // the packetizer / depacketizer worker loops so they
310 // exit cleanly instead of spewing per-frame send
311 // failures into the log, and consulted by
312 // executeCmd(Read) / executeCmd(Write) so the failure
313 // surfaces to the pipeline as a write/read error.
314 // @c _disconnectErrorCode stores the disconnect reason
315 // as an @c Error::Code (int) to keep the atomic POD;
316 // reconstruct an @c Error via @c Error(static_cast).
317 Atomic<bool> _clientDisconnected{false};
318 Atomic<int> _disconnectErrorCode{0};
319
320 // Sink-side state set at Open from the MediaConfig +
321 // pending descriptor, read by the packetizer thread.
322 ImageDesc _imageDesc;
323 AudioDesc _audioDesc;
324 Url _url;
325 String _streamKey;
326 bool _readerMode = false;
327 bool _dropUntilKeyframe = true;
328 bool _repeatParameterSets = true;
329 bool _enhancedRtmp = true;
330 bool _emitAnnexB = false;
331 bool _dataEnabled = true;
332 int _sendQueueDepth = 64;
333 int _readQueueDepth = 64;
334
335 // Per-instance frame counters used to populate
336 // currentFrame / frameCount on read / write commands.
337 // Strand-owned; updated only inside executeCmd().
338 FrameCount _frameCount{0};
339 FrameCount _framesSent{0};
340
341 // Cumulative stats counters bumped by the worker
342 // threads, read by executeCmd(Stats). Atomics for
343 // lock-free cross-thread aggregation.
344 Atomic<int64_t> _readerFramesReceived{0};
345 Atomic<int64_t> _videoFramesDroppedPreIdr{0};
346 Atomic<int64_t> _sendQueueOverflows{0};
347
348 // Strand-side video pacing. Mirrors RtpMediaIO's
349 // _videoGate but defaults to an Internal wall-clock
350 // binding since TCP has no kernel-pacing analog to fall
351 // back on. Touched only from the strand.
352 RtmpVideoPacing _videoPacingMode{RtmpVideoPacing::Internal};
353 PacingGate _videoPaceGate;
354 FrameRate _frameRate;
355 int _paceSkipThresholdMs = 0;
356 int _paceReanchorThresholdMs = 0;
357 bool _paceClockIsExternal = false;
358
359 // Open-time telemetry — captured around the
360 // RtmpClient::open / publish calls.
361 int64_t _connectDurationMs = 0;
362 int64_t _handshakeDurationMs = 0;
363
364 // Process-local monotonic counter that gives every
365 // RtmpMediaIO a distinct id within one process.
366 static Atomic<uint64_t> _nextObjectId;
367 uint64_t _objectId = 0;
368};
369
374class RtmpFactory : public MediaIOFactory {
375 public:
376 RtmpFactory() = default;
377
378 String name() const override { return String("Rtmp"); }
379 String displayName() const override { return String("RTMP Stream"); }
380 String description() const override {
381 return String("RTMP / RTMPS publisher and subscriber "
382 "(H.264 + HEVC video, AAC audio)");
383 }
384 StringList schemes() const override {
385 return StringList{String("rtmp"), String("rtmps")};
386 }
387
388 bool canBeSource() const override { return true; }
389 bool canBeSink() const override { return true; }
390
391 Config::SpecMap configSpecs() const override;
392 Error urlToConfig(const Url &url, Config *outConfig) const override;
393 MediaIO *create(const Config &config, ObjectBase *parent = nullptr) const override;
394};
395
396PROMEKI_NAMESPACE_END
397
398#endif // PROMEKI_ENABLE_NETWORK