Joedb 10.4.1
The Journal-Only Embedded Database
Loading...
Searching...
No Matches
Server.cpp
Go to the documentation of this file.
1#include "joedb/asio/Server.h"
3
4#include <boost/asio/detached.hpp>
5#include <boost/asio/co_spawn.hpp>
6#include <boost/asio/read.hpp>
7#include <boost/asio/write.hpp>
8
9namespace joedb::asio
10{
11 void Server::log(beman::cstring_view message) noexcept
12 {
13 try
14 {
15 std::string s = endpoint_path + ": ";
16 s += message;
17 logger.log(s);
18 }
19 catch (...)
20 {
21 }
22 }
23
25 (
26 Server &server,
27 boost::asio::local::stream_protocol::socket &&socket
28 ):
29 server(server),
30 id(server.session_id++),
31 socket(std::move(socket)),
32 strand(server.thread_pool.get_executor())
33 {
34 if (server.log_level > 1)
35 log("start");
36 }
37
38 boost::asio::awaitable<size_t> Server::Session::read_buffer
39 (
40 const size_t offset,
41 const size_t size
42 )
43 {
44 const size_t result = co_await boost::asio::async_read
45 (
46 socket,
47 boost::asio::buffer(buffer.data + offset, size),
48 boost::asio::use_awaitable
49 );
50
51 buffer.index = offset;
52
53 co_return result;
54 }
55
56 boost::asio::awaitable<void> Server::Session::write_buffer()
57 {
58 co_await boost::asio::async_write
59 (
60 socket,
61 boost::asio::buffer(buffer.data, buffer.index),
62 boost::asio::use_awaitable
63 );
64 }
65
66 void Server::Session::log(beman::cstring_view message) noexcept
67 {
68 try
69 {
70 std::string s = std::to_string(id) + ": ";
71 s += message;
72 server.log(s);
73 }
74 catch (...)
75 {
76 }
77 }
78
79 Server::Session::~Session() = default;
80
81 boost::asio::awaitable<void> Server::listener()
82 {
83 while (true)
84 {
85 auto session = new_session
86 (
87 co_await acceptor.async_accept(boost::asio::use_awaitable)
88 );
89
90 auto* const session_ptr = session.get();
91
92 boost::asio::co_spawn
93 (
94 session_ptr->strand,
95 session_ptr->run(),
96 [ending_session = std::move(session), this]
97 (
98 std::exception_ptr exception_ptr
99 )
100 {
101 ending_session->cleanup();
102 if (log_level > 1)
103 ending_session->log("stop");
104 if (exception_ptr && log_level > 1)
105 {
106 try
107 {
108 std::rethrow_exception(exception_ptr);
109 }
110 catch (const std::exception &e)
111 {
112 if (log_level > 1)
113 ending_session->log(std::string("exception: ") + e.what());
114 }
115 catch (...)
116 {
117 }
118 }
119 }
120 );
121 }
122 }
123
125 (
126 Logger &logger,
127 int log_level,
128 int thread_count,
129 std::string endpoint_path
130 ):
131 logger(logger),
132 log_level(log_level),
133 thread_count(thread_count),
134 thread_pool(thread_count),
135 joined(false),
136 endpoint_path(std::move(endpoint_path)),
137 endpoint(this->endpoint_path),
138 acceptor(thread_pool, endpoint, false),
139 interrupt_signals(thread_pool, SIGINT, SIGTERM)
140 {
141 if (log_level > 0)
142 log("start, thread_count = " + std::to_string(thread_count));
143
144 interrupt_signals.async_wait
145 (
146 [this](const boost::system::error_code &error, int signal)
147 {
148 if (!error)
149 {
150 if (this->log_level > 0)
151 {
152 if (signal == SIGINT)
153 this->log("interrupted by SIGINT");
154 else if (signal == SIGTERM)
155 this->log("interrupted by SIGTERM");
156 }
157 this->thread_pool.stop();
158 }
159 }
160 );
161
162 boost::asio::co_spawn
163 (
165 listener(),
166 boost::asio::detached
167 );
168 }
169
171 {
172 thread_pool.stop();
173 join();
174 }
175
177 {
178 thread_pool.join();
180 std::remove(endpoint_path.c_str());
181 joined = true;
182 }
183
185 {
186 }
187
189 {
190 if (!joined)
191 {
192 Destructor_Logger::warning("Server: not joined. This is a bug.");
193 std::terminate(); // it is too late to call join, since child was destroyed already
194 }
195 }
196}
static void warning(beman::cstring_view message) noexcept
boost::asio::awaitable< void > write_buffer()
Definition Server.cpp:56
Session(Server &server, boost::asio::local::stream_protocol::socket &&socket)
Definition Server.cpp:25
boost::asio::awaitable< size_t > read_buffer(size_t offset, size_t size)
Definition Server.cpp:39
void log(beman::cstring_view message) noexcept override
Definition Server.cpp:66
Superclass for asio servers.
Definition Server.h:19
const std::string endpoint_path
Definition Server.h:29
virtual std::unique_ptr< Session > new_session(boost::asio::local::stream_protocol::socket &&socket)=0
boost::asio::local::stream_protocol::acceptor acceptor
Definition Server.h:31
const int thread_count
Definition Server.h:25
boost::asio::signal_set interrupt_signals
Definition Server.h:32
virtual ~Server()
Definition Server.cpp:188
boost::asio::thread_pool thread_pool
Definition Server.h:26
Server(Logger &logger, int log_level, int thread_count, std::string endpoint_path)
Definition Server.cpp:125
void log(beman::cstring_view message) noexcept override
Definition Server.cpp:11
const int log_level
Definition Server.h:22
virtual void cleanup_after_join()
Definition Server.cpp:184