dfx 0.1.0
Linux-based dynamic dataflow executor
Loading...
Searching...
No Matches
dfx::Runtime::Scheduler Class Reference

Runtime scheduler and execution bridge between graph delivery, node execution, and FD events. More...

#include <dfx-runtime/Scheduler.hpp>

Inheritance diagram for dfx::Runtime::Scheduler:
[legend]
Collaboration diagram for dfx::Runtime::Scheduler:
[legend]

Public Member Functions

 Scheduler (Utils::SystemConfig &sysConfig)
 Construct a scheduler and register runtime settings into sysConfig.
 DISABLE_COPY_AND_MOVE (Scheduler)
 Scheduler is neither copyable nor movable.
 ~Scheduler () override
 Destroy the scheduler and stop it if running.
template<DerivedFromTask T, typename ... Args>
void emplaceTask (Args &&... args)
 Convenience helper to construct and submit a task directly to the worker pool.
void pushTask (TaskPtr task)
 Submit an already-constructed task to the worker pool.
void start ()
 Start all known nodes.
void stop ()
 Stop all known nodes.
bool isRunning () const noexcept
 Whether the runtime is currently started.
HookResult preDelivery (Core::NodePtr src, Core::OutputPort const &out, Core::NodePtr dst, Core::InputPort &in, Core::MessagePtr &message) override
 Delivery hook executed before a message is enqueued into a channel.
HookResult postDelivery (Core::NodePtr src, Core::OutputPort const &out, Core::NodePtr dst, Core::InputPort &in) override
 Delivery hook executed after a message has been enqueued into a channel.
void registerNodeFd (FdWatch::BorrowedFd fd, FdWatch::EventInterests events, FdWatch::FdCallback cb, Core::NodeWPtr node) override
 Register an FD watch on behalf of a node.
void deregisterNodeFd (FdWatch::BorrowedFd fd) noexcept override
 Deregister a previously registered FD.
void deferNodeCall (FdWatch::Callback cb, Core::NodeWPtr node) override
 Defer a callback to execute under the node lock on a worker thread.
void onNodeCreated (Core::NodePtr node) override
 Graph hook invoked when a node is created.
void onNodeRemoved (Core::NodePtr node) override
 Graph hook invoked when a node is removed.
void onChannelCreated (Core::ChannelPtr channel) override
 Graph hook invoked when a channel is created.
void onChannelRemoved (Core::ChannelPtr channel) override
 Graph hook invoked when a channel is removed.

Additional Inherited Members

Public Types inherited from dfx::Hooks::Delivery
enum class  HookResult { Proceed , Skip , Fail }
 Result code controlling pushMessage() behavior. More...

Detailed Description

Runtime scheduler and execution bridge between graph delivery, node execution, and FD events.

Scheduler is the central runtime component that:

  • orchestrates node lifecycle start/stop across a ThreadPool,
  • hooks into message delivery (via Hooks::Delivery) to decide inline vs deferred execution,
  • provides a node-aware FD reactor (via Api::NodeReactor) backed by a dedicated FdListener thread,
  • tracks graph topology changes (via Hooks::Graph) to attach/detach hooks and perform cleanup.

In practice, this is the "runtime spine" of dfx: channels deliver messages, the scheduler decides how the next node gets executed, and external events (FD readiness) are funneled back into node work items.

Execution model
Scheduler uses two layers:
  • FdListener : detects FD readiness on its own thread and invokes a single callback.
  • ThreadPool : executes runtime tasks (mostly node-bound tasks) on worker threads.

Any work that must touch a node is scheduled as a node-bound task (NodeTask), which:

  • checks node lifetime at execution time (weak pointer lock),
  • serializes entry into the node (node lock), ensuring no concurrent re-entry into the same node.
Inline delivery optimization (fast-path)
During preDelivery(), the scheduler may skip enqueuing into the channel and perform an inline call directly into the destination input port (i.e. the current thread runs the next node work immediately).

This is used to reduce latency and queue overhead when safe.

The inline fast-path is only taken when all of the following hold:

  • the runtime is started,
  • the destination node is already running,
  • the source node allows it (see Core::Node::ExecutionFlowPolicy, AlwaysDispatch forbids inline),
  • the current thread is a pool worker and has not exceeded the max nesting limit,
  • the destination node lock can be acquired immediately (try-lock).

If the inline fast-path is taken, the hook returns Hooks::Delivery::HookResult::Skip to prevent the channel from enqueuing the message.

Nesting / stack-safety
To protect against deep or cyclic graphs causing unbounded recursion, the scheduler enforces a maximum number of consecutive inline calls per worker thread.

The current nesting depth is tracked per worker thread via ThreadData::nestedLevel (retrieved via ThreadPool::threadData()). Once the configured limit is reached, the scheduler forces delivery to be deferred through the normal queue + task mechanism.

Graph integration
  • On channel creation, the scheduler attaches itself as the delivery hook.
  • On channel removal, it detaches and warns if messages are dropped.
  • On node creation, it sets itself as the node reactor and keeps a weak reference.
  • On node removal, it clears the node reactor and automatically deregisters any FD registered by that node.
Configuration
The scheduler registers runtime-tunable settings into the provided Utils::SystemConfig :
  • dfx::Utils::SystemConfigKeys::Runtime::threadPoolSize adjusts the worker count (the pool always keeps at least 1 thread).
  • dfx::Utils::SystemConfigKeys::Runtime::threadMaxNesting controls the inline nesting limit used to avoid stack overflow.
Warning
Node work (start/stop, input processing, FD callbacks, deferred callbacks) executes under the node lock. Avoid long blocking operations while holding the node lock, or you will stall all work for that node.

Constructor & Destructor Documentation

◆ Scheduler()

dfx::Runtime::Scheduler::Scheduler ( Utils::SystemConfig & sysConfig)

Construct a scheduler and register runtime settings into sysConfig.

The scheduler creates:

  • a ThreadPool initially sized to 1 worker thread,
  • a FdListener that forwards FD events to the scheduler, which then dispatches them to the thread pool as node-bound tasks.

◆ ~Scheduler()

dfx::Runtime::Scheduler::~Scheduler ( )
override

Destroy the scheduler and stop it if running.

Calls stop() to ensure nodes are stopped before destruction.

Member Function Documentation

◆ deferNodeCall()

void dfx::Runtime::Scheduler::deferNodeCall ( FdWatch::Callback cb,
Core::NodeWPtr node )
overridevirtual

Defer a callback to execute under the node lock on a worker thread.

Wraps cb into a node-bound task and submits it to the ThreadPool. Deferring a null callback is considered a bug and an exception will be throw.

Implements dfx::Runtime::Api::NodeReactor.

◆ deregisterNodeFd()

void dfx::Runtime::Scheduler::deregisterNodeFd ( FdWatch::BorrowedFd fd)
overridevirtualnoexcept

Deregister a previously registered FD.

Removes the FD from the FdListener and erases internal tracking. Invalid FDs are ignored.

Implements dfx::Runtime::Api::NodeReactor.

◆ DISABLE_COPY_AND_MOVE()

dfx::Runtime::Scheduler::DISABLE_COPY_AND_MOVE ( Scheduler )

Scheduler is neither copyable nor movable.

◆ emplaceTask()

template<DerivedFromTask T, typename ... Args>
void dfx::Runtime::Scheduler::emplaceTask ( Args &&... args)
inline

Convenience helper to construct and submit a task directly to the worker pool.

Template Parameters
TTask type derived from Task.
Parameters
argsConstructor arguments forwarded to T.

◆ isRunning()

bool dfx::Runtime::Scheduler::isRunning ( ) const
inlinenoexcept

Whether the runtime is currently started.

◆ onChannelCreated()

void dfx::Runtime::Scheduler::onChannelCreated ( Core::ChannelPtr channel)
overridevirtual

Graph hook invoked when a channel is created.

Sets this scheduler as the channel delivery hook (channel->setDeliveryHook(this)), enabling inline delivery decisions and deferred execution scheduling.

Implements dfx::Hooks::Graph.

◆ onChannelRemoved()

void dfx::Runtime::Scheduler::onChannelRemoved ( Core::ChannelPtr channel)
overridevirtual

Graph hook invoked when a channel is removed.

Clears the delivery hook and warns if messages were still pending in the channel, as those messages will be dropped as a result of channel removal.

Implements dfx::Hooks::Graph.

◆ onNodeCreated()

void dfx::Runtime::Scheduler::onNodeCreated ( Core::NodePtr node)
overridevirtual

Graph hook invoked when a node is created.

  • Sets this scheduler as the node reactor (node->setReactor(this)).
  • Stores a weak reference to the node for lifecycle management (start/stop, cleanup).

Implements dfx::Hooks::Graph.

◆ onNodeRemoved()

void dfx::Runtime::Scheduler::onNodeRemoved ( Core::NodePtr node)
overridevirtual

Graph hook invoked when a node is removed.

  • Clears the node reactor (node->setReactor(nullptr)).
  • Automatically deregisters any FD previously registered by that node.
  • Removes the node from the internal node list.

Implements dfx::Hooks::Graph.

◆ postDelivery()

HookResult dfx::Runtime::Scheduler::postDelivery ( Core::NodePtr src,
Core::OutputPort const & out,
Core::NodePtr dst,
Core::InputPort & in )
overridevirtual

Delivery hook executed after a message has been enqueued into a channel.

If the destination node is running, schedules a node-bound task that drains the input port via in.processAllPendingMessages().

If the destination node is not running, no task is scheduled: messages remain queued in the channel until the node is started.

Implements dfx::Hooks::Delivery.

◆ preDelivery()

HookResult dfx::Runtime::Scheduler::preDelivery ( Core::NodePtr src,
Core::OutputPort const & out,
Core::NodePtr dst,
Core::InputPort & in,
Core::MessagePtr & message )
overridevirtual

Delivery hook executed before a message is enqueued into a channel.

Decides whether to:

This method is also where mime-type mismatch warnings are emitted.

Implements dfx::Hooks::Delivery.

◆ pushTask()

void dfx::Runtime::Scheduler::pushTask ( TaskPtr task)
inline

Submit an already-constructed task to the worker pool.

◆ registerNodeFd()

void dfx::Runtime::Scheduler::registerNodeFd ( FdWatch::BorrowedFd fd,
FdWatch::EventInterests events,
FdWatch::FdCallback cb,
Core::NodeWPtr node )
overridevirtual

Register an FD watch on behalf of a node.

  • Registers the FD with the FdListener (which monitors it on its own thread).
  • Stores the node + callback association internally.

When the FD triggers, the scheduler will schedule a node-bound task that runs the callback under the node lock. Invalid FDs are ignored. Registering a null callback is considered a bug and an exception will be throw.

Implements dfx::Runtime::Api::NodeReactor.

◆ start()

void dfx::Runtime::Scheduler::start ( )

Start all known nodes.

For each registered node, a task is scheduled to call node->start() under the node lock. The call blocks until all start tasks have completed.

◆ stop()

void dfx::Runtime::Scheduler::stop ( )

Stop all known nodes.

For each registered node, a task is scheduled to call node->stop() under the node lock. The call blocks until all stop tasks have completed.


The documentation for this class was generated from the following file: