8#define LOG(x) log([&](std::ostream &out){out << x;}) 
    9#define LOGID(x) log([&](std::ostream &out){session->write_id(out) << x;}) 
   11#include <asio/read.hpp> 
   12#include <asio/write.hpp> 
   18 const std::map<char, const char *> Server::request_description
 
   21  {
'H', 
"get fast SHA-256 hash code"},
 
   22  {
'I', 
"get full SHA-256 hash code"},
 
   23  {
'r', 
"read a range of bytes"},
 
   24  {
'D', 
"get checkpoint"},
 
   38  return std::chrono::duration_cast<std::chrono::milliseconds>
 
   40   std::chrono::steady_clock::now() - start_time
 
 
   45 std::ostream &Server::Session::write_id(std::ostream &out) 
const 
   49  out << server.get_time_stamp().count() << 
' ';
 
   52  out << server.endpoint_path << 
'(' 
   53      << server.client.get_journal_checkpoint() << 
"): " << 
id << 
": ";
 
   59 Server::Session::Session
 
   63  asio::local::stream_protocol::socket &&socket
 
   65  id(++server.session_id),
 
   67  socket(std::move(socket)),
 
   68  state(State::not_locking)
 
   70  server.log([
this](std::ostream &out)
 
   72   write_id(out) << 
"new session created\n";}
 
   74  server.sessions.insert(
this);
 
   75  server.write_status();
 
   79 Server::Session::~Session()
 
   84   server.sessions.erase(
this);
 
   86   if (state == State::locking)
 
   88    server.log([
this](std::ostream &out)
 
   90     write_id(out) << 
"removing lock held by dying session.\n";
 
   96   server.log([
this](std::ostream &out)
 
   98    write_id(out) << 
"deleted\n";
 
  101   server.write_status();
 
  111 void Server::async_read
 
  114  std::shared_ptr<Session> session,
 
  117  Transfer_Handler handler
 
  123   asio::buffer(session->buffer.data + offset, size),
 
  124   [
this, handler, session](std::error_code e, 
size_t s)
 
  126    (this->*handler)(session, e, s);
 
  132 void Server::write_status()
 
  135  log([
this](std::ostream &out)
 
  138   out << 
"): pid = " << joedb::get_pid();
 
  140   out << 
"; sessions = " << sessions.size() << 
'\n';
 
  145 void Server::lock_dequeue()
 
  150   if (lock_queue.empty())
 
  154     client_lock->unlock();
 
  161    const std::shared_ptr<Session> session = lock_queue.front();
 
  167     if (!writable_journal_client)
 
  169      LOGID(
"error: locking pull-only server\n");
 
  170      session->buffer.data[0] = 
'R';
 
  173      client_lock.emplace(*writable_journal_client); 
 
  176    if (session->state == Session::State::waiting_for_lock_to_pull)
 
  177     start_pulling(session);
 
  179    session->state = Session::State::locking;
 
  180    refresh_lock_timeout(session);
 
  189  const std::shared_ptr<Session> session,
 
  190  const Session::State state
 
  193  if (session->state != Session::State::locking)
 
  195   session->state = state;
 
  196   lock_queue.emplace(session);
 
  200   LOGID(
"error: locking an already locked session\n");
 
  204 void Server::unlock(Session &session)
 
  207  if (session.state == Session::State::locking)
 
  209   log([&session](std::ostream &out)
 
  211    session.write_id(out) << 
"unlocking\n";
 
  213   session.state = Session::State::not_locking;
 
  215   lock_timeout_timer.cancel();
 
  222 void Server::lock_timeout_handler
 
  225  const std::shared_ptr<Session> session,
 
  226  const std::error_code error
 
  233   if (session->push_writer)
 
  235    session->push_writer.reset();
 
  236    session->push_status = 
't';
 
  244 void Server::refresh_lock_timeout(
const std::shared_ptr<Session> session)
 
  247  if (lock_timeout.count() > 0 && session->state == Session::State::locking)
 
  249   lock_timeout_timer.expires_after(lock_timeout);
 
  250   lock_timeout_timer.async_wait
 
  252    [
this, session](std::error_code e){lock_timeout_handler(session, e);}
 
  258 void Server::push_transfer_handler
 
  261  const std::shared_ptr<Session> session,
 
  262  const std::error_code error,
 
  263  const size_t bytes_transferred
 
  268   if (session->push_writer)
 
  269    session->push_writer->write(session->buffer.data, bytes_transferred); 
 
  271   session->push_remaining_size -= int64_t(bytes_transferred);
 
  272   session->progress_bar->print_remaining(session->push_remaining_size);
 
  274   push_transfer(session);
 
  279 void Server::push_transfer(
const std::shared_ptr<Session> session)
 
  282  if (session->push_remaining_size > 0)
 
  284   refresh_lock_timeout(session);
 
  290    size_t(std::min(session->push_remaining_size, int64_t(session->buffer.size))),
 
  291    &Server::push_transfer_handler
 
  296   if (session->push_writer)
 
  300     client_lock->get_journal().soft_checkpoint_at
 
  302      session->push_writer->get_position()
 
  306     if (client.
is_shared() && session->unlock_after_push)
 
  308      client_lock->checkpoint_and_push_unlock();
 
  312      client_lock->checkpoint_and_push();
 
  315    session->push_writer.reset();
 
  318   session->progress_bar.reset();
 
  320   session->buffer.data[0] = session->push_status;
 
  321   LOGID(
"returning " << session->push_status << 
'\n');
 
  322   write_buffer_and_next_command(session, 1);
 
  324   if (session->unlock_after_push)
 
  327   for (
auto *other_session: sessions)
 
  331     other_session->state == Session::State::waiting_for_push_to_pull &&
 
  335     other_session->state = Session::State::not_locking;
 
  336     start_pulling(other_session->shared_from_this());
 
  343 void Server::push_handler
 
  346  const std::shared_ptr<Session> session,
 
  347  const std::error_code error,
 
  348  const size_t bytes_transferred
 
  353   session->buffer.index = 0;
 
  354   const int64_t from = session->buffer.read<int64_t>();
 
  355   const int64_t until = session->buffer.read<int64_t>();
 
  357   if (locked && session->state != Session::State::locking)
 
  359    LOGID(
"trying to push while someone else is locking\n");
 
  363    LOGID(
"taking the lock for push attempt.\n");
 
  364    lock(session, Session::State::waiting_for_lock_to_push);
 
  367   const bool conflict =
 
  369    session->state != Session::State::locking ||
 
  370    from != client.get_journal().get_checkpoint()
 
  373   LOGID(
"pushing, from = " << from << 
", until = " << until);
 
  374   session->progress_bar.emplace(until - from, log_pointer);
 
  376   if (!writable_journal_client)
 
  377    session->push_status = 
'R';
 
  379    session->push_status = 
'C';
 
  384     session->push_writer.emplace
 
  386      client_lock->get_journal().get_async_tail_writer()
 
  391   session->push_remaining_size = until - from;
 
  393   push_transfer(session);
 
  398 void Server::read_transfer_handler
 
  401  const std::shared_ptr<Session> session,
 
  403  const std::error_code error,
 
  404  const size_t bytes_transferred,
 
  410   session->progress_bar->print_remaining(reader.get_remaining());
 
  412   if (offset + reader.get_remaining() > 0)
 
  414    const size_t size = reader.read 
 
  416     session->buffer.data + offset,
 
  417     session->buffer.size - offset
 
  420    if (reader.is_end_of_file())
 
  421     LOG(
"error: unexpected end of file\n");
 
  424     refresh_lock_timeout(session);
 
  429      asio::buffer(session->buffer.data, size + offset),
 
  430      [
this, session, reader](std::error_code e, 
size_t s)
 
  432       read_transfer_handler(session, reader, e, s, 0);
 
  439    session->pull_timer.reset();
 
  440    session->progress_bar.reset();
 
  441    read_command(session);
 
  447 void Server::start_reading
 
  450  std::shared_ptr<Session> session,
 
  454  LOGID(
"reading from = " << reader.get_current() << 
", until = " << reader.get_end());
 
  455  session->progress_bar.emplace(reader.get_remaining(), log_pointer);
 
  457  session->buffer.index = 1;
 
  458  session->buffer.write<int64_t>(reader.get_end());
 
  460  read_transfer_handler
 
  466   session->buffer.index
 
  471 void Server::start_pulling(std::shared_ptr<Session> session)
 
  476   session->lock_before_pulling &&
 
  477   session->state != Session::State::waiting_for_lock_to_pull
 
  480   lock(session, Session::State::waiting_for_lock_to_pull);
 
  484  if (!session->send_pull_data)
 
  495 void Server::pull_handler
 
  498  const std::shared_ptr<Session> session,
 
  499  const std::error_code error,
 
  500  const size_t bytes_transferred
 
  505   session->buffer.index = 1;
 
  506   const std::chrono::milliseconds wait{session->buffer.read<int64_t>()};
 
  507   session->pull_checkpoint = session->buffer.read<int64_t>();
 
  515    session->pull_checkpoint == client.get_journal_checkpoint()
 
  520     "waiting at checkpoint = " << session->pull_checkpoint <<
 
  521     " for " << 
double(wait.count()) * 0.001 << 
"s\n" 
  524    session->state = Session::State::waiting_for_push_to_pull;
 
  525    session->pull_timer.emplace(io_context);
 
  526    session->pull_timer->expires_after(wait);
 
  527    session->pull_timer->async_wait
 
  529     [
this, session](std::error_code timer_error)
 
  533       if (session->state == Session::State::waiting_for_push_to_pull)
 
  535        session->state = Session::State::not_locking;
 
  536        start_pulling(session);
 
  543    start_pulling(session);
 
  548 void Server::read_handler
 
  551  std::shared_ptr<Session> session,
 
  552  std::error_code error,
 
  553  size_t bytes_transferred
 
  558   session->buffer.index = 1;
 
  559   int64_t from = session->buffer.read<int64_t>();
 
  560   int64_t until = session->buffer.read<int64_t>();
 
  562   if (until > client.get_journal_checkpoint())
 
  563    until = client.get_journal_checkpoint();
 
  570    client.get_journal().get_async_reader(from, until)
 
  576 void Server::check_hash_handler
 
  579  const std::shared_ptr<Session> session,
 
  580  const std::error_code error,
 
  581  const size_t bytes_transferred
 
  586   session->buffer.index = 1;
 
  587   const auto checkpoint = session->buffer.read<int64_t>();
 
  590   const Readonly_Journal &journal = client.get_journal();
 
  594    checkpoint > journal.get_checkpoint() ||
 
  595    hash != (session->buffer.data[0] == 
'H'  
  597    : Journal_Hasher::get_full_hash(journal, checkpoint))
 
  600    session->buffer.data[0] = 
'h';
 
  603   LOGID(
"hash for checkpoint = " << checkpoint << 
", result = " 
  604    << session->buffer.data[0] << 
'\n');
 
  606   write_buffer_and_next_command(session, 1);
 
  611 void Server::read_command_handler
 
  614  const std::shared_ptr<Session> session,
 
  615  const std::error_code error,
 
  616  const size_t bytes_transferred
 
  621   const char code = session->buffer.data[0];
 
  625    const auto i = request_description.find(code);
 
  626    if (i != request_description.end())
 
  627     LOGID(
"received request " << code << 
": " << i->second << 
'\n');
 
  629     LOGID(
"unknown code: " << code << 
'\n');
 
  635     async_read(session, 1, 40, &Server::check_hash_handler);
 
  639     async_read(session, 1, 16, &Server::read_handler);
 
  642    case 'D': 
case 'E': 
case 'F': 
case 'G':
 
  643     session->lock_before_pulling = code & 1;
 
  644     session->send_pull_data = code & 2;
 
  645     async_read(session, 1, 16, &Server::pull_handler);
 
  650     write_buffer_and_next_command(session, 1);
 
  654     session->unlock_after_push = (code == 
'O');
 
  655     session->push_status = code;
 
  656     async_read(session, 0, 16, &Server::push_handler);
 
  666 void Server::read_command(
const std::shared_ptr<Session> session)
 
  669  async_read(session, 0, 1, &Server::read_command_handler);
 
  673 void Server::write_buffer_and_next_command
 
  676  const std::shared_ptr<Session> session,
 
  683   asio::buffer(session->buffer.data, size),
 
  684   [
this, session](std::error_code e, 
size_t s)
 
  687     read_command(session);
 
  693 void Server::handshake_handler
 
  696  const std::shared_ptr<Session> session,
 
  697  const std::error_code error,
 
  698  const size_t bytes_transferred
 
  703   session->buffer.index = 0;
 
  705   if (session->buffer.read<std::array<char, 5>>() == 
Header::joedb)
 
  707    const int64_t client_version = session->buffer.read<int64_t>();
 
  708    LOGID(
"client_version = " << client_version << 
'\n');
 
  710    session->buffer.index = 5;
 
  712    session->buffer.write<int64_t>(session->id);
 
  713    session->buffer.write<int64_t>(client.get_journal_checkpoint());
 
  714    session->buffer.write<
char>
 
  716     (writable_journal_client && !client.is_pullonly()) ? 
'W' : 
'R' 
  719    write_buffer_and_next_command(session, session->buffer.index);
 
  723   LOGID(
"bad handshake\n");
 
  728 void Server::start_accept()
 
  733   acceptor.async_accept
 
  736    [
this](std::error_code error, asio::local::stream_protocol::socket socket)
 
  738     if (!error && !stopped)
 
  740      std::shared_ptr<Session> session(
new Session(*
this, std::move(socket)));
 
  741      async_read(session, 0, 13, &Server::handshake_handler);
 
  755  asio::io_context &io_context,
 
  756  std::string endpoint_path,
 
  757  const std::chrono::milliseconds lock_timeout,
 
  758  std::ostream * 
const log_pointer
 
  760  start_time(std::chrono::steady_clock::now()),
 
  763  io_context(io_context),
 
  764  endpoint_path(std::move(endpoint_path)),
 
  765  endpoint(this->endpoint_path),
 
  766  acceptor(io_context, endpoint, false),
 
  768  interrupt_signals(io_context, SIGINT, SIGTERM),
 
  770  lock_timeout(lock_timeout),
 
  771  lock_timeout_timer(io_context),
 
  773  log_pointer(log_pointer)
 
  775  if (writable_journal_client)
 
 
  789   if (!client.
is_shared() && writable_journal_client)
 
  790    client_lock.emplace(*writable_journal_client);
 
  792   interrupt_signals.async_wait([
this](
const asio::error_code &error, 
int)
 
  804    ": start. lock_timeout = " << 
double(lock_timeout.count()) * 0.001 <<
 
  806    "; shared = " << client.
is_shared() << 
'\n' 
 
  817  interrupt_signals.cancel();
 
 
  829   for (Session *session: sessions)
 
  831    session->socket.close();
 
  832    session->pull_timer.reset();
 
  837    client_lock->unlock();
 
 
  851   if (!sessions.empty())
 
  852    LOG(
"Destroying server before sessions.\n");
 
  853   std::remove(endpoint_path.c_str());
 
 
Handle concurrent access to a file with a joedb::Connection.
int64_t get_journal_checkpoint() const
const Readonly_Journal & get_journal() const
static SHA_256::Hash get_fast_hash(const Readonly_Journal &journal, int64_t checkpoint)
Async_Reader get_async_tail_reader(int64_t start_position) const
std::array< uint32_t, 8 > Hash
const std::string & get_endpoint_path() const
void stop_after_sessions()
std::chrono::milliseconds get_time_stamp() const
Server(Client &client, asio::io_context &io_context, std::string endpoint_path, std::chrono::milliseconds lock_timeout, std::ostream *log_pointer)
int64_t push_if_ahead() override
constexpr int protocol_version
std::string get_time_string_of_now()