8#define LOG(x) log([&](std::ostream &out){out << x;})
9#define LOGID(x) log([&](std::ostream &out){session->write_id(out) << x;})
11#include <asio/read.hpp>
12#include <asio/write.hpp>
21 return std::chrono::duration_cast<std::chrono::milliseconds>
23 std::chrono::steady_clock::now() - start_time
28 std::ostream &Server::Session::write_id(std::ostream &out)
const
32 out << server.get_time_stamp().count() <<
' ';
35 out << server.endpoint_path <<
'(' <<
id <<
"): ";
41 Server::Session::Session
45 asio::local::stream_protocol::socket &&socket
47 id(++server.session_id),
49 socket(std::move(socket)),
50 state(State::not_locking)
52 server.log([
this](std::ostream &out)
54 write_id(out) <<
"new session created\n";}
56 server.sessions.insert(
this);
57 server.write_status();
61 Server::Session::~Session()
66 server.sessions.erase(
this);
68 if (state == State::locking)
70 server.log([
this](std::ostream &out)
72 write_id(out) <<
"removing lock held by dying session.\n";
78 server.log([
this](std::ostream &out)
80 write_id(out) <<
"deleted\n";
83 server.write_status();
93 void Server::async_read
96 std::shared_ptr<Session> session,
99 Transfer_Handler handler
105 asio::buffer(session->buffer.data + offset, size),
106 [
this, handler, session](std::error_code e,
size_t s)
108 (this->*handler)(session, e, s);
114 void Server::write_status()
117 log([
this](std::ostream &out)
119 out << endpoint_path;
120 out <<
": pid = " << joedb::get_pid();
122 out <<
"; sessions = " << sessions.size();
123 out <<
"; checkpoint = ";
129 void Server::lock_dequeue()
134 if (lock_queue.empty())
138 client_lock->unlock();
145 const std::shared_ptr<Session> session = lock_queue.front();
151 if (!writable_journal_client)
153 LOGID(
"Error: locking pull-only server\n");
154 session->buffer.data[0] =
'R';
157 client_lock.emplace(*writable_journal_client);
160 if (session->state == Session::State::waiting_for_lock_to_pull)
161 start_pulling(session);
163 session->state = Session::State::locking;
164 refresh_lock_timeout(session);
173 const std::shared_ptr<Session> session,
174 const Session::State state
177 if (session->state != Session::State::locking)
179 session->state = state;
180 lock_queue.emplace(session);
184 LOGID(
"Error: locking an already locked session\n");
188 void Server::unlock(Session &session)
191 if (session.state == Session::State::locking)
193 log([&session](std::ostream &out)
195 session.write_id(out) <<
"unlocking\n";
197 session.state = Session::State::not_locking;
199 lock_timeout_timer.cancel();
206 void Server::lock_timeout_handler
209 const std::shared_ptr<Session> session,
210 const std::error_code error
217 if (session->push_writer)
219 session->push_writer.reset();
220 session->push_status =
't';
228 void Server::refresh_lock_timeout(
const std::shared_ptr<Session> session)
231 if (lock_timeout.count() > 0 && session->state == Session::State::locking)
233 lock_timeout_timer.expires_after(lock_timeout);
234 lock_timeout_timer.async_wait
236 [
this, session](std::error_code e){lock_timeout_handler(session, e);}
242 void Server::push_transfer_handler
245 const std::shared_ptr<Session> session,
246 const std::error_code error,
247 const size_t bytes_transferred
252 if (session->push_writer)
253 session->push_writer->write(session->buffer.data, bytes_transferred);
255 session->push_remaining_size -= int64_t(bytes_transferred);
256 session->progress_bar->print_remaining(session->push_remaining_size);
258 push_transfer(session);
263 void Server::push_transfer(
const std::shared_ptr<Session> session)
266 if (session->push_remaining_size > 0)
268 refresh_lock_timeout(session);
274 size_t(std::min(session->push_remaining_size, session->buffer.ssize)),
275 &Server::push_transfer_handler
280 if (session->push_writer)
284 client_lock->get_journal().soft_checkpoint_at
286 session->push_writer->get_position()
290 if (client.
is_shared() && session->unlock_after_push)
292 client_lock->checkpoint_and_push_unlock();
296 client_lock->checkpoint_and_push();
299 session->push_writer.reset();
302 session->buffer.data[0] = session->push_status;
304 session->progress_bar.reset();
305 LOGID(
"Returning '" << session->push_status <<
"'\n");
307 write_buffer_and_next_command(session, 1);
309 if (session->unlock_after_push)
312 for (
auto *other_session: sessions)
316 other_session->state == Session::State::waiting_for_push_to_pull &&
320 other_session->state = Session::State::not_locking;
321 start_pulling(other_session->shared_from_this());
328 void Server::push_handler
331 const std::shared_ptr<Session> session,
332 const std::error_code error,
333 const size_t bytes_transferred
338 session->buffer.index = 0;
339 const int64_t from = session->buffer.read<int64_t>();
340 const int64_t until = session->buffer.read<int64_t>();
342 if (locked && session->state != Session::State::locking)
344 LOGID(
"trying to push while someone else is locking\n");
348 LOGID(
"Taking the lock for push attempt.\n");
349 lock(session, Session::State::waiting_for_lock_to_push);
352 const bool conflict =
354 session->state != Session::State::locking ||
355 from != client.get_journal().get_checkpoint()
358 LOGID(
"pushing, from = " << from <<
", until = " << until);
359 session->progress_bar.emplace(until - from, log_pointer);
361 if (!writable_journal_client)
362 session->push_status =
'R';
364 session->push_status =
'C';
369 session->push_writer.emplace
371 client_lock->get_journal().get_async_tail_writer()
376 session->push_remaining_size = until - from;
378 push_transfer(session);
383 void Server::read_transfer_handler
386 const std::shared_ptr<Session> session,
388 const std::error_code error,
389 const size_t bytes_transferred,
395 session->progress_bar->print_remaining(reader.get_remaining());
397 if (offset + reader.get_remaining() > 0)
399 const size_t size = reader.read
401 session->buffer.data + offset,
402 session->buffer.size - offset
405 if (reader.is_end_of_file())
406 LOG(
"error: unexpected end of file\n");
409 refresh_lock_timeout(session);
414 asio::buffer(session->buffer.data, size + offset),
415 [
this, session, reader](std::error_code e,
size_t s)
417 read_transfer_handler(session, reader, e, s, 0);
424 session->pull_timer.reset();
425 session->progress_bar.reset();
426 read_command(session);
432 void Server::start_reading
435 std::shared_ptr<Session> session,
439 LOGID(
"reading from = " << reader.get_current() <<
", until = " << reader.get_end());
440 session->progress_bar.emplace(reader.get_remaining(), log_pointer);
442 session->buffer.index = 1;
443 session->buffer.write<int64_t>(reader.get_end());
445 read_transfer_handler
451 session->buffer.index
456 void Server::start_pulling(std::shared_ptr<Session> session)
461 session->lock_before_pulling &&
462 session->state != Session::State::waiting_for_lock_to_pull
465 lock(session, Session::State::waiting_for_lock_to_pull);
469 if (!session->send_pull_data)
480 void Server::pull_handler
483 const std::shared_ptr<Session> session,
484 const std::error_code error,
485 const size_t bytes_transferred
490 session->buffer.index = 1;
491 const std::chrono::milliseconds wait{session->buffer.read<int64_t>()};
492 session->pull_checkpoint = session->buffer.read<int64_t>();
500 session->pull_checkpoint == client.get_journal_checkpoint()
505 "waiting at checkpoint = " << session->pull_checkpoint <<
506 " for " << wait.count() <<
" milliseconds\n"
509 session->state = Session::State::waiting_for_push_to_pull;
510 session->pull_timer.emplace(io_context);
511 session->pull_timer->expires_after(wait);
512 session->pull_timer->async_wait
514 [
this, session](std::error_code timer_error)
518 if (session->state == Session::State::waiting_for_push_to_pull)
520 session->state = Session::State::not_locking;
521 start_pulling(session);
528 start_pulling(session);
533 void Server::read_handler
536 std::shared_ptr<Session> session,
537 std::error_code error,
538 size_t bytes_transferred
543 session->buffer.index = 1;
544 int64_t from = session->buffer.read<int64_t>();
545 int64_t until = session->buffer.read<int64_t>();
547 if (until > client.get_journal_checkpoint())
548 until = client.get_journal_checkpoint();
555 client.get_journal().get_async_reader(from, until)
561 void Server::check_hash_handler
564 const std::shared_ptr<Session> session,
565 const std::error_code error,
566 const size_t bytes_transferred
571 session->buffer.index = 1;
572 const auto checkpoint = session->buffer.read<int64_t>();
575 const Readonly_Journal &readonly_journal = client.get_journal();
579 checkpoint > readonly_journal.get_checkpoint() ||
583 session->buffer.data[0] =
'h';
586 LOGID(
"hash for checkpoint = " << checkpoint <<
", result = "
587 << session->buffer.data[0] <<
'\n');
589 write_buffer_and_next_command(session, 1);
594 void Server::read_command_handler
597 const std::shared_ptr<Session> session,
598 const std::error_code error,
599 const size_t bytes_transferred
604 LOGID(session->buffer.data[0] <<
'\n');
606 switch (session->buffer.data[0])
609 async_read(session, 1, 40, &Server::check_hash_handler);
613 async_read(session, 1, 16, &Server::read_handler);
616 case 'D':
case 'E':
case 'F':
case 'G':
617 session->lock_before_pulling = session->buffer.data[0] & 1;
618 session->send_pull_data = session->buffer.data[0] & 2;
619 async_read(session, 1, 16, &Server::pull_handler);
624 write_buffer_and_next_command(session, 1);
628 session->unlock_after_push = (session->buffer.data[0] ==
'O');
629 session->push_status = session->buffer.data[0];
630 async_read(session, 0, 16, &Server::push_handler);
634 LOGID(
"unexpected command\n");
641 void Server::read_command(
const std::shared_ptr<Session> session)
644 async_read(session, 0, 1, &Server::read_command_handler);
648 void Server::write_buffer_and_next_command
651 const std::shared_ptr<Session> session,
658 asio::buffer(session->buffer.data, size),
659 [
this, session](std::error_code e,
size_t s)
662 read_command(session);
668 void Server::handshake_handler
671 const std::shared_ptr<Session> session,
672 const std::error_code error,
673 const size_t bytes_transferred
678 session->buffer.index = 0;
680 if (session->buffer.read<std::array<char, 5>>() ==
Header::joedb)
682 const int64_t client_version = session->buffer.read<int64_t>();
683 LOGID(
"client_version = " << client_version <<
'\n');
685 session->buffer.index = 5;
687 session->buffer.write<int64_t>(session->id);
688 session->buffer.write<int64_t>(client.get_journal_checkpoint());
689 session->buffer.write<
char>
691 (writable_journal_client && !client.is_pullonly()) ?
'W' :
'R'
694 write_buffer_and_next_command(session, session->buffer.index);
698 LOGID(
"bad handshake\n");
703 void Server::start_accept()
708 acceptor.async_accept
711 [
this](std::error_code error, asio::local::stream_protocol::socket socket)
713 if (!error && !stopped)
715 std::shared_ptr<Session> session(
new Session(*
this, std::move(socket)));
716 async_read(session, 0, 13, &Server::handshake_handler);
730 asio::io_context &io_context,
731 std::string endpoint_path,
732 const std::chrono::milliseconds lock_timeout,
733 std::ostream *
const log_pointer
735 start_time(std::chrono::steady_clock::now()),
738 io_context(io_context),
739 endpoint_path(std::move(endpoint_path)),
740 endpoint(this->endpoint_path),
741 acceptor(io_context, endpoint, false),
743 interrupt_signals(io_context, SIGINT, SIGTERM),
745 lock_timeout(lock_timeout),
746 lock_timeout_timer(io_context),
748 log_pointer(log_pointer)
750 if (writable_journal_client)
764 if (!client.
is_shared() && writable_journal_client)
765 client_lock.emplace(*writable_journal_client);
767 interrupt_signals.async_wait([
this](
const asio::error_code &error,
int)
779 ": start. lock_timeout = " << lock_timeout.count() <<
781 "; shared = " << client.
is_shared() <<
'\n'
792 interrupt_signals.cancel();
804 for (Session *session: sessions)
806 session->socket.close();
807 session->pull_timer.reset();
812 client_lock->unlock();
826 if (!sessions.empty())
827 LOG(
"Destroying server before sessions.\n");
828 std::remove(endpoint_path.c_str());
Handle concurrent access to a file with a joedb::Connection.
int64_t get_journal_checkpoint() const
const Readonly_Journal & get_journal() const
static SHA_256::Hash get_hash(const Readonly_Journal &journal, int64_t checkpoint)
Async_Reader get_async_tail_reader(int64_t start_position) const
std::array< uint32_t, 8 > Hash
const std::string & get_endpoint_path() const
void stop_after_sessions()
std::chrono::milliseconds get_time_stamp() const
Server(Client &client, asio::io_context &io_context, std::string endpoint_path, std::chrono::milliseconds lock_timeout, std::ostream *log_pointer)
int64_t push_if_ahead() override
constexpr int protocol_version
std::string get_time_string_of_now()