dfx 0.1.0
Linux-based dynamic dataflow executor
Loading...
Searching...
No Matches
Process.hpp
1// SPDX-FileCopyrightText: 2025 Vincent Leroy
2// SPDX-License-Identifier: MIT
3//
4// This file is part of dfx.
5//
6// Licensed under the MIT License. See the LICENSE file in the project root
7// for full license information.
8
9#pragma once
10
11// Standard includes
12#include <queue>
13#include <unordered_map>
14
15// Project includes
16#include <dfx-core/Node.hpp>
17#include <dfx-core/messages/DataMessage.hpp>
19#include <dfx-subprocess/Process.hpp>
21#include <dfx-utilities/FileSystem.hpp>
22#include <dfx-utilities/MonotonicIdAllocator.hpp>
23
24namespace dfx::Node
25{
26class Process : public Core::Node
27{
28 DFX_NODE("Process")
29
30private:
31 struct JsonParserState
32 {
33 bool hasEmitWarnForFirstNonValidChar = false;
34 char openDelimiter = '\0';
35 char closeDelimiter = '\0';
36 uint32_t counter = 0;
37 std::size_t idx = 0;
38 };
39
40 struct ChildInfo
41 {
43 : process(poller, std::move(config))
44 {}
45
46 Subprocess::Process process;
47 std::string stdoutData;
48 std::string stderrData;
49
50 JsonParserState stdoutParserState;
51 JsonParserState stderrParserState;
52 };
53
54 using ChildInfos = std::unordered_map<uint32_t, ChildInfo>;
55
56public:
57 enum class LifeCycle
58 {
59 OneShot,
60 Trigerred,
61 TriggeredParallel,
62 Daemon,
63 };
64
65 enum class EmitMode
66 {
67 Null,
68 Line,
69 Chunk,
70 Whole,
71 Json,
72 };
73
74 struct IO
75 {
76 EmitMode emitMode = EmitMode::Line;
77 };
78
79public:
80 Process(Id id, std::string name);
81 ~Process();
82
83protected:
84 void startImpl() override;
85 void stopImpl() override;
86
87 void initializeImpl(nlohmann::json config) override;
88 void handleMessage(Core::InputPort const & port, Core::MessagePtr message) override;
89
90private:
91 ChildInfos::iterator _startNewChild();
92
93 void _sendMessageToChild(ChildInfo & childInfo, Core::DataMessage const & message);
94
95 void _onStdoutReadyRead(uint32_t id, std::string data);
96 void _onStderrReadyRead(uint32_t id, std::string data);
97 void _onChildTerminated(uint32_t id);
98
99 std::vector<Core::MessagePtr> _buildMessages(Subprocess::Process::Which stream, ChildInfo & childInfo);
100
101private:
102 Subprocess::Process::Config _childConfig;
103
104 LifeCycle _lifecycle;
105 bool _shouldRestart = true;
106
107 std::chrono::milliseconds _stopTimeout{0};
108
109 std::queue<Core::MessagePtr> _stdinBuffer;
110 IO _stdout;
111 IO _stderr;
112
113private:
115 ChildInfos _childInfos;
116};
117} // !namespace dfx::Node
118
119DECLARE_ENUM_STRING_FUNCTIONS(dfx::Node::Process::LifeCycle);
120DECLARE_ENUM_STRING_FUNCTIONS(dfx::Node::Process::EmitMode);
Macro-based enum <-> string utilities for dfx.
#define DECLARE_ENUM_STRING_FUNCTIONS(E)
Declare the enum string API (and std::formatter) for enum type E.
Definition EnumString.hpp:327
Event interest and trigger flags for file-descriptor watching (Linux/epoll).
Base class for all runtime-executed nodes in a dfx dataflow graph.
#define DFX_NODE(typeName)
Convenience macro to declare the node type string and metadata/schema hooks.
Definition Node.hpp:501
Data-plane message containing an owned byte buffer.
Definition DataMessage.hpp:42
Incoming message endpoint attached to a node.
Definition InputPort.hpp:55
Abstract base class for all nodes in the dfx runtime.
Definition Node.hpp:91
uint32_t Id
Node identifier type (unique and stable within a graph instance).
Definition Node.hpp:113
FdWatch::Poller & poller()
Access a poller instance associated with this node.
nlohmann::json const & config() const noexcept
Get the node configuration.
Definition Node.hpp:138
std::string const & name() const noexcept
Get the node name.
Definition Node.hpp:134
Abstract interface for FD-based event polling.
Definition Poller.hpp:37
void initializeImpl(nlohmann::json config) override
Called by initialize; derived nodes must parse config here.
void startImpl() override
Called by start before the node is marked running.
void handleMessage(Core::InputPort const &port, Core::MessagePtr message) override
Handle an incoming message on an input port.
void stopImpl() override
Called by stop before the node is marked stopped.
Subprocess wrapper with optional stdio piping/capture and event-driven callbacks.
Definition Process.hpp:59
Which
Identifies which standard stream is being referred to.
Definition Process.hpp:87
Monotonically increasing ID allocator.
Definition MonotonicIdAllocator.hpp:52
std::unique_ptr< Message > MessagePtr
Unique ownership handle for messages.
Definition Message.hpp:27
Definition MimeTypeRouter.hpp:18
Definition Process.hpp:75
Process spawn and I/O configuration.
Definition Process.hpp:123