libpromeki 1.0.0-alpha
PROfessional MEdia toolKIt
 
Loading...
Searching...
No Matches
rtmpchunkstream.h
Go to the documentation of this file.
1
8#pragma once
9
10
11#include <promeki/config.h>
12#if PROMEKI_ENABLE_NETWORK
13#include <cstdint>
14
15#include <promeki/atomic.h>
16#include <promeki/error.h>
17#include <promeki/hashmap.h>
18#include <promeki/mutex.h>
19#include <promeki/namespace.h>
20#include <promeki/objectbase.h>
21#include <promeki/result.h>
22#include <promeki/rtmpmessage.h>
23
24PROMEKI_NAMESPACE_BEGIN
25
26class IODevice;
27
82class RtmpChunkStream : public ObjectBase {
83 PROMEKI_OBJECT(RtmpChunkStream, ObjectBase)
84 public:
86 static constexpr int DefaultChunkSize = 128;
87
89 static constexpr int MinChunkSize = 128;
90
99 static constexpr int MaxChunkSize = 65535;
100
102 static constexpr int DefaultWindowAckSize = 5'000'000;
103
111 explicit RtmpChunkStream(IODevice *device, ObjectBase *parent = nullptr);
112
114 ~RtmpChunkStream() override;
115
116 // ---- Write side ----
117
134 Error writeMessage(const RtmpMessage &msg);
135
136 // ---- Read side ----
137
165 Result<RtmpMessage> readMessage(unsigned int timeoutMs = 0);
166
167 // ---- Configuration ----
168
170 int localChunkSize() const { return _localChunkSize.value(); }
171
181 Error setLocalChunkSize(int bytes);
182
184 int peerChunkSize() const { return _peerChunkSize.value(); }
185
187 int localWindowAckSize() const { return _localWindowAckSize.value(); }
188
193 Error setLocalWindowAckSize(int bytes);
194
196 int peerWindowAckSize() const { return _peerWindowAckSize.value(); }
197
198 // ---- Counters ----
199
201 int64_t bytesSent() const { return _bytesSent.value(); }
202
204 int64_t bytesReceived() const { return _bytesReceived.value(); }
205
207 int64_t lastAckBytesAcked() const { return _lastAckBytesSent.value(); }
208
209 // ---- Signals ----
210
217 PROMEKI_SIGNAL(controlMessageReceived, RtmpMessage);
218
220 PROMEKI_SIGNAL(peerChunkSizeChanged, int);
221
227 PROMEKI_SIGNAL(peerAck, uint32_t);
228
229 private:
235 struct WriteState {
236 uint32_t timestamp = 0;
237 uint32_t delta = 0;
238 uint32_t messageLength = 0;
239 uint8_t messageTypeId = 0;
240 uint32_t messageStreamId = 0;
241 bool established = false;
242 };
243
249 struct ReadState {
250 uint32_t timestamp = 0;
251 uint32_t delta = 0;
252 uint32_t messageLength = 0;
253 uint8_t messageTypeId = 0;
254 uint32_t messageStreamId = 0;
255 bool extendedTimestamp = false;
256 bool established = false;
257 Buffer reassembly;
258 uint32_t reassemblyBytes = 0;
259 };
260
261 // ---- Internal helpers ----
262
263 Error writeBytes(const uint8_t *data, size_t len);
264 Error readBytesExact(uint8_t *data, size_t len, unsigned int timeoutMs);
265 Error sendSetChunkSize(int bytes);
266 Error sendWindowAckSize(int bytes);
267 Error sendAck(uint32_t cumulative);
268 void handleControl(const RtmpMessage &msg);
269
270 int defaultCsidForType(RtmpMessage::Type type) const;
271
272 IODevice *_device;
273
274 Atomic<int32_t> _localChunkSize{DefaultChunkSize};
275 Atomic<int32_t> _peerChunkSize{DefaultChunkSize};
276 Atomic<int32_t> _localWindowAckSize{DefaultWindowAckSize};
277 Atomic<int32_t> _peerWindowAckSize{0};
278
279 Atomic<int64_t> _bytesSent{0};
280 Atomic<int64_t> _bytesReceived{0};
281 Atomic<int64_t> _lastAckBytesSent{0};
282
283 Mutex _writeMutex;
284 Mutex _readMutex;
285 HashMap<uint32_t, WriteState> _writeStates;
286 HashMap<uint32_t, ReadState> _readStates;
287};
288
289PROMEKI_NAMESPACE_END
290
291#endif // PROMEKI_ENABLE_NETWORK