dfx 0.1.0
Linux-based dynamic dataflow executor
Loading...
Searching...
No Matches
UnixSession.hpp
1// SPDX-FileCopyrightText: 2025-2026 Vincent Leroy
2// SPDX-License-Identifier: MIT
3//
4// This file is part of dfx.
5//
6// Licensed under the MIT License. See the LICENSE file in the project root
7// for full license information.
8
9#pragma once
10
11// Standard includes
12#include <atomic>
13#include <chrono>
14#include <memory>
15
16// Third-party includes
17#include <nlohmann/json.hpp>
18
19// Project includes
20#include <dfx-fdwatch/PollerFd.hpp>
22
23namespace dfx::FdWatch
24{
25class Poller;
26} // !namespace dfx::FdWatch
27
28namespace dfx::Server
29{
30class UnixRouter;
31class UnixServer;
32
75class UnixSession : public std::enable_shared_from_this<UnixSession>
76{
77public:
79 struct PeerCreds
80 {
81 pid_t pid;
82 uid_t uid;
83 gid_t gid;
84 };
85
86public:
97 UnixSession(UnixServer & server, UnixRouter & router, std::chrono::milliseconds timeout);
98
101
104 Utils::BorrowedFd socket() const noexcept { return _socket.borrow(); }
105
107 PeerCreds peerCredentials() const noexcept { return _creds; }
108
110 uint64_t inMsgCount() const noexcept { return _inMsgCount.load(std::memory_order::relaxed); }
112 uint64_t outMsgCount() const noexcept { return _outMsgCount.load(std::memory_order::relaxed); }
114 uint64_t outErrMsgCount() const noexcept { return _outErrMsgCount.load(std::memory_order::relaxed); }
122 uint64_t outConsecutiveErrMsgCount() const noexcept { return _outConsecutiveErrMsgCount.load(std::memory_order::relaxed); }
123
131 bool subtractTimeAndCheckTimeout(std::chrono::milliseconds time);
132
134 std::chrono::milliseconds initialTimeout() const noexcept { return _initialTimeout; }
135
138 void setInitialTimeout(std::chrono::milliseconds timeout);
139
140public:
150
151public:
159 void reply(nlohmann::json response, std::string_view id);
160
166 void replyError(std::string message, std::string_view id, nlohmann::json baseJson = {})
167 { replyError(std::vector<std::string>{ std::move(message) }, id, std::move(baseJson)); }
168
180 void replyError(std::vector<std::string> const & messages, std::string_view id, nlohmann::json baseJson = {});
181
186 void terminate();
187
188private:
189 void _onSocketReadyRead(FdWatch::EventTriggers events);
190 bool _onIncommingMessage();
191 bool _readAllPendingMessages(std::vector<std::string> & messages);
192 void _sendMessage(std::string_view message);
193
194private:
195 FdWatch::PollerFd _socket;
196 UnixServer & _server;
197 UnixRouter & _router;
198
199 PeerCreds _creds;
200
201private:
202 std::atomic_uint64_t _inMsgCount = 0;
203 std::atomic_uint64_t _outMsgCount = 0;
204 std::atomic_uint64_t _outErrMsgCount = 0;
205 std::atomic_uint64_t _outConsecutiveErrMsgCount = 0;
206
207 std::chrono::milliseconds _initialTimeout;
208 std::chrono::milliseconds _timeBeforeTimeout{0};
209};
210
212using UnixSessionPtr = std::shared_ptr<UnixSession>;
214using UnixSessionWPtr = std::weak_ptr<UnixSession>;
215} // !namespace dfx::Server
Convenience macros to explicitly control copy and move semantics.
RAII wrapper for the registration of a FD in a Poller.
Definition PollerFd.hpp:42
Abstract interface for FD-based event polling.
Definition Poller.hpp:37
JSON command router for the Unix domain socket control protocol.
Definition UnixRouter.hpp:78
Asynchronous Unix domain socket control server driven by a FdWatch::Poller.
Definition UnixServer.hpp:93
uint64_t outMsgCount() const noexcept
Total number of sent response messages (success + error).
Definition UnixSession.hpp:112
DFX_DISABLE_COPY_AND_MOVE(UnixSession)
UnixSessions are neither copyable nor movable.
void replyError(std::string message, std::string_view id, nlohmann::json baseJson={})
Convenience overload to reply with a single error message.
Definition UnixSession.hpp:166
uint64_t outConsecutiveErrMsgCount() const noexcept
Number of consecutive error responses sent.
Definition UnixSession.hpp:122
UnixSession(UnixServer &server, UnixRouter &router, std::chrono::milliseconds timeout)
Create a session descriptor (not yet bound to a socket FD).
void init(Utils::OwnedFd socket, FdWatch::Poller &poller, PeerCreds creds)
Bind an accepted socket to this session and register it into a poller.
void terminate()
Terminate the session.
Utils::BorrowedFd socket() const noexcept
Borrowed socket FD for this session.
Definition UnixSession.hpp:104
uint64_t inMsgCount() const noexcept
Total number of received request messages.
Definition UnixSession.hpp:110
void setInitialTimeout(std::chrono::milliseconds timeout)
Update the initial timeout value.
bool subtractTimeAndCheckTimeout(std::chrono::milliseconds time)
Decrease the remaining time before timeout and report expiration.
uint64_t outErrMsgCount() const noexcept
Total number of sent error responses.
Definition UnixSession.hpp:114
std::chrono::milliseconds initialTimeout() const noexcept
Current configured initial timeout value.
Definition UnixSession.hpp:134
void reply(nlohmann::json response, std::string_view id)
Send a JSON response associated with a request id.
PeerCreds peerCredentials() const noexcept
Peer credentials attached to the socket.
Definition UnixSession.hpp:107
void replyError(std::vector< std::string > const &messages, std::string_view id, nlohmann::json baseJson={})
Reply with an error response containing one or more messages.
Non-owning wrapper around a file descriptor.
Definition BorrowedFd.hpp:33
Owning RAII wrapper around a file descriptor.
Definition OwnedFd.hpp:36
Definition SocketClient.hpp:23
Definition BaseCommandHandler.hpp:16
std::weak_ptr< UnixSession > UnixSessionWPtr
Weak pointer type for sessions.
Definition UnixSession.hpp:214
std::shared_ptr< UnixSession > UnixSessionPtr
Shared ownership pointer type for sessions.
Definition BaseCommandHandler.hpp:20
Peer credentials associated with the Unix socket.
Definition UnixSession.hpp:80
gid_t gid
GID of the remote program connected to the server socket.
Definition UnixSession.hpp:83
pid_t pid
PID of the remote program connected to the server socket.
Definition UnixSession.hpp:81
uid_t uid
UID of the remote program connected to the server socket.
Definition UnixSession.hpp:82