dfx 0.1.0
Linux-based dynamic dataflow executor
Loading...
Searching...
No Matches
FileSource.hpp
1// SPDX-FileCopyrightText: 2025 Vincent Leroy
2// SPDX-License-Identifier: MIT
3//
4// This file is part of dfx.
5//
6// Licensed under the MIT License. See the LICENSE file in the project root
7// for full license information.
8
9#pragma once
10
11// Standard includes
12#include <chrono>
13
14// Project includes
15#include <dfx-core/Node.hpp>
16#include <dfx-fdwatch/InotifyWatcher.hpp>
17#include <dfx-fdwatch/OwnedFd.hpp>
18#include <dfx-fdwatch/Timer.hpp>
19#include <dfx-magic/Context.hpp>
21#include <dfx-utilities/FileSystem.hpp>
22
23namespace dfx::Node
24{
25class FileSource : public Core::Node
26{
27 DFX_NODE("FileSource")
28
29public:
30 enum class EmitMode
31 {
32 Line,
33 Block,
34 Whole,
35 };
36
37 enum class Trigger
38 {
39 OneShot,
40 Manual,
41 Periodic,
42 Tail,
43 };
44
45public:
46 FileSource(Id id, std::string name);
47 ~FileSource();
48
49protected:
50 void startImpl() override;
51 void stopImpl() override;
52 void initializeImpl(nlohmann::json config) override;
53 void handleMessage(Core::InputPort const & port, Core::MessagePtr message) override;
54
55private:
56 void _doNextRead();
57 void _onInotifyEvent(fs::path const & p, uint32_t mask);
58
59 std::size_t _readOnceInto(std::vector<uint8_t> & buffer);
60
61 bool _sendNextLine();
62 bool _sendNextBlock();
63
64private:
65 Magic::Context _magic;
67 FdWatch::Timer _periodicTimer;
68 std::unique_ptr<FdWatch::InotifyWatcher> _watcher;
69
70private:
71 fs::path _path;
72 EmitMode _emitMode = EmitMode::Whole;
73 Trigger _trigger = Trigger::OneShot;
74 std::string _delimiter = "\n";
75 std::size_t _blockSize = 4096;
76 std::chrono::milliseconds _interval{10};
77
78private:
79 std::vector<uint8_t> _buffer;
80};
81} // !namespace dfx::Node
82
83DECLARE_ENUM_STRING_FUNCTIONS(dfx::Node::FileSource::EmitMode);
84DECLARE_ENUM_STRING_FUNCTIONS(dfx::Node::FileSource::Trigger);
Macro-based enum <-> string utilities for dfx.
#define DECLARE_ENUM_STRING_FUNCTIONS(E)
Declare the enum string API (and std::formatter) for enum type E.
Definition EnumString.hpp:327
Base class for all runtime-executed nodes in a dfx dataflow graph.
#define DFX_NODE(typeName)
Convenience macro to declare the node type string and metadata/schema hooks.
Definition Node.hpp:501
Incoming message endpoint attached to a node.
Definition InputPort.hpp:55
Abstract base class for all nodes in the dfx runtime.
Definition Node.hpp:91
uint32_t Id
Node identifier type (unique and stable within a graph instance).
Definition Node.hpp:113
nlohmann::json const & config() const noexcept
Get the node configuration.
Definition Node.hpp:138
std::string const & name() const noexcept
Get the node name.
Definition Node.hpp:134
Owning RAII wrapper around a file descriptor.
Definition OwnedFd.hpp:36
FD-integrated timer utility (timerfd-backed).
Definition Timer.hpp:52
RAII wrapper around a libmagic "magic_set" context.
Definition Context.hpp:49
void handleMessage(Core::InputPort const &port, Core::MessagePtr message) override
Handle an incoming message on an input port.
void initializeImpl(nlohmann::json config) override
Called by initialize; derived nodes must parse config here.
void startImpl() override
Called by start before the node is marked running.
void stopImpl() override
Called by stop before the node is marked stopped.
std::unique_ptr< Message > MessagePtr
Unique ownership handle for messages.
Definition Message.hpp:27
Definition MimeTypeRouter.hpp:18