dfx 0.1.0
Linux-based dynamic dataflow executor
Loading...
Searching...
No Matches
Worker.hpp
1// SPDX-FileCopyrightText: 2025 Vincent Leroy
2// SPDX-License-Identifier: MIT
3//
4// This file is part of dfx.
5//
6// Licensed under the MIT License. See the LICENSE file in the project root
7// for full license information.
8
9#pragma once
10
11// Standard includes
12#include <array>
13#include <atomic>
14#include <mutex>
15#include <queue>
16#include <thread>
17
18// Project includes
19#include "sinks/Sink.hpp"
20#include <dfx-core/Node.hpp>
21#include <dfx-core/messages/Message.hpp>
22#include <dfx-fdwatch/PollerFd.hpp>
23#include <dfx-fdwatch/EPollPoller.hpp>
24#include <dfx-fdwatch/Timer.hpp>
25#include <dfx-utilities/UUIDGenerator.hpp>
26
27namespace dfx::Pcapng
28{
73class Worker
74{
75public:
79 enum SinkIndex : std::size_t
80 {
84
86 };
87
88public:
94 struct SinkData
95 {
98
100 std::atomic_bool isNew{false};
101 };
102
112 {
117
119 std::string nodeName;
122
124 std::string portName;
129
131 uint32_t counter = 1;
133 uint8_t timeoutCounter = 10; // Multiple of 100 ms
134 };
135
136public:
141 Worker(bool getHwAndOsInfo);
142
146
155 void setFileSink(SinkPtr fileSink);
156
165 void setPipeSink(SinkPtr pipeSink);
166
175 void setTcpSink(SinkPtr tcpSink);
176
183 void pushData(QueueData data);
184
185public:
188 void setUserAppInfo(std::string info);
189
192 void clearHwAndOsInfo() noexcept;
193
194private:
195 void _onCleanupTimerTimeout();
196 void _onQueueDataReady();
197 void _handleNewSink(SinkPtr & sink);
198
199 void _sinkWriteData(std::vector<uint8_t> const & data);
200
201private:
202 std::queue<QueueData> _queue;
203 std::mutex _mutexQueue;
204
205 std::array<SinkData, SinkIndex::Count> _sinkDatas;
206 std::mutex _mutexSink;
207
208 std::string _userAppInfo;
209 std::string _hardwareInfo;
210 std::string _osInfo;
211
212private:
213 std::unordered_map<Utils::UUID, QueueData> _pendingData;
214
215private:
216 // Destroy the poller last
217 FdWatch::EPollPoller _poller;
218
219 FdWatch::PollerFd _eventfd;
220 FdWatch::Timer _cleanupTimer;
221
222 // Destroy the thread first
223 std::jthread _thread;
224};
225} // !namespace dfx::Pcapng
Base class for all runtime-executed nodes in a dfx dataflow graph.
uint32_t Id
Node identifier type (unique and stable within a graph instance).
Definition Node.hpp:113
Mode
Port direction.
Definition Port.hpp:74
uint32_t Id
Identifier type of a port (unique within a node by not unique accross a graph).
Definition Port.hpp:69
void setPipeSink(SinkPtr pipeSink)
Install or replace the pipe sink.
~Worker()
Stop the worker thread and release all resources. Ensures the thread is terminated and the poller/tim...
void setFileSink(SinkPtr fileSink)
Install or replace the file sink.
void clearHwAndOsInfo() noexcept
Disable (or remove) hardware/OS metadata in the capture output. This only affects metadata emission; ...
void setTcpSink(SinkPtr tcpSink)
Install or replace the TCP sink.
Worker(bool getHwAndOsInfo)
Construct and start the worker thread.
SinkIndex
Index of supported sinks.
Definition Worker.hpp:80
@ Count
Number of sink slots.
Definition Worker.hpp:85
@ File
File output (offline capture). See FileSink.
Definition Worker.hpp:81
@ Pipe
Pipe output (live capture into another process). See PipeSink.
Definition Worker.hpp:82
@ Tcp
TCP output (stream capture to a remote consumer). See TcpSink.
Definition Worker.hpp:83
void pushData(QueueData data)
Push a capture item into the worker queue.
void setUserAppInfo(std::string info)
Set application-specific information to embed into capture metadata.
std::unique_ptr< Message > MessagePtr
Unique ownership handle for messages.
Definition Message.hpp:27
Definition SocketClient.hpp:23
Definition Capture.hpp:27
std::unique_ptr< Sink > SinkPtr
Owning pointer type for sinks.
Definition Sink.hpp:122
Definition SystemConfigCommandHandler.hpp:15
STL namespace.
One capture item describing a message event and its context.
Definition Worker.hpp:112
Utils::UUID uuid
Identifier used to correlate and track pending items.
Definition Worker.hpp:116
Core::Node::Id nodeId
Stable node id.
Definition Worker.hpp:121
Core::Port::Mode portMode
Input vs output.
Definition Worker.hpp:128
Core::Port::Id portId
Stable port id.
Definition Worker.hpp:126
Core::MessagePtr message
Message payload.
Definition Worker.hpp:114
uint32_t counter
Worker-internal counter (tracking/aggregation).
Definition Worker.hpp:131
std::string portName
Human-readable port name.
Definition Worker.hpp:124
uint8_t timeoutCounter
Expiration counter, in 100ms units (10 => ~1s).
Definition Worker.hpp:133
std::string nodeName
Human-readable node name.
Definition Worker.hpp:119
Sink storage and "new sink" marker for hot swapping.
Definition Worker.hpp:95
SinkPtr sink
<
Definition Worker.hpp:97
std::atomic_bool isNew
True if this sink was just installed/replaced and must be handled by the worker thread.
Definition Worker.hpp:100
128-bit UUID value type.
Definition UUIDGenerator.hpp:33