libpromeki 1.0.0-alpha
PROfessional MEdia toolKIt
 
Loading...
Searching...
No Matches
rtpmediaio.h
Go to the documentation of this file.
1
8#pragma once
9
10
11#include <promeki/config.h>
12#if PROMEKI_ENABLE_NETWORK
13#include <promeki/atomic.h>
14#include <promeki/audiobuffer.h>
15#include <promeki/audiodesc.h>
16#include <promeki/clockdomain.h>
18#include <promeki/eui64.h>
19#include <promeki/macaddress.h>
20#include <promeki/frame.h>
21#include <promeki/histogram.h>
22#include <promeki/imagedesc.h>
24#include <promeki/mutex.h>
25#include <promeki/namespace.h>
26#include <promeki/ntptime.h>
27#include <promeki/pacinggate.h>
29#include <promeki/pixelformat.h>
30#include <promeki/queue.h>
40#include <promeki/rtppacket.h>
45#include <promeki/rtptxthread.h>
47#include <promeki/sdpsession.h>
49#include <promeki/string.h>
50#include <promeki/timestamp.h>
53
54PROMEKI_NAMESPACE_BEGIN
55
56class RtpSession;
57class RtpPayload;
58class UdpSocketTransport;
59class Thread;
60
379class RtpMediaIO : public DedicatedThreadMediaIO {
380 PROMEKI_OBJECT(RtpMediaIO, DedicatedThreadMediaIO)
381 public:
383 static inline const MediaIOStats::ID StatsFramesSent{"FramesSent"};
385 static inline const MediaIOStats::ID StatsPacketsSent{"PacketsSent"};
387 static inline const MediaIOStats::ID StatsBytesSent{"BytesSent"};
389 static inline const MediaIOStats::ID StatsFramesReceived{"FramesReceived"};
391 static inline const MediaIOStats::ID StatsPacketsReceived{"PacketsReceived"};
393 static inline const MediaIOStats::ID StatsBytesReceived{"BytesReceived"};
395 static inline const MediaIOStats::ID StatsTxVideoFrameIntervalUs{"TxVideoFrameIntervalUs"};
397 static inline const MediaIOStats::ID StatsTxVideoSendDurationUs{"TxVideoSendDurationUs"};
399 static inline const MediaIOStats::ID StatsRxVideoPacketIntervalUs{"RxVideoPacketIntervalUs"};
401 static inline const MediaIOStats::ID StatsRxVideoFrameIntervalUs{"RxVideoFrameIntervalUs"};
403 static inline const MediaIOStats::ID StatsRxVideoFrameAssembleUs{"RxVideoFrameAssembleUs"};
415 static inline const MediaIOStats::ID StatsAudioSilencePacketsEmitted{"AudioSilencePacketsEmitted"};
424 static inline const MediaIOStats::ID StatsAudioSilenceSamplesEmitted{"AudioSilenceSamplesEmitted"};
425
426 // ----------------------------------------------------------
427 // Reader-side per-stream RFC 3550 §A counters,
428 // aggregated across every active reader stream
429 // (video + audio + data). All published by
430 // @c executeCmd(MediaIOCommandStats); see
431 // @c devplan/network/rtp-rx.md for the full
432 // ReaderStream::Stats block these correspond to.
433 // ----------------------------------------------------------
434
436 static inline const MediaIOStats::ID StatsRxExtendedHighestSeq{"RxExtendedHighestSeq"};
438 static inline const MediaIOStats::ID StatsRxPacketsExpected{"RxPacketsExpected"};
440 static inline const MediaIOStats::ID StatsRxCumulativeLost{"RxCumulativeLost"};
442 static inline const MediaIOStats::ID StatsRxFractionLost{"RxFractionLost"};
444 static inline const MediaIOStats::ID StatsRxDuplicatePackets{"RxDuplicatePackets"};
446 static inline const MediaIOStats::ID StatsRxReorderedPackets{"RxReorderedPackets"};
448 static inline const MediaIOStats::ID StatsRxInterarrivalJitter{"RxInterarrivalJitter"};
449
451 static inline const MediaIOStats::ID StatsRxSsrcChanges{"RxSsrcChanges"};
452
454 static inline const MediaIOStats::ID StatsRxReorderEmittedInOrder{"RxReorderEmittedInOrder"};
456 static inline const MediaIOStats::ID StatsRxReorderEmittedOnDeadline{"RxReorderEmittedOnDeadline"};
458 static inline const MediaIOStats::ID StatsRxReorderDroppedOverflow{"RxReorderDroppedOverflow"};
460 static inline const MediaIOStats::ID StatsRxReorderDroppedDuplicate{"RxReorderDroppedDuplicate"};
461
463 static inline const MediaIOStats::ID StatsRxVideoQueueDepth{"RxVideoQueueDepth"};
465 static inline const MediaIOStats::ID StatsRxAudioQueueDepth{"RxAudioQueueDepth"};
467 static inline const MediaIOStats::ID StatsRxDataQueueDepth{"RxDataQueueDepth"};
469 static inline const MediaIOStats::ID StatsRxReaderQueueDepth{"RxReaderQueueDepth"};
470
475 static inline const MediaIOStats::ID StatsRxFramesReassembled{"RxFramesReassembled"};
479 static inline const MediaIOStats::ID StatsRxFramesDroppedValidate{"RxFramesDroppedValidate"};
484 static inline const MediaIOStats::ID StatsRxFramesWaitingParamSets{"RxFramesWaitingParamSets"};
490 static inline const MediaIOStats::ID StatsRxFramesDroppedSsrcReset{"RxFramesDroppedSsrcReset"};
491
495 static inline const MediaIOStats::ID StatsRxSrObserved{"RxSrObserved"};
500 static inline const MediaIOStats::ID StatsRxLastSrAgeUs{"RxLastSrAgeUs"};
505 static inline const MediaIOStats::ID StatsRxFirstSrLatencyUs{"RxFirstSrLatencyUs"};
506
508 static inline const MediaIOParamsID ParamGetSdp{"GetSdp"};
510 static inline const MediaIOParamsID ParamSdp{"Sdp"};
511
513 RtpMediaIO(ObjectBase *parent = nullptr);
514
516 ~RtpMediaIO() override;
517
530 uint64_t objectId() const { return _objectId; }
531
545 static String buildDefaultCname(int64_t pid, uint64_t objectId, const String &host);
546
565 static String pickEgressHostForCname(const SocketAddress &destination);
566
567 protected:
568 Error executeCmd(MediaIOCommandOpen &cmd) override;
569 Error executeCmd(MediaIOCommandClose &cmd) override;
570 Error executeCmd(MediaIOCommandRead &cmd) override;
571 Error executeCmd(MediaIOCommandWrite &cmd) override;
572 Error executeCmd(MediaIOCommandParams &cmd) override;
573 Error executeCmd(MediaIOCommandStats &cmd) override;
593 Error executeCmd(MediaIOCommandSetClock &cmd) override;
594
595 // Wakes the reader-side executeCmd(Read) loop so close()
596 // can drain a strand parked on _readerQueue.pop().
597 void cancelBlockingWork() override;
598
599 private:
600
601 // describe() / proposeInput overrides intentionally
602 // omitted for v1: each RFC 9134 / RFC 2435 / RFC 4175
603 // payload type has its own accepted-shape constraints
604 // (e.g. JPEG payload = 8-bit YUV422 only; raw payload
605 // = configurable subsampling / bit depth; L16 audio =
606 // 16-bit BE PCM at fixed channel counts). Encoding
607 // those rules per-payload is its own follow-up — for
608 // now the planner's open() fallback inspects the SDP
609 // (when configured) to learn the live shape, and
610 // bridges insert as needed via the configured payload
611 // type's own runtime checks.
612
621 // Per-stream packetizer + TX worker classes. Each
622 // active stream gets one of each: the packetizer
623 // pulls Frames off the strand-side @c PayloadQueue
624 // and emits payload-bytes-only packets onto the
625 // sink-side queue; the TX thread pops the latter,
626 // stamps the full RTP header, and dispatches to
627 // the wire. Packetizer and TX talk only through
628 // their own per-stream @c Queue, so stream-level
629 // jitter (heavy IDR encode, audio cadence) never
630 // bleeds across stream boundaries. Concrete
631 // subclasses live as nested classes inside
632 // @c rtpmediaio.cpp because they need access to
633 // RtpMediaIO state (parameter-set cache, pacing
634 // mode, SDP sprop refresh, etc.).
635 class VideoPacketizerThread;
636 class AudioPacketizerThread;
637 class DataPacketizerThread;
638 class VideoTxThread;
639 class AudioTxThread;
640 class DataTxThread;
641
674 struct Stream {
675 Stream() = default;
676 Stream(Stream &&o) noexcept
677 : transport(o.transport),
678 session(o.session),
679 payload(o.payload),
680 destination(std::move(o.destination)),
681 payloadType(o.payloadType),
682 clockRate(o.clockRate),
683 dscp(o.dscp),
684 ssrc(o.ssrc),
685 mediaType(std::move(o.mediaType)),
686 rtpmap(std::move(o.rtpmap)),
687 fmtp(std::move(o.fmtp)),
688 active(o.active),
689 clockDomain(o.clockDomain),
690 tsRefClkMode(o.tsRefClkMode),
691 ptpGrandmaster(o.ptpGrandmaster),
692 ptpDomain(o.ptpDomain),
693 refClockLocalMac(std::move(o.refClockLocalMac)),
694 mediaClkOffset(o.mediaClkOffset) {
695 // Null pointers on the moved-
696 // from instance so a stray
697 // reset can't double-delete.
698 o.transport = nullptr;
699 o.session = nullptr;
700 o.payload = nullptr;
701 }
702 Stream(const Stream &) = delete;
703 Stream &operator=(const Stream &) = delete;
704 Stream &operator=(Stream &&) = delete;
705 UdpSocketTransport *transport = nullptr;
706 RtpSession *session = nullptr;
707 RtpPayload *payload = nullptr;
708 SocketAddress destination;
709 uint8_t payloadType = 0;
710 uint32_t clockRate = 90000;
711 int dscp = 0;
712 uint32_t ssrc = 0;
713 String mediaType;
714 String rtpmap;
715 String fmtp;
716 bool active = false;
717 ClockDomain clockDomain;
718 RtpRefClockMode tsRefClkMode = RtpRefClockMode::None;
719 EUI64 ptpGrandmaster;
720 uint8_t ptpDomain = 0;
721 MacAddress refClockLocalMac;
722 int32_t mediaClkOffset = 0;
723 };
724
739 struct WriterStream : Stream {
740 WriterStream() = default;
741 WriterStream(WriterStream &&o) noexcept
742 : Stream(std::move(o)),
743 packetizer(o.packetizer),
744 tx(o.tx),
745 packetsSent(o.packetsSent.value()),
746 bytesSent(o.bytesSent.value()),
747 senderOctets(o.senderOctets.value()),
748 txFrameInterval(std::move(o.txFrameInterval)),
749 txSendDuration(std::move(o.txSendDuration)),
750 txLastSendStart(o.txLastSendStart),
751 txHasLastSend(o.txHasLastSend) {
752 o.packetizer = nullptr;
753 o.tx = nullptr;
754 }
755 WriterStream(const WriterStream &) = delete;
756 WriterStream &operator=(const WriterStream &) = delete;
757 WriterStream &operator=(WriterStream &&) = delete;
758 RtpPacketizerThread *packetizer = nullptr;
759 RtpTxThread *tx = nullptr;
760 Atomic<int64_t> packetsSent{0};
761 Atomic<int64_t> bytesSent{0};
762 Atomic<int64_t> senderOctets{0};
763 Histogram txFrameInterval;
764 Histogram txSendDuration;
765 TimeStamp txLastSendStart;
766 bool txHasLastSend = false;
767 };
768
785 struct ReaderStream : Stream {
786 ReaderStream() = default;
787 ReaderStream(ReaderStream &&o) noexcept
788 : Stream(std::move(o)),
789 packetsReceived(o.packetsReceived.value()),
790 bytesReceived(o.bytesReceived.value()),
791 framesReceived(o.framesReceived),
792 packetsLost(o.packetsLost.value()),
793 readerImageDesc(std::move(o.readerImageDesc)),
794 readerAudioDesc(std::move(o.readerAudioDesc)),
795 reasmTimestamp(o.reasmTimestamp),
796 reasmHasTimestamp(o.reasmHasTimestamp),
797 reasmLastSeq(o.reasmLastSeq),
798 reasmHaveLastSeq(o.reasmHaveLastSeq),
799 reasmPackets(std::move(o.reasmPackets)),
800 rxPacketInterval(std::move(o.rxPacketInterval)),
801 rxFrameInterval(std::move(o.rxFrameInterval)),
802 rxFrameAssembleTime(std::move(o.rxFrameAssembleTime)),
803 rxLastPacketTime(o.rxLastPacketTime),
804 rxLastFrameTime(o.rxLastFrameTime),
805 rxFrameStartTime(o.rxFrameStartTime),
806 rxHasLastPacket(o.rxHasLastPacket),
807 rxHasLastFrame(o.rxHasLastFrame),
808 rxHasFrameStart(o.rxHasFrameStart),
809 streamClock(std::move(o.streamClock)),
810 lastSrArrivedAt(o.lastSrArrivedAt),
811 hasSr(o.hasSr),
812 seqTracker(std::move(o.seqTracker)),
813 reorderBuffer(std::move(o.reorderBuffer)),
814 reorderQueue(std::move(o.reorderQueue)),
815 depacketizer(std::move(o.depacketizer)),
816 ssrcChanges(o.ssrcChanges.value()),
817 framesReassembled(o.framesReassembled.value()),
818 framesDroppedValidate(o.framesDroppedValidate.value()),
819 framesWaitingParamSets(o.framesWaitingParamSets.value()),
820 framesDroppedSsrcReset(o.framesDroppedSsrcReset.value()) {}
821 ReaderStream(const ReaderStream &) = delete;
822 ReaderStream &operator=(const ReaderStream &) = delete;
823 ReaderStream &operator=(ReaderStream &&) = delete;
824 Atomic<int64_t> packetsReceived{0};
825 Atomic<int64_t> bytesReceived{0};
826 FrameCount framesReceived{0};
827 Atomic<int64_t> packetsLost{0};
828 ImageDesc readerImageDesc;
829 AudioDesc readerAudioDesc;
830 uint32_t reasmTimestamp = 0;
831 bool reasmHasTimestamp = false;
832 uint16_t reasmLastSeq = 0;
833 bool reasmHaveLastSeq = false;
834 RtpPacket::List reasmPackets;
835 Histogram rxPacketInterval;
836 Histogram rxFrameInterval;
837 Histogram rxFrameAssembleTime;
838 TimeStamp rxLastPacketTime;
839 TimeStamp rxLastFrameTime;
840 TimeStamp rxFrameStartTime;
841 bool rxHasLastPacket = false;
842 bool rxHasLastFrame = false;
843 bool rxHasFrameStart = false;
857 RtpStreamClock streamClock;
865 TimeStamp lastSrArrivedAt;
868 bool hasSr = false;
869
881 UniquePtr<RtpSeqTracker> seqTracker;
882
887 UniquePtr<RtpSeqReorderBuffer> reorderBuffer;
888
896 UniquePtr<RtpPacket::Queue> reorderQueue;
897
906 UniquePtr<RtpDepacketizerThread> depacketizer;
907
920 Atomic<uint32_t> resetEpoch{0};
921
932 Atomic<int64_t> lastPacketArrivalNs{0};
933
941 bool wireSilenceEosSignaled = false;
942
953 Atomic<int64_t> ssrcChanges{0};
954
964 Atomic<int64_t> framesReassembled{0};
965
975 Atomic<int64_t> framesDroppedValidate{0};
976
986 Atomic<int64_t> framesWaitingParamSets{0};
987
995 Atomic<int64_t> framesDroppedSsrcReset{0};
996 };
997
1014 struct VideoStream : WriterStream {
1015 VideoStream() = default;
1016 VideoStream(VideoStream &&o) noexcept
1017 : WriterStream(std::move(o)),
1018 imageDesc(std::move(o.imageDesc)),
1019 cachedSps(std::move(o.cachedSps)),
1020 cachedPps(std::move(o.cachedPps)),
1021 cachedVps(std::move(o.cachedVps)) {}
1022 VideoStream(const VideoStream &) = delete;
1023 VideoStream &operator=(const VideoStream &) = delete;
1024 VideoStream &operator=(VideoStream &&) = delete;
1033 ImageDesc imageDesc;
1034 Buffer cachedSps;
1035 Buffer cachedPps;
1036 Buffer cachedVps;
1037 };
1038
1049 struct DataStream : WriterStream {
1050 DataStream() = default;
1051 DataStream(DataStream &&o) noexcept = default;
1052 DataStream(const DataStream &) = delete;
1053 DataStream &operator=(const DataStream &) = delete;
1054 DataStream &operator=(DataStream &&) = delete;
1055 };
1056
1075 struct AudioStream : WriterStream {
1076 AudioStream() = default;
1077 AudioStream(AudioStream &&o) noexcept
1078 : WriterStream(std::move(o)),
1079 storageDesc(std::move(o.storageDesc)),
1080 packetSamples(o.packetSamples),
1081 packetBytes(o.packetBytes),
1082 packetTimeUs(o.packetTimeUs),
1083 prerollSamples(o.prerollSamples),
1084 silencePacketsEmitted(
1085 o.silencePacketsEmitted.value()),
1086 silenceSamplesEmitted(
1087 o.silenceSamplesEmitted.value()) {}
1088 AudioStream(const AudioStream &) = delete;
1089 AudioStream &operator=(const AudioStream &) = delete;
1090 AudioStream &operator=(AudioStream &&) = delete;
1098 AudioDesc storageDesc;
1100 size_t packetSamples = 0;
1102 size_t packetBytes = 0;
1104 int packetTimeUs = 0;
1113 size_t prerollSamples = 0;
1122 Atomic<int64_t> silencePacketsEmitted{0};
1127 Atomic<int64_t> silenceSamplesEmitted{0};
1128 };
1129
1140 struct VideoReaderStream : ReaderStream {
1141 VideoReaderStream() = default;
1142 VideoReaderStream(VideoReaderStream &&o) noexcept
1143 : ReaderStream(std::move(o)),
1144 payloadQueue(std::move(o.payloadQueue)) {}
1145 VideoReaderStream(const VideoReaderStream &) = delete;
1146 VideoReaderStream &operator=(const VideoReaderStream &) = delete;
1147 VideoReaderStream &operator=(VideoReaderStream &&) = delete;
1158 UniquePtr<Queue<RxVideoFrame>> payloadQueue;
1159 };
1160
1177 struct AudioReaderStream : ReaderStream {
1178 AudioReaderStream() = default;
1179 AudioReaderStream(AudioReaderStream &&o) noexcept
1180 : ReaderStream(std::move(o)),
1181 fifo(std::move(o.fifo)),
1182 payloadQueue(std::move(o.payloadQueue)) {}
1183 AudioReaderStream(const AudioReaderStream &) = delete;
1184 AudioReaderStream &operator=(const AudioReaderStream &) = delete;
1185 AudioReaderStream &operator=(AudioReaderStream &&) = delete;
1195 AudioBuffer fifo;
1199 UniquePtr<Queue<RxAudioChunk>> payloadQueue;
1200 };
1201
1215 struct DataReaderStream : ReaderStream {
1216 DataReaderStream() = default;
1217 DataReaderStream(DataReaderStream &&o) noexcept
1218 : ReaderStream(std::move(o)),
1219 payloadQueue(std::move(o.payloadQueue)),
1220 ancPayloadQueue(std::move(o.ancPayloadQueue)) {}
1221 DataReaderStream(const DataReaderStream &) = delete;
1222 DataReaderStream &operator=(const DataReaderStream &) = delete;
1223 DataReaderStream &operator=(DataReaderStream &&) = delete;
1229 UniquePtr<Queue<RxDataMessage>> payloadQueue;
1235 UniquePtr<Queue<RxAncFrame>> ancPayloadQueue;
1236 };
1237
1245 static constexpr size_t VideoPayloadQueueDepth = 4;
1246 static constexpr size_t AudioPayloadQueueDepth = 100;
1247 static constexpr size_t DataPayloadQueueDepth = 8;
1248
1249 Error configureVideoStream(const MediaIO::Config &cfg, const MediaDesc &mediaDesc);
1250 Error configureAudioStream(const MediaIO::Config &cfg, const MediaDesc &mediaDesc);
1251 Error configureDataStream(const MediaIO::Config &cfg);
1252
1259 void applyClockReferenceConfig(const MediaIO::Config &cfg);
1260
1261 Error openStream(WriterStream &s, bool enableMulticastLoopback);
1262 Error openReaderStream(ReaderStream &s, bool enableMulticastLoopback);
1263
1297 Error injectParameterSets(const uint8_t *data, size_t size, Buffer &healed);
1298
1313 bool paceVideoFrame();
1314
1315 // sendVideo / sendAudio / sendData were per-stream
1316 // strand-side helpers in the previous architecture.
1317 // They are gone in Phase 2; their packetization
1318 // logic lives in the per-stream
1319 // @c VideoPacketizerThread / @c AudioPacketizerThread
1320 // / @c DataPacketizerThread (declared as nested
1321 // classes inside @c rtpmediaio.cpp), and their wire-
1322 // pacing logic lives in the matching
1323 // @c VideoTxThread / @c AudioTxThread /
1324 // @c DataTxThread.
1325
1343 void refreshStreamClock(ReaderStream &s);
1344
1364 TimeStamp ntpToSteady(const NtpTime &ntp) const;
1365
1366 // Reader path.
1367 Error applySdp(const SdpSession &sdp, MediaIO::Config &cfg, MediaDesc &mediaDesc);
1368 Error openAllReaders();
1369 void pushReaderFrame(Frame frame);
1370
1371 void buildSdp();
1372 Error writeSdpFile(const String &path);
1373
1379 void resetStreamCommon(Stream &s);
1384 void resetWriterStream(WriterStream &s);
1390 void resetReaderStream(ReaderStream &s);
1391 void resetAll();
1392
1404 RtcpSchedulerContext buildRtcpSchedulerContext();
1405
1406 // Per-mode stream lists — writer-mode populates the
1407 // first three, reader-mode the last three. Reader
1408 // and writer modes are mutually exclusive on a
1409 // single @c RtpMediaIO instance, so only one set is
1410 // populated at any given time. Each kind is a list
1411 // (instead of a single slot) so the routing, SDP
1412 // builder, RTCP scheduler, and per-stream threads
1413 // can iterate uniformly regardless of how many
1414 // streams of each kind a session carries. Today's
1415 // @c configureVideoStream / @c configureAudioStream /
1416 // @c configureDataStream populate one entry per kind
1417 // on the matching list (per @c _readerMode); the
1418 // collective shape is ready for multi-stream config
1419 // that lands later.
1420 List<VideoStream> _videos;
1421 List<AudioStream> _audios;
1422 List<DataStream> _datas;
1423 List<VideoReaderStream> _videoReaders;
1424 List<AudioReaderStream> _audioReaders;
1425 List<DataReaderStream> _dataReaders;
1426
1427 // Transport-global config
1428 SocketAddress _localAddress;
1429 String _sessionName;
1430 String _sessionOrigin;
1431 String _multicastInterface;
1432 int _multicastTTL = 0;
1433 int _recvBufferBytes = 0;
1434 int _sendBufferBytes = 0;
1435 Enum _pacingMode;
1436 Enum _dataFormat;
1437
1438 // Runtime
1439 FrameRate _frameRate;
1440 FrameCount _frameCount{0};
1441 FrameCount _framesSent{0};
1442
1443 // Mode
1444 bool _readerMode = false;
1445
1446 // RFC 4175 wire-format PixelFormat. When the input
1447 // pixel format doesn't match what RFC 4175 expects
1448 // on the wire (e.g. YUYV vs UYVY), the
1449 // VideoPacketizerThread calls
1450 // UncompressedVideoPayload::convert() to the wire
1451 // format before packing. Invalid means no conversion
1452 // needed.
1453 PixelFormat _videoWirePixelFormat;
1454
1455 // Reader runtime
1456 Queue<Frame> _readerQueue;
1457 int _readerMaxDepth = 4;
1458 int _readerJitterMs = 50;
1459 int _wireSilenceTimeoutMs = 0;
1460 bool _videoWatchdogEnabled = false;
1461 FrameCount _readerFramesReceived{0};
1462 // Set by cancelBlockingWork() so the executeCmd(Read)
1463 // pop loop can break out of its short-timeout polling
1464 // when MediaIO::close is unwinding the strand. Cleared
1465 // at every Open so a closed-then-reopened RtpMediaIO
1466 // doesn't carry the previous instance's cancellation
1467 // forward.
1468 Atomic<bool> _readCancelled{false};
1469
1486 TimeStamp _readerSteadyAnchor;
1487 NtpTime _readerNtpAnchor;
1488 bool _readerHasAnchor = false;
1495 TimeStamp _openedAt;
1502 UniquePtr<RtpAggregatorThread> _aggregator;
1503
1504 // SDP — the active session description built at
1505 // open time. Reader mode leaves this empty (the
1506 // reader consumes an externally-supplied SDP via
1507 // RtpSdp); writer mode populates it so the
1508 // GetSdp params command and the RtpSaveSdpPath
1509 // export path can serve it.
1510 SdpSession _sdpSession;
1511 String _sdpPath;
1512
1513 // RTCP — one scheduler thread per RtpMediaIO. Wakes
1514 // every @c _rtcpIntervalMs and emits an SR + SDES
1515 // compound on every active writer stream's
1516 // RtpSession plus an RR for every active reader
1517 // stream. The wallclock anchor captured at open
1518 // time is shared across all streams so a single
1519 // receiver-side observation of any stream's first
1520 // SR is sufficient for cross-stream correlation.
1521 // Disabled in reader mode (we are not a sender
1522 // there) and when @c MediaConfig::RtpRtcpEnabled
1523 // is @c false. The scheduler itself lives in
1524 // @c rtcpscheduler.h; this io populates an
1525 // @c RtcpSchedulerContext at @c executeCmd(Open)
1526 // time and hands it to the scheduler's constructor.
1527 UniquePtr<RtcpScheduler> _rtcpScheduler;
1528 bool _rtcpEnabled = true;
1529 int _rtcpIntervalMs = 5000;
1530 String _rtcpCname;
1531
1532 // Process-local monotonic counter that gives every
1533 // RtpMediaIO a distinct id within one process — see
1534 // objectId() and buildDefaultCname(). Starts at 0,
1535 // first instance gets 1.
1536 static Atomic<uint64_t> _nextObjectId;
1537 uint64_t _objectId = 0;
1538
1539 // SR-anchor seeding gate. At @c openStream time
1540 // every active session is anchored with
1541 // @c (NtpTime::now(), 0) so an SR can be emitted
1542 // even before the first frame arrives (better than
1543 // a structurally-invalid SR for late receivers).
1544 // The very first @c executeCmd(Write) refines the
1545 // anchor from the Frame's @ref Frame::captureTime —
1546 // we use a compare-exchange on this flag so even
1547 // future changes that take the Write path off the
1548 // single-threaded strand still seed exactly once
1549 // per opening.
1550 Atomic<bool> _anchorSeeded;
1551
1552 // External writer-mode video pacing — null clock means
1553 // the upstream pump's natural cadence is the only
1554 // timing source. Set via
1555 // executeCmd(MediaIOCommandSetClock); read on the
1556 // dedicated worker thread in executeCmd(Write) (same
1557 // thread as the setter, no synchronization required).
1558 // Audio is not paced separately by the gate — AES67
1559 // packet timing is governed by the per-packet RTP
1560 // timestamp stride that the audio FIFO maintains.
1561 PacingGate _videoGate;
1562
1563 // Out-of-band parameter sets for H.264 / HEVC writers,
1564 // populated lazily by injectParameterSets() the first
1565 // time a complete set passes through the bitstream
1566 // (typically frame 0). Stored as the base64-encoded
1567 // SDP-ready string — for H.264 it's
1568 // @c "<sps-base64>,<pps-base64>" (matches RFC 6184
1569 // @c sprop-parameter-sets); for HEVC it's the three
1570 // separate sprop-vps / -sps / -pps strings (the bool
1571 // tracks whether they have been populated). Empty
1572 // until the encoder's first IDR / IRAP flows through;
1573 // once populated, buildSdp() embeds them in the
1574 // @c a=fmtp line so a receiver that reads the SDP
1575 // (e.g. ffplay) can populate its decoder's extradata
1576 // before the first packet arrives. ffmpeg's H.264 RTP
1577 // demuxer requires this — without it, a receiver that
1578 // joins after the first IDR fails its initial codec
1579 // probe and never recovers, even with in-band
1580 // parameter sets repeated on every IDR.
1581 String _h264SpropParameterSets;
1582 String _h265SpropVps;
1583 String _h265SpropSps;
1584 String _h265SpropPps;
1585
1599 void refreshSdpSprop();
1600};
1601
1606class RtpFactory : public MediaIOFactory {
1607 public:
1608 RtpFactory() = default;
1609
1610 String name() const override { return String("Rtp"); }
1611 String displayName() const override { return String("RTP Stream"); }
1612 String description() const override {
1613 return String("RTP video + audio + metadata reader / writer "
1614 "(MJPEG / JPEG XS / H.264 / H.265 / raw / L16 / JSON)");
1615 }
1616 // An SDP file on disk implies the Rtp reader; writers
1617 // never open via a filesystem path, but extension-based
1618 // dispatch still uses this list so `-i foo.sdp` in
1619 // mediaplay picks the Rtp backend automatically.
1620 StringList extensions() const override {
1621 return {String("sdp")};
1622 }
1623
1624 bool canBeSource() const override { return true; }
1625 bool canBeSink() const override { return true; }
1626
1627 bool canHandleDevice(IODevice *device) const override;
1628 Config::SpecMap configSpecs() const override;
1629
1630 MediaIO *create(const Config &config, ObjectBase *parent = nullptr) const override;
1631};
1632
1633PROMEKI_NAMESPACE_END
1634
1635#endif // PROMEKI_ENABLE_NETWORK