dfx 0.1.0
Linux-based dynamic dataflow executor
Loading...
Searching...
No Matches
ThreadPool.hpp
1// SPDX-FileCopyrightText: 2025-2026 Vincent Leroy
2// SPDX-License-Identifier: MIT
3//
4// This file is part of dfx.
5//
6// Licensed under the MIT License. See the LICENSE file in the project root
7// for full license information.
8
9#pragma once
10
11// Standard includes
12#include <list>
13#include <mutex>
14#include <optional>
15#include <shared_mutex>
16#include <span>
17#include <thread>
18#include <vector>
19
20// Project includes
21#include "ThreadData.hpp"
22#include <dfx-runtime-api/Task.hpp>
23#include <dfx-utilities/sync-queues/PriorityStableQueue.hpp>
24
25namespace dfx::Runtime
26{
56{
57public:
63 explicit ThreadPool(std::size_t threadCount = 1);
64
71
77 void addThread(std::size_t count = 1);
78
89 void removeThread(std::size_t count = 1);
90
92 std::size_t threadCount() const noexcept { std::lock_guard lock(_mutexThreads); return _threads.size(); }
93
94public:
98
101 void pushTasks(std::span<Api::TaskPtr> tasks);
102
108 template<Api::DerivedFromTask T, typename ... Args>
109 requires std::is_constructible_v<T, Args...>
110 void emplaceTask(Args && ... args)
111 { pushTask(std::make_unique<T>(std::forward<Args>(args)...)); }
112
113public:
115 using MaybeThreadData = std::optional<std::reference_wrapper<ThreadData>>;
117 using MaybeCThreadData = std::optional<std::reference_wrapper<ThreadData const>>;
118
124
130
131private:
132 void _work(std::stop_token stopToken);
133
134private:
136
137private:
138 // Use a list instead of a map so interators and references
139 // aren't invalidated when inserting / removing elements
140 std::list<ThreadData> _threadDatas;
141 mutable std::shared_mutex _mutexThreadData;
142
143private:
144 mutable std::mutex _mutexThreads;
145 // The threads have to be delete first in the dtor
146 std::vector<std::jthread> _threads;
147};
148} // !namespace dfx::Runtime
void emplaceTask(Args &&... args)
Construct a task in-place and submit it.
Definition ThreadPool.hpp:110
std::size_t threadCount() const noexcept
Current number of worker threads.
Definition ThreadPool.hpp:92
MaybeThreadData threadData()
Retrieve the calling thread's ThreadData if it belongs to this pool.
std::optional< std::reference_wrapper< ThreadData > > MaybeThreadData
Optional reference to mutable thread data.
Definition ThreadPool.hpp:115
std::optional< std::reference_wrapper< ThreadData const > > MaybeCThreadData
Optional reference to immutable thread data.
Definition ThreadPool.hpp:117
MaybeCThreadData threadData() const
Retrieve the calling thread's ThreadData if it belongs to this pool.
void addThread(std::size_t count=1)
Add worker threads to the pool.
void pushTask(Api::TaskPtr task)
Submit a task for execution.
~ThreadPool()
Stop all worker threads and abandon remaining tasks.
ThreadPool(std::size_t threadCount=1)
Construct a thread pool with an initial number of worker threads.
void pushTasks(std::span< Api::TaskPtr > tasks)
Submit multiple tasks for execution.
void removeThread(std::size_t count=1)
Remove worker threads from the pool.
A thread-safe, priority-based queue that maintains insertion order for equal priorities.
Definition PriorityStableQueue.hpp:50
Convenience concept for constraining templates to Task-derived types.
Definition Task.hpp:24
std::unique_ptr< Task > TaskPtr
Unique ownership pointer for tasks.
Definition Task.hpp:102
Definition Node.hpp:48