35 constexpr void lock()
noexcept {}
36 constexpr void unlock()
noexcept {}
37 constexpr bool try_lock()
noexcept {
return true; }
40inline ALWAYS_INLINE_ATTR
void cpuRelax() noexcept
42#ifdef PLATFORM_ARCH_X86
43 __builtin_ia32_pause();
44#elif PLATFORM_ARCH_ARM
45 asm volatile(
"yield" :::
"memory");
49template<
typename T,
bool sp, std::
size_t Capacity = 1'000>
52 static_assert(Capacity > 1);
60 using container_type = std::array<std::byte, (Capacity + 1) *
sizeof(T)>;
61 using mutex_type = std::mutex;
62 using pmutex_type = std::conditional_t<sp, noOpMutex, std::mutex>;
64 using size_type =
typename container_type::size_type;
65 using reference = value_type &;
66 using const_reference = value_type
const &;
67 using opt_value_type = std::optional<std::reference_wrapper<value_type>>;
71 SyncQueue(SyncQueue<T, sp, Capacity>
const &) =
delete;
72 SyncQueue(SyncQueue<T, sp, Capacity> &&) =
delete;
75 SyncQueue<T, sp, Capacity> & operator=(SyncQueue<T, sp, Capacity>
const &) =
delete;
76 SyncQueue<T, sp, Capacity> & operator=(SyncQueue<T, sp, Capacity> &&) =
delete;
79 [[nodiscard]]
bool empty()
const noexcept;
80 [[nodiscard]]
bool full()
const noexcept;
81 [[nodiscard]] size_type size()
const noexcept;
82 [[nodiscard]]
constexpr size_type capacity()
const noexcept {
return Capacity; }
84 int createReadEventFd(
bool nonblock =
false);
85 [[nodiscard]]
int readEventFd()
const noexcept {
return _readEventFd; }
87 void forceQuit()
noexcept;
90 template<
typename ... Args>
91 void emplace(Args && ... args);
93 [[nodiscard]] opt_value_type front();
94 [[nodiscard]] opt_value_type tryFront()
noexcept;
98 void setSpinWaitCount(uint32_t counter)
noexcept { _spinWaitCounter = counter; }
99 void disableSpinWait()
noexcept { setSpinWaitCount(0); }
100 uint32_t spinWaitCount()
const noexcept {
return _spinWaitCounter; }
103 void _notifyReader()
noexcept;
104 void _notifyWriter()
noexcept;
107 alignas(T) container_type _queue;
110 using atomic_itr = std::atomic<typename container_type::iterator>;
111 static_assert(atomic_itr::is_always_lock_free);
113 static constexpr auto alignSize = 64;
115 struct alignas(alignSize) ReadSide
118 std::atomic<size_type> snapshot{0};
121 struct alignas(alignSize) WriteSide
124 std::atomic<size_type> snapshot{0};
127 struct alignas(alignSize) Flags
129 std::atomic_bool forceQuit =
false;
130 std::atomic_bool hasWriteWaiter =
false;
131 std::atomic_bool hasReadWaiter =
false;
136 pmutex_type _producerMutex;
137 std::condition_variable _cvReader;
138 std::condition_variable _cvWriter;
141 uint32_t _spinWaitCounter = 1'000;
142 int _readEventFd = -1;
145template<
typename T,
bool sp, std::
size_t Capacity>
146SyncQueue<T, sp, Capacity>::SyncQueue()
147 : _read {
std::begin(_queue) }
148 , _write {
std::begin(_queue) }
152template<
typename T,
bool sp, std::
size_t Capacity>
153SyncQueue<T, sp, Capacity>::~SyncQueue()
155 _flags.forceQuit.store(
true, std::memory_order_release);
156 if (_readEventFd != -1)
162 if constexpr (!std::is_trivially_destructible_v<T>)
165 for (
auto rItr = _read.itr.load(std::memory_order_relaxed); rItr != _write.itr.load(std::memory_order_relaxed);)
167 reinterpret_cast<T *
>(rItr)->~T();
170 if (rItr == std::end(_queue))
171 rItr = std::begin(_queue);
176template<
typename T,
bool sp, std::
size_t Capacity>
177bool SyncQueue<T, sp, Capacity>::empty() const noexcept
182template<
typename T,
bool sp, std::
size_t Capacity>
183bool SyncQueue<T, sp, Capacity>::full() const noexcept
185 return size() == capacity();
188template<
typename T,
bool sp, std::
size_t Capacity>
189typename SyncQueue<T, sp, Capacity>::size_type SyncQueue<T, sp, Capacity>::size() const noexcept
191 auto const rPos = _read.snapshot.load(std::memory_order_acquire) /
sizeof(T);
192 auto const wPos = _write.snapshot.load(std::memory_order_acquire) /
sizeof(T);
194 return wPos >= rPos ? (wPos - rPos) : (Capacity + 1 + wPos - rPos);
197template<
typename T,
bool sp, std::
size_t Capacity>
198int SyncQueue<T, sp, Capacity>::createReadEventFd(
bool nonblock)
200 if (_readEventFd != -1)
203 int flags = EFD_CLOEXEC;
205 flags |= EFD_NONBLOCK;
207 _readEventFd = eventfd(size(), flags);
208 B_ASSERT(_readEventFd != -1,
"Failed to create eventfd: {}", strerror(errno));
212template<
typename T,
bool sp, std::
size_t Capacity>
213void SyncQueue<T, sp, Capacity>::forceQuit() noexcept
216 std::lock_guard<std::mutex> lock(_mutex);
217 _flags.forceQuit.store(
true, std::memory_order_release);
220 _cvReader.notify_all();
221 _cvWriter.notify_all();
224template<
typename T,
bool sp, std::
size_t Capacity>
225template<
typename ... Args>
226void SyncQueue<T, sp, Capacity>::emplace(Args && ... args)
228 std::unique_lock<pmutex_type> pLock(_producerMutex, std::defer_lock);
232 auto const wItr = _write.itr.load(std::memory_order_relaxed);
233 auto nextWItr = wItr +
sizeof(T);
234 if (nextWItr == std::end(_queue))
235 nextWItr = std::begin(_queue);
236 auto const isQueueFull = [nextWItr,
this]()
noexcept {
return nextWItr == _read.itr.load(std::memory_order_acquire); };
240 for (uint32_t counter = _spinWaitCounter; counter > 0 && isQueueFull(); --counter)
245 std::unique_lock<mutex_type> lock(_mutex);
246 _flags.hasWriteWaiter.store(
true, std::memory_order_release);
247 _cvWriter.wait(lock, [&isQueueFull,
this]()
noexcept
250 return !isQueueFull() || UNLIKELY(_flags.forceQuit.load(std::memory_order_acquire));
253 _flags.hasWriteWaiter.store(
false, std::memory_order_release);
255 if (UNLIKELY(_flags.forceQuit.load(std::memory_order_relaxed)))
260 new (wItr) T(std::forward<Args>(args)...);
270 std::lock_guard<mutex_type> lock(_mutex);
271 _write.itr.store(nextWItr, std::memory_order_release);
273 auto offset =
static_cast<size_type
>(std::distance(std::begin(_queue), nextWItr));
274 offset %= (Capacity + 1) *
sizeof(T);
275 _write.snapshot.store(offset, std::memory_order_release);
278 if (_flags.hasReadWaiter.load(std::memory_order_acquire) || _readEventFd != -1)
282template<
typename T,
bool sp, std::
size_t Capacity>
283typename SyncQueue<T, sp, Capacity>::opt_value_type SyncQueue<T, sp, Capacity>::front()
285 auto const rItr = _read.itr.load(std::memory_order_relaxed);
286 auto const isQueueEmpty = [&rItr,
this]()
noexcept {
return rItr == _write.itr.load(std::memory_order_acquire); };
290 for (uint32_t counter = _spinWaitCounter; counter > 0 && isQueueEmpty(); --counter)
295 std::unique_lock<mutex_type> lock(_mutex);
296 _flags.hasReadWaiter.store(
true, std::memory_order_release);
297 _cvReader.wait(lock, [&isQueueEmpty,
this]()
noexcept
300 return !isQueueEmpty() || UNLIKELY(_flags.forceQuit.load(std::memory_order_acquire));
303 _flags.hasReadWaiter.store(
false, std::memory_order_release);
305 if (UNLIKELY(_flags.forceQuit.load(std::memory_order_relaxed)))
310 return *std::launder(
reinterpret_cast<T *
>(rItr));
313template<
typename T,
bool sp, std::
size_t Capacity>
314typename SyncQueue<T, sp, Capacity>::opt_value_type SyncQueue<T, sp, Capacity>::tryFront() noexcept
316 auto const rItr = _read.itr.load(std::memory_order_relaxed);
317 if (rItr == _write.itr.load(std::memory_order_acquire))
320 return *std::launder(
reinterpret_cast<T *
>(rItr));
323template<
typename T,
bool sp, std::
size_t Capacity>
324void SyncQueue<T, sp, Capacity>::pop()
326 auto const rItr = _read.itr.load(std::memory_order_relaxed);
327 B_ASSERT(rItr != _write.itr.load(std::memory_order_acquire),
"Trying to pop an empty queue");
329 auto nextRItr = rItr +
sizeof(T);
330 if (nextRItr == std::end(_queue))
331 nextRItr = std::begin(_queue);
333 if constexpr (!std::is_trivially_destructible_v<T>)
334 reinterpret_cast<T *
>(rItr)->~T();
344 std::lock_guard<mutex_type> lock(_mutex);
345 _read.itr.store(nextRItr, std::memory_order_release);
347 auto offset =
static_cast<size_type
>(std::distance(std::begin(_queue), nextRItr));
348 offset %= (Capacity + 1) *
sizeof(T);
349 _read.snapshot.store(offset, std::memory_order_release);
352 if (_flags.hasWriteWaiter.load(std::memory_order_acquire))
356template<
typename T,
bool sp, std::
size_t Capacity>
357void SyncQueue<T, sp, Capacity>::_notifyReader() noexcept
359 _cvReader.notify_one();
361 if (_readEventFd != -1)
363 uint64_t
const val = 1;
364 write(_readEventFd, &val,
sizeof(val));
368template<
typename T,
bool sp, std::
size_t Capacity>
369void SyncQueue<T, sp, Capacity>::_notifyWriter() noexcept
371 _cvWriter.notify_one();