17#include <nlohmann/json.hpp>
20#include "ports/InputPort.hpp"
21#include "ports/OutputPort.hpp"
22#include <dfx-utilities/CompilerSupport.hpp>
24#include <dfx-utilities/MonotonicIdAllocator.hpp>
25#include <dfx-utilities/StringMap.hpp>
90class Node :
public std::enable_shared_from_this<Node>
93 friend class InputPort;
95 using InputPortContainer = std::vector<std::pair<std::string, InputPort>>;
96 using OutputPortContainer = std::vector<std::pair<std::string, OutputPort>>;
132 std::string
const &
type() const noexcept {
return _type; }
134 std::string
const &
name() const noexcept {
return _name; }
136 Id id() const noexcept {
return _id; }
138 nlohmann::json
const &
config() const noexcept {
return _config; }
169 template<DerivedFromNode T>
170 bool is() const noexcept {
return dynamic_cast<T
const *
>(
this) !=
nullptr; }
178 template<DerivedFromNode T>
179 T &
as() noexcept {
return static_cast<T &
>(*this); }
187 template<DerivedFromNode T>
188 T
const &
as() const noexcept {
return static_cast<T
const &
>(*this); }
218 InputPortContainer
const &
inputPorts() const noexcept {
return _inputPorts; }
220 OutputPortContainer
const &
outputPorts() const noexcept {
return _outputPorts; }
224 {
return _getInputPortItr(portName) != _inputPorts.end(); }
227 {
return _getOutputPortItr(portName) != _outputPorts.end(); }
231 {
return _getInputPortItr(
id) != _inputPorts.end(); }
234 {
return _getOutputPortItr(
id) != _outputPorts.end(); }
238 InputPort
const &
inputPort(std::string_view portName)
const;
297 void lock() { _nodeLock.lock(); }
301 bool try_lock() noexcept {
return _nodeLock.try_lock(); }
449 InputPortContainer::iterator _getInputPortItr(std::string_view
name)
noexcept;
450 InputPortContainer::const_iterator _getInputPortItr(std::string_view
name)
const noexcept;
451 OutputPortContainer::iterator _getOutputPortItr(std::string_view
name)
noexcept;
452 OutputPortContainer::const_iterator _getOutputPortItr(std::string_view
name)
const noexcept;
454 InputPortContainer::iterator _getInputPortItr(
Port::Id id)
noexcept;
455 InputPortContainer::const_iterator _getInputPortItr(
Port::Id id)
const noexcept;
456 OutputPortContainer::iterator _getOutputPortItr(
Port::Id id)
noexcept;
457 OutputPortContainer::const_iterator _getOutputPortItr(
Port::Id id)
const noexcept;
463 nlohmann::json _config;
464 bool _isRunning =
false;
465 bool _allowsMimeTypePropagation =
true;
469 std::recursive_mutex _nodeLock;
471 std::unique_ptr<Runtime::Api::PollerProxy> _pollerProxy;
474 InputPortContainer _inputPorts;
475 OutputPortContainer _outputPorts;
501#define DFX_NODE(typeName) \
503 static constexpr auto type = typeName; \
506 static nlohmann::json staticConfigSchema(); \
507 nlohmann::json configSchema() const override { return staticConfigSchema(); } \
508 static nlohmann::json staticMetadata(); \
509 nlohmann::json metadata() const override { return staticMetadata(); }\
Macro-based enum <-> string utilities for dfx.
#define DECLARE_ENUM_STRING_FUNCTIONS(E)
Declare the enum string API (and std::formatter) for enum type E.
Definition EnumString.hpp:327
MIME type value object.
Definition MimeType.hpp:44
@ Any
"" (empty string), meaning "accept anything"
Definition MimeType.hpp:67
bool isRunning() const noexcept
Check of the node is currently running.
Definition Node.hpp:142
virtual void setExecutionFlowPolicy(ExecutionFlowPolicy policy)
Set the execution flow policy used by the runtime scheduler.
Definition Node.hpp:434
ExecutionFlowPolicy
Policy controlling how execution is chained after message delivery.
Definition Node.hpp:106
@ AlwaysDispatch
Always enqueue next task for deferred execution.
Definition Node.hpp:107
@ InlineIfAvailable
Run inline on the current thread when possible.
Definition Node.hpp:108
virtual ~Node()=0
Virtual destructor. Pure virtual to ensure Node is abstract, but defined out-of-line.
OutputPortContainer const & outputPorts() const noexcept
Get all output ports of all Kind.
Definition Node.hpp:220
virtual nlohmann::json configSchema() const =0
Return the JSON schema describing the node configuration. Used by the dfx::Graph::NodeFactory / tooli...
bool allowsMimeTypePropagation() const noexcept
Whether this node allows mime-type propagation through its ports.
Definition Node.hpp:149
OutputPort const & outputPort(std::string_view portName) const
Get an output port by name.
virtual void stopImpl()
Called by stop before the node is marked stopped.
Definition Node.hpp:375
void start()
Start the node.
uint32_t Id
Node identifier type (unique and stable within a graph instance).
Definition Node.hpp:113
ExecutionFlowPolicy executionFlowPolicy() const noexcept
Get the node execution policy.
Definition Node.hpp:140
InputPort const & inputPort(std::string_view portName) const
Get an input port by name.
OutputPort & outputPort(std::string_view portName)
Get an output port by name (mutable).
void setName(std::string newName)
Rename the node.
virtual void handleMessage(InputPort const &port, MessagePtr message)=0
Handle an incoming message on an input port.
bool hasOutputPort(std::string_view portName) const noexcept
Check whether an output port exists by name.
Definition Node.hpp:226
static void validateNodeName(std::string_view name)
Validate a node name.
T const & as() const noexcept
Cast this node to a derived type (unchecked).
Definition Node.hpp:188
void lock()
Lock the node's internal recursive mutex.
Definition Node.hpp:297
void unlock()
Unlock the node's internal recursive mutex.
Definition Node.hpp:299
InputPort const & inputPort(Port::Id id) const
Get an input port by id.
void setAllowsMimeTypePropagation(bool allowed)
Enable/disable global mime-type propagation for this node. This affects how the runtime may propagate...
Definition Node.hpp:366
bool hasInputPort(std::string_view portName) const noexcept
Check whether an input port exists by name.
Definition Node.hpp:223
bool hasOutputPort(Port::Id id) const noexcept
Check whether an output port exists by id.
Definition Node.hpp:233
void setReactor(Runtime::Api::NodeReactor *reactor)
Attach the runtime reactor used by this node.
InputPort & inputPort(Port::Id id)
Get an input port by id (mutable).
virtual void startImpl()
Called by start before the node is marked running.
Definition Node.hpp:373
bool is() const noexcept
Check whether this node is of a given derived type.
Definition Node.hpp:170
FdWatch::Poller & poller()
Access a poller instance associated with this node.
Id id() const noexcept
Get the node id.
Definition Node.hpp:136
Node(std::string type, Id id, std::string name)
Construct a node instance.
OutputPort & outputPort(Port::Id id)
Get an output port by id (mutable).
void sendMessage(OutputPort &port, MessagePtr message)
Send a message on the provided output port.
InputPort & inputPort(std::string_view portName)
Get an input port by name (mutable).
T & as() noexcept
Cast this node to a derived type (unchecked).
Definition Node.hpp:179
nlohmann::json const & config() const noexcept
Get the node configuration.
Definition Node.hpp:138
InputPort & registerInputPort(std::string name, MimeTypes supportedMimeTypes={}, std::optional< bool > allowsMimeTypePropagation={})
Register a data input port.
bool try_lock() noexcept
Try to lock the node's internal recursive mutex.
Definition Node.hpp:301
OutputPort & registerOutputPort(std::string name, MimeType mimeType=MimeType::Any, std::optional< bool > allowsMimeTypePropagation={})
Register a data output port.
bool hasInputPort(Port::Id id) const noexcept
Check whether an input port exists by id.
Definition Node.hpp:230
InputPort & registerControlInputPort(std::string name)
Register a control input port.
virtual void initializeImpl(nlohmann::json config)=0
Called by initialize; derived nodes must parse config here.
void sendMessage(std::string_view portName, MessagePtr message)
Send a message on an output port by name.
std::string const & name() const noexcept
Get the node name.
Definition Node.hpp:134
void stop()
Stop the node.
Runtime::Api::NodeReactor * reactor() const noexcept
Get the currently attached reactor (may be null if not set).
Definition Node.hpp:330
virtual nlohmann::json metadata() const =0
Return node metadata (description, categories, ports, etc.). Intended for UI/tooling and discovery.
void sendMessage(Port::Id id, MessagePtr message)
Send a message on an output port by id.
std::string const & type() const noexcept
Get the node type.
Definition Node.hpp:132
OutputPort & registerControlOutputPort(std::string name)
Register a control output port.
OutputPort const & outputPort(Port::Id id) const
Get an output port by id.
void initialize(nlohmann::json config)
Initialize the node with its configuration.
InputPortContainer const & inputPorts() const noexcept
Get all input ports of all Kind.
Definition Node.hpp:218
Outgoing message endpoint attached to a node.
Definition OutputPort.hpp:47
uint32_t Id
Identifier type of a port (unique within a node by not unique accross a graph).
Definition Port.hpp:69
Abstract interface for FD-based event polling.
Definition Poller.hpp:37
Runtime node instantiation and validation facility (builders + config/metadata schemas).
Definition NodeFactory.hpp:60
Node-aware reactor API for FD watching and deferred execution in the runtime.
Definition NodeReactor.hpp:59
Monotonically increasing ID allocator.
Definition MonotonicIdAllocator.hpp:52
Concept used by dfx::Core::Node::is() and dfx::Core::Node::as() to constrain types.
Definition Node.hpp:67
Definition Channel.hpp:22
std::unique_ptr< Message > MessagePtr
Unique ownership handle for messages.
Definition Message.hpp:27
std::vector< MimeType > MimeTypes
Convenience alias for a list of MIME types.
Definition MimeType.hpp:130
std::shared_ptr< Node > NodePtr
Shared ownership pointer type for Nodes..
Definition Node.hpp:61
std::weak_ptr< Node > NodeWPtr
Weak pointer type for Nodes.
Definition Node.hpp:63
Kind
Port kind (connection domain).
Definition Kind.hpp:29
Definition SocketClient.hpp:23