dfx 0.1.0
Linux-based dynamic dataflow executor
Loading...
Searching...
No Matches
Scheduler.hpp
1// SPDX-FileCopyrightText: 2025-2026 Vincent Leroy
2// SPDX-License-Identifier: MIT
3//
4// This file is part of dfx.
5//
6// Licensed under the MIT License. See the LICENSE file in the project root
7// for full license information.
8
9#pragma once
10
11// Standard includes
12#include <mutex>
13
14// Project includes
15#include "FdListener.hpp"
16#include "ThreadPool.hpp"
17#include <dfx-utilities/BorrowedFd.hpp>
18#include <dfx-utilities/OwnedFd.hpp>
19#include <dfx-hooks/Delivery.hpp>
20#include <dfx-hooks/Graph.hpp>
21#include <dfx-runtime-api/NodeReactor.hpp>
23#include <dfx-utilities/SystemConfig.hpp>
24
25namespace dfx::Runtime
26{
89{
90private:
91 struct FdData
92 {
95 Core::NodeWPtr node;
96 FdWatch::EventInterests currentInterests;
97 bool isArmed;
98 };
99
100 struct DeferedCbData
101 {
102 FdWatch::Callback callback;
103 Core::NodeWPtr node;
104 };
105
106 using FdDatas = std::vector<FdData>;
107
108public:
115 explicit Scheduler(Utils::SystemConfig & sysConfig);
116
119
123 ~Scheduler() override;
124
125public:
130 template<Api::DerivedFromTask T, typename ... Args>
131 void emplaceTask(Args && ... args)
132 { _threadPool.emplaceTask<T>(std::forward<Args>(args)...); }
133
136 { _threadPool.pushTask(std::move(task)); }
137
138public:
143 void start();
144
149 void stop();
150
152 bool isRunning() const noexcept { return _isRunning; }
153
154public:
164 Core::MessagePtr & message) override;
165
173 HookResult postDelivery(Core::Endpoint const & src, Core::Endpoint const & dst) override;
174
184 void registerNodeFd(Utils::BorrowedFd fd, FdWatch::EventInterests events, FdWatch::FdCallback cb, Core::NodeWPtr node) override;
185
190 void deregisterNodeFd(Utils::BorrowedFd fd) noexcept override;
191
197
199 void updateNodeFdEvents(Utils::BorrowedFd fd, FdWatch::EventInterests events) override;
200
205 void onNodeCreated(Core::NodePtr node) override;
206
212 void onNodeRemoved(Core::NodePtr node) override;
213
218 void onChannelCreated(Core::ChannelPtr channel) override;
219
224 void onChannelRemoved(Core::ChannelPtr channel) override;
225
226private:
227 void _deliverTaskToNode(Api::TaskPtr task, Core::NodeWPtr node);
228 void _deliverTasksToNode(std::span<Api::TaskPtr> tasks, Core::NodeWPtr node);
229
230 FdDatas::iterator _getFdData(Utils::BorrowedFd fd) noexcept;
231 FdDatas::const_iterator _getFdData(Utils::BorrowedFd fd) const noexcept;
232
233private:
234 void _onFdTriggered(std::vector<FdListener::FdInfo> fdInfos);
235
236private:
237 std::atomic_bool _isRunning = false;
238
239private:
240 std::atomic_uint32_t _maxNesting = 0;
241
242private:
243 std::list<Core::NodeWPtr> _nodes;
244 std::mutex _nodeMutex;
245
246private:
247 FdDatas _fdData;
248 mutable std::mutex _fdMutex;
249
250private:
251 // Destroy the FdListener then the ThreadPool
252 // in order to avoid issue with a pending task
253 // that will execute code in the scheduler that
254 // is actively being destroyed
255 ThreadPool _threadPool;
256 FdListener _fdListener;
257};
258} // !namespace dfx::Runtime
Convenience macros to explicitly control copy and move semantics.
Definition Endpoint.hpp:24
Hook interface invoked around message enqueue in an dfx::Core::Channel.
Definition Delivery.hpp:39
HookResult
Result code controlling pushMessage() behavior.
Definition Delivery.hpp:63
Hooks related to the lifecycle of graph objects (nodes and channels).
Definition Graph.hpp:54
Node-aware reactor API for FD watching and deferred execution in the runtime.
Definition NodeReactor.hpp:60
Background FD event listener running its own polling thread.
Definition FdListener.hpp:50
void start()
Start all known nodes.
void onChannelCreated(Core::ChannelPtr channel) override
Graph hook invoked when a channel is created.
void deregisterNodeFd(Utils::BorrowedFd fd) noexcept override
Deregister a previously registered FD.
void onNodeCreated(Core::NodePtr node) override
Graph hook invoked when a node is created.
HookResult preDelivery(Core::Endpoint const &src, Core::Endpoint const &dst, Core::MessagePtr &message) override
Delivery hook executed before a message is enqueued into a channel.
void stop()
Stop all known nodes.
HookResult postDelivery(Core::Endpoint const &src, Core::Endpoint const &dst) override
Delivery hook executed after a message has been enqueued into a channel.
void onNodeRemoved(Core::NodePtr node) override
Graph hook invoked when a node is removed.
void pushTask(Api::TaskPtr task)
Submit an already-constructed task to the worker pool.
Definition Scheduler.hpp:135
~Scheduler() override
Destroy the scheduler and stop it if running.
void emplaceTask(Args &&... args)
Convenience helper to construct and submit a task directly to the worker pool.
Definition Scheduler.hpp:131
bool isRunning() const noexcept
Whether the runtime is currently started.
Definition Scheduler.hpp:152
void deferNodeCall(FdWatch::Callback cb, Core::NodeWPtr node) override
Defer a callback to execute under the node lock on a worker thread.
void onChannelRemoved(Core::ChannelPtr channel) override
Graph hook invoked when a channel is removed.
void updateNodeFdEvents(Utils::BorrowedFd fd, FdWatch::EventInterests events) override
Update the list of event that this fd is attached to.
void registerNodeFd(Utils::BorrowedFd fd, FdWatch::EventInterests events, FdWatch::FdCallback cb, Core::NodeWPtr node) override
Register an FD watch on behalf of a node.
DFX_DISABLE_COPY_AND_MOVE(Scheduler)
Scheduler is neither copyable nor movable.
Scheduler(Utils::SystemConfig &sysConfig)
Construct a scheduler and register runtime settings into sysConfig.
Runtime thread pool executing Api::Task instances with priorities and FIFO ordering.
Definition ThreadPool.hpp:56
Non-owning wrapper around a file descriptor.
Definition BorrowedFd.hpp:33
System configuration registry with typed values, entry metadata, and change callbacks.
Definition SystemConfig.hpp:84
Convenience concept for constraining templates to Task-derived types.
Definition Task.hpp:24
std::unique_ptr< Message > MessagePtr
Unique ownership handle for messages.
Definition Message.hpp:27
std::shared_ptr< Node > NodePtr
Shared ownership pointer type for Nodes..
Definition Endpoint.hpp:21
std::weak_ptr< Node > NodeWPtr
Weak pointer type for Nodes.
Definition Node.hpp:66
std::shared_ptr< Channel > ChannelPtr
Shared ownership handle for channels.
Definition Channel.hpp:137
std::move_only_function< void()> Callback
Generic move-only callback with no arguments.
Definition Callback.hpp:31
std::function< void(Utils::BorrowedFd, EventTriggers)> FdCallback
Invoked when events occur on a watched file descriptor.
Definition Callback.hpp:28
std::unique_ptr< Task > TaskPtr
Unique ownership pointer for tasks.
Definition Task.hpp:102
Definition Node.hpp:48