dfx 0.1.0
Linux-based dynamic dataflow executor
Loading...
Searching...
No Matches
Node.hpp
Go to the documentation of this file.
1// SPDX-FileCopyrightText: 2025 Vincent Leroy
2// SPDX-License-Identifier: MIT
3//
4// This file is part of dfx.
5//
6// Licensed under the MIT License. See the LICENSE file in the project root
7// for full license information.
8
9#pragma once
10
11// Standard includes
12#include <concepts>
13#include <mutex>
14#include <string>
15
16// Third-party includes
17#include <nlohmann/json.hpp>
18
19// Project includes
20#include "ports/InputPort.hpp"
21#include "ports/OutputPort.hpp"
22#include <dfx-utilities/CompilerSupport.hpp>
24#include <dfx-utilities/MonotonicIdAllocator.hpp>
25#include <dfx-utilities/StringMap.hpp>
26
40
41namespace dfx::Graph
42{
43class NodeFactory;
44} // !namespace dfx::Graph
45
47{
48class NodeReactor;
49class PollerProxy;
50} // !namespace dfx::Runtime::Api
51
52namespace dfx::FdWatch
53{
54class Poller;
55} // !namespace dfx::FdWatch
56
57namespace dfx::Core
58{
59class Node;
61using NodePtr = std::shared_ptr<Node>;
63using NodeWPtr = std::weak_ptr<Node>;
64
66template<typename T>
67concept DerivedFromNode = std::derived_from<T, Node>;
68
90class Node : public std::enable_shared_from_this<Node>
91{
92 friend class Graph::NodeFactory;
93 friend class InputPort;
94
95 using InputPortContainer = std::vector<std::pair<std::string, InputPort>>;
96 using OutputPortContainer = std::vector<std::pair<std::string, OutputPort>>;
97
98public:
110
111public:
113 using Id = uint32_t;
114
115public:
123 Node(std::string type, Id id, std::string name);
126 virtual ~Node() = 0;
127
130
132 std::string const & type() const noexcept { return _type; }
134 std::string const & name() const noexcept { return _name; }
136 Id id() const noexcept { return _id; }
138 nlohmann::json const & config() const noexcept { return _config; }
140 ExecutionFlowPolicy executionFlowPolicy() const noexcept { return _execPolicy; }
142 bool isRunning() const noexcept { return _isRunning; }
149 bool allowsMimeTypePropagation() const noexcept { return _allowsMimeTypePropagation; }
150
152
153public:
163 void setName(std::string newName);
164
165public:
169 template<DerivedFromNode T>
170 bool is() const noexcept { return dynamic_cast<T const *>(this) != nullptr; }
171
178 template<DerivedFromNode T>
179 T & as() noexcept { return static_cast<T &>(*this); }
180
187 template<DerivedFromNode T>
188 T const & as() const noexcept { return static_cast<T const &>(*this); }
189
190public:
193
202 void start();
203
209 void stop();
210
212
213public:
216
218 InputPortContainer const & inputPorts() const noexcept { return _inputPorts; }
220 OutputPortContainer const & outputPorts() const noexcept { return _outputPorts; }
221
223 bool hasInputPort(std::string_view portName) const noexcept
224 { return _getInputPortItr(portName) != _inputPorts.end(); }
225
226 bool hasOutputPort(std::string_view portName) const noexcept
227 { return _getOutputPortItr(portName) != _outputPorts.end(); }
228
230 bool hasInputPort(Port::Id id) const noexcept
231 { return _getInputPortItr(id) != _inputPorts.end(); }
232
233 bool hasOutputPort(Port::Id id) const noexcept
234 { return _getOutputPortItr(id) != _outputPorts.end(); }
235
238 InputPort const & inputPort(std::string_view portName) const;
241 OutputPort const & outputPort(std::string_view portName) const;
244 InputPort & inputPort(std::string_view portName);
247 OutputPort & outputPort(std::string_view portName);
248
251 InputPort const & inputPort(Port::Id id) const;
254 OutputPort const & outputPort(Port::Id id) const;
257 InputPort & inputPort(Port::Id id);
261
263
264public:
267
274 void sendMessage(Port::Id id, MessagePtr message);
275
282 void sendMessage(std::string_view portName, MessagePtr message);
283
288 void sendMessage(OutputPort & port, MessagePtr message);
289
291
292public:
295
297 void lock() { _nodeLock.lock(); }
299 void unlock() { _nodeLock.unlock(); }
301 bool try_lock() noexcept { return _nodeLock.try_lock(); }
302
304
305public:
317 static void validateNodeName(std::string_view name);
318
319public:
322
328
330 Runtime::Api::NodeReactor * reactor() const noexcept { return _reactor; }
331
341
343
344public:
347 virtual nlohmann::json configSchema() const = 0;
350 virtual nlohmann::json metadata() const = 0;
351
352protected:
360 void initialize(nlohmann::json config);
361
366 void setAllowsMimeTypePropagation(bool allowed) { _allowsMimeTypePropagation = allowed; }
367
368protected:
371
373 virtual void startImpl() {}
375 virtual void stopImpl() {}
377 virtual void initializeImpl(nlohmann::json config) = 0;
378
380
381protected:
384
395 OutputPort & registerOutputPort(std::string name, MimeType mimeType = MimeType::Any, std::optional<bool> allowsMimeTypePropagation = {});
396
407 InputPort & registerInputPort(std::string name, MimeTypes supportedMimeTypes = {}, std::optional<bool> allowsMimeTypePropagation = {});
408
417
425 InputPort & registerControlInputPort(std::string name);
426
428
429protected:
434 virtual void setExecutionFlowPolicy(ExecutionFlowPolicy policy) { _execPolicy = policy; }
435
443 virtual void handleMessage(InputPort const & port, MessagePtr message) = 0;
444
445private:
446 OutputPort & _registerOutputPort(std::string name, Kind kind, MimeType mimeType, std::optional<bool> allowsMimeTypePropagation);
447 InputPort & _registerInputPort(std::string name, Kind kind, MimeTypes supportedMimeTypes, std::optional<bool> allowsMimeTypePropagation);
448
449 InputPortContainer::iterator _getInputPortItr(std::string_view name) noexcept;
450 InputPortContainer::const_iterator _getInputPortItr(std::string_view name) const noexcept;
451 OutputPortContainer::iterator _getOutputPortItr(std::string_view name) noexcept;
452 OutputPortContainer::const_iterator _getOutputPortItr(std::string_view name) const noexcept;
453
454 InputPortContainer::iterator _getInputPortItr(Port::Id id) noexcept;
455 InputPortContainer::const_iterator _getInputPortItr(Port::Id id) const noexcept;
456 OutputPortContainer::iterator _getOutputPortItr(Port::Id id) noexcept;
457 OutputPortContainer::const_iterator _getOutputPortItr(Port::Id id) const noexcept;
458
459private:
460 Id _id;
461 std::string _name;
462 std::string _type;
463 nlohmann::json _config;
464 bool _isRunning = false;
465 bool _allowsMimeTypePropagation = true;
466
468
469 std::recursive_mutex _nodeLock;
470 Runtime::Api::NodeReactor * _reactor = nullptr;
471 std::unique_ptr<Runtime::Api::PollerProxy> _pollerProxy;
472
473private:
474 InputPortContainer _inputPorts;
475 OutputPortContainer _outputPorts;
476
477private:
478 Utils::MonotonicIdAllocator<Port::Id> _inputPortIdAllocator;
479 Utils::MonotonicIdAllocator<Port::Id> _outputPortIdAllocator;
480};
481} // !namespace dfx::Core
482
501#define DFX_NODE(typeName) \
502public: \
503 static constexpr auto type = typeName; \
504 \
505public: \
506 static nlohmann::json staticConfigSchema(); \
507 nlohmann::json configSchema() const override { return staticConfigSchema(); } \
508 static nlohmann::json staticMetadata(); \
509 nlohmann::json metadata() const override { return staticMetadata(); }\
510private:
511
Macro-based enum <-> string utilities for dfx.
#define DECLARE_ENUM_STRING_FUNCTIONS(E)
Declare the enum string API (and std::formatter) for enum type E.
Definition EnumString.hpp:327
MIME type value object.
Definition MimeType.hpp:44
@ Any
"" (empty string), meaning "accept anything"
Definition MimeType.hpp:67
bool isRunning() const noexcept
Check of the node is currently running.
Definition Node.hpp:142
virtual void setExecutionFlowPolicy(ExecutionFlowPolicy policy)
Set the execution flow policy used by the runtime scheduler.
Definition Node.hpp:434
ExecutionFlowPolicy
Policy controlling how execution is chained after message delivery.
Definition Node.hpp:106
@ AlwaysDispatch
Always enqueue next task for deferred execution.
Definition Node.hpp:107
@ InlineIfAvailable
Run inline on the current thread when possible.
Definition Node.hpp:108
virtual ~Node()=0
Virtual destructor. Pure virtual to ensure Node is abstract, but defined out-of-line.
OutputPortContainer const & outputPorts() const noexcept
Get all output ports of all Kind.
Definition Node.hpp:220
virtual nlohmann::json configSchema() const =0
Return the JSON schema describing the node configuration. Used by the dfx::Graph::NodeFactory / tooli...
bool allowsMimeTypePropagation() const noexcept
Whether this node allows mime-type propagation through its ports.
Definition Node.hpp:149
OutputPort const & outputPort(std::string_view portName) const
Get an output port by name.
virtual void stopImpl()
Called by stop before the node is marked stopped.
Definition Node.hpp:375
void start()
Start the node.
uint32_t Id
Node identifier type (unique and stable within a graph instance).
Definition Node.hpp:113
ExecutionFlowPolicy executionFlowPolicy() const noexcept
Get the node execution policy.
Definition Node.hpp:140
InputPort const & inputPort(std::string_view portName) const
Get an input port by name.
OutputPort & outputPort(std::string_view portName)
Get an output port by name (mutable).
void setName(std::string newName)
Rename the node.
virtual void handleMessage(InputPort const &port, MessagePtr message)=0
Handle an incoming message on an input port.
bool hasOutputPort(std::string_view portName) const noexcept
Check whether an output port exists by name.
Definition Node.hpp:226
static void validateNodeName(std::string_view name)
Validate a node name.
T const & as() const noexcept
Cast this node to a derived type (unchecked).
Definition Node.hpp:188
void lock()
Lock the node's internal recursive mutex.
Definition Node.hpp:297
void unlock()
Unlock the node's internal recursive mutex.
Definition Node.hpp:299
InputPort const & inputPort(Port::Id id) const
Get an input port by id.
void setAllowsMimeTypePropagation(bool allowed)
Enable/disable global mime-type propagation for this node. This affects how the runtime may propagate...
Definition Node.hpp:366
bool hasInputPort(std::string_view portName) const noexcept
Check whether an input port exists by name.
Definition Node.hpp:223
bool hasOutputPort(Port::Id id) const noexcept
Check whether an output port exists by id.
Definition Node.hpp:233
void setReactor(Runtime::Api::NodeReactor *reactor)
Attach the runtime reactor used by this node.
InputPort & inputPort(Port::Id id)
Get an input port by id (mutable).
virtual void startImpl()
Called by start before the node is marked running.
Definition Node.hpp:373
bool is() const noexcept
Check whether this node is of a given derived type.
Definition Node.hpp:170
FdWatch::Poller & poller()
Access a poller instance associated with this node.
Id id() const noexcept
Get the node id.
Definition Node.hpp:136
Node(std::string type, Id id, std::string name)
Construct a node instance.
OutputPort & outputPort(Port::Id id)
Get an output port by id (mutable).
void sendMessage(OutputPort &port, MessagePtr message)
Send a message on the provided output port.
InputPort & inputPort(std::string_view portName)
Get an input port by name (mutable).
T & as() noexcept
Cast this node to a derived type (unchecked).
Definition Node.hpp:179
nlohmann::json const & config() const noexcept
Get the node configuration.
Definition Node.hpp:138
InputPort & registerInputPort(std::string name, MimeTypes supportedMimeTypes={}, std::optional< bool > allowsMimeTypePropagation={})
Register a data input port.
bool try_lock() noexcept
Try to lock the node's internal recursive mutex.
Definition Node.hpp:301
OutputPort & registerOutputPort(std::string name, MimeType mimeType=MimeType::Any, std::optional< bool > allowsMimeTypePropagation={})
Register a data output port.
bool hasInputPort(Port::Id id) const noexcept
Check whether an input port exists by id.
Definition Node.hpp:230
InputPort & registerControlInputPort(std::string name)
Register a control input port.
virtual void initializeImpl(nlohmann::json config)=0
Called by initialize; derived nodes must parse config here.
void sendMessage(std::string_view portName, MessagePtr message)
Send a message on an output port by name.
std::string const & name() const noexcept
Get the node name.
Definition Node.hpp:134
void stop()
Stop the node.
Runtime::Api::NodeReactor * reactor() const noexcept
Get the currently attached reactor (may be null if not set).
Definition Node.hpp:330
virtual nlohmann::json metadata() const =0
Return node metadata (description, categories, ports, etc.). Intended for UI/tooling and discovery.
void sendMessage(Port::Id id, MessagePtr message)
Send a message on an output port by id.
std::string const & type() const noexcept
Get the node type.
Definition Node.hpp:132
OutputPort & registerControlOutputPort(std::string name)
Register a control output port.
OutputPort const & outputPort(Port::Id id) const
Get an output port by id.
void initialize(nlohmann::json config)
Initialize the node with its configuration.
InputPortContainer const & inputPorts() const noexcept
Get all input ports of all Kind.
Definition Node.hpp:218
Outgoing message endpoint attached to a node.
Definition OutputPort.hpp:47
uint32_t Id
Identifier type of a port (unique within a node by not unique accross a graph).
Definition Port.hpp:69
Abstract interface for FD-based event polling.
Definition Poller.hpp:37
Runtime node instantiation and validation facility (builders + config/metadata schemas).
Definition NodeFactory.hpp:60
Node-aware reactor API for FD watching and deferred execution in the runtime.
Definition NodeReactor.hpp:59
Monotonically increasing ID allocator.
Definition MonotonicIdAllocator.hpp:52
Concept used by dfx::Core::Node::is() and dfx::Core::Node::as() to constrain types.
Definition Node.hpp:67
Definition Channel.hpp:22
std::unique_ptr< Message > MessagePtr
Unique ownership handle for messages.
Definition Message.hpp:27
std::vector< MimeType > MimeTypes
Convenience alias for a list of MIME types.
Definition MimeType.hpp:130
std::shared_ptr< Node > NodePtr
Shared ownership pointer type for Nodes..
Definition Node.hpp:61
std::weak_ptr< Node > NodeWPtr
Weak pointer type for Nodes.
Definition Node.hpp:63
Kind
Port kind (connection domain).
Definition Kind.hpp:29
Definition SocketClient.hpp:23
Definition Node.hpp:42
Definition Node.hpp:47