dfx 0.1.0
Linux-based dynamic dataflow executor
Loading...
Searching...
No Matches
UnixSession.hpp
1// SPDX-FileCopyrightText: 2025 Vincent Leroy
2// SPDX-License-Identifier: MIT
3//
4// This file is part of dfx.
5//
6// Licensed under the MIT License. See the LICENSE file in the project root
7// for full license information.
8
9#pragma once
10
11// Standard includes
12#include <atomic>
13#include <chrono>
14#include <memory>
15
16// Third-party includes
17#include <nlohmann/json.hpp>
18
19// Project includes
20#include <dfx-fdwatch/PollerFd.hpp>
22
23namespace dfx::FdWatch
24{
25class Poller;
26} // !namespace dfx::FdWatch
27
28namespace dfx::Server
29{
30class UnixRouter;
31class UnixServer;
32
75class UnixSession : public std::enable_shared_from_this<UnixSession>
76{
77public:
88 UnixSession(UnixServer & server, UnixRouter & router, std::chrono::milliseconds timeout);
89
92
95 FdWatch::BorrowedFd socket() const noexcept { return _socket.borrow(); }
96
98 uint64_t inMsgCount() const noexcept { return _inMsgCount.load(std::memory_order::relaxed); }
100 uint64_t outMsgCount() const noexcept { return _outMsgCount.load(std::memory_order::relaxed); }
102 uint64_t outErrMsgCount() const noexcept { return _outErrMsgCount.load(std::memory_order::relaxed); }
110 uint64_t outConsecutiveErrMsgCount() const noexcept { return _outConsecutiveErrMsgCount.load(std::memory_order::relaxed); }
111
119 bool subtractTimeAndCheckTimeout(std::chrono::milliseconds time);
120
122 std::chrono::milliseconds initialTimeout() const noexcept { return _initialTimeout; }
123
126 void setInitialTimeout(std::chrono::milliseconds timeout);
127
128public:
137
138public:
146 void reply(nlohmann::json response, std::string_view id);
147
153 void replyError(std::string message, std::string_view id, nlohmann::json baseJson = {})
154 { replyError(std::vector<std::string>{ std::move(message) }, id, std::move(baseJson)); }
155
167 void replyError(std::vector<std::string> const & messages, std::string_view id, nlohmann::json baseJson = {});
168
173 void terminate();
174
175private:
176 void _onSocketReadyRead(FdWatch::EventTriggers events);
177 bool _onIncommingMessage();
178 bool _readAllPendingMessages(std::vector<std::string> & messages);
179 void _sendMessage(std::string_view message);
180
181private:
182 FdWatch::PollerFd _socket;
183 UnixServer & _server;
184 UnixRouter & _router;
185
186private:
187 std::atomic_uint64_t _inMsgCount = 0;
188 std::atomic_uint64_t _outMsgCount = 0;
189 std::atomic_uint64_t _outErrMsgCount = 0;
190 std::atomic_uint64_t _outConsecutiveErrMsgCount = 0;
191
192 std::chrono::milliseconds _initialTimeout;
193 std::chrono::milliseconds _timeBeforeTimeout{0};
194};
195
197using UnixSessionPtr = std::shared_ptr<UnixSession>;
199using UnixSessionWPtr = std::weak_ptr<UnixSession>;
200} // !namespace dfx::Server
Convenience macros to explicitly control copy and move semantics.
Non-owning wrapper around a file descriptor.
Definition BorrowedFd.hpp:37
Owning RAII wrapper around a file descriptor.
Definition OwnedFd.hpp:36
RAII wrapper for the registration of a FD in a Poller.
Definition PollerFd.hpp:42
Abstract interface for FD-based event polling.
Definition Poller.hpp:37
JSON command router for the Unix domain socket control protocol.
Definition UnixRouter.hpp:78
Asynchronous Unix domain socket control server driven by a FdWatch::Poller.
Definition UnixServer.hpp:93
uint64_t outMsgCount() const noexcept
Total number of sent response messages (success + error).
Definition UnixSession.hpp:100
void replyError(std::string message, std::string_view id, nlohmann::json baseJson={})
Convenience overload to reply with a single error message.
Definition UnixSession.hpp:153
uint64_t outConsecutiveErrMsgCount() const noexcept
Number of consecutive error responses sent.
Definition UnixSession.hpp:110
UnixSession(UnixServer &server, UnixRouter &router, std::chrono::milliseconds timeout)
Create a session descriptor (not yet bound to a socket FD).
void terminate()
Terminate the session.
uint64_t inMsgCount() const noexcept
Total number of received request messages.
Definition UnixSession.hpp:98
void init(FdWatch::OwnedFd socket, FdWatch::Poller &poller)
Bind an accepted socket to this session and register it into a poller.
FdWatch::BorrowedFd socket() const noexcept
Borrowed socket FD for this session.
Definition UnixSession.hpp:95
void setInitialTimeout(std::chrono::milliseconds timeout)
Update the initial timeout value.
bool subtractTimeAndCheckTimeout(std::chrono::milliseconds time)
Decrease the remaining time before timeout and report expiration.
uint64_t outErrMsgCount() const noexcept
Total number of sent error responses.
Definition UnixSession.hpp:102
DISABLE_COPY_AND_MOVE(UnixSession)
UnixSessions are neither copyable nor movable.
std::chrono::milliseconds initialTimeout() const noexcept
Current configured initial timeout value.
Definition UnixSession.hpp:122
void reply(nlohmann::json response, std::string_view id)
Send a JSON response associated with a request id.
void replyError(std::vector< std::string > const &messages, std::string_view id, nlohmann::json baseJson={})
Reply with an error response containing one or more messages.
Definition SocketClient.hpp:23
Definition BaseCommandHandler.hpp:16
std::weak_ptr< UnixSession > UnixSessionWPtr
Weak pointer type for sessions.
Definition UnixSession.hpp:199
std::shared_ptr< UnixSession > UnixSessionPtr
Shared ownership pointer type for sessions.
Definition BaseCommandHandler.hpp:20