6#include <boost/asio/redirect_error.hpp>
7#include <boost/asio/use_awaitable.hpp>
15 const std::map<char, const char *> Server::request_description
18 {
'H',
"get fast SHA-256 hash code"},
19 {
'I',
"get full SHA-256 hash code"},
20 {
'r',
"read a range of bytes"},
21 {
'D',
"get checkpoint"},
33 Server::Session::Session
37 boost::asio::local::stream_protocol::socket &&socket
39 joedb::asio::Server::Session(server, std::move(socket)),
41 lock_timeout_timer(strand)
46 void Server::Session::remove_from_queue(std::deque<Session *> &queue)
49 const auto it = std::find(queue.begin(), queue.end(),
this);
50 if (it != queue.end())
55 void Server::Session::cleanup()
61 remove_from_queue(get_server().lock_waiters);
62 remove_from_queue(get_server().pull_waiters);
66 boost::asio::awaitable<void> Server::Session::lock()
70 throw Exception(
"error: locking an already locked session");
73 if (!get_server().locked)
75 if (get_server().log_level > 2)
76 log(
"obtained lock immediately");
80 if (get_server().log_level > 2)
81 log(
"waiting for lock");
84 timer.expires_after(boost::asio::steady_timer::duration::max());
86 get_server().lock_waiters.emplace_back(
this);
88 boost::system::error_code ec;
89 co_await timer.async_wait
91 boost::asio::redirect_error(boost::asio::use_awaitable, ec)
94 get_server().lock_waiters.pop_front();
96 if (get_server().log_level > 2)
97 log(
"obtained lock after waiting");
100 get_server().locked =
true;
102 refresh_lock_timeout();
104 if (!get_server().client_lock)
106 if (!get_server().writable_journal_client)
108 log(
"error: locking pull-only server");
109 buffer.data[0] =
'R';
112 get_server().client_lock.emplace(*get_server().writable_journal_client);
118 void Server::Session::unlock()
123 if (get_server().log_level > 2)
127 lock_timeout_timer.cancel();
129 if (get_server().lock_waiters.empty())
131 get_server().locked =
false;
132 if (get_server().client.is_shared() && get_server().client_lock)
134 get_server().client_lock->unlock();
135 get_server().client_lock.reset();
139 get_server().lock_waiters.front()->timer.cancel();
144 void Server::Session::refresh_lock_timeout()
147 if (locking && get_server().lock_timeout.count() > 0)
149 lock_timeout_timer.expires_after(get_server().lock_timeout);
150 lock_timeout_timer.async_wait
152 [
this](std::error_code error)
156 if (get_server().log_level > 2)
173 boost::asio::awaitable<void> Server::Session::send(Async_Reader reader)
178 "sending to client, from = " + std::to_string(reader.get_current()) +
179 ", until = " + std::to_string(reader.get_end())
183 buffer.write<int64_t>(reader.get_end());
185 const int64_t display_step = std::max
188 reader.get_remaining() >> 6
190 int64_t next_display = reader.get_remaining() - display_step;
192 while (buffer.index + reader.get_remaining() > 0)
196 get_server().log_level > 3 &&
197 reader.get_remaining() > 0 &&
198 reader.get_remaining() <= next_display
201 next_display -= display_step;
202 log(
"remaining_size = " + std::to_string(reader.get_remaining()));
205 buffer.index += reader.read
207 buffer.data + buffer.index,
208 buffer.size - buffer.index
211 if (reader.is_end_of_file())
212 throw Exception(
"unexpected end of file");
214 refresh_lock_timeout();
215 co_await write_buffer();
221 boost::asio::awaitable<void> Server::Session::check_hash()
224 co_await read_buffer(1, 40);
226 const auto checkpoint = buffer.read<int64_t>();
227 const auto hash = buffer.read<SHA_256::Hash>();
229 const Readonly_Journal &journal = get_server().client.get_journal();
233 checkpoint > journal.get_checkpoint() ||
234 hash != (buffer.data[0] ==
'H'
235 ? Journal_Hasher::get_fast_hash(journal, checkpoint)
236 : Journal_Hasher::get_full_hash(journal, checkpoint))
239 buffer.data[0] =
'h';
242 if (get_server().log_level > 2)
246 "hash for checkpoint = " + std::to_string(checkpoint) +
247 ", result = " + buffer.data[0]
252 co_await write_buffer();
256 boost::asio::awaitable<void> Server::Session::handshake()
259 co_await read_buffer(0, 13);
261 if (buffer.read<std::array<char, 5>>() != Header::joedb)
262 throw Exception(
"handshake does not start by joedb");
264 const int64_t client_version = buffer.read<int64_t>();
265 if (get_server().log_level > 0)
266 log(
"client_version = " + std::to_string(client_version));
268 const int64_t version =
271 const bool writable =
272 get_server().writable_journal_client &&
273 !get_server().client.is_pullonly();
276 buffer.write<int64_t>(version);
277 buffer.write<int64_t>(id);
278 buffer.write<int64_t>(get_server().client.get_journal_checkpoint());
279 buffer.write<
char>(writable ?
'W' :
'R');
281 co_await write_buffer();
285 boost::asio::awaitable<void> Server::Session::read()
288 co_await read_buffer(1, 16);
290 int64_t from = buffer.read<int64_t>();
291 int64_t until = buffer.read<int64_t>();
293 if (until > get_server().client.get_journal_checkpoint())
294 until = get_server().client.get_journal_checkpoint();
300 get_server().client.get_journal().get_async_reader(from, until)
305 boost::asio::awaitable<void> Server::Session::pull
312 co_await read_buffer(1, 16);
314 const std::chrono::milliseconds wait{buffer.read<int64_t>()};
315 int64_t pull_checkpoint = buffer.read<int64_t>();
317 if (!get_server().client_lock)
318 get_server().client.pull();
323 pull_checkpoint == get_server().client.get_journal_checkpoint()
326 if (get_server().log_level > 2)
330 "waiting at checkpoint = " + std::to_string(pull_checkpoint) +
331 " for " + std::to_string(
double(wait.count()) * 0.001) +
's'
335 timer.expires_after(wait);
337 get_server().pull_waiters.emplace_back(
this);
339 boost::system::error_code ec;
340 co_await timer.async_wait
342 boost::asio::redirect_error(boost::asio::use_awaitable, ec)
345 remove_from_queue(get_server().pull_waiters);
352 pull_checkpoint = get_server().client.get_journal_checkpoint();
356 get_server().client.get_journal().get_async_tail_reader(pull_checkpoint)
361 boost::asio::awaitable<void> Server::Session::push
367 co_await read_buffer(1, 16);
369 const int64_t from = buffer.read<int64_t>();
370 const int64_t until = buffer.read<int64_t>();
372 if (get_server().locked && !locking)
373 throw Exception(
"trying to push while someone else is locking");
374 else if (!get_server().locked)
376 if (get_server().log_level > 2)
377 log(
"taking the lock for push attempt.");
381 const bool conflict =
384 from != get_server().client.get_journal().get_checkpoint()
387 push_status = buffer.data[0];
389 if (!get_server().writable_journal_client)
395 if (get_server().client_lock)
399 get_server().client_lock->get_journal().get_async_tail_writer()
404 if (get_server().log_level > 2)
408 "receiving from client, from = " + std::to_string(from) +
409 ", until = " + std::to_string(until)
413 int64_t remaining_size = until - from;
415 while (remaining_size > 0)
417 refresh_lock_timeout();
419 const size_t size =
co_await read_buffer
422 size_t(std::min(remaining_size, int64_t(buffer.size)))
426 push_writer->write(buffer.data, size);
428 remaining_size -= int64_t(size);
429 if (get_server().log_level > 3 && remaining_size > 0)
430 log(
"remaining_size = " + std::to_string(remaining_size));
437 if (get_server().client_lock)
439 get_server().client_lock->get_journal().soft_checkpoint_at
441 push_writer->get_position()
444 if (get_server().client.is_shared() && unlock_after)
446 get_server().client_lock->checkpoint_and_push_unlock();
447 get_server().client_lock.reset();
450 get_server().client_lock->checkpoint_and_push();
455 if (get_server().log_level > 2)
456 log(std::string(
"done pushing, status = ") + push_status);
461 for (
auto *waiter: get_server().pull_waiters)
462 waiter->timer.cancel();
463 get_server().pull_waiters.clear();
465 buffer.data[0] = push_status;
467 co_await write_buffer();
471 boost::asio::awaitable<void> Server::Session::run()
474 co_await handshake();
478 co_await read_buffer(0, 1);
480 const char code = buffer.read<
char>();
482 if (server.get_log_level() > 2)
484 const auto i = request_description.find(code);
485 if (i != request_description.end())
486 log(std::string(
"received request ") + code +
": " + i->second);
488 log(std::string(
"unknown code: ") + code);
494 co_await check_hash();
501 case 'D':
case 'E':
case 'F':
case 'G':
502 co_await pull(code & 1, code & 2);
507 co_await write_buffer();
511 co_await push(code ==
'O');
528 std::string endpoint_path,
530 std::chrono::milliseconds lock_timeout
532 joedb::asio::
Server(logger, log_level, thread_count, std::move(endpoint_path)),
535 lock_timeout(lock_timeout),
540 if (writable_journal_client)
542 if (!client.is_shared())
543 client_lock.emplace(*writable_journal_client);
553 client_lock->unlock();
Handle concurrent access to a file with a joedb::Connection.
void cleanup_after_join() override
int64_t push_if_ahead(int64_t until) override
constexpr int protocol_version
#define JOEDB_RELEASE_ASSERT(x)
always-tested assertion (release and debug mode)