8#define LOG(x) log([&](std::ostream &out){out << x;})
9#define LOGID(x) log([&](std::ostream &out){session->write_id(out) << x;})
11#include <asio/read.hpp>
12#include <asio/write.hpp>
18 const std::map<char, const char *> Server::request_description
21 {
'H',
"get fast SHA-256 hash code"},
22 {
'I',
"get full SHA-256 hash code"},
23 {
'r',
"read a range of bytes"},
24 {
'D',
"get checkpoint"},
38 return std::chrono::duration_cast<std::chrono::milliseconds>
40 std::chrono::steady_clock::now() - start_time
45 std::ostream &Server::Session::write_id(std::ostream &out)
const
49 out << server.get_time_stamp().count() <<
' ';
52 out << server.endpoint_path <<
'('
53 << server.client.get_journal_checkpoint() <<
"): " <<
id <<
": ";
59 Server::Session::Session
63 asio::local::stream_protocol::socket &&socket
65 id(++server.session_id),
67 socket(std::move(socket)),
68 state(State::not_locking)
70 server.log([
this](std::ostream &out)
72 write_id(out) <<
"new session created\n";}
74 server.sessions.insert(
this);
75 server.write_status();
79 Server::Session::~Session()
84 server.sessions.erase(
this);
86 if (state == State::locking)
88 server.log([
this](std::ostream &out)
90 write_id(out) <<
"removing lock held by dying session.\n";
96 server.log([
this](std::ostream &out)
98 write_id(out) <<
"deleted\n";
101 server.write_status();
111 void Server::async_read
114 std::shared_ptr<Session> session,
117 Transfer_Handler handler
123 asio::buffer(session->buffer.data + offset, size),
124 [
this, handler, session](std::error_code e,
size_t s)
126 (this->*handler)(session, e, s);
132 void Server::write_status()
135 log([
this](std::ostream &out)
138 out <<
"): pid = " << joedb::get_pid();
140 out <<
"; sessions = " << sessions.size() <<
'\n';
145 void Server::lock_dequeue()
150 if (lock_queue.empty())
154 client_lock->unlock();
161 const std::shared_ptr<Session> session = lock_queue.front();
167 if (!writable_journal_client)
169 LOGID(
"error: locking pull-only server\n");
170 session->buffer.data[0] =
'R';
173 client_lock.emplace(*writable_journal_client);
176 if (session->state == Session::State::waiting_for_lock_to_pull)
177 start_pulling(session);
179 session->state = Session::State::locking;
180 refresh_lock_timeout(session);
189 const std::shared_ptr<Session> session,
190 const Session::State state
193 if (session->state != Session::State::locking)
195 session->state = state;
196 lock_queue.emplace(session);
200 LOGID(
"error: locking an already locked session\n");
204 void Server::unlock(Session &session)
207 if (session.state == Session::State::locking)
209 log([&session](std::ostream &out)
211 session.write_id(out) <<
"unlocking\n";
213 session.state = Session::State::not_locking;
215 lock_timeout_timer.cancel();
222 void Server::lock_timeout_handler
225 const std::shared_ptr<Session> session,
226 const std::error_code error
233 if (session->push_writer)
235 session->push_writer.reset();
236 session->push_status =
't';
244 void Server::refresh_lock_timeout(
const std::shared_ptr<Session> session)
247 if (lock_timeout.count() > 0 && session->state == Session::State::locking)
249 lock_timeout_timer.expires_after(lock_timeout);
250 lock_timeout_timer.async_wait
252 [
this, session](std::error_code e){lock_timeout_handler(session, e);}
258 void Server::push_transfer_handler
261 const std::shared_ptr<Session> session,
262 const std::error_code error,
263 const size_t bytes_transferred
268 if (session->push_writer)
269 session->push_writer->write(session->buffer.data, bytes_transferred);
271 session->push_remaining_size -= int64_t(bytes_transferred);
272 session->progress_bar->print_remaining(session->push_remaining_size);
274 push_transfer(session);
279 void Server::push_transfer(
const std::shared_ptr<Session> session)
282 if (session->push_remaining_size > 0)
284 refresh_lock_timeout(session);
290 size_t(std::min(session->push_remaining_size, int64_t(session->buffer.size))),
291 &Server::push_transfer_handler
296 if (session->push_writer)
300 client_lock->get_journal().soft_checkpoint_at
302 session->push_writer->get_position()
306 if (client.
is_shared() && session->unlock_after_push)
308 client_lock->checkpoint_and_push_unlock();
312 client_lock->checkpoint_and_push();
315 session->push_writer.reset();
318 session->progress_bar.reset();
320 session->buffer.data[0] = session->push_status;
321 LOGID(
"returning " << session->push_status <<
'\n');
322 write_buffer_and_next_command(session, 1);
324 if (session->unlock_after_push)
327 for (
auto *other_session: sessions)
331 other_session->state == Session::State::waiting_for_push_to_pull &&
335 other_session->state = Session::State::not_locking;
336 start_pulling(other_session->shared_from_this());
343 void Server::push_handler
346 const std::shared_ptr<Session> session,
347 const std::error_code error,
348 const size_t bytes_transferred
353 session->buffer.index = 0;
354 const int64_t from = session->buffer.read<int64_t>();
355 const int64_t until = session->buffer.read<int64_t>();
357 if (locked && session->state != Session::State::locking)
359 LOGID(
"trying to push while someone else is locking\n");
363 LOGID(
"taking the lock for push attempt.\n");
364 lock(session, Session::State::waiting_for_lock_to_push);
367 const bool conflict =
369 session->state != Session::State::locking ||
370 from != client.get_journal().get_checkpoint()
373 LOGID(
"pushing, from = " << from <<
", until = " << until);
374 session->progress_bar.emplace(until - from, log_pointer);
376 if (!writable_journal_client)
377 session->push_status =
'R';
379 session->push_status =
'C';
384 session->push_writer.emplace
386 client_lock->get_journal().get_async_tail_writer()
391 session->push_remaining_size = until - from;
393 push_transfer(session);
398 void Server::read_transfer_handler
401 const std::shared_ptr<Session> session,
403 const std::error_code error,
404 const size_t bytes_transferred,
410 session->progress_bar->print_remaining(reader.get_remaining());
412 if (offset + reader.get_remaining() > 0)
414 const size_t size = reader.read
416 session->buffer.data + offset,
417 session->buffer.size - offset
420 if (reader.is_end_of_file())
421 LOG(
"error: unexpected end of file\n");
424 refresh_lock_timeout(session);
429 asio::buffer(session->buffer.data, size + offset),
430 [
this, session, reader](std::error_code e,
size_t s)
432 read_transfer_handler(session, reader, e, s, 0);
439 session->pull_timer.reset();
440 session->progress_bar.reset();
441 read_command(session);
447 void Server::start_reading
450 std::shared_ptr<Session> session,
454 LOGID(
"reading from = " << reader.get_current() <<
", until = " << reader.get_end());
455 session->progress_bar.emplace(reader.get_remaining(), log_pointer);
457 session->buffer.index = 1;
458 session->buffer.write<int64_t>(reader.get_end());
460 read_transfer_handler
466 session->buffer.index
471 void Server::start_pulling(std::shared_ptr<Session> session)
476 session->lock_before_pulling &&
477 session->state != Session::State::waiting_for_lock_to_pull
480 lock(session, Session::State::waiting_for_lock_to_pull);
484 if (!session->send_pull_data)
495 void Server::pull_handler
498 const std::shared_ptr<Session> session,
499 const std::error_code error,
500 const size_t bytes_transferred
505 session->buffer.index = 1;
506 const std::chrono::milliseconds wait{session->buffer.read<int64_t>()};
507 session->pull_checkpoint = session->buffer.read<int64_t>();
515 session->pull_checkpoint == client.get_journal_checkpoint()
520 "waiting at checkpoint = " << session->pull_checkpoint <<
521 " for " <<
double(wait.count()) * 0.001 <<
"s\n"
524 session->state = Session::State::waiting_for_push_to_pull;
525 session->pull_timer.emplace(io_context);
526 session->pull_timer->expires_after(wait);
527 session->pull_timer->async_wait
529 [
this, session](std::error_code timer_error)
533 if (session->state == Session::State::waiting_for_push_to_pull)
535 session->state = Session::State::not_locking;
536 start_pulling(session);
543 start_pulling(session);
548 void Server::read_handler
551 std::shared_ptr<Session> session,
552 std::error_code error,
553 size_t bytes_transferred
558 session->buffer.index = 1;
559 int64_t from = session->buffer.read<int64_t>();
560 int64_t until = session->buffer.read<int64_t>();
562 if (until > client.get_journal_checkpoint())
563 until = client.get_journal_checkpoint();
570 client.get_journal().get_async_reader(from, until)
576 void Server::check_hash_handler
579 const std::shared_ptr<Session> session,
580 const std::error_code error,
581 const size_t bytes_transferred
586 session->buffer.index = 1;
587 const auto checkpoint = session->buffer.read<int64_t>();
590 const Readonly_Journal &journal = client.get_journal();
594 checkpoint > journal.get_checkpoint() ||
595 hash != (session->buffer.data[0] ==
'H'
597 : Journal_Hasher::get_full_hash(journal, checkpoint))
600 session->buffer.data[0] =
'h';
603 LOGID(
"hash for checkpoint = " << checkpoint <<
", result = "
604 << session->buffer.data[0] <<
'\n');
606 write_buffer_and_next_command(session, 1);
611 void Server::read_command_handler
614 const std::shared_ptr<Session> session,
615 const std::error_code error,
616 const size_t bytes_transferred
621 const char code = session->buffer.data[0];
625 const auto i = request_description.find(code);
626 if (i != request_description.end())
627 LOGID(
"received request " << code <<
": " << i->second <<
'\n');
629 LOGID(
"unknown code: " << code <<
'\n');
635 async_read(session, 1, 40, &Server::check_hash_handler);
639 async_read(session, 1, 16, &Server::read_handler);
642 case 'D':
case 'E':
case 'F':
case 'G':
643 session->lock_before_pulling = code & 1;
644 session->send_pull_data = code & 2;
645 async_read(session, 1, 16, &Server::pull_handler);
650 write_buffer_and_next_command(session, 1);
654 session->unlock_after_push = (code ==
'O');
655 session->push_status = code;
656 async_read(session, 0, 16, &Server::push_handler);
666 void Server::read_command(
const std::shared_ptr<Session> session)
669 async_read(session, 0, 1, &Server::read_command_handler);
673 void Server::write_buffer_and_next_command
676 const std::shared_ptr<Session> session,
683 asio::buffer(session->buffer.data, size),
684 [
this, session](std::error_code e,
size_t s)
687 read_command(session);
693 void Server::handshake_handler
696 const std::shared_ptr<Session> session,
697 const std::error_code error,
698 const size_t bytes_transferred
703 session->buffer.index = 0;
705 if (session->buffer.read<std::array<char, 5>>() ==
Header::joedb)
707 const int64_t client_version = session->buffer.read<int64_t>();
708 LOGID(
"client_version = " << client_version <<
'\n');
710 session->buffer.index = 5;
712 session->buffer.write<int64_t>(session->id);
713 session->buffer.write<int64_t>(client.get_journal_checkpoint());
714 session->buffer.write<
char>
716 (writable_journal_client && !client.is_pullonly()) ?
'W' :
'R'
719 write_buffer_and_next_command(session, session->buffer.index);
723 LOGID(
"bad handshake\n");
728 void Server::start_accept()
733 acceptor.async_accept
736 [
this](std::error_code error, asio::local::stream_protocol::socket socket)
738 if (!error && !stopped)
740 std::shared_ptr<Session> session(
new Session(*
this, std::move(socket)));
741 async_read(session, 0, 13, &Server::handshake_handler);
755 asio::io_context &io_context,
756 std::string endpoint_path,
757 const std::chrono::milliseconds lock_timeout,
758 std::ostream *
const log_pointer
760 start_time(std::chrono::steady_clock::now()),
763 io_context(io_context),
764 endpoint_path(std::move(endpoint_path)),
765 endpoint(this->endpoint_path),
766 acceptor(io_context, endpoint, false),
768 interrupt_signals(io_context, SIGINT, SIGTERM),
770 lock_timeout(lock_timeout),
771 lock_timeout_timer(io_context),
773 log_pointer(log_pointer)
775 if (writable_journal_client)
789 if (!client.
is_shared() && writable_journal_client)
790 client_lock.emplace(*writable_journal_client);
792 interrupt_signals.async_wait([
this](
const asio::error_code &error,
int)
804 ": start. lock_timeout = " <<
double(lock_timeout.count()) * 0.001 <<
806 "; shared = " << client.
is_shared() <<
'\n'
817 interrupt_signals.cancel();
829 for (Session *session: sessions)
831 session->socket.close();
832 session->pull_timer.reset();
837 client_lock->unlock();
851 if (!sessions.empty())
852 LOG(
"Destroying server before sessions.\n");
853 std::remove(endpoint_path.c_str());
Handle concurrent access to a file with a joedb::Connection.
int64_t get_journal_checkpoint() const
const Readonly_Journal & get_journal() const
static SHA_256::Hash get_fast_hash(const Readonly_Journal &journal, int64_t checkpoint)
Async_Reader get_async_tail_reader(int64_t start_position) const
std::array< uint32_t, 8 > Hash
const std::string & get_endpoint_path() const
void stop_after_sessions()
std::chrono::milliseconds get_time_stamp() const
Server(Client &client, asio::io_context &io_context, std::string endpoint_path, std::chrono::milliseconds lock_timeout, std::ostream *log_pointer)
int64_t push_if_ahead() override
constexpr int protocol_version
std::string get_time_string_of_now()