dfx 0.1.0
Linux-based dynamic dataflow executor
Loading...
Searching...
No Matches
Worker.hpp
1// SPDX-FileCopyrightText: 2025-2026 Vincent Leroy
2// SPDX-License-Identifier: MIT
3//
4// This file is part of dfx.
5//
6// Licensed under the MIT License. See the LICENSE file in the project root
7// for full license information.
8
9#pragma once
10
11// Standard includes
12#include <array>
13#include <atomic>
14#include <mutex>
15#include <queue>
16
17// Project includes
18#include "sinks/Sink.hpp"
19#include <dfx-core/Node.hpp>
20#include <dfx-core/messages/Message.hpp>
21#include <dfx-fdwatch/PollerFd.hpp>
22#include <dfx-fdwatch/EPollPoller.hpp>
23#include <dfx-fdwatch/Timer.hpp>
24#include <dfx-utilities/UUIDGenerator.hpp>
25
26namespace dfx::Pcapng
27{
72class Worker
73{
74public:
78 enum SinkIndex : std::size_t
79 {
83
85 };
86
87public:
93 struct SinkData
94 {
97
99 std::atomic_bool isNew{false};
100 };
101
111 {
116
118 std::string nodeName;
121
123 std::string portName;
128
130 uint32_t counter = 1;
132 uint8_t timeoutCounter = 10; // Multiple of 100 ms
133 };
134
135public:
140 explicit Worker(bool getHwAndOsInfo);
141
145
154 void setFileSink(SinkPtr fileSink);
155
164 void setPipeSink(SinkPtr pipeSink);
165
174 void setTcpSink(SinkPtr tcpSink);
175
182 void pushData(QueueData data);
183
185 void exec(std::stop_token stopToken);
186
187public:
190 void setUserAppInfo(std::string info);
191
194 void clearHwAndOsInfo() noexcept;
195
196private:
197 void _onCleanupTimerTimeout();
198 void _onQueueDataReady();
199 void _handleNewSink(SinkPtr & sink);
200
201 void _sinkWriteData(std::vector<uint8_t> const & data);
202
203private:
204 // Destroy the poller last
205 FdWatch::EPollPoller _poller;
206
207 std::queue<QueueData> _queue;
208 std::mutex _mutexQueue;
209
210 std::array<SinkData, SinkIndex::Count> _sinkDatas;
211 std::mutex _mutexSink;
212
213 std::string _userAppInfo;
214 std::string _hardwareInfo;
215 std::string _osInfo;
216
217private:
218 std::unordered_map<Utils::UUID, QueueData> _pendingData;
219
220private:
221 FdWatch::PollerFd _eventfd;
222 FdWatch::Timer _cleanupTimer;
223};
224} // !namespace dfx::Pcapng
Base class for all runtime-executed nodes in a dfx dataflow graph.
uint32_t Id
Node identifier type (unique and stable within a graph instance).
Definition Node.hpp:116
Mode
Port direction.
Definition Port.hpp:74
uint32_t Id
Identifier type of a port (unique within a node by not unique accross a graph).
Definition Port.hpp:69
void exec(std::stop_token stopToken)
Starts the internal event loop and take ownership of the current thread.
void setPipeSink(SinkPtr pipeSink)
Install or replace the pipe sink.
~Worker()
Stop the worker thread and release all resources. Ensures the thread is terminated and the poller/tim...
void setFileSink(SinkPtr fileSink)
Install or replace the file sink.
void clearHwAndOsInfo() noexcept
Disable (or remove) hardware/OS metadata in the capture output. This only affects metadata emission; ...
void setTcpSink(SinkPtr tcpSink)
Install or replace the TCP sink.
Worker(bool getHwAndOsInfo)
Construct and start the worker thread.
SinkIndex
Index of supported sinks.
Definition Worker.hpp:79
@ Count
Number of sink slots.
Definition Worker.hpp:84
@ File
File output (offline capture). See FileSink.
Definition Worker.hpp:80
@ Pipe
Pipe output (live capture into another process). See PipeSink.
Definition Worker.hpp:81
@ Tcp
TCP output (stream capture to a remote consumer). See TcpSink.
Definition Worker.hpp:82
void pushData(QueueData data)
Push a capture item into the worker queue.
void setUserAppInfo(std::string info)
Set application-specific information to embed into capture metadata.
std::unique_ptr< Message > MessagePtr
Unique ownership handle for messages.
Definition Message.hpp:27
Definition SocketClient.hpp:23
Definition Capture.hpp:28
std::unique_ptr< Sink > SinkPtr
Owning pointer type for sinks.
Definition Sink.hpp:122
Definition SystemConfigCommandHandler.hpp:15
STL namespace.
One capture item describing a message event and its context.
Definition Worker.hpp:111
Utils::UUID uuid
Identifier used to correlate and track pending items.
Definition Worker.hpp:115
Core::Node::Id nodeId
Stable node id.
Definition Worker.hpp:120
Core::Port::Mode portMode
Input vs output.
Definition Worker.hpp:127
Core::Port::Id portId
Stable port id.
Definition Worker.hpp:125
Core::MessagePtr message
Message payload.
Definition Worker.hpp:113
uint32_t counter
Worker-internal counter (tracking/aggregation).
Definition Worker.hpp:130
std::string portName
Human-readable port name.
Definition Worker.hpp:123
uint8_t timeoutCounter
Expiration counter, in 100ms units (10 => ~1s).
Definition Worker.hpp:132
std::string nodeName
Human-readable node name.
Definition Worker.hpp:118
Sink storage and "new sink" marker for hot swapping.
Definition Worker.hpp:94
SinkPtr sink
<
Definition Worker.hpp:96
std::atomic_bool isNew
True if this sink was just installed/replaced and must be handled by the worker thread.
Definition Worker.hpp:99
128-bit UUID value type.
Definition UUIDGenerator.hpp:33