dfx 0.1.0
Linux-based dynamic dataflow executor
Loading...
Searching...
No Matches
PriorityStableQueue.hpp
1// SPDX-FileCopyrightText: 2025 Vincent Leroy
2// SPDX-License-Identifier: MIT
3//
4// This file is part of dfx.
5//
6// Licensed under the MIT License. See the LICENSE file in the project root
7// for full license information.
8
9#pragma once
10
11// Standard includes
12#include <algorithm>
13#include <concepts>
14#include <condition_variable>
15#include <mutex>
16#include <optional>
17#include <span>
18#include <vector>
19
20// Project includes
22#include "../MonotonicIdAllocator.hpp"
23
24namespace dfx::Utils
25{
29template<typename Mapper, typename T>
30concept PriorityMapper = requires(Mapper m, T const & val)
31{
32 { m(val) } -> std::convertible_to<int>;
33};
34
48template<typename T>
50{
51private:
52 struct QueueItem
53 {
54 T item;
55 int priority;
56 uint64_t seq;
57 };
58
59 struct QueueItemCompare
60 {
61 bool operator()(QueueItem const & a, QueueItem const & b) const noexcept
62 {
63 // higher priority first
64 auto const ap = a.priority;
65 auto const bp = b.priority;
66 if (ap != bp)
67 return ap < bp;
68
69 // FIFO among equal priorities: smaller seq first
70 return a.seq > b.seq;
71 }
72 };
73
74public:
75 using value_type = T;
76 using size_type = std::size_t;
77 using difference_type = std::ptrdiff_t;
78 using reference = value_type &;
79 using const_reference = value_type const &;
80 using pointer = value_type *;
81 using const_pointer = value_type const *;
82
83public:
86
88
89public:
94 void push(T item, int priority = 0);
95
103 void pushRange(std::span<T> items, int priority = 0);
104
113 template<typename Mapper>
115 void pushRange(std::span<T> items, Mapper && priorityMapper);
116
126 std::optional<T> pop(std::stop_token stopToken = {});
127
132 std::optional<T> tryPop();
133
140 std::vector<T> drain();
141
143 void clear();
144
150 bool empty() const;
151
155 size_type size() const;
156
157private:
158 mutable std::mutex _mutex;
159 std::vector<QueueItem> _stableQueue;
161 std::condition_variable_any _cvQueue;
162};
163
164template<typename T>
165inline void PriorityStableQueue<T>::push(T item, int priority)
166{
167 {
168 std::lock_guard lock(_mutex);
169 _stableQueue.emplace_back(std::move(item), priority, _seqAllocator.next());
170 std::push_heap(_stableQueue.begin(), _stableQueue.end(), QueueItemCompare());
171 }
172
173 _cvQueue.notify_one();
174}
175
176template<typename T>
177inline void PriorityStableQueue<T>::pushRange(std::span<T> items, int priority)
178{
179 if (items.empty())
180 return ;
181
182 {
183 std::lock_guard lock(_mutex);
184
185 for (auto & item : items)
186 _stableQueue.emplace_back(std::move(item), priority, _seqAllocator.next());
187
188 std::make_heap(_stableQueue.begin(), _stableQueue.end(), QueueItemCompare{});
189 }
190
191 _cvQueue.notify_all();
192}
193
194template<typename T>
195template<typename Mapper>
197inline void PriorityStableQueue<T>::pushRange(std::span<T> items, Mapper && priorityMapper)
198{
199 if (items.empty())
200 return ;
201
202 {
203 std::lock_guard lock(_mutex);
204
205 for (auto & item : items)
206 _stableQueue.emplace_back(std::move(item), priorityMapper(item), _seqAllocator.next());
207
208 std::make_heap(_stableQueue.begin(), _stableQueue.end(), QueueItemCompare{});
209 }
210
211 _cvQueue.notify_all();
212}
213
214template<typename T>
215inline std::optional<T> PriorityStableQueue<T>::pop(std::stop_token stopToken)
216{
217 std::unique_lock lock(_mutex);
218 if (!_cvQueue.wait(lock, stopToken, [this] noexcept { return !_stableQueue.empty(); }))
219 return std::nullopt;
220
221 // std::conditional_variable_any::wait return value, in case a stop is requested,
222 // is not always false but the value returned by the predicate at this moment
223 // So here we ensure that if a stop was requested, it will be honored asap
224 if (stopToken.stop_requested())
225 return std::nullopt;
226
227 std::pop_heap(_stableQueue.begin(), _stableQueue.end(), QueueItemCompare());
228 auto item = std::move(_stableQueue.back().item);
229 _stableQueue.pop_back();
230
231 return item;
232}
233
234template<typename T>
235inline std::optional<T> PriorityStableQueue<T>::tryPop()
236{
237 std::lock_guard lock(_mutex);
238 if (_stableQueue.empty())
239 return std::nullopt;
240
241 std::pop_heap(_stableQueue.begin(), _stableQueue.end(), QueueItemCompare());
242 auto item = std::move(_stableQueue.back().item);
243 _stableQueue.pop_back();
244
245 return item;
246}
247
248template<typename T>
249inline std::vector<T> PriorityStableQueue<T>::drain()
250{
251 std::vector<QueueItem> items;
252
253 {
254 std::lock_guard lock(_mutex);
255 if (_stableQueue.empty())
256 return {};
257
258 items = std::move(_stableQueue);
259 _stableQueue.clear();
260 }
261
262 // std::sort is faster than std::sort_heap
263 std::sort(items.begin(), items.end(), QueueItemCompare());
264 // We want the highest priority first
265 std::reverse(items.begin(), items.end());
266
267 std::vector<T> result;
268 result.reserve(items.size());
269 for (auto & qi : items)
270 result.push_back(std::move(qi.item));
271
272 return result;
273}
274
275template<typename T>
277{
278 std::lock_guard lock(_mutex);
279 _stableQueue.clear();
280}
281
282template<typename T>
284{
285 std::lock_guard lock(_mutex);
286 return _stableQueue.empty();
287}
288
289template<typename T>
290inline PriorityStableQueue<T>::size_type PriorityStableQueue<T>::size() const
291{
292 std::lock_guard lock(_mutex);
293 return _stableQueue.size();
294}
295} // !namespace dfx::Utils
Convenience macros to explicitly control copy and move semantics.
#define DFX_DISABLE_COPY_AND_MOVE(ClassName)
Disable both copy and move.
Definition CopyMoveControl.hpp:37
Monotonically increasing ID allocator.
Definition MonotonicIdAllocator.hpp:52
void push(T item, int priority=0)
Pushes a single value into the queue.
Definition PriorityStableQueue.hpp:165
void pushRange(std::span< T > items, int priority=0)
Pushes a range of values into the queue with a shared priority.
Definition PriorityStableQueue.hpp:177
size_type size() const
Returns the current number of elements in the queue.
Definition PriorityStableQueue.hpp:290
std::optional< T > pop(std::stop_token stopToken={})
Removes and returns the highest priority element (blocking).
Definition PriorityStableQueue.hpp:215
PriorityStableQueue()=default
Default constructor.
bool empty() const
Checks if the queue is empty.
Definition PriorityStableQueue.hpp:283
std::vector< T > drain()
Removes all elements from the queue and returns them in priority order.
Definition PriorityStableQueue.hpp:249
void clear()
Removes all elements from the queue.
Definition PriorityStableQueue.hpp:276
std::optional< T > tryPop()
Attempts to remove and return the highest priority element (non-blocking).
Definition PriorityStableQueue.hpp:235
Concept for a priority extractor.
Definition PriorityStableQueue.hpp:30
Definition SystemConfigCommandHandler.hpp:15