dfx 0.1.0
Linux-based dynamic dataflow executor
Loading...
Searching...
No Matches
Scheduler.hpp
1// SPDX-FileCopyrightText: 2025 Vincent Leroy
2// SPDX-License-Identifier: MIT
3//
4// This file is part of dfx.
5//
6// Licensed under the MIT License. See the LICENSE file in the project root
7// for full license information.
8
9#pragma once
10
11// Standard includes
12#include <mutex>
13
14// Project includes
15#include "FdListener.hpp"
16#include "ThreadPool.hpp"
17#include <dfx-fdwatch/BorrowedFd.hpp>
18#include <dfx-fdwatch/OwnedFd.hpp>
19#include <dfx-hooks/Delivery.hpp>
20#include <dfx-hooks/Graph.hpp>
21#include <dfx-runtime-api/NodeReactor.hpp>
23#include <dfx-utilities/SystemConfig.hpp>
24
25namespace dfx::Runtime
26{
89{
90private:
91 struct FdData
92 {
95 Core::NodeWPtr node;
96 };
97
98 struct DeferedCbData
99 {
100 FdWatch::Callback callback;
101 Core::NodeWPtr node;
102 };
103
104 using FdDatas = std::vector<FdData>;
105
106public:
114
117
121 ~Scheduler() override;
122
123public:
128 template<DerivedFromTask T, typename ... Args>
129 void emplaceTask(Args && ... args)
130 { _threadPool.emplaceTask<T>(std::forward<Args>(args)...); }
131
133 void pushTask(TaskPtr task)
134 { _threadPool.pushTask(std::move(task)); }
135
136public:
141 void start();
142
147 void stop();
148
150 bool isRunning() const noexcept { return _isRunning; }
151
152public:
163 Core::MessagePtr & message) override;
164
173 Core::NodePtr dst, Core::InputPort & in) override;
174
184 void registerNodeFd(FdWatch::BorrowedFd fd, FdWatch::EventInterests events, FdWatch::FdCallback cb, Core::NodeWPtr node) override;
185
190 void deregisterNodeFd(FdWatch::BorrowedFd fd) noexcept override;
191
197
202 void onNodeCreated(Core::NodePtr node) override;
203
209 void onNodeRemoved(Core::NodePtr node) override;
210
215 void onChannelCreated(Core::ChannelPtr channel) override;
216
221 void onChannelRemoved(Core::ChannelPtr channel) override;
222
223private:
224 FdDatas::iterator _getFdData(FdWatch::BorrowedFd fd) noexcept;
225 FdDatas::const_iterator _getFdData(FdWatch::BorrowedFd fd) const noexcept;
226
227private:
228 void _onFdTriggered(FdWatch::BorrowedFd fd, FdWatch::EventTriggers events);
229
230private:
231 std::atomic_bool _isRunning = false;
232
233private:
234 std::atomic_uint32_t _maxNesting = 0;
235
236private:
237 std::list<Core::NodeWPtr> _nodes;
238 std::mutex _nodeMutex;
239
240private:
241 FdDatas _fdData;
242 mutable std::mutex _fdMutex;
243
244private:
245 // Destroy the FdListener then the ThreadPool
246 // in order to avoid issue with a pending task
247 // that will execute code in the scheduler that
248 // is actively being destroyed
249 ThreadPool _threadPool;
250 FdListener _fdListener;
251};
252} // !namespace dfx::Runtime
Convenience macros to explicitly control copy and move semantics.
Incoming message endpoint attached to a node.
Definition InputPort.hpp:55
Outgoing message endpoint attached to a node.
Definition OutputPort.hpp:47
Non-owning wrapper around a file descriptor.
Definition BorrowedFd.hpp:37
Hook interface invoked around message enqueue in an dfx::Core::Channel.
Definition Delivery.hpp:50
HookResult
Result code controlling pushMessage() behavior.
Definition Delivery.hpp:74
Hooks related to the lifecycle of graph objects (nodes and channels).
Definition Graph.hpp:54
Node-aware reactor API for FD watching and deferred execution in the runtime.
Definition NodeReactor.hpp:59
Background FD event listener running its own polling thread.
Definition FdListener.hpp:50
void start()
Start all known nodes.
void onChannelCreated(Core::ChannelPtr channel) override
Graph hook invoked when a channel is created.
void deregisterNodeFd(FdWatch::BorrowedFd fd) noexcept override
Deregister a previously registered FD.
HookResult postDelivery(Core::NodePtr src, Core::OutputPort const &out, Core::NodePtr dst, Core::InputPort &in) override
Delivery hook executed after a message has been enqueued into a channel.
void onNodeCreated(Core::NodePtr node) override
Graph hook invoked when a node is created.
void stop()
Stop all known nodes.
void onNodeRemoved(Core::NodePtr node) override
Graph hook invoked when a node is removed.
~Scheduler() override
Destroy the scheduler and stop it if running.
void pushTask(TaskPtr task)
Submit an already-constructed task to the worker pool.
Definition Scheduler.hpp:133
void emplaceTask(Args &&... args)
Convenience helper to construct and submit a task directly to the worker pool.
Definition Scheduler.hpp:129
void registerNodeFd(FdWatch::BorrowedFd fd, FdWatch::EventInterests events, FdWatch::FdCallback cb, Core::NodeWPtr node) override
Register an FD watch on behalf of a node.
bool isRunning() const noexcept
Whether the runtime is currently started.
Definition Scheduler.hpp:150
HookResult preDelivery(Core::NodePtr src, Core::OutputPort const &out, Core::NodePtr dst, Core::InputPort &in, Core::MessagePtr &message) override
Delivery hook executed before a message is enqueued into a channel.
void deferNodeCall(FdWatch::Callback cb, Core::NodeWPtr node) override
Defer a callback to execute under the node lock on a worker thread.
void onChannelRemoved(Core::ChannelPtr channel) override
Graph hook invoked when a channel is removed.
Scheduler(Utils::SystemConfig &sysConfig)
Construct a scheduler and register runtime settings into sysConfig.
DISABLE_COPY_AND_MOVE(Scheduler)
Scheduler is neither copyable nor movable.
Runtime thread pool executing Task instances with priorities and FIFO ordering.
Definition ThreadPool.hpp:56
System configuration registry with typed values, entry metadata, and change callbacks.
Definition SystemConfig.hpp:81
Convenience concept for constraining templates to Task-derived types.
Definition Task.hpp:24
std::unique_ptr< Message > MessagePtr
Unique ownership handle for messages.
Definition Message.hpp:27
std::shared_ptr< Node > NodePtr
Shared ownership pointer type for Nodes..
Definition Node.hpp:61
std::weak_ptr< Node > NodeWPtr
Weak pointer type for Nodes.
Definition Node.hpp:63
std::shared_ptr< Channel > ChannelPtr
Shared ownership handle for channels.
Definition Channel.hpp:147
std::function< void(BorrowedFd, EventTriggers)> FdCallback
Invoked when events occur on a watched file descriptor.
Definition Callback.hpp:28
std::move_only_function< void()> Callback
Generic move-only callback with no arguments.
Definition Callback.hpp:31
Definition Node.hpp:47
std::unique_ptr< Task > TaskPtr
Unique ownership pointer for tasks.
Definition Task.hpp:102