11#include <promeki/config.h>
12#if PROMEKI_ENABLE_NETWORK
54PROMEKI_NAMESPACE_BEGIN
58class UdpSocketTransport;
379class RtpMediaIO :
public DedicatedThreadMediaIO {
380 PROMEKI_OBJECT(RtpMediaIO, DedicatedThreadMediaIO)
383 static inline const MediaIOStats::ID StatsFramesSent{
"FramesSent"};
385 static inline const MediaIOStats::ID StatsPacketsSent{
"PacketsSent"};
387 static inline const MediaIOStats::ID StatsBytesSent{
"BytesSent"};
389 static inline const MediaIOStats::ID StatsFramesReceived{
"FramesReceived"};
391 static inline const MediaIOStats::ID StatsPacketsReceived{
"PacketsReceived"};
393 static inline const MediaIOStats::ID StatsBytesReceived{
"BytesReceived"};
395 static inline const MediaIOStats::ID StatsTxVideoFrameIntervalUs{
"TxVideoFrameIntervalUs"};
397 static inline const MediaIOStats::ID StatsTxVideoSendDurationUs{
"TxVideoSendDurationUs"};
399 static inline const MediaIOStats::ID StatsRxVideoPacketIntervalUs{
"RxVideoPacketIntervalUs"};
401 static inline const MediaIOStats::ID StatsRxVideoFrameIntervalUs{
"RxVideoFrameIntervalUs"};
403 static inline const MediaIOStats::ID StatsRxVideoFrameAssembleUs{
"RxVideoFrameAssembleUs"};
415 static inline const MediaIOStats::ID StatsAudioSilencePacketsEmitted{
"AudioSilencePacketsEmitted"};
424 static inline const MediaIOStats::ID StatsAudioSilenceSamplesEmitted{
"AudioSilenceSamplesEmitted"};
436 static inline const MediaIOStats::ID StatsRxExtendedHighestSeq{
"RxExtendedHighestSeq"};
438 static inline const MediaIOStats::ID StatsRxPacketsExpected{
"RxPacketsExpected"};
440 static inline const MediaIOStats::ID StatsRxCumulativeLost{
"RxCumulativeLost"};
442 static inline const MediaIOStats::ID StatsRxFractionLost{
"RxFractionLost"};
444 static inline const MediaIOStats::ID StatsRxDuplicatePackets{
"RxDuplicatePackets"};
446 static inline const MediaIOStats::ID StatsRxReorderedPackets{
"RxReorderedPackets"};
448 static inline const MediaIOStats::ID StatsRxInterarrivalJitter{
"RxInterarrivalJitter"};
451 static inline const MediaIOStats::ID StatsRxSsrcChanges{
"RxSsrcChanges"};
454 static inline const MediaIOStats::ID StatsRxReorderEmittedInOrder{
"RxReorderEmittedInOrder"};
456 static inline const MediaIOStats::ID StatsRxReorderEmittedOnDeadline{
"RxReorderEmittedOnDeadline"};
458 static inline const MediaIOStats::ID StatsRxReorderDroppedOverflow{
"RxReorderDroppedOverflow"};
460 static inline const MediaIOStats::ID StatsRxReorderDroppedDuplicate{
"RxReorderDroppedDuplicate"};
463 static inline const MediaIOStats::ID StatsRxVideoQueueDepth{
"RxVideoQueueDepth"};
465 static inline const MediaIOStats::ID StatsRxAudioQueueDepth{
"RxAudioQueueDepth"};
467 static inline const MediaIOStats::ID StatsRxDataQueueDepth{
"RxDataQueueDepth"};
469 static inline const MediaIOStats::ID StatsRxReaderQueueDepth{
"RxReaderQueueDepth"};
475 static inline const MediaIOStats::ID StatsRxFramesReassembled{
"RxFramesReassembled"};
479 static inline const MediaIOStats::ID StatsRxFramesDroppedValidate{
"RxFramesDroppedValidate"};
484 static inline const MediaIOStats::ID StatsRxFramesWaitingParamSets{
"RxFramesWaitingParamSets"};
490 static inline const MediaIOStats::ID StatsRxFramesDroppedSsrcReset{
"RxFramesDroppedSsrcReset"};
495 static inline const MediaIOStats::ID StatsRxSrObserved{
"RxSrObserved"};
500 static inline const MediaIOStats::ID StatsRxLastSrAgeUs{
"RxLastSrAgeUs"};
505 static inline const MediaIOStats::ID StatsRxFirstSrLatencyUs{
"RxFirstSrLatencyUs"};
508 static inline const MediaIOParamsID ParamGetSdp{
"GetSdp"};
510 static inline const MediaIOParamsID ParamSdp{
"Sdp"};
513 RtpMediaIO(ObjectBase *parent =
nullptr);
516 ~RtpMediaIO()
override;
530 uint64_t objectId()
const {
return _objectId; }
545 static String buildDefaultCname(int64_t pid, uint64_t objectId,
const String &host);
565 static String pickEgressHostForCname(
const SocketAddress &destination);
568 Error executeCmd(MediaIOCommandOpen &cmd)
override;
569 Error executeCmd(MediaIOCommandClose &cmd)
override;
570 Error executeCmd(MediaIOCommandRead &cmd)
override;
571 Error executeCmd(MediaIOCommandWrite &cmd)
override;
572 Error executeCmd(MediaIOCommandParams &cmd)
override;
573 Error executeCmd(MediaIOCommandStats &cmd)
override;
593 Error executeCmd(MediaIOCommandSetClock &cmd)
override;
597 void cancelBlockingWork()
override;
635 class VideoPacketizerThread;
636 class AudioPacketizerThread;
637 class DataPacketizerThread;
676 Stream(Stream &&o) noexcept
677 : transport(o.transport),
680 destination(std::move(o.destination)),
681 payloadType(o.payloadType),
682 clockRate(o.clockRate),
685 mediaType(std::move(o.mediaType)),
686 rtpmap(std::move(o.rtpmap)),
687 fmtp(std::move(o.fmtp)),
689 clockDomain(o.clockDomain),
690 tsRefClkMode(o.tsRefClkMode),
691 ptpGrandmaster(o.ptpGrandmaster),
692 ptpDomain(o.ptpDomain),
693 refClockLocalMac(std::move(o.refClockLocalMac)),
694 mediaClkOffset(o.mediaClkOffset) {
698 o.transport =
nullptr;
702 Stream(
const Stream &) =
delete;
703 Stream &operator=(
const Stream &) =
delete;
704 Stream &operator=(Stream &&) =
delete;
705 UdpSocketTransport *transport =
nullptr;
706 RtpSession *session =
nullptr;
707 RtpPayload *payload =
nullptr;
708 SocketAddress destination;
709 uint8_t payloadType = 0;
710 uint32_t clockRate = 90000;
717 ClockDomain clockDomain;
718 RtpRefClockMode tsRefClkMode = RtpRefClockMode::None;
719 EUI64 ptpGrandmaster;
720 uint8_t ptpDomain = 0;
721 MacAddress refClockLocalMac;
722 int32_t mediaClkOffset = 0;
739 struct WriterStream : Stream {
740 WriterStream() =
default;
741 WriterStream(WriterStream &&o) noexcept
742 : Stream(std::move(o)),
743 packetizer(o.packetizer),
745 packetsSent(o.packetsSent.value()),
746 bytesSent(o.bytesSent.value()),
747 senderOctets(o.senderOctets.value()),
748 txFrameInterval(std::move(o.txFrameInterval)),
749 txSendDuration(std::move(o.txSendDuration)),
750 txLastSendStart(o.txLastSendStart),
751 txHasLastSend(o.txHasLastSend) {
752 o.packetizer =
nullptr;
755 WriterStream(
const WriterStream &) =
delete;
756 WriterStream &operator=(
const WriterStream &) =
delete;
757 WriterStream &operator=(WriterStream &&) =
delete;
758 RtpPacketizerThread *packetizer =
nullptr;
759 RtpTxThread *tx =
nullptr;
760 Atomic<int64_t> packetsSent{0};
761 Atomic<int64_t> bytesSent{0};
762 Atomic<int64_t> senderOctets{0};
763 Histogram txFrameInterval;
764 Histogram txSendDuration;
765 TimeStamp txLastSendStart;
766 bool txHasLastSend =
false;
785 struct ReaderStream : Stream {
786 ReaderStream() =
default;
787 ReaderStream(ReaderStream &&o) noexcept
788 : Stream(std::move(o)),
789 packetsReceived(o.packetsReceived.value()),
790 bytesReceived(o.bytesReceived.value()),
791 framesReceived(o.framesReceived),
792 packetsLost(o.packetsLost.value()),
793 readerImageDesc(std::move(o.readerImageDesc)),
794 readerAudioDesc(std::move(o.readerAudioDesc)),
795 reasmTimestamp(o.reasmTimestamp),
796 reasmHasTimestamp(o.reasmHasTimestamp),
797 reasmLastSeq(o.reasmLastSeq),
798 reasmHaveLastSeq(o.reasmHaveLastSeq),
799 reasmPackets(std::move(o.reasmPackets)),
800 rxPacketInterval(std::move(o.rxPacketInterval)),
801 rxFrameInterval(std::move(o.rxFrameInterval)),
802 rxFrameAssembleTime(std::move(o.rxFrameAssembleTime)),
803 rxLastPacketTime(o.rxLastPacketTime),
804 rxLastFrameTime(o.rxLastFrameTime),
805 rxFrameStartTime(o.rxFrameStartTime),
806 rxHasLastPacket(o.rxHasLastPacket),
807 rxHasLastFrame(o.rxHasLastFrame),
808 rxHasFrameStart(o.rxHasFrameStart),
809 streamClock(std::move(o.streamClock)),
810 lastSrArrivedAt(o.lastSrArrivedAt),
812 seqTracker(std::move(o.seqTracker)),
813 reorderBuffer(std::move(o.reorderBuffer)),
814 reorderQueue(std::move(o.reorderQueue)),
815 depacketizer(std::move(o.depacketizer)),
816 ssrcChanges(o.ssrcChanges.value()),
817 framesReassembled(o.framesReassembled.value()),
818 framesDroppedValidate(o.framesDroppedValidate.value()),
819 framesWaitingParamSets(o.framesWaitingParamSets.value()),
820 framesDroppedSsrcReset(o.framesDroppedSsrcReset.value()) {}
821 ReaderStream(
const ReaderStream &) =
delete;
822 ReaderStream &operator=(
const ReaderStream &) =
delete;
823 ReaderStream &operator=(ReaderStream &&) =
delete;
824 Atomic<int64_t> packetsReceived{0};
825 Atomic<int64_t> bytesReceived{0};
826 FrameCount framesReceived{0};
827 Atomic<int64_t> packetsLost{0};
828 ImageDesc readerImageDesc;
829 AudioDesc readerAudioDesc;
830 uint32_t reasmTimestamp = 0;
831 bool reasmHasTimestamp =
false;
832 uint16_t reasmLastSeq = 0;
833 bool reasmHaveLastSeq =
false;
834 RtpPacket::List reasmPackets;
835 Histogram rxPacketInterval;
836 Histogram rxFrameInterval;
837 Histogram rxFrameAssembleTime;
838 TimeStamp rxLastPacketTime;
839 TimeStamp rxLastFrameTime;
840 TimeStamp rxFrameStartTime;
841 bool rxHasLastPacket =
false;
842 bool rxHasLastFrame =
false;
843 bool rxHasFrameStart =
false;
857 RtpStreamClock streamClock;
865 TimeStamp lastSrArrivedAt;
881 UniquePtr<RtpSeqTracker> seqTracker;
887 UniquePtr<RtpSeqReorderBuffer> reorderBuffer;
896 UniquePtr<RtpPacket::Queue> reorderQueue;
906 UniquePtr<RtpDepacketizerThread> depacketizer;
920 Atomic<uint32_t> resetEpoch{0};
932 Atomic<int64_t> lastPacketArrivalNs{0};
941 bool wireSilenceEosSignaled =
false;
953 Atomic<int64_t> ssrcChanges{0};
964 Atomic<int64_t> framesReassembled{0};
975 Atomic<int64_t> framesDroppedValidate{0};
986 Atomic<int64_t> framesWaitingParamSets{0};
995 Atomic<int64_t> framesDroppedSsrcReset{0};
1014 struct VideoStream : WriterStream {
1015 VideoStream() =
default;
1016 VideoStream(VideoStream &&o) noexcept
1017 : WriterStream(std::move(o)),
1018 imageDesc(std::move(o.imageDesc)),
1019 cachedSps(std::move(o.cachedSps)),
1020 cachedPps(std::move(o.cachedPps)),
1021 cachedVps(std::move(o.cachedVps)) {}
1022 VideoStream(
const VideoStream &) =
delete;
1023 VideoStream &operator=(
const VideoStream &) =
delete;
1024 VideoStream &operator=(VideoStream &&) =
delete;
1033 ImageDesc imageDesc;
1049 struct DataStream : WriterStream {
1050 DataStream() =
default;
1051 DataStream(DataStream &&o)
noexcept =
default;
1052 DataStream(
const DataStream &) =
delete;
1053 DataStream &operator=(
const DataStream &) =
delete;
1054 DataStream &operator=(DataStream &&) =
delete;
1075 struct AudioStream : WriterStream {
1076 AudioStream() =
default;
1077 AudioStream(AudioStream &&o) noexcept
1078 : WriterStream(std::move(o)),
1079 storageDesc(std::move(o.storageDesc)),
1080 packetSamples(o.packetSamples),
1081 packetBytes(o.packetBytes),
1082 packetTimeUs(o.packetTimeUs),
1083 prerollSamples(o.prerollSamples),
1084 silencePacketsEmitted(
1085 o.silencePacketsEmitted.value()),
1086 silenceSamplesEmitted(
1087 o.silenceSamplesEmitted.value()) {}
1088 AudioStream(
const AudioStream &) =
delete;
1089 AudioStream &operator=(
const AudioStream &) =
delete;
1090 AudioStream &operator=(AudioStream &&) =
delete;
1098 AudioDesc storageDesc;
1100 size_t packetSamples = 0;
1102 size_t packetBytes = 0;
1104 int packetTimeUs = 0;
1113 size_t prerollSamples = 0;
1122 Atomic<int64_t> silencePacketsEmitted{0};
1127 Atomic<int64_t> silenceSamplesEmitted{0};
1140 struct VideoReaderStream : ReaderStream {
1141 VideoReaderStream() =
default;
1142 VideoReaderStream(VideoReaderStream &&o) noexcept
1143 : ReaderStream(std::move(o)),
1144 payloadQueue(std::move(o.payloadQueue)) {}
1145 VideoReaderStream(
const VideoReaderStream &) =
delete;
1146 VideoReaderStream &operator=(
const VideoReaderStream &) =
delete;
1147 VideoReaderStream &operator=(VideoReaderStream &&) =
delete;
1158 UniquePtr<Queue<RxVideoFrame>> payloadQueue;
1177 struct AudioReaderStream : ReaderStream {
1178 AudioReaderStream() =
default;
1179 AudioReaderStream(AudioReaderStream &&o) noexcept
1180 : ReaderStream(std::move(o)),
1181 fifo(std::move(o.fifo)),
1182 payloadQueue(std::move(o.payloadQueue)) {}
1183 AudioReaderStream(
const AudioReaderStream &) =
delete;
1184 AudioReaderStream &operator=(
const AudioReaderStream &) =
delete;
1185 AudioReaderStream &operator=(AudioReaderStream &&) =
delete;
1199 UniquePtr<Queue<RxAudioChunk>> payloadQueue;
1215 struct DataReaderStream : ReaderStream {
1216 DataReaderStream() =
default;
1217 DataReaderStream(DataReaderStream &&o) noexcept
1218 : ReaderStream(std::move(o)),
1219 payloadQueue(std::move(o.payloadQueue)),
1220 ancPayloadQueue(std::move(o.ancPayloadQueue)) {}
1221 DataReaderStream(
const DataReaderStream &) =
delete;
1222 DataReaderStream &operator=(
const DataReaderStream &) =
delete;
1223 DataReaderStream &operator=(DataReaderStream &&) =
delete;
1229 UniquePtr<Queue<RxDataMessage>> payloadQueue;
1235 UniquePtr<Queue<RxAncFrame>> ancPayloadQueue;
1245 static constexpr size_t VideoPayloadQueueDepth = 4;
1246 static constexpr size_t AudioPayloadQueueDepth = 100;
1247 static constexpr size_t DataPayloadQueueDepth = 8;
1249 Error configureVideoStream(
const MediaIO::Config &cfg,
const MediaDesc &mediaDesc);
1250 Error configureAudioStream(
const MediaIO::Config &cfg,
const MediaDesc &mediaDesc);
1251 Error configureDataStream(
const MediaIO::Config &cfg);
1259 void applyClockReferenceConfig(
const MediaIO::Config &cfg);
1261 Error openStream(WriterStream &s,
bool enableMulticastLoopback);
1262 Error openReaderStream(ReaderStream &s,
bool enableMulticastLoopback);
1297 Error injectParameterSets(
const uint8_t *data,
size_t size, Buffer &healed);
1313 bool paceVideoFrame();
1343 void refreshStreamClock(ReaderStream &s);
1364 TimeStamp ntpToSteady(
const NtpTime &ntp)
const;
1367 Error applySdp(
const SdpSession &sdp, MediaIO::Config &cfg, MediaDesc &mediaDesc);
1368 Error openAllReaders();
1369 void pushReaderFrame(Frame frame);
1372 Error writeSdpFile(
const String &path);
1379 void resetStreamCommon(Stream &s);
1384 void resetWriterStream(WriterStream &s);
1390 void resetReaderStream(ReaderStream &s);
1404 RtcpSchedulerContext buildRtcpSchedulerContext();
1420 List<VideoStream> _videos;
1421 List<AudioStream> _audios;
1422 List<DataStream> _datas;
1423 List<VideoReaderStream> _videoReaders;
1424 List<AudioReaderStream> _audioReaders;
1425 List<DataReaderStream> _dataReaders;
1428 SocketAddress _localAddress;
1429 String _sessionName;
1430 String _sessionOrigin;
1431 String _multicastInterface;
1432 int _multicastTTL = 0;
1433 int _recvBufferBytes = 0;
1434 int _sendBufferBytes = 0;
1439 FrameRate _frameRate;
1440 FrameCount _frameCount{0};
1441 FrameCount _framesSent{0};
1444 bool _readerMode =
false;
1453 PixelFormat _videoWirePixelFormat;
1456 Queue<Frame> _readerQueue;
1457 int _readerMaxDepth = 4;
1458 int _readerJitterMs = 50;
1459 int _wireSilenceTimeoutMs = 0;
1460 bool _videoWatchdogEnabled =
false;
1461 FrameCount _readerFramesReceived{0};
1468 Atomic<bool> _readCancelled{
false};
1486 TimeStamp _readerSteadyAnchor;
1487 NtpTime _readerNtpAnchor;
1488 bool _readerHasAnchor =
false;
1495 TimeStamp _openedAt;
1502 UniquePtr<RtpAggregatorThread> _aggregator;
1510 SdpSession _sdpSession;
1527 UniquePtr<RtcpScheduler> _rtcpScheduler;
1528 bool _rtcpEnabled =
true;
1529 int _rtcpIntervalMs = 5000;
1536 static Atomic<uint64_t> _nextObjectId;
1537 uint64_t _objectId = 0;
1550 Atomic<bool> _anchorSeeded;
1561 PacingGate _videoGate;
1581 String _h264SpropParameterSets;
1582 String _h265SpropVps;
1583 String _h265SpropSps;
1584 String _h265SpropPps;
1599 void refreshSdpSprop();
1606class RtpFactory :
public MediaIOFactory {
1608 RtpFactory() =
default;
1610 String name()
const override {
return String(
"Rtp"); }
1611 String displayName()
const override {
return String(
"RTP Stream"); }
1612 String description()
const override {
1613 return String(
"RTP video + audio + metadata reader / writer "
1614 "(MJPEG / JPEG XS / H.264 / H.265 / raw / L16 / JSON)");
1620 StringList extensions()
const override {
1621 return {String(
"sdp")};
1624 bool canBeSource()
const override {
return true; }
1625 bool canBeSink()
const override {
return true; }
1627 bool canHandleDevice(IODevice *device)
const override;
1628 Config::SpecMap configSpecs()
const override;
1630 MediaIO *create(
const Config &config, ObjectBase *parent =
nullptr)
const override;
1633PROMEKI_NAMESPACE_END