dfx 0.1.0
Linux-based dynamic dataflow executor
Loading...
Searching...
No Matches
ThreadPool.hpp
1// SPDX-FileCopyrightText: 2025 Vincent Leroy
2// SPDX-License-Identifier: MIT
3//
4// This file is part of dfx.
5//
6// Licensed under the MIT License. See the LICENSE file in the project root
7// for full license information.
8
9#pragma once
10
11// Standard includes
12#include <condition_variable>
13#include <list>
14#include <mutex>
15#include <optional>
16#include <shared_mutex>
17#include <thread>
18#include <vector>
19
20// Project includes
21#include "tasks/Task.hpp"
22#include "ThreadData.hpp"
23#include <dfx-utilities/MonotonicIdAllocator.hpp>
24
25namespace dfx::Runtime
26{
56{
57 struct QueueItem
58 {
59 TaskPtr task;
60 uint64_t seq;
61 };
62
63 struct QueueItemCompare
64 {
65 bool operator()(QueueItem const & a, QueueItem const & b) const noexcept
66 {
67 // higher priority first
68 auto const ap = a.task->priority();
69 auto const bp = b.task->priority();
70 if (ap != bp)
71 return ap < bp;
72
73 // FIFO among equal priorities: smaller seq first
74 return a.seq > b.seq;
75 }
76 };
77
78public:
84 ThreadPool(std::size_t threadCount = 1);
85
92
98 void addThread(std::size_t count = 1);
99
110 void removeThread(std::size_t count = 1);
111
113 std::size_t threadCount() const noexcept { std::lock_guard lock(_mutexThreads); return _threads.size(); }
114
115public:
118 void pushTask(TaskPtr task);
119
122 void pushTasks(std::vector<TaskPtr> tasks);
123
129 template<DerivedFromTask T, typename ... Args>
130 requires std::is_constructible_v<T, Args...>
131 void emplaceTask(Args && ... args)
132 { pushTask(std::make_unique<T>(std::forward<Args>(args)...)); }
133
134public:
136 using MaybeThreadData = std::optional<std::reference_wrapper<ThreadData>>;
138 using MaybeCThreadData = std::optional<std::reference_wrapper<ThreadData const>>;
139
145
151
152private:
153 void _work(std::stop_token stopToken);
154
155private:
156 std::vector<QueueItem> _queue;
157 std::mutex _mutexQueue;
159 std::condition_variable_any _cvQueue;
160
161private:
162 // Use a list instead of a map so interators and references
163 // aren't invalidated when inserted / removing elements
164 std::list<ThreadData> _threadDatas;
165 mutable std::shared_mutex _mutexThreadData;
166
167private:
168 mutable std::mutex _mutexThreads;
169 // The threads have to be delete first in the dtor
170 std::vector<std::jthread> _threads;
171};
172} // !namespace dfx::Runtime
void emplaceTask(Args &&... args)
Construct a task in-place and submit it.
Definition ThreadPool.hpp:131
std::size_t threadCount() const noexcept
Current number of worker threads.
Definition ThreadPool.hpp:113
MaybeThreadData threadData()
Retrieve the calling thread's ThreadData if it belongs to this pool.
void pushTasks(std::vector< TaskPtr > tasks)
Submit multiple tasks for execution.
std::optional< std::reference_wrapper< ThreadData > > MaybeThreadData
Optional reference to mutable thread data.
Definition ThreadPool.hpp:136
void pushTask(TaskPtr task)
Submit a task for execution.
std::optional< std::reference_wrapper< ThreadData const > > MaybeCThreadData
Optional reference to immutable thread data.
Definition ThreadPool.hpp:138
MaybeCThreadData threadData() const
Retrieve the calling thread's ThreadData if it belongs to this pool.
void addThread(std::size_t count=1)
Add worker threads to the pool.
~ThreadPool()
Stop all worker threads and abandon remaining tasks.
ThreadPool(std::size_t threadCount=1)
Construct a thread pool with an initial number of worker threads.
void removeThread(std::size_t count=1)
Remove worker threads from the pool.
Monotonically increasing ID allocator.
Definition MonotonicIdAllocator.hpp:52
Convenience concept for constraining templates to Task-derived types.
Definition Task.hpp:24
Definition Node.hpp:47
std::unique_ptr< Task > TaskPtr
Unique ownership pointer for tasks.
Definition Task.hpp:102