18#include <nlohmann/json.hpp>
21#include "ports/InputPort.hpp"
22#include "ports/OutputPort.hpp"
23#include <dfx-utilities/CompilerSupport.hpp>
25#include <dfx-utilities/MonotonicIdAllocator.hpp>
26#include <dfx-utilities/StringMap.hpp>
51class NodeTaskExecutor;
64using NodePtr = std::shared_ptr<Node>;
93class Node :
public std::enable_shared_from_this<Node>
96 friend class InputPort;
98 using InputPortContainer = std::vector<std::pair<std::string, InputPort>>;
99 using OutputPortContainer = std::vector<std::pair<std::string, OutputPort>>;
135 std::string
const &
type() const noexcept {
return _type; }
137 std::string
const &
name() const noexcept {
return _name; }
139 Id id() const noexcept {
return _id; }
141 nlohmann::json
const &
config() const noexcept {
return _config; }
145 bool isRunning() const noexcept {
return _isRunning.load(std::memory_order::acquire); }
172 template<DerivedFromNode T>
173 bool is() const noexcept {
return dynamic_cast<T
const *
>(
this) !=
nullptr; }
181 template<DerivedFromNode T>
182 T &
as() noexcept {
return static_cast<T &
>(*this); }
190 template<DerivedFromNode T>
191 T
const &
as() const noexcept {
return static_cast<T
const &
>(*this); }
227 bool stopRequested() const noexcept {
return _stopRequested.test(std::memory_order::acquire); }
236 InputPortContainer
const &
inputPorts() const noexcept {
return _inputPorts; }
238 OutputPortContainer
const &
outputPorts() const noexcept {
return _outputPorts; }
242 {
return _getInputPortItr(portName) != _inputPorts.end(); }
245 {
return _getOutputPortItr(portName) != _outputPorts.end(); }
249 {
return _getInputPortItr(
id) != _inputPorts.end(); }
252 {
return _getOutputPortItr(
id) != _outputPorts.end(); }
256 InputPort
const &
inputPort(std::string_view portName)
const;
315 void lock() { _nodeLock.lock(); }
319 bool try_lock() noexcept {
return _nodeLock.try_lock(); }
368 template<
typename T = Runtime::Api::NodeTaskExecutor>
369 T *
customExecutor() const noexcept {
return static_cast<T *
>(_customExecutor.get()); }
514 InputPortContainer::iterator _getInputPortItr(std::string_view
name)
noexcept;
515 InputPortContainer::const_iterator _getInputPortItr(std::string_view
name)
const noexcept;
516 OutputPortContainer::iterator _getOutputPortItr(std::string_view
name)
noexcept;
517 OutputPortContainer::const_iterator _getOutputPortItr(std::string_view
name)
const noexcept;
519 InputPortContainer::iterator _getInputPortItr(
Port::Id id)
noexcept;
520 InputPortContainer::const_iterator _getInputPortItr(
Port::Id id)
const noexcept;
521 OutputPortContainer::iterator _getOutputPortItr(
Port::Id id)
noexcept;
522 OutputPortContainer::const_iterator _getOutputPortItr(
Port::Id id)
const noexcept;
528 nlohmann::json _config;
529 std::atomic_bool _isRunning =
false;
530 std::atomic_flag _stopRequested;
531 std::promise<void> _stopPromise;
532 std::shared_future<void> _stopFuture;
533 bool _allowsMimeTypePropagation =
true;
537 std::recursive_mutex _nodeLock;
539 std::unique_ptr<Runtime::Api::PollerProxy> _pollerProxy;
543 InputPortContainer _inputPorts;
544 OutputPortContainer _outputPorts;
569#define DFX_NODE(typeName) \
571 static constexpr auto type = typeName; \
574 static nlohmann::json staticConfigSchema(); \
575 nlohmann::json configSchema() const override { return staticConfigSchema(); } \
576 static nlohmann::json staticMetadata(); \
577 nlohmann::json metadata() const override { return staticMetadata(); }\
Macro-based enum <-> string utilities for dfx.
#define DFX_DECLARE_ENUM_STRING_FUNCTIONS(E)
Declare the enum string API (and std::formatter) for enum type E.
Definition EnumString.hpp:327
MIME type value object.
Definition MimeType.hpp:44
@ Any
"" (empty string), meaning "accept anything"
Definition MimeType.hpp:67
bool isRunning() const noexcept
Check of the node is currently running.
Definition Node.hpp:145
virtual void setExecutionFlowPolicy(ExecutionFlowPolicy policy)
Set the execution flow policy used by the runtime scheduler.
Definition Node.hpp:499
ExecutionFlowPolicy
Policy controlling how execution is chained after message delivery.
Definition Node.hpp:109
@ AlwaysDispatch
Always enqueue next task for deferred execution.
Definition Node.hpp:110
@ InlineIfAvailable
Run inline on the current thread when possible.
Definition Node.hpp:111
virtual ~Node()=0
Virtual destructor. Pure virtual to ensure Node is abstract, but defined out-of-line.
OutputPortContainer const & outputPorts() const noexcept
Get all output ports of all Kind.
Definition Node.hpp:238
virtual nlohmann::json configSchema() const =0
Return the JSON schema describing the node configuration. Used by the dfx::Graph::NodeFactory / tooli...
bool allowsMimeTypePropagation() const noexcept
Whether this node allows mime-type propagation through its ports.
Definition Node.hpp:152
OutputPort const & outputPort(std::string_view portName) const
Get an output port by name.
virtual void stopImpl()
Called by stop before the node is marked stopped.
Definition Node.hpp:434
void start()
Start the node.
uint32_t Id
Node identifier type (unique and stable within a graph instance).
Definition Node.hpp:116
ExecutionFlowPolicy executionFlowPolicy() const noexcept
Get the node execution policy.
Definition Node.hpp:143
InputPort const & inputPort(std::string_view portName) const
Get an input port by name.
OutputPort & outputPort(std::string_view portName)
Get an output port by name (mutable).
void setName(std::string newName)
Rename the node.
virtual void handleMessage(InputPort const &port, MessagePtr message)=0
Handle an incoming message on an input port.
bool hasOutputPort(std::string_view portName) const noexcept
Check whether an output port exists by name.
Definition Node.hpp:244
static void validateNodeName(std::string_view name)
Validate a node name.
T const & as() const noexcept
Cast this node to a derived type (unchecked).
Definition Node.hpp:191
void lock()
Lock the node's internal recursive mutex.
Definition Node.hpp:315
void unlock()
Unlock the node's internal recursive mutex.
Definition Node.hpp:317
InputPort const & inputPort(Port::Id id) const
Get an input port by id.
void setAllowsMimeTypePropagation(bool allowed)
Enable/disable global mime-type propagation for this node. This affects how the runtime may propagate...
Definition Node.hpp:422
bool hasInputPort(std::string_view portName) const noexcept
Check whether an input port exists by name.
Definition Node.hpp:241
bool hasOutputPort(Port::Id id) const noexcept
Check whether an output port exists by id.
Definition Node.hpp:251
void assignTaskExecutor(Runtime::Api::NodeTaskExecutorPtr executor)
Assigns a custom task executor to this node.
void setReactor(Runtime::Api::NodeReactor *reactor)
Attach the runtime reactor used by this node.
bool stopRequested() const noexcept
Check if stop has already been called or not.
Definition Node.hpp:227
InputPort & inputPort(Port::Id id)
Get an input port by id (mutable).
virtual void startImpl()
Called by start before the node is marked running.
Definition Node.hpp:429
bool is() const noexcept
Check whether this node is of a given derived type.
Definition Node.hpp:173
FdWatch::Poller & poller()
Access a poller instance associated with this node.
Id id() const noexcept
Get the node id.
Definition Node.hpp:139
Node(std::string type, Id id, std::string name)
Construct a node instance.
InputPort & registerInputPort(std::string name, MimeTypes supportedMimeTypes={}, std::optional< bool > allowsMimeTypePropagation=std::nullopt)
Register a data input port.
OutputPort & outputPort(Port::Id id)
Get an output port by id (mutable).
void sendMessage(OutputPort &port, MessagePtr message)
Send a message on the provided output port.
InputPort & inputPort(std::string_view portName)
Get an input port by name (mutable).
T & as() noexcept
Cast this node to a derived type (unchecked).
Definition Node.hpp:182
void onNodeStopped()
Must be called by classes that re-implement the stopImpl function.
nlohmann::json const & config() const noexcept
Get the node configuration.
Definition Node.hpp:141
bool try_lock() noexcept
Try to lock the node's internal recursive mutex.
Definition Node.hpp:319
void waitForStopped()
Block the current thread until the node is stopped.
OutputPort & registerOutputPort(std::string name, MimeType mimeType=MimeType::Any, std::optional< bool > allowsMimeTypePropagation=std::nullopt)
Register a data output port.
bool hasInputPort(Port::Id id) const noexcept
Check whether an input port exists by id.
Definition Node.hpp:248
InputPort & registerControlInputPort(std::string name)
Register a control input port.
virtual void initializeImpl(nlohmann::json config)=0
Called by initialize; derived nodes must parse config here.
void sendMessage(std::string_view portName, MessagePtr message)
Send a message on an output port by name.
std::string const & name() const noexcept
Get the node name.
Definition Node.hpp:137
void stop()
Stop the node.
void revokeTaskExecutor() noexcept
Revokes a custom task executor, returning this node to the default execution model.
Runtime::Api::NodeReactor * reactor() const noexcept
Get the currently attached reactor (may be null if not set).
Definition Node.hpp:351
virtual nlohmann::json metadata() const =0
Return node metadata (description, categories, ports, etc.). Intended for UI/tooling and discovery.
void sendMessage(Port::Id id, MessagePtr message)
Send a message on an output port by id.
std::string const & type() const noexcept
Get the node type.
Definition Node.hpp:135
OutputPort & registerControlOutputPort(std::string name)
Register a control output port.
OutputPort const & outputPort(Port::Id id) const
Get an output port by id.
T * customExecutor() const noexcept
Access the customer executor of this node if any.
Definition Node.hpp:369
void initialize(nlohmann::json config)
Initialize the node with its configuration.
InputPortContainer const & inputPorts() const noexcept
Get all input ports of all Kind.
Definition Node.hpp:236
Outgoing message endpoint attached to a node.
Definition OutputPort.hpp:47
uint32_t Id
Identifier type of a port (unique within a node by not unique accross a graph).
Definition Port.hpp:69
Abstract interface for FD-based event polling.
Definition Poller.hpp:37
Runtime node instantiation and validation facility (builders + config/metadata schemas).
Definition NodeFactory.hpp:60
Node-aware reactor API for FD watching and deferred execution in the runtime.
Definition NodeReactor.hpp:60
Monotonically increasing ID allocator.
Definition MonotonicIdAllocator.hpp:52
Concept used by dfx::Core::Node::is() and dfx::Core::Node::as() to constrain types.
Definition Node.hpp:70
Definition Channel.hpp:25
std::unique_ptr< Message > MessagePtr
Unique ownership handle for messages.
Definition Message.hpp:27
std::vector< MimeType > MimeTypes
Convenience alias for a list of MIME types.
Definition MimeType.hpp:130
std::shared_ptr< Node > NodePtr
Shared ownership pointer type for Nodes..
Definition Endpoint.hpp:21
std::weak_ptr< Node > NodeWPtr
Weak pointer type for Nodes.
Definition Node.hpp:66
Kind
Port kind (connection domain).
Definition Kind.hpp:29
Definition SocketClient.hpp:23
Definition MimeTypeRouter.hpp:18
std::unique_ptr< NodeTaskExecutor > NodeTaskExecutorPtr
Unique ownership pointer type for NodeTaskExecutor.
Definition Node.hpp:52