dfx 0.1.0
Linux-based dynamic dataflow executor
Loading...
Searching...
No Matches
SpscFixedQueue.hpp
1// SPDX-FileCopyrightText: 2026 Vincent Leroy
2// SPDX-License-Identifier: MIT
3//
4// This file is part of dfx.
5//
6// Licensed under the MIT License. See the LICENSE file in the project root
7// for full license information.
8
9#pragma once
10
11// C++ includes
12#include <array>
13#include <atomic>
14#include <new>
15#include <optional>
16
17// Project includes
18#include "../CompilerSupport.hpp"
20
21namespace dfx::Utils
22{
45template<typename T, std::size_t Capacity>
47{
48 static_assert(Capacity > 1);
49
50public:
51 // We add 1 to the capacity of the internal container because of the way we handle
52 // the detection of isEmpty / isFull. isEmpty is readItr == writeItr and isFull
53 // is readItr == (writeItr + 1) so if we didn't add this extra capacity
54 // isFull would return true at Capacity - 1 which would be unexpected
55 // from the user point of view
56 using container_type = std::array<std::byte, (Capacity + 1) * sizeof(T)>;
57 using value_type = T;
58 using size_type = typename container_type::size_type;
59 using reference = value_type &;
60 using const_reference = value_type const &;
61
62public:
67
69
70public:
72 [[nodiscard]] bool empty() const noexcept;
73
75 [[nodiscard]] bool full() const noexcept;
76
83 [[nodiscard]] size_type size() const noexcept;
84
86 [[nodiscard]] constexpr size_type capacity() const noexcept { return Capacity; }
87
88public:
98 template<typename U>
99 requires std::is_same_v<std::decay_t<U>, T>
100 bool tryPush(U && value);
101
108 std::optional<T> tryPop();
109
110private:
111 alignas(T) container_type _queue;
112
113private:
114 using atomic_itr = std::atomic<typename container_type::iterator>;
115
116#ifdef __cpp_lib_hardware_interference_size
117 DFX_DIAGNOSTIC_PUSH
118 // We ignore this warning because this queue is not part of any public API
119 DFX_DIAGNOSTIC_IGNORE("-Winterference-size")
120 static constexpr auto alignSize = std::hardware_destructive_interference_size;
121 DFX_DIAGNOSTIC_POP
122#else
123 static constexpr auto alignSize = 64;
124#endif
125
126 struct alignas(alignSize) ReadSide
127 {
128 atomic_itr itr;
129 } _read;
130
131 struct alignas(alignSize) WriteSide
132 {
133 atomic_itr itr;
134 } _write;
135};
136
137template<typename T, std::size_t Capacity>
139 : _read { std::begin(_queue) }
140 , _write { std::begin(_queue) }
141{
142}
143
144template<typename T, std::size_t Capacity>
146{
147 if constexpr (!std::is_trivially_destructible_v<T>)
148 {
149 // We are in the dtor: no need for heavy synchronization mechanism
150 for (auto rItr = _read.itr.load(std::memory_order_relaxed); rItr != _write.itr.load(std::memory_order_relaxed);)
151 {
152 reinterpret_cast<T *>(rItr)->~T(); // Placement new == explicit call to dtor
153
154 rItr += sizeof(T);
155 if (rItr == std::end(_queue))
156 rItr = std::begin(_queue);
157 }
158 }
159}
160
161template<typename T, std::size_t Capacity>
162inline bool SpscFixedQueue<T, Capacity>::empty() const noexcept
163{
164 return size() == 0;
165}
166
167template<typename T, std::size_t Capacity>
168inline bool SpscFixedQueue<T, Capacity>::full() const noexcept
169{
170 return size() == capacity();
171}
172
173template<typename T, std::size_t Capacity>
174inline SpscFixedQueue<T, Capacity>::size_type SpscFixedQueue<T, Capacity>::size() const noexcept
175{
176 using const_iterator = container_type::const_iterator;
177
178 auto const rItr = const_iterator(_read.itr.load(std::memory_order_acquire));
179 auto const wItr = const_iterator(_write.itr.load(std::memory_order_acquire));
180
181 auto const rPos = static_cast<size_type>(std::distance(std::begin(_queue), rItr)) / sizeof(T);
182 auto const wPos = static_cast<size_type>(std::distance(std::begin(_queue), wItr)) / sizeof(T);
183
184 return wPos >= rPos ? (wPos - rPos) : (Capacity + 1 + wPos - rPos);
185}
186
187template<typename T, std::size_t Capacity>
188template<typename U>
189requires std::is_same_v<std::decay_t<U>, T>
191{
192 auto const wItr = _write.itr.load(std::memory_order_relaxed);
193 auto nextWItr = wItr + sizeof(T);
194 if (nextWItr == std::end(_queue))
195 nextWItr = std::begin(_queue);
196 auto const isQueueFull = [nextWItr, this] noexcept { return nextWItr == _read.itr.load(std::memory_order_acquire); };
197
198 if (isQueueFull())
199 return false;
200
201 new (wItr) T(std::forward<U>(value));
202
203 _write.itr.store(nextWItr, std::memory_order_release);
204 return true;
205}
206
207template<typename T, std::size_t Capacity>
208inline std::optional<T> SpscFixedQueue<T, Capacity>::tryPop()
209{
210 auto const rItr = _read.itr.load(std::memory_order_relaxed);
211 auto const isQueueEmpty = [&rItr, this] noexcept { return rItr == _write.itr.load(std::memory_order_acquire); };
212
213 if (isQueueEmpty())
214 return std::nullopt;
215
216 auto nextRItr = rItr + sizeof(T);
217 if (nextRItr == std::end(_queue))
218 nextRItr = std::begin(_queue);
219
220 auto value = std::move(*std::launder(reinterpret_cast<T *>(rItr)));
221
222 if constexpr (!std::is_trivially_destructible_v<T>)
223 reinterpret_cast<T *>(rItr)->~T(); // Placement new == explicit call to dtor
224
225 _read.itr.store(nextRItr, std::memory_order_release);
226
227 return value;
228}
229} // !namespace dfx::Utils
Convenience macros to explicitly control copy and move semantics.
#define DFX_DISABLE_COPY_AND_MOVE(ClassName)
Disable both copy and move.
Definition CopyMoveControl.hpp:37
bool full() const noexcept
Definition SpscFixedQueue.hpp:168
constexpr size_type capacity() const noexcept
Returns the maximum capacity of the queue.
Definition SpscFixedQueue.hpp:86
size_type size() const noexcept
Definition SpscFixedQueue.hpp:174
SpscFixedQueue()
Default constructor.
Definition SpscFixedQueue.hpp:138
bool empty() const noexcept
Checks if the queue contains no elements.
Definition SpscFixedQueue.hpp:162
std::optional< T > tryPop()
Attempts to pop an element from the queue.
Definition SpscFixedQueue.hpp:208
~SpscFixedQueue()
Destructor.
Definition SpscFixedQueue.hpp:145
bool tryPush(U &&value)
Attempts to push an element into the queue.
Definition SpscFixedQueue.hpp:190
Definition SystemConfigCommandHandler.hpp:15
STL namespace.