7#include <boost/asio/redirect_error.hpp>
8#include <boost/asio/use_awaitable.hpp>
16 const std::map<char, const char *> Server::request_description
19 {
'H',
"get fast SHA-256 hash code"},
20 {
'I',
"get full SHA-256 hash code"},
21 {
'r',
"read a range of bytes"},
22 {
'D',
"get checkpoint"},
34 Server::Session::Session
38 boost::asio::local::stream_protocol::socket &&socket
40 joedb::asio::Server::Session(server, std::move(socket)),
42 lock_timeout_timer(strand)
47 void Server::Session::remove_from_queue(std::deque<Session *> &queue)
50 const auto it = std::find(queue.begin(), queue.end(),
this);
51 if (it != queue.end())
56 void Server::Session::cleanup()
62 remove_from_queue(get_server().lock_waiters);
63 remove_from_queue(get_server().pull_waiters);
67 boost::asio::awaitable<void> Server::Session::lock()
71 throw Exception(
"error: locking an already locked session");
74 if (!get_server().locked)
76 if (get_server().log_level > 2)
77 log(
"obtained lock immediately");
81 if (get_server().log_level > 2)
82 log(
"waiting for lock");
85 timer.expires_after(boost::asio::steady_timer::duration::max());
87 get_server().lock_waiters.emplace_back(
this);
89 boost::system::error_code ec;
90 co_await timer.async_wait
92 boost::asio::redirect_error(boost::asio::use_awaitable, ec)
95 get_server().lock_waiters.pop_front();
97 if (get_server().log_level > 2)
98 log(
"obtained lock after waiting");
101 get_server().locked =
true;
103 refresh_lock_timeout();
105 if (!get_server().client_lock)
107 if (!get_server().writable_journal_client)
109 log(
"error: locking pull-only server");
110 buffer.data[0] =
'R';
113 get_server().client_lock.emplace(*get_server().writable_journal_client);
119 void Server::Session::unlock()
124 if (get_server().log_level > 2)
128 lock_timeout_timer.cancel();
130 if (get_server().lock_waiters.empty())
132 get_server().locked =
false;
133 if (get_server().client.is_shared() && get_server().client_lock)
135 get_server().client_lock->unlock();
136 get_server().client_lock.reset();
140 get_server().lock_waiters.front()->timer.cancel();
145 void Server::Session::refresh_lock_timeout()
148 if (locking && get_server().lock_timeout.count() > 0)
150 lock_timeout_timer.expires_after(get_server().lock_timeout);
151 lock_timeout_timer.async_wait
153 [
this](std::error_code error)
157 if (get_server().log_level > 2)
174 boost::asio::awaitable<void> Server::Session::send(Async_Reader reader)
179 "sending to client, from = " + std::to_string(reader.get_current()) +
180 ", until = " + std::to_string(reader.get_end())
184 buffer.write<int64_t>(reader.get_end());
185 Progress_Bar progress_bar(reader.get_remaining(), *
this);
187 while (buffer.index + reader.get_remaining() > 0)
189 buffer.index += reader.read
191 buffer.data + buffer.index,
192 buffer.size - buffer.index
195 if (reader.is_end_of_file())
196 throw Exception(
"unexpected end of file");
198 refresh_lock_timeout();
199 co_await write_buffer();
202 if (get_server().log_level > 3)
203 progress_bar.print_remaining(reader.get_remaining());
208 boost::asio::awaitable<void> Server::Session::check_hash()
211 co_await read_buffer(1, 40);
213 const auto checkpoint = buffer.read<int64_t>();
214 const auto hash = buffer.read<SHA_256::Hash>();
216 const Readonly_Journal &journal = get_server().client.get_journal();
220 checkpoint > journal.get_checkpoint() ||
221 hash != (buffer.data[0] ==
'H'
222 ? Journal_Hasher::get_fast_hash(journal, checkpoint)
223 : Journal_Hasher::get_full_hash(journal, checkpoint))
226 buffer.data[0] =
'h';
229 if (get_server().log_level > 2)
233 "hash for checkpoint = " + std::to_string(checkpoint) +
234 ", result = " + buffer.data[0]
239 co_await write_buffer();
243 boost::asio::awaitable<void> Server::Session::handshake()
246 co_await read_buffer(0, 13);
248 if (buffer.read<std::array<char, 5>>() != Header::joedb)
249 throw Exception(
"handshake does not start by joedb");
251 const int64_t client_version = buffer.read<int64_t>();
252 if (get_server().log_level > 0)
253 log(
"client_version = " + std::to_string(client_version));
255 const int64_t version =
258 const bool writable =
259 get_server().writable_journal_client &&
260 !get_server().client.is_pullonly();
263 buffer.write<int64_t>(version);
264 buffer.write<int64_t>(id);
265 buffer.write<int64_t>(get_server().client.get_journal_checkpoint());
266 buffer.write<
char>(writable ?
'W' :
'R');
268 co_await write_buffer();
272 boost::asio::awaitable<void> Server::Session::read()
275 co_await read_buffer(1, 16);
277 int64_t from = buffer.read<int64_t>();
278 int64_t until = buffer.read<int64_t>();
280 if (until > get_server().client.get_journal_checkpoint())
281 until = get_server().client.get_journal_checkpoint();
287 get_server().client.get_journal().get_async_reader(from, until)
292 boost::asio::awaitable<void> Server::Session::pull
299 co_await read_buffer(1, 16);
301 const std::chrono::milliseconds wait{buffer.read<int64_t>()};
302 int64_t pull_checkpoint = buffer.read<int64_t>();
304 if (!get_server().client_lock)
305 get_server().client.pull();
310 pull_checkpoint == get_server().client.get_journal_checkpoint()
313 if (get_server().log_level > 2)
317 "waiting at checkpoint = " + std::to_string(pull_checkpoint) +
318 " for " + std::to_string(
double(wait.count()) * 0.001) +
's'
322 timer.expires_after(wait);
324 get_server().pull_waiters.emplace_back(
this);
326 boost::system::error_code ec;
327 co_await timer.async_wait
329 boost::asio::redirect_error(boost::asio::use_awaitable, ec)
332 remove_from_queue(get_server().pull_waiters);
339 pull_checkpoint = get_server().client.get_journal_checkpoint();
343 get_server().client.get_journal().get_async_tail_reader(pull_checkpoint)
348 boost::asio::awaitable<void> Server::Session::push
354 co_await read_buffer(1, 16);
356 const int64_t from = buffer.read<int64_t>();
357 const int64_t until = buffer.read<int64_t>();
359 if (get_server().locked && !locking)
360 throw Exception(
"trying to push while someone else is locking");
361 else if (!get_server().locked)
363 if (get_server().log_level > 2)
364 log(
"taking the lock for push attempt.");
368 const bool conflict =
371 from != get_server().client.get_journal().get_checkpoint()
374 push_status = buffer.data[0];
376 if (!get_server().writable_journal_client)
382 if (get_server().client_lock)
386 get_server().client_lock->get_journal().get_async_tail_writer()
391 if (get_server().log_level > 2)
395 "receiving from client, from = " + std::to_string(from) +
396 ", until = " + std::to_string(until)
400 int64_t remaining_size = until - from;
401 Progress_Bar progress_bar(remaining_size, *
this);
403 while (remaining_size > 0)
405 refresh_lock_timeout();
407 const size_t size =
co_await read_buffer
410 size_t(std::min(remaining_size, int64_t(buffer.size)))
414 push_writer->write(buffer.data, size);
416 remaining_size -= int64_t(size);
417 if (get_server().log_level > 3)
418 progress_bar.print_remaining(remaining_size);
425 if (get_server().client_lock)
427 get_server().client_lock->get_journal().soft_checkpoint_at
429 push_writer->get_position()
432 if (get_server().client.is_shared() && unlock_after)
434 get_server().client_lock->checkpoint_and_push_unlock();
435 get_server().client_lock.reset();
438 get_server().client_lock->checkpoint_and_push();
443 if (get_server().log_level > 2)
444 log(std::string(
"done pushing, status = ") + push_status);
449 for (
auto *waiter: get_server().pull_waiters)
450 waiter->timer.cancel();
451 get_server().pull_waiters.clear();
453 buffer.data[0] = push_status;
455 co_await write_buffer();
459 boost::asio::awaitable<void> Server::Session::run()
462 co_await handshake();
466 co_await read_buffer(0, 1);
468 const char code = buffer.read<
char>();
470 if (server.get_log_level() > 2)
472 const auto i = request_description.find(code);
473 if (i != request_description.end())
474 log(std::string(
"received request ") + code +
": " + i->second);
476 log(std::string(
"unknown code: ") + code);
482 co_await check_hash();
489 case 'D':
case 'E':
case 'F':
case 'G':
490 co_await pull(code & 1, code & 2);
495 co_await write_buffer();
499 co_await push(code ==
'O');
516 std::string endpoint_path,
518 std::chrono::milliseconds lock_timeout
520 joedb::asio::
Server(logger, log_level, thread_count, std::move(endpoint_path)),
523 lock_timeout(lock_timeout),
528 if (writable_journal_client)
530 if (!client.is_shared())
531 client_lock.emplace(*writable_journal_client);
541 client_lock->unlock();
Handle concurrent access to a file with a joedb::Connection.
void cleanup_after_join() override
int64_t push_if_ahead(int64_t until) override
constexpr int protocol_version
#define JOEDB_RELEASE_ASSERT(x)
always-tested assertion (release and debug mode)