libpromeki 1.0.0-alpha
PROfessional MEdia toolKIt
 
Loading...
Searching...
No Matches
httpconnection.h
Go to the documentation of this file.
1
8#pragma once
9
10
11#include <promeki/config.h>
12#if PROMEKI_ENABLE_HTTP
13#include <functional>
14#include <promeki/function.h>
15#include <promeki/namespace.h>
16#include <promeki/objectbase.h>
17#include <promeki/error.h>
18#include <promeki/buffer.h>
19#include <promeki/string.h>
20#include <promeki/uniqueptr.h>
21#include <promeki/list.h>
22#include <promeki/httprequest.h>
24
25PROMEKI_NAMESPACE_BEGIN
26
27class TcpSocket;
28class EventLoop;
29
72class HttpConnection : public ObjectBase {
73 PROMEKI_OBJECT(HttpConnection, ObjectBase)
74 public:
76 using List = ::promeki::List<HttpConnection *>;
77
79 static constexpr unsigned int DefaultIdleTimeoutMs = 60'000;
80
82 static constexpr int64_t DefaultMaxBodyBytes = 64 * 1024 * 1024;
83
93 using RequestHandler = Function<void(HttpRequest &request, HttpResponse &response)>;
94
106 HttpConnection(TcpSocket *socket, ObjectBase *parent = nullptr);
107
122 void setNeedsServerHandshake();
123
125 ~HttpConnection() override;
126
136 void setRequestHandler(RequestHandler handler);
137
145 Error start();
146
153 void close();
154
156 bool isOpen() const { return _state != State::Closed; }
157
159 String peerAddress() const;
160
161 // ----------------------------------------------------
162 // Limits / tuning
163 // ----------------------------------------------------
164
166 void setIdleTimeoutMs(unsigned int ms) { _idleTimeoutMs = ms; }
167
174 void setMaxBodyBytes(int64_t bytes) { _maxBodyBytes = bytes; }
175
187 Error postResponse(HttpResponse response);
188
190 PROMEKI_SIGNAL(requestReceived, HttpRequest);
191
193 PROMEKI_SIGNAL(responseSent, HttpRequest, HttpResponse);
194
196 PROMEKI_SIGNAL(closed);
197
199 PROMEKI_SIGNAL(errorOccurred, Error);
200
201 private:
202 // High-level state machine. We never re-pipeline more
203 // than one in-flight request; once parsed we wait for
204 // the dispatcher to populate a response, drain it,
205 // then either resume reading (keep-alive) or close.
206 enum class State {
207 Handshaking,
208 Reading,
209 AwaitingResponse,
210 Writing,
211 Closing,
212 Closed
213 };
214
215 struct Impl;
216 UniquePtr<Impl> _impl;
217
218 TcpSocket *_socket = nullptr;
219 EventLoop *_loop = nullptr;
220 int _ioHandle = -1;
221 int _timerId = -1;
222 State _state = State::Reading;
223
224 RequestHandler _handler;
225 Buffer _readBuf;
226 Buffer _writeQueue;
227 size_t _writeOffset = 0;
228
229 HttpRequest _pendingRequest;
230 HttpRequest _lastRequest;
231 IODevice::Shared _streamSource;
232 int64_t _streamRemaining = -1;
233 bool _streamChunked = false;
234 bool _keepAlive = true;
235
236 // Async-read parking: when the body IODevice returns
237 // read()==0 with atEnd()==false, the pump unsubscribes
238 // from IoWrite and waits on the device's readyRead
239 // signal. _streamReadyReadSlotId is the slot id from
240 // Signal::connect — non-negative while parked.
241 size_t _streamReadyReadSlotId = 0;
242 bool _streamReadyReadConnected = false;
243 bool _streamParked = false;
244
245 // Protocol upgrade. When the response carries an
246 // upgrade hook (HttpResponse::upgradeHook()) and is a
247 // 101 Switching Protocols, _pendingUpgradeHook is set
248 // here and fires once the response finishes writing.
249 HttpResponse::UpgradeHook _pendingUpgradeHook;
250
251 unsigned int _idleTimeoutMs = DefaultIdleTimeoutMs;
252 int64_t _maxBodyBytes = DefaultMaxBodyBytes;
253 int64_t _bodyBytesSoFar = 0;
254 bool _needsServerHandshake = false;
255
256 // Header-collection scratch. llhttp emits header
257 // fields and values as separate callbacks (and may
258 // fragment a single field across multiple callbacks)
259 // so we accumulate into these strings until both
260 // sides are complete, then push into _pendingRequest.
261 String _hdrField;
262 String _hdrValue;
263 bool _hdrFieldComplete = false;
264 bool _hdrValueComplete = false;
265 String _urlBuf;
266
267 void onIoReady(int fd, uint32_t events);
268 void onIdleTimeout();
269 void readSome();
270 void pumpWrite();
271 void deliverRequest();
272 void enqueueResponse(HttpResponse response);
273 void scheduleStreamPump();
274 void completeProtocolUpgrade();
275
276 // Async-read parking: hook the stream's readyRead so
277 // pumpWrite resumes when the producer pushes more
278 // bytes; detach when the stream finishes or the
279 // connection closes. Both are no-ops if the stream
280 // isn't a sequential async device.
281 void attachStreamReadyRead();
282 void detachStreamReadyRead();
283 void onStreamReadyRead();
284
285 void flushPendingHeaderPair();
286 void resetForNextRequest();
287
288 // llhttp callbacks (defined in cpp; bridge via parser->data).
289 static int cbMessageBegin(void *parser);
290 static int cbUrl(void *parser, const char *at, size_t len);
291 static int cbHeaderField(void *parser, const char *at, size_t len);
292 static int cbHeaderValue(void *parser, const char *at, size_t len);
293 static int cbHeadersComplete(void *parser);
294 static int cbBody(void *parser, const char *at, size_t len);
295 static int cbMessageComplete(void *parser);
296};
297
298PROMEKI_NAMESPACE_END
299
300#endif // PROMEKI_ENABLE_HTTP