dfx 0.1.0
Linux-based dynamic dataflow executor
Loading...
Searching...
No Matches
Node.hpp
Go to the documentation of this file.
1// SPDX-FileCopyrightText: 2025-2026 Vincent Leroy
2// SPDX-License-Identifier: MIT
3//
4// This file is part of dfx.
5//
6// Licensed under the MIT License. See the LICENSE file in the project root
7// for full license information.
8
9#pragma once
10
11// Standard includes
12#include <concepts>
13#include <future>
14#include <mutex>
15#include <string>
16
17// Third-party includes
18#include <nlohmann/json.hpp>
19
20// Project includes
21#include "ports/InputPort.hpp"
22#include "ports/OutputPort.hpp"
23#include <dfx-utilities/CompilerSupport.hpp>
25#include <dfx-utilities/MonotonicIdAllocator.hpp>
26#include <dfx-utilities/StringMap.hpp>
27
41
42namespace dfx::Graph
43{
44class NodeFactory;
45} // !namespace dfx::Graph
46
48{
49class NodeReactor;
50class PollerProxy;
51class NodeTaskExecutor;
52using NodeTaskExecutorPtr = std::unique_ptr<NodeTaskExecutor>;
53} // !namespace dfx::Runtime::Api
54
55namespace dfx::FdWatch
56{
57class Poller;
58} // !namespace dfx::FdWatch
59
60namespace dfx::Core
61{
62class Node;
64using NodePtr = std::shared_ptr<Node>;
66using NodeWPtr = std::weak_ptr<Node>;
67
69template<typename T>
70concept DerivedFromNode = std::derived_from<T, Node>;
71
93class Node : public std::enable_shared_from_this<Node>
94{
95 friend class Graph::NodeFactory;
96 friend class InputPort;
97
98 using InputPortContainer = std::vector<std::pair<std::string, InputPort>>;
99 using OutputPortContainer = std::vector<std::pair<std::string, OutputPort>>;
100
101public:
113
114public:
116 using Id = uint32_t;
117
118public:
126 Node(std::string type, Id id, std::string name);
129 virtual ~Node() = 0;
130
133
135 std::string const & type() const noexcept { return _type; }
137 std::string const & name() const noexcept { return _name; }
139 Id id() const noexcept { return _id; }
141 nlohmann::json const & config() const noexcept { return _config; }
143 ExecutionFlowPolicy executionFlowPolicy() const noexcept { return _execPolicy; }
145 bool isRunning() const noexcept { return _isRunning.load(std::memory_order::acquire); }
152 bool allowsMimeTypePropagation() const noexcept { return _allowsMimeTypePropagation; }
153
155
156public:
166 void setName(std::string newName);
167
168public:
172 template<DerivedFromNode T>
173 bool is() const noexcept { return dynamic_cast<T const *>(this) != nullptr; }
174
181 template<DerivedFromNode T>
182 T & as() noexcept { return static_cast<T &>(*this); }
183
190 template<DerivedFromNode T>
191 T const & as() const noexcept { return static_cast<T const &>(*this); }
192
193public:
196
206 void start();
207
214 void stop();
215
223
227 bool stopRequested() const noexcept { return _stopRequested.test(std::memory_order::acquire); }
228
230
231public:
234
236 InputPortContainer const & inputPorts() const noexcept { return _inputPorts; }
238 OutputPortContainer const & outputPorts() const noexcept { return _outputPorts; }
239
241 bool hasInputPort(std::string_view portName) const noexcept
242 { return _getInputPortItr(portName) != _inputPorts.end(); }
243
244 bool hasOutputPort(std::string_view portName) const noexcept
245 { return _getOutputPortItr(portName) != _outputPorts.end(); }
246
248 bool hasInputPort(Port::Id id) const noexcept
249 { return _getInputPortItr(id) != _inputPorts.end(); }
250
251 bool hasOutputPort(Port::Id id) const noexcept
252 { return _getOutputPortItr(id) != _outputPorts.end(); }
253
256 InputPort const & inputPort(std::string_view portName) const;
259 OutputPort const & outputPort(std::string_view portName) const;
262 InputPort & inputPort(std::string_view portName);
265 OutputPort & outputPort(std::string_view portName);
266
269 InputPort const & inputPort(Port::Id id) const;
272 OutputPort const & outputPort(Port::Id id) const;
275 InputPort & inputPort(Port::Id id);
279
281
282public:
285
292 void sendMessage(Port::Id id, MessagePtr message);
293
300 void sendMessage(std::string_view portName, MessagePtr message);
301
306 void sendMessage(OutputPort & port, MessagePtr message);
307
309
310public:
313
315 void lock() { _nodeLock.lock(); }
317 void unlock() { _nodeLock.unlock(); }
319 bool try_lock() noexcept { return _nodeLock.try_lock(); }
320
322
323public:
335 static void validateNodeName(std::string_view name);
336
337public:
340
349
351 Runtime::Api::NodeReactor * reactor() const noexcept { return _reactor; }
352
362
368 template<typename T = Runtime::Api::NodeTaskExecutor>
369 T * customExecutor() const noexcept { return static_cast<T *>(_customExecutor.get()); }
370
371protected:
383
387 void revokeTaskExecutor() noexcept;
388
390
391public:
394
399 virtual nlohmann::json configSchema() const = 0;
404 virtual nlohmann::json metadata() const = 0;
405
407
408protected:
416 void initialize(nlohmann::json config);
417
422 void setAllowsMimeTypePropagation(bool allowed) { _allowsMimeTypePropagation = allowed; }
423
424protected:
427
429 virtual void startImpl() {}
434 virtual void stopImpl() { onNodeStopped(); }
436 virtual void initializeImpl(nlohmann::json config) = 0;
437
443
445
446protected:
449
460 OutputPort & registerOutputPort(std::string name, MimeType mimeType = MimeType::Any, std::optional<bool> allowsMimeTypePropagation = std::nullopt);
461
472 InputPort & registerInputPort(std::string name, MimeTypes supportedMimeTypes = {}, std::optional<bool> allowsMimeTypePropagation = std::nullopt);
473
482
490 InputPort & registerControlInputPort(std::string name);
491
493
494protected:
499 virtual void setExecutionFlowPolicy(ExecutionFlowPolicy policy) { _execPolicy = policy; }
500
508 virtual void handleMessage(InputPort const & port, MessagePtr message) = 0;
509
510private:
511 OutputPort & _registerOutputPort(std::string name, Kind kind, MimeType mimeType, std::optional<bool> allowsMimeTypePropagation);
512 InputPort & _registerInputPort(std::string name, Kind kind, MimeTypes supportedMimeTypes, std::optional<bool> allowsMimeTypePropagation);
513
514 InputPortContainer::iterator _getInputPortItr(std::string_view name) noexcept;
515 InputPortContainer::const_iterator _getInputPortItr(std::string_view name) const noexcept;
516 OutputPortContainer::iterator _getOutputPortItr(std::string_view name) noexcept;
517 OutputPortContainer::const_iterator _getOutputPortItr(std::string_view name) const noexcept;
518
519 InputPortContainer::iterator _getInputPortItr(Port::Id id) noexcept;
520 InputPortContainer::const_iterator _getInputPortItr(Port::Id id) const noexcept;
521 OutputPortContainer::iterator _getOutputPortItr(Port::Id id) noexcept;
522 OutputPortContainer::const_iterator _getOutputPortItr(Port::Id id) const noexcept;
523
524private:
525 Id _id;
526 std::string _name;
527 std::string _type;
528 nlohmann::json _config;
529 std::atomic_bool _isRunning = false;
530 std::atomic_flag _stopRequested;
531 std::promise<void> _stopPromise;
532 std::shared_future<void> _stopFuture;
533 bool _allowsMimeTypePropagation = true;
534
536
537 std::recursive_mutex _nodeLock;
538 Runtime::Api::NodeReactor * _reactor = nullptr;
539 std::unique_ptr<Runtime::Api::PollerProxy> _pollerProxy;
540 Runtime::Api::NodeTaskExecutorPtr _customExecutor;
541
542private:
543 InputPortContainer _inputPorts;
544 OutputPortContainer _outputPorts;
545
546private:
548};
549} // !namespace dfx::Core
550
569#define DFX_NODE(typeName) \
570public: \
571 static constexpr auto type = typeName; \
572 \
573public: \
574 static nlohmann::json staticConfigSchema(); \
575 nlohmann::json configSchema() const override { return staticConfigSchema(); } \
576 static nlohmann::json staticMetadata(); \
577 nlohmann::json metadata() const override { return staticMetadata(); }\
578private:
579
Macro-based enum <-> string utilities for dfx.
#define DFX_DECLARE_ENUM_STRING_FUNCTIONS(E)
Declare the enum string API (and std::formatter) for enum type E.
Definition EnumString.hpp:327
MIME type value object.
Definition MimeType.hpp:44
@ Any
"" (empty string), meaning "accept anything"
Definition MimeType.hpp:67
bool isRunning() const noexcept
Check of the node is currently running.
Definition Node.hpp:145
virtual void setExecutionFlowPolicy(ExecutionFlowPolicy policy)
Set the execution flow policy used by the runtime scheduler.
Definition Node.hpp:499
ExecutionFlowPolicy
Policy controlling how execution is chained after message delivery.
Definition Node.hpp:109
@ AlwaysDispatch
Always enqueue next task for deferred execution.
Definition Node.hpp:110
@ InlineIfAvailable
Run inline on the current thread when possible.
Definition Node.hpp:111
virtual ~Node()=0
Virtual destructor. Pure virtual to ensure Node is abstract, but defined out-of-line.
OutputPortContainer const & outputPorts() const noexcept
Get all output ports of all Kind.
Definition Node.hpp:238
virtual nlohmann::json configSchema() const =0
Return the JSON schema describing the node configuration. Used by the dfx::Graph::NodeFactory / tooli...
bool allowsMimeTypePropagation() const noexcept
Whether this node allows mime-type propagation through its ports.
Definition Node.hpp:152
OutputPort const & outputPort(std::string_view portName) const
Get an output port by name.
virtual void stopImpl()
Called by stop before the node is marked stopped.
Definition Node.hpp:434
void start()
Start the node.
uint32_t Id
Node identifier type (unique and stable within a graph instance).
Definition Node.hpp:116
ExecutionFlowPolicy executionFlowPolicy() const noexcept
Get the node execution policy.
Definition Node.hpp:143
InputPort const & inputPort(std::string_view portName) const
Get an input port by name.
OutputPort & outputPort(std::string_view portName)
Get an output port by name (mutable).
void setName(std::string newName)
Rename the node.
virtual void handleMessage(InputPort const &port, MessagePtr message)=0
Handle an incoming message on an input port.
bool hasOutputPort(std::string_view portName) const noexcept
Check whether an output port exists by name.
Definition Node.hpp:244
static void validateNodeName(std::string_view name)
Validate a node name.
T const & as() const noexcept
Cast this node to a derived type (unchecked).
Definition Node.hpp:191
void lock()
Lock the node's internal recursive mutex.
Definition Node.hpp:315
void unlock()
Unlock the node's internal recursive mutex.
Definition Node.hpp:317
InputPort const & inputPort(Port::Id id) const
Get an input port by id.
void setAllowsMimeTypePropagation(bool allowed)
Enable/disable global mime-type propagation for this node. This affects how the runtime may propagate...
Definition Node.hpp:422
bool hasInputPort(std::string_view portName) const noexcept
Check whether an input port exists by name.
Definition Node.hpp:241
bool hasOutputPort(Port::Id id) const noexcept
Check whether an output port exists by id.
Definition Node.hpp:251
void assignTaskExecutor(Runtime::Api::NodeTaskExecutorPtr executor)
Assigns a custom task executor to this node.
void setReactor(Runtime::Api::NodeReactor *reactor)
Attach the runtime reactor used by this node.
bool stopRequested() const noexcept
Check if stop has already been called or not.
Definition Node.hpp:227
InputPort & inputPort(Port::Id id)
Get an input port by id (mutable).
virtual void startImpl()
Called by start before the node is marked running.
Definition Node.hpp:429
bool is() const noexcept
Check whether this node is of a given derived type.
Definition Node.hpp:173
FdWatch::Poller & poller()
Access a poller instance associated with this node.
Id id() const noexcept
Get the node id.
Definition Node.hpp:139
Node(std::string type, Id id, std::string name)
Construct a node instance.
InputPort & registerInputPort(std::string name, MimeTypes supportedMimeTypes={}, std::optional< bool > allowsMimeTypePropagation=std::nullopt)
Register a data input port.
OutputPort & outputPort(Port::Id id)
Get an output port by id (mutable).
void sendMessage(OutputPort &port, MessagePtr message)
Send a message on the provided output port.
InputPort & inputPort(std::string_view portName)
Get an input port by name (mutable).
T & as() noexcept
Cast this node to a derived type (unchecked).
Definition Node.hpp:182
void onNodeStopped()
Must be called by classes that re-implement the stopImpl function.
nlohmann::json const & config() const noexcept
Get the node configuration.
Definition Node.hpp:141
bool try_lock() noexcept
Try to lock the node's internal recursive mutex.
Definition Node.hpp:319
void waitForStopped()
Block the current thread until the node is stopped.
OutputPort & registerOutputPort(std::string name, MimeType mimeType=MimeType::Any, std::optional< bool > allowsMimeTypePropagation=std::nullopt)
Register a data output port.
bool hasInputPort(Port::Id id) const noexcept
Check whether an input port exists by id.
Definition Node.hpp:248
InputPort & registerControlInputPort(std::string name)
Register a control input port.
virtual void initializeImpl(nlohmann::json config)=0
Called by initialize; derived nodes must parse config here.
void sendMessage(std::string_view portName, MessagePtr message)
Send a message on an output port by name.
std::string const & name() const noexcept
Get the node name.
Definition Node.hpp:137
void stop()
Stop the node.
void revokeTaskExecutor() noexcept
Revokes a custom task executor, returning this node to the default execution model.
Runtime::Api::NodeReactor * reactor() const noexcept
Get the currently attached reactor (may be null if not set).
Definition Node.hpp:351
virtual nlohmann::json metadata() const =0
Return node metadata (description, categories, ports, etc.). Intended for UI/tooling and discovery.
void sendMessage(Port::Id id, MessagePtr message)
Send a message on an output port by id.
std::string const & type() const noexcept
Get the node type.
Definition Node.hpp:135
OutputPort & registerControlOutputPort(std::string name)
Register a control output port.
OutputPort const & outputPort(Port::Id id) const
Get an output port by id.
T * customExecutor() const noexcept
Access the customer executor of this node if any.
Definition Node.hpp:369
void initialize(nlohmann::json config)
Initialize the node with its configuration.
InputPortContainer const & inputPorts() const noexcept
Get all input ports of all Kind.
Definition Node.hpp:236
Outgoing message endpoint attached to a node.
Definition OutputPort.hpp:47
uint32_t Id
Identifier type of a port (unique within a node by not unique accross a graph).
Definition Port.hpp:69
Abstract interface for FD-based event polling.
Definition Poller.hpp:37
Runtime node instantiation and validation facility (builders + config/metadata schemas).
Definition NodeFactory.hpp:60
Node-aware reactor API for FD watching and deferred execution in the runtime.
Definition NodeReactor.hpp:60
Monotonically increasing ID allocator.
Definition MonotonicIdAllocator.hpp:52
Concept used by dfx::Core::Node::is() and dfx::Core::Node::as() to constrain types.
Definition Node.hpp:70
Definition Channel.hpp:25
std::unique_ptr< Message > MessagePtr
Unique ownership handle for messages.
Definition Message.hpp:27
std::vector< MimeType > MimeTypes
Convenience alias for a list of MIME types.
Definition MimeType.hpp:130
std::shared_ptr< Node > NodePtr
Shared ownership pointer type for Nodes..
Definition Endpoint.hpp:21
std::weak_ptr< Node > NodeWPtr
Weak pointer type for Nodes.
Definition Node.hpp:66
Kind
Port kind (connection domain).
Definition Kind.hpp:29
Definition SocketClient.hpp:23
Definition Node.hpp:43
Definition MimeTypeRouter.hpp:18
Definition Node.hpp:48
std::unique_ptr< NodeTaskExecutor > NodeTaskExecutorPtr
Unique ownership pointer type for NodeTaskExecutor.
Definition Node.hpp:52