dfx 0.1.0
Linux-based dynamic dataflow executor
Loading...
Searching...
No Matches
SyncQueue.hpp
1// SPDX-FileCopyrightText: 2025 Vincent Leroy
2// SPDX-License-Identifier: MIT
3//
4// This file is part of dfx.
5//
6// Licensed under the MIT License. See the LICENSE file in the project root
7// for full license information.
8
9#pragma once
10
11// C includes
12#include <sys/eventfd.h>
13#include <unistd.h>
14
15// C++ includes
16#include <array>
17#include <atomic>
18#include <cerrno>
19#include <condition_variable>
20#include <cstddef>
21#include <functional>
22#include <mutex>
23#include <new>
24#include <optional>
25#include <type_traits>
26
27// Project includes
29#include <dfx-utilities/CompilerSupport.hpp>
30
32{
34{
35 constexpr void lock() noexcept {}
36 constexpr void unlock() noexcept {}
37 constexpr bool try_lock() noexcept { return true; }
38};
39
40inline ALWAYS_INLINE_ATTR void cpuRelax() noexcept
41{
42#ifdef PLATFORM_ARCH_X86
43 __builtin_ia32_pause();
44#elif PLATFORM_ARCH_ARM
45 asm volatile("yield" ::: "memory");
46#endif
47}
48
49template<typename T, bool sp, std::size_t Capacity = 1'000>
50class SyncQueue
51{
52 static_assert(Capacity > 1);
53
54public:
55 // We add 1 to the capacity of the internal container because of the way we handle
56 // the detection of isEmpty / isFull. isEmpty is readItr == writeItr and isFull
57 // is readItr == (writeItr + 1) so if we didn't add this extra capacity
58 // isFull would return true at Capacity - 1 which would be unexpected
59 // from the user point of view
60 using container_type = std::array<std::byte, (Capacity + 1) * sizeof(T)>;
61 using mutex_type = std::mutex;
62 using pmutex_type = std::conditional_t<sp, noOpMutex, std::mutex>;
63 using value_type = T;
64 using size_type = typename container_type::size_type;
65 using reference = value_type &;
66 using const_reference = value_type const &;
67 using opt_value_type = std::optional<std::reference_wrapper<value_type>>;
68
69public:
70 SyncQueue();
71 SyncQueue(SyncQueue<T, sp, Capacity> const &) = delete;
72 SyncQueue(SyncQueue<T, sp, Capacity> &&) = delete;
73 ~SyncQueue();
74
75 SyncQueue<T, sp, Capacity> & operator=(SyncQueue<T, sp, Capacity> const &) = delete;
76 SyncQueue<T, sp, Capacity> & operator=(SyncQueue<T, sp, Capacity> &&) = delete;
77
78public:
79 [[nodiscard]] bool empty() const noexcept;
80 [[nodiscard]] bool full() const noexcept;
81 [[nodiscard]] size_type size() const noexcept;
82 [[nodiscard]] constexpr size_type capacity() const noexcept { return Capacity; }
83
84 int createReadEventFd(bool nonblock = false);
85 [[nodiscard]] int readEventFd() const noexcept { return _readEventFd; }
86
87 void forceQuit() noexcept;
88
89public:
90 template<typename ... Args>
91 void emplace(Args && ... args);
92
93 [[nodiscard]] opt_value_type front();
94 [[nodiscard]] opt_value_type tryFront() noexcept;
95 void pop();
96
97public:
98 void setSpinWaitCount(uint32_t counter) noexcept { _spinWaitCounter = counter; }
99 void disableSpinWait() noexcept { setSpinWaitCount(0); }
100 uint32_t spinWaitCount() const noexcept { return _spinWaitCounter; }
101
102private:
103 void _notifyReader() noexcept;
104 void _notifyWriter() noexcept;
105
106private:
107 alignas(T) container_type _queue;
108
109private:
110 using atomic_itr = std::atomic<typename container_type::iterator>;
111 static_assert(atomic_itr::is_always_lock_free);
112
113 static constexpr auto alignSize = 64;
114
115 struct alignas(alignSize) ReadSide
116 {
117 atomic_itr itr;
118 std::atomic<size_type> snapshot{0};
119 } _read;
120
121 struct alignas(alignSize) WriteSide
122 {
123 atomic_itr itr;
124 std::atomic<size_type> snapshot{0};
125 } _write;
126
127 struct alignas(alignSize) Flags
128 {
129 std::atomic_bool forceQuit = false;
130 std::atomic_bool hasWriteWaiter = false;
131 std::atomic_bool hasReadWaiter = false;
132 } _flags;
133
134private:
135 mutex_type _mutex;
136 pmutex_type _producerMutex;
137 std::condition_variable _cvReader;
138 std::condition_variable _cvWriter;
139
140private:
141 uint32_t _spinWaitCounter = 1'000;
142 int _readEventFd = -1;
143};
144
145template<typename T, bool sp, std::size_t Capacity>
146SyncQueue<T, sp, Capacity>::SyncQueue()
147 : _read { std::begin(_queue) }
148 , _write { std::begin(_queue) }
149{
150}
151
152template<typename T, bool sp, std::size_t Capacity>
153SyncQueue<T, sp, Capacity>::~SyncQueue()
154{
155 _flags.forceQuit.store(true, std::memory_order_release);
156 if (_readEventFd != -1)
157 close(_readEventFd);
158
159 _notifyReader();
160 _notifyWriter();
161
162 if constexpr (!std::is_trivially_destructible_v<T>)
163 {
164 // We are in the dtor: no need for heavy synchronization mechanism
165 for (auto rItr = _read.itr.load(std::memory_order_relaxed); rItr != _write.itr.load(std::memory_order_relaxed);)
166 {
167 reinterpret_cast<T *>(rItr)->~T(); // Placement new == explicit call to dtor
168
169 rItr += sizeof(T);
170 if (rItr == std::end(_queue))
171 rItr = std::begin(_queue);
172 }
173 }
174}
175
176template<typename T, bool sp, std::size_t Capacity>
177bool SyncQueue<T, sp, Capacity>::empty() const noexcept
178{
179 return size() == 0;
180}
181
182template<typename T, bool sp, std::size_t Capacity>
183bool SyncQueue<T, sp, Capacity>::full() const noexcept
184{
185 return size() == capacity();
186}
187
188template<typename T, bool sp, std::size_t Capacity>
189typename SyncQueue<T, sp, Capacity>::size_type SyncQueue<T, sp, Capacity>::size() const noexcept
190{
191 auto const rPos = _read.snapshot.load(std::memory_order_acquire) / sizeof(T);
192 auto const wPos = _write.snapshot.load(std::memory_order_acquire) / sizeof(T);
193
194 return wPos >= rPos ? (wPos - rPos) : (Capacity + 1 + wPos - rPos);
195}
196
197template<typename T, bool sp, std::size_t Capacity>
198int SyncQueue<T, sp, Capacity>::createReadEventFd(bool nonblock)
199{
200 if (_readEventFd != -1)
201 return _readEventFd;
202
203 int flags = EFD_CLOEXEC;
204 if (nonblock)
205 flags |= EFD_NONBLOCK;
206
207 _readEventFd = eventfd(size(), flags);
208 B_ASSERT(_readEventFd != -1, "Failed to create eventfd: {}", strerror(errno));
209 return _readEventFd;
210}
211
212template<typename T, bool sp, std::size_t Capacity>
213void SyncQueue<T, sp, Capacity>::forceQuit() noexcept
214{
215 {
216 std::lock_guard<std::mutex> lock(_mutex);
217 _flags.forceQuit.store(true, std::memory_order_release);
218 }
219
220 _cvReader.notify_all();
221 _cvWriter.notify_all();
222}
223
224template<typename T, bool sp, std::size_t Capacity>
225template<typename ... Args>
226void SyncQueue<T, sp, Capacity>::emplace(Args && ... args)
227{
228 std::unique_lock<pmutex_type> pLock(_producerMutex, std::defer_lock);
229 if constexpr (!sp)
230 pLock.lock();
231
232 auto const wItr = _write.itr.load(std::memory_order_relaxed);
233 auto nextWItr = wItr + sizeof(T);
234 if (nextWItr == std::end(_queue))
235 nextWItr = std::begin(_queue);
236 auto const isQueueFull = [nextWItr, this]() noexcept { return nextWItr == _read.itr.load(std::memory_order_acquire); };
237
238 if (isQueueFull())
239 {
240 for (uint32_t counter = _spinWaitCounter; counter > 0 && isQueueFull(); --counter)
241 cpuRelax();
242
243 if (isQueueFull())
244 {
245 std::unique_lock<mutex_type> lock(_mutex);
246 _flags.hasWriteWaiter.store(true, std::memory_order_release);
247 _cvWriter.wait(lock, [&isQueueFull, this]() noexcept
248 {
249 // Stop waiting if not full anymore or force quit
250 return !isQueueFull() || UNLIKELY(_flags.forceQuit.load(std::memory_order_acquire));
251 });
252 // No need to worry about RAII here as everything is marked noexcept
253 _flags.hasWriteWaiter.store(false, std::memory_order_release);
254
255 if (UNLIKELY(_flags.forceQuit.load(std::memory_order_relaxed)))
256 return ;
257 }
258 }
259
260 new (wItr) T(std::forward<Args>(args)...);
261
262 {
263 // The mutex has to be locked here even if we use an atomic
264 // otherwise the following scenario can occur (and in fact has):
265 // - thread A waiting on _cvReader wakes spuriously
266 // - thread B call _write.writeItr store function
267 // - thread B call _cvReader notify_one function
268 // - thread A waits on _cvReader and so the notification has been missed
269 // c.f. https://stackoverflow.com/a/36130475/3368370
270 std::lock_guard<mutex_type> lock(_mutex);
271 _write.itr.store(nextWItr, std::memory_order_release);
272
273 auto offset = static_cast<size_type>(std::distance(std::begin(_queue), nextWItr));
274 offset %= (Capacity + 1) * sizeof(T);
275 _write.snapshot.store(offset, std::memory_order_release);
276 }
277
278 if (_flags.hasReadWaiter.load(std::memory_order_acquire) || _readEventFd != -1)
279 _notifyReader();
280}
281
282template<typename T, bool sp, std::size_t Capacity>
283typename SyncQueue<T, sp, Capacity>::opt_value_type SyncQueue<T, sp, Capacity>::front()
284{
285 auto const rItr = _read.itr.load(std::memory_order_relaxed);
286 auto const isQueueEmpty = [&rItr, this]() noexcept { return rItr == _write.itr.load(std::memory_order_acquire); };
287
288 if (isQueueEmpty())
289 {
290 for (uint32_t counter = _spinWaitCounter; counter > 0 && isQueueEmpty(); --counter)
291 cpuRelax();
292
293 if (isQueueEmpty())
294 {
295 std::unique_lock<mutex_type> lock(_mutex);
296 _flags.hasReadWaiter.store(true, std::memory_order_release);
297 _cvReader.wait(lock, [&isQueueEmpty, this]() noexcept
298 {
299 // Stop waiting if not empty anymore or force quit
300 return !isQueueEmpty() || UNLIKELY(_flags.forceQuit.load(std::memory_order_acquire));
301 });
302 // No need to worry about RAII here as everything is marked noexcept
303 _flags.hasReadWaiter.store(false, std::memory_order_release);
304
305 if (UNLIKELY(_flags.forceQuit.load(std::memory_order_relaxed)))
306 return {};
307 }
308 }
309
310 return *std::launder(reinterpret_cast<T *>(rItr));
311}
312
313template<typename T, bool sp, std::size_t Capacity>
314typename SyncQueue<T, sp, Capacity>::opt_value_type SyncQueue<T, sp, Capacity>::tryFront() noexcept
315{
316 auto const rItr = _read.itr.load(std::memory_order_relaxed);
317 if (rItr == _write.itr.load(std::memory_order_acquire)) // Queue is empty
318 return {};
319
320 return *std::launder(reinterpret_cast<T *>(rItr));
321}
322
323template<typename T, bool sp, std::size_t Capacity>
324void SyncQueue<T, sp, Capacity>::pop()
325{
326 auto const rItr = _read.itr.load(std::memory_order_relaxed);
327 B_ASSERT(rItr != _write.itr.load(std::memory_order_acquire), "Trying to pop an empty queue");
328
329 auto nextRItr = rItr + sizeof(T);
330 if (nextRItr == std::end(_queue))
331 nextRItr = std::begin(_queue);
332
333 if constexpr (!std::is_trivially_destructible_v<T>)
334 reinterpret_cast<T *>(rItr)->~T(); // Placement new == explicit call to dtor
335
336 {
337 // The mutex has to be locked here even if we use an atomic
338 // otherwise the following scenario can occur (and in fact has):
339 // - thread A waiting on _cvWriter wakes spuriously
340 // - thread B call _read.readItr store function
341 // - thread B call _cvWriter notify_one function
342 // - thread A waits on _cvWriter and so the notification has been missed
343 // c.f. https://stackoverflow.com/a/36130475/3368370
344 std::lock_guard<mutex_type> lock(_mutex);
345 _read.itr.store(nextRItr, std::memory_order_release);
346
347 auto offset = static_cast<size_type>(std::distance(std::begin(_queue), nextRItr));
348 offset %= (Capacity + 1) * sizeof(T);
349 _read.snapshot.store(offset, std::memory_order_release);
350 }
351
352 if (_flags.hasWriteWaiter.load(std::memory_order_acquire))
353 _notifyWriter();
354}
355
356template<typename T, bool sp, std::size_t Capacity>
357void SyncQueue<T, sp, Capacity>::_notifyReader() noexcept
358{
359 _cvReader.notify_one();
360
361 if (_readEventFd != -1)
362 {
363 uint64_t const val = 1;
364 write(_readEventFd, &val, sizeof(val));
365 }
366}
367
368template<typename T, bool sp, std::size_t Capacity>
369void SyncQueue<T, sp, Capacity>::_notifyWriter() noexcept
370{
371 _cvWriter.notify_one();
372}
373} // !namespace dfx::Core::details
Exception utilities for dfx (source-location aware exceptions, nested stacks, and safe invocation hel...
#define B_ASSERT(expr, msg,...)
Assert-like check that throws an dfx::Utils::Exception on failure.
Definition Exception.hpp:251
Definition SyncQueue.hpp:32
STL namespace.
Definition SyncQueue.hpp:34