Joedb 10.3.0
The Journal-Only Embedded Database
Loading...
Searching...
No Matches
Server.cpp
Go to the documentation of this file.
1#include "joedb/asio/Server.h"
3
4#include <boost/asio/detached.hpp>
5#include <boost/asio/co_spawn.hpp>
6#include <boost/asio/read.hpp>
7#include <boost/asio/write.hpp>
8
9namespace joedb::asio
10{
11 void Server::log(const std::string &message) noexcept
12 {
13 try
14 {
15 logger.log(endpoint_path + ": " + message);
16 }
17 catch (...)
18 {
19 }
20 }
21
23 (
24 Server &server,
25 boost::asio::local::stream_protocol::socket &&socket
26 ):
27 server(server),
28 id(server.session_id++),
29 socket(std::move(socket)),
30 strand(server.thread_pool.get_executor())
31 {
32 if (server.log_level > 1)
33 log("start");
34 }
35
36 boost::asio::awaitable<size_t> Server::Session::read_buffer
37 (
38 const size_t offset,
39 const size_t size
40 )
41 {
42 const size_t result = co_await boost::asio::async_read
43 (
44 socket,
45 boost::asio::buffer(buffer.data + offset, size),
46 boost::asio::use_awaitable
47 );
48
49 buffer.index = offset;
50
51 co_return result;
52 }
53
54 boost::asio::awaitable<void> Server::Session::write_buffer()
55 {
56 co_await boost::asio::async_write
57 (
58 socket,
59 boost::asio::buffer(buffer.data, buffer.index),
60 boost::asio::use_awaitable
61 );
62 }
63
64 void Server::Session::log(const std::string &message) noexcept
65 {
66 try
67 {
68 server.log(std::to_string(id) + ": " + message);
69 }
70 catch (...)
71 {
72 }
73 }
74
75 Server::Session::~Session() = default;
76
77 boost::asio::awaitable<void> Server::listener()
78 {
79 while (true)
80 {
81 auto session = new_session
82 (
83 co_await acceptor.async_accept(boost::asio::use_awaitable)
84 );
85
86 auto* const session_ptr = session.get();
87
88 boost::asio::co_spawn
89 (
90 session_ptr->strand,
91 session_ptr->run(),
92 [ending_session = std::move(session), this]
93 (
94 std::exception_ptr exception_ptr
95 )
96 {
97 ending_session->cleanup();
98 if (log_level > 1)
99 ending_session->log("stop");
100 if (exception_ptr && log_level > 1)
101 {
102 try
103 {
104 std::rethrow_exception(exception_ptr);
105 }
106 catch (const std::exception &e)
107 {
108 if (log_level > 1)
109 ending_session->log(std::string("exception: ") + e.what());
110 }
111 catch (...)
112 {
113 }
114 }
115 }
116 );
117 }
118 }
119
121 (
122 Logger &logger,
123 int log_level,
124 int thread_count,
125 std::string endpoint_path
126 ):
127 logger(logger),
128 log_level(log_level),
129 thread_count(thread_count),
130 thread_pool(thread_count),
131 joined(false),
132 endpoint_path(std::move(endpoint_path)),
133 endpoint(this->endpoint_path),
134 acceptor(thread_pool, endpoint, false),
135 interrupt_signals(thread_pool, SIGINT, SIGTERM)
136 {
137 if (log_level > 0)
138 log("start, thread_count = " + std::to_string(thread_count));
139
140 interrupt_signals.async_wait
141 (
142 [this](const boost::system::error_code &error, int signal)
143 {
144 if (!error)
145 {
146 if (this->log_level > 0)
147 {
148 if (signal == SIGINT)
149 this->log("interrupted by SIGINT");
150 else if (signal == SIGTERM)
151 this->log("interrupted by SIGTERM");
152 }
153 this->thread_pool.stop();
154 }
155 }
156 );
157
158 boost::asio::co_spawn
159 (
161 listener(),
162 boost::asio::detached
163 );
164 }
165
167 {
168 thread_pool.stop();
169 join();
170 }
171
173 {
174 thread_pool.join();
176 std::remove(endpoint_path.c_str());
177 joined = true;
178 }
179
181 {
182 }
183
185 {
186 if (!joined)
187 {
188 Destructor_Logger::warning("Server: not joined. This is a bug.");
189 std::terminate(); // it is too late to call join, since child was destroyed already
190 }
191 }
192}
static void warning(const std::string &message) noexcept
boost::asio::awaitable< void > write_buffer()
Definition Server.cpp:54
Session(Server &server, boost::asio::local::stream_protocol::socket &&socket)
Definition Server.cpp:23
boost::asio::awaitable< size_t > read_buffer(size_t offset, size_t size)
Definition Server.cpp:37
void log(const std::string &message) noexcept override
Definition Server.cpp:64
Superclass for asio servers.
Definition Server.h:19
const std::string endpoint_path
Definition Server.h:29
virtual std::unique_ptr< Session > new_session(boost::asio::local::stream_protocol::socket &&socket)=0
boost::asio::local::stream_protocol::acceptor acceptor
Definition Server.h:31
const int thread_count
Definition Server.h:25
boost::asio::signal_set interrupt_signals
Definition Server.h:32
virtual ~Server()
Definition Server.cpp:184
boost::asio::thread_pool thread_pool
Definition Server.h:26
Server(Logger &logger, int log_level, int thread_count, std::string endpoint_path)
Definition Server.cpp:121
const int log_level
Definition Server.h:22
void log(const std::string &message) noexcept override
Definition Server.cpp:11
virtual void cleanup_after_join()
Definition Server.cpp:180