libpromeki 1.0.0-alpha
PROfessional MEdia toolKIt
 
Loading...
Searching...
No Matches
threadpool.h
Go to the documentation of this file.
1
8#pragma once
9
10
11#include <promeki/config.h>
12#if PROMEKI_ENABLE_CORE
13#include <functional>
14#include <memory>
15#include <type_traits>
16#include <promeki/basicthread.h>
17#include <promeki/function.h>
18#include <promeki/namespace.h>
19#include <promeki/atomic.h>
20#include <promeki/duration.h>
21#include <promeki/string.h>
22#include <promeki/error.h>
23#include <promeki/hashmap.h>
24#include <promeki/mutex.h>
27#include <promeki/timestamp.h>
28#include <promeki/uniqueptr.h>
30#include <promeki/future.h>
31#include <promeki/list.h>
32
33PROMEKI_NAMESPACE_BEGIN
34
67class ThreadPool {
68 public:
92 using WorkTag = StringRegistry<"ThreadPoolWorkTag">::Item;
93
101 struct WorkStats {
102 WorkTag tag;
103 String name;
104 Duration totalWall;
105 Duration totalCpu;
106 Duration totalQueueWait;
107 int64_t count = 0;
108 };
109
121 ThreadPool(int maxThreadCount = -1, bool lazy = true);
122
126 ~ThreadPool();
127
128 ThreadPool(const ThreadPool &) = delete;
129 ThreadPool &operator=(const ThreadPool &) = delete;
130 ThreadPool(ThreadPool &&) = delete;
131 ThreadPool &operator=(ThreadPool &&) = delete;
132
151 template <typename F> auto submit(F &&callable) -> Future<std::invoke_result_t<F>> {
152 return submit(WorkTag(), std::forward<F>(callable));
153 }
154
174 template <typename F> auto submit(WorkTag tag, F &&callable) -> Future<std::invoke_result_t<F>> {
175 using R = std::invoke_result_t<F>;
176 auto task = std::make_shared<std::packaged_task<R()>>(std::forward<F>(callable));
177 Future<R> fut(task->get_future());
178 bool runInline = false;
179 TaggedTask entry;
180 entry.tag = tag;
181 entry.enqueuedAt = TimeStamp::now();
182 entry.callable = [task]() { (*task)(); };
183 {
184 Mutex::Locker locker(_mutex);
185 if (_maxThreadCount == 0) {
186 runInline = true;
187 } else {
188 _tasks.pushToBack(std::move(entry));
189 maybeSpawnOne();
190 _cv.wakeOne();
191 }
192 }
193 if (runInline) {
194 runTaskWithStats(entry);
195 }
196 return fut;
197 }
198
209 void setNamePrefix(const String &prefix);
210
215 String namePrefix() const;
216
228 void setThreadCount(int count, bool lazy = true);
229
234 int maxThreadCount() const;
235
240 int threadCount() const;
241
246 int activeThreadCount() const;
247
251 void waitForDone();
252
258 Error waitForDone(unsigned int timeoutMs);
259
263 void clear();
264
276 void setName(const String &name);
277
279 String name() const;
280
291 List<WorkStats> snapshotWorkStats() const;
292
303 void resetWorkStats();
304
317 static List<ThreadPool *> allPools();
318
319 private:
320 using Task = Function<void()>;
321
324 struct TaggedTask {
325 Task callable;
326 WorkTag tag;
327 TimeStamp enqueuedAt;
328 };
329
334 struct WorkRecord {
335 Atomic<int64_t> totalWallNs{0};
336 Atomic<int64_t> totalCpuNs{0};
337 Atomic<int64_t> totalQueueWaitNs{0};
338 Atomic<int64_t> count{0};
339 String name;
340 WorkTag tag;
341 };
342
343 void workerFunc(int index);
344 void spawnThreads(int count);
345 void maybeSpawnOne();
346
353 void runTaskWithStats(TaggedTask &t);
354
358 WorkRecord *recordFor(WorkTag tag);
359
363 static Mutex &registryMutex();
364 static List<ThreadPool *> &registry();
365
366 mutable Mutex _mutex;
367 WaitCondition _cv;
368 WaitCondition _doneCv;
369 List<TaggedTask> _tasks;
370 List<BasicThread> _threads;
371 String _namePrefix;
372 String _name;
373 int _maxThreadCount = 0;
374 int _threadCount = 0;
375 int _activeCount = 0;
376 int _waitingCount = 0;
377 bool _shutdown = false;
378
379 mutable ReadWriteLock _statsLock;
380 HashMap<uint64_t, UniquePtr<WorkRecord>> _stats;
381};
382
383PROMEKI_NAMESPACE_END
384
385#endif // PROMEKI_ENABLE_CORE