libpromeki 1.0.0-alpha
PROfessional MEdia toolKIt
 
Loading...
Searching...
No Matches
rtpsession.h
Go to the documentation of this file.
1
8#pragma once
9
10
11#include <promeki/config.h>
12#if PROMEKI_ENABLE_NETWORK
13#include <cstdint>
14#include <promeki/atomic.h>
15#include <promeki/list.h>
16#include <promeki/mutex.h>
17#include <promeki/objectbase.h>
18#include <promeki/error.h>
19#include <promeki/buffer.h>
20#include <promeki/duration.h>
21#include <promeki/queue.h>
22#include <promeki/rtcppacket.h>
24#include <promeki/ntptime.h>
25#include <promeki/rtppacket.h>
28#include <promeki/string.h>
29#include <promeki/timestamp.h>
30
31PROMEKI_NAMESPACE_BEGIN
32
33class Thread;
34class RtpSeqTracker;
35class RtpSeqReorderBuffer;
36
90class RtpSession : public ObjectBase {
91 PROMEKI_OBJECT(RtpSession, ObjectBase)
92 public:
97 RtpSession(ObjectBase *parent = nullptr);
98
100 ~RtpSession() override;
101
113 Error start(const SocketAddress &localAddr);
114
126 Error start(PacketTransport *transport);
127
129 void stop();
130
132 bool isRunning() const { return _running; }
133
144 void setRemote(const SocketAddress &dest) { _remote = dest; }
145
147 const SocketAddress &remote() const { return _remote; }
148
158 Error sendPacket(const Buffer &payload, uint32_t timestamp, uint8_t payloadType, bool marker = false);
159
196 Error sendPackets(RtpPacketBatch &batch);
197
220 struct StreamReceiver {
225 RtpPacket::Queue *outQueue = nullptr;
226
229 RtpSeqTracker *seqTracker = nullptr;
230
234 RtpSeqReorderBuffer *reorderBuffer = nullptr;
235
246 uint32_t clockRateHz = 0;
247
258 uint8_t payloadType = 0;
259 };
260
307 Error startReceiving(List<StreamReceiver> receivers,
308 const String &threadName = "rtp-rx");
309
318 void stopReceiving();
319
321 bool isReceiving() const { return _receiving.value(); }
322
333 void setReceivePollIntervalMs(unsigned int timeoutMs) {
334 _receivePollMs = timeoutMs == 0 ? 200 : timeoutMs;
335 }
336
338 unsigned int receivePollIntervalMs() const { return _receivePollMs; }
339
353 Error setPacingRate(uint64_t bytesPerSec);
354
356 uint32_t ssrc() const { return _ssrc; }
357
359 void setSsrc(uint32_t ssrc) { _ssrc = ssrc; }
360
362 uint16_t sequenceNumber() const { return _sequenceNumber; }
363
365 void setPayloadType(uint8_t pt) { _payloadType = pt; }
366
368 uint8_t payloadType() const { return _payloadType; }
369
371 void setClockRate(uint32_t hz) { _clockRate = hz; }
372
374 uint32_t clockRate() const { return _clockRate; }
375
413 void setRtpAnchor(NtpTime captureNtp, uint32_t rtpTs);
414
423 NtpTime anchorNtp() const;
424
429 uint32_t anchorRtpTs() const;
430
462 void noteRtpEmission(uint32_t rtpTs);
463
473 bool hasEmissionRecord() const;
474
476 const String &cname() const { return _cname; }
477
479 void setCname(const String &cname) { _cname = cname; }
480
502 Error emitRtcpSr(uint32_t senderPacketCount, uint32_t senderOctetCount);
503
522 Error emitRtcpRr(const RtcpPacket::ReportBlock &block);
523
534 Error emitRtcpBye();
535
550 struct ReceivedSr {
551 NtpTime ntp;
552 uint32_t rtpTs = 0;
553 TimeStamp arrivedAt;
554 bool valid = false;
555 };
556
568 ReceivedSr receivedSr() const;
569
579 uint32_t srObservedCount() const;
580
590 TimeStamp firstSrAt() const;
591
607 NtpTime currentSrNtp() const;
608
620 PacketTransport *transport() const { return _transport; }
621
630 PROMEKI_SIGNAL(packetReceived, Buffer, uint32_t, uint8_t, bool);
631
633 PROMEKI_SIGNAL(ssrcCollision, uint32_t);
634
649 PROMEKI_SIGNAL(ssrcChange, uint32_t, uint32_t, uint8_t);
650
662 PROMEKI_SIGNAL(byeReceived, uint32_t);
663
664 private:
665 class ReceiveThread;
666 friend class ReceiveThread;
667
686 void handleRtcp(const uint8_t *data, size_t size);
687
688 void fillHeader(RtpPacket &pkt, uint8_t pt, bool marker, uint32_t timestamp);
700 void fillTransportHeader(RtpPacket &pkt);
701 void generateSsrc();
702
703 PacketTransport *_transport = nullptr;
704 PacketTransport::UPtr _ownedTransport;
705 bool _running = false;
706 SocketAddress _remote;
707 uint32_t _ssrc = 0;
708 uint16_t _sequenceNumber = 0;
709 uint8_t _payloadType = 96;
710 uint32_t _clockRate = 90000;
711
712 // RTCP SR / SDES state. CNAME is the SDES item every
713 // SR-bearing compound packet carries. The
714 // capture-anchor is established once at openStream
715 // time and refined by the first arriving Frame's
716 // @ref Frame::captureTime; thereafter, @c emitRtcpSr
717 // derives the SR's NTP from
718 // @c anchorNtp + (lastEmissionRtpTs - anchorRtpTs) /
719 // clockRate without sampling the system clock.
720 // @c _hasEmission gates SR emission on whether any
721 // packet has actually gone out, so the scheduler
722 // never sends an SR for a session that has yet to
723 // produce wire activity. Mutex-guarded because the
724 // noter runs on the per-stream TX thread while the
725 // scheduler runs on its own thread.
726 mutable Mutex _rtcpMutex;
727 NtpTime _anchorNtp;
728 uint32_t _anchorRtpTs = 0;
729 uint32_t _lastEmissionRtpTs = 0;
730 bool _hasEmission = false;
731 String _cname;
736 uint32_t _srObservedCount = 0;
743 TimeStamp _firstSrAt;
744
745 // Most-recently parsed inbound SR. The receive thread
746 // demuxes RTCP from RTP via the second byte of every
747 // datagram (PT in [200..223] → RTCP) and walks each
748 // RTCP compound for SRs. Reader-side helpers like
749 // @ref RtpStreamClock pick this up via
750 // @ref receivedSr to map any future RTP-TS on this
751 // session to a wallclock instant for cross-stream
752 // alignment.
753 ReceivedSr _lastReceivedSr;
754
755 // Receive path. @c _streamReceivers is populated
756 // by @ref startReceiving and consumed by the recv
757 // socket thread; per-stream depacketizer threads
758 // pull from the post-reorder queues each entry
759 // points at.
760 using ReceiveThreadUPtr = UniquePtr<ReceiveThread>;
761 ReceiveThreadUPtr _receiveThread;
762 List<StreamReceiver> _streamReceivers;
763 Atomic<bool> _receiving;
764 unsigned int _receivePollMs = 200;
765
766 // Per-stream-receiver SSRC pin state. Sized to
767 // match @c _streamReceivers.size() at
768 // @c startReceiving time. Index-parallel — entry
769 // @c i tracks the SSRC pin for receivers[i].
770 struct SsrcPinState {
771 uint32_t expectedSsrc = 0;
772 bool pinned = false;
773 uint32_t mismatchCount = 0;
774 TimeStamp mismatchFirstTime;
775 };
776 List<SsrcPinState> _ssrcPinStates;
777};
778
779PROMEKI_NAMESPACE_END
780
781#endif // PROMEKI_ENABLE_NETWORK