Joedb 10.0.1
The Journal-Only Embedded Database
Loading...
Searching...
No Matches
Server.cpp
Go to the documentation of this file.
3#include "joedb/get_pid.h"
7
8#define LOG(x) log([&](std::ostream &out){out << x;})
9#define LOGID(x) log([&](std::ostream &out){session->write_id(out) << x;})
10
11#include <asio/read.hpp>
12#include <asio/write.hpp>
13#include <cstdio>
14
15namespace joedb
16{
17 ////////////////////////////////////////////////////////////////////////////
18 const std::map<char, const char *> Server::request_description
19 ////////////////////////////////////////////////////////////////////////////
20 {
21 {'H', "get fast SHA-256 hash code"},
22 {'I', "get full SHA-256 hash code"},
23 {'r', "read a range of bytes"},
24 {'D', "get checkpoint"},
25 {'E', "lock"},
26 {'F', "pull"},
27 {'G', "lock_pull"},
28 {'L', "lock"},
29 {'M', "unlock"},
30 {'N', "lock_push"},
31 {'O', "push_unlock"}
32 };
33
34 ////////////////////////////////////////////////////////////////////////////
35 std::chrono::milliseconds Server::get_time_stamp() const
36 ////////////////////////////////////////////////////////////////////////////
37 {
38 return std::chrono::duration_cast<std::chrono::milliseconds>
39 (
40 std::chrono::steady_clock::now() - start_time
41 );
42 }
43
44 ////////////////////////////////////////////////////////////////////////////
45 std::ostream &Server::Session::write_id(std::ostream &out) const
46 ////////////////////////////////////////////////////////////////////////////
47 {
48#if 0
49 out << server.get_time_stamp().count() << ' ';
50#endif
51
52 out << server.endpoint_path << '('
53 << server.client.get_journal_checkpoint() << "): " << id << ": ";
54
55 return out;
56 }
57
58 ////////////////////////////////////////////////////////////////////////////
59 Server::Session::Session
60 ////////////////////////////////////////////////////////////////////////////
61 (
62 Server &server,
63 asio::local::stream_protocol::socket &&socket
64 ):
65 id(++server.session_id),
66 server(server),
67 socket(std::move(socket)),
68 state(State::not_locking)
69 {
70 server.log([this](std::ostream &out)
71 {
72 write_id(out) << "new session created\n";}
73 );
74 server.sessions.insert(this);
75 server.write_status();
76 }
77
78 ////////////////////////////////////////////////////////////////////////////
79 Server::Session::~Session()
80 ////////////////////////////////////////////////////////////////////////////
81 {
82 try
83 {
84 server.sessions.erase(this);
85
86 if (state == State::locking)
87 {
88 server.log([this](std::ostream &out)
89 {
90 write_id(out) << "removing lock held by dying session.\n";
91 });
92
93 server.unlock(*this);
94 }
95
96 server.log([this](std::ostream &out)
97 {
98 write_id(out) << "deleted\n";
99 });
100
101 server.write_status();
102 }
103 catch (...)
104 {
105 // What should we do? maybe post the exception?
106 // This may not be a problem any more with coroutines.
107 }
108 }
109
110 ////////////////////////////////////////////////////////////////////////////
111 void Server::async_read
112 ////////////////////////////////////////////////////////////////////////////
113 (
114 std::shared_ptr<Session> session,
115 size_t offset,
116 size_t size,
117 Transfer_Handler handler
118 )
119 {
120 asio::async_read
121 (
122 session->socket,
123 asio::buffer(session->buffer.data + offset, size),
124 [this, handler, session](std::error_code e, size_t s)
125 {
126 (this->*handler)(session, e, s);
127 }
128 );
129 }
130
131 ////////////////////////////////////////////////////////////////////////////
132 void Server::write_status()
133 ////////////////////////////////////////////////////////////////////////////
134 {
135 log([this](std::ostream &out)
136 {
137 out << endpoint_path << '(' << client.get_journal_checkpoint();
138 out << "): pid = " << joedb::get_pid();
139 out << "; " << get_time_string_of_now();
140 out << "; sessions = " << sessions.size() << '\n';
141 });
142 }
143
144 ////////////////////////////////////////////////////////////////////////////
145 void Server::lock_dequeue()
146 ////////////////////////////////////////////////////////////////////////////
147 {
148 if (!locked)
149 {
150 if (lock_queue.empty())
151 {
152 if (client.is_shared() && client_lock)
153 {
154 client_lock->unlock();
155 client_lock.reset();
156 }
157 }
158 else
159 {
160 locked = true;
161 const std::shared_ptr<Session> session = lock_queue.front();
162 lock_queue.pop();
163 LOGID("locking\n");
164
165 if (!client_lock)
166 {
167 if (!writable_journal_client)
168 {
169 LOGID("error: locking pull-only server\n");
170 session->buffer.data[0] = 'R';
171 }
172 else
173 client_lock.emplace(*writable_journal_client); // ??? takes_time
174 }
175
176 if (session->state == Session::State::waiting_for_lock_to_pull)
177 start_pulling(session);
178
179 session->state = Session::State::locking;
180 refresh_lock_timeout(session);
181 }
182 }
183 }
184
185 ////////////////////////////////////////////////////////////////////////////
186 void Server::lock
187 ////////////////////////////////////////////////////////////////////////////
188 (
189 const std::shared_ptr<Session> session,
190 const Session::State state
191 )
192 {
193 if (session->state != Session::State::locking)
194 {
195 session->state = state;
196 lock_queue.emplace(session);
197 lock_dequeue();
198 }
199 else
200 LOGID("error: locking an already locked session\n");
201 }
202
203 ////////////////////////////////////////////////////////////////////////////
204 void Server::unlock(Session &session)
205 ////////////////////////////////////////////////////////////////////////////
206 {
207 if (session.state == Session::State::locking)
208 {
209 log([&session](std::ostream &out)
210 {
211 session.write_id(out) << "unlocking\n";
212 });
213 session.state = Session::State::not_locking;
214 locked = false;
215 lock_timeout_timer.cancel();
216
217 lock_dequeue();
218 }
219 }
220
221 ////////////////////////////////////////////////////////////////////////////
222 void Server::lock_timeout_handler
223 ////////////////////////////////////////////////////////////////////////////
224 (
225 const std::shared_ptr<Session> session,
226 const std::error_code error
227 )
228 {
229 if (!error)
230 {
231 LOGID("timeout\n");
232
233 if (session->push_writer)
234 {
235 session->push_writer.reset();
236 session->push_status = 't';
237 }
238
239 unlock(*session);
240 }
241 }
242
243 ////////////////////////////////////////////////////////////////////////////
244 void Server::refresh_lock_timeout(const std::shared_ptr<Session> session)
245 ////////////////////////////////////////////////////////////////////////////
246 {
247 if (lock_timeout.count() > 0 && session->state == Session::State::locking)
248 {
249 lock_timeout_timer.expires_after(lock_timeout);
250 lock_timeout_timer.async_wait
251 (
252 [this, session](std::error_code e){lock_timeout_handler(session, e);}
253 );
254 }
255 }
256
257 ////////////////////////////////////////////////////////////////////////////
258 void Server::push_transfer_handler
259 ////////////////////////////////////////////////////////////////////////////
260 (
261 const std::shared_ptr<Session> session,
262 const std::error_code error,
263 const size_t bytes_transferred
264 )
265 {
266 if (!error)
267 {
268 if (session->push_writer)
269 session->push_writer->write(session->buffer.data, bytes_transferred); // ??? takes_time
270
271 session->push_remaining_size -= int64_t(bytes_transferred);
272 session->progress_bar->print_remaining(session->push_remaining_size);
273
274 push_transfer(session);
275 }
276 }
277
278 ////////////////////////////////////////////////////////////////////////////
279 void Server::push_transfer(const std::shared_ptr<Session> session)
280 ////////////////////////////////////////////////////////////////////////////
281 {
282 if (session->push_remaining_size > 0)
283 {
284 refresh_lock_timeout(session);
285
286 async_read
287 (
288 session,
289 0,
290 size_t(std::min(session->push_remaining_size, int64_t(session->buffer.size))),
291 &Server::push_transfer_handler
292 );
293 }
294 else
295 {
296 if (session->push_writer)
297 {
298 if (client_lock)
299 {
300 client_lock->get_journal().soft_checkpoint_at
301 (
302 session->push_writer->get_position()
303 );
304
305 // ??? takes_time
306 if (client.is_shared() && session->unlock_after_push)
307 {
308 client_lock->checkpoint_and_push_unlock();
309 client_lock.reset();
310 }
311 else
312 client_lock->checkpoint_and_push();
313 }
314
315 session->push_writer.reset();
316 }
317
318 session->progress_bar.reset();
319
320 session->buffer.data[0] = session->push_status;
321 LOGID("returning " << session->push_status << '\n');
322 write_buffer_and_next_command(session, 1);
323
324 if (session->unlock_after_push)
325 unlock(*session);
326
327 for (auto *other_session: sessions)
328 {
329 if
330 (
331 other_session->state == Session::State::waiting_for_push_to_pull &&
332 other_session->pull_checkpoint < client.get_journal_checkpoint()
333 )
334 {
335 other_session->state = Session::State::not_locking;
336 start_pulling(other_session->shared_from_this());
337 }
338 }
339 }
340 }
341
342 ////////////////////////////////////////////////////////////////////////////
343 void Server::push_handler
344 ////////////////////////////////////////////////////////////////////////////
345 (
346 const std::shared_ptr<Session> session,
347 const std::error_code error,
348 const size_t bytes_transferred
349 )
350 {
351 if (!error)
352 {
353 session->buffer.index = 0;
354 const int64_t from = session->buffer.read<int64_t>();
355 const int64_t until = session->buffer.read<int64_t>();
356
357 if (locked && session->state != Session::State::locking)
358 {
359 LOGID("trying to push while someone else is locking\n");
360 }
361 else if (!locked)
362 {
363 LOGID("taking the lock for push attempt.\n");
364 lock(session, Session::State::waiting_for_lock_to_push);
365 }
366
367 const bool conflict =
368 (
369 session->state != Session::State::locking ||
370 from != client.get_journal().get_checkpoint()
371 );
372
373 LOGID("pushing, from = " << from << ", until = " << until);
374 session->progress_bar.emplace(until - from, log_pointer);
375
376 if (!writable_journal_client)
377 session->push_status = 'R';
378 else if (conflict)
379 session->push_status = 'C';
380 else
381 {
382 if (client_lock)
383 {
384 session->push_writer.emplace
385 (
386 client_lock->get_journal().get_async_tail_writer()
387 );
388 }
389 }
390
391 session->push_remaining_size = until - from;
392
393 push_transfer(session);
394 }
395 }
396
397 ////////////////////////////////////////////////////////////////////////////
398 void Server::read_transfer_handler
399 ////////////////////////////////////////////////////////////////////////////
400 (
401 const std::shared_ptr<Session> session,
402 Async_Reader reader,
403 const std::error_code error,
404 const size_t bytes_transferred,
405 const size_t offset
406 )
407 {
408 if (!error)
409 {
410 session->progress_bar->print_remaining(reader.get_remaining());
411
412 if (offset + reader.get_remaining() > 0)
413 {
414 const size_t size = reader.read // ??? takes_time
415 (
416 session->buffer.data + offset,
417 session->buffer.size - offset
418 );
419
420 if (reader.is_end_of_file())
421 LOG("error: unexpected end of file\n");
422 else
423 {
424 refresh_lock_timeout(session);
425
426 asio::async_write
427 (
428 session->socket,
429 asio::buffer(session->buffer.data, size + offset),
430 [this, session, reader](std::error_code e, size_t s)
431 {
432 read_transfer_handler(session, reader, e, s, 0);
433 }
434 );
435 }
436 }
437 else
438 {
439 session->pull_timer.reset();
440 session->progress_bar.reset();
441 read_command(session);
442 }
443 }
444 }
445
446 ///////////////////////////////////////////////////////////////////////////
447 void Server::start_reading
448 ///////////////////////////////////////////////////////////////////////////
449 (
450 std::shared_ptr<Session> session,
451 Async_Reader reader
452 )
453 {
454 LOGID("reading from = " << reader.get_current() << ", until = " << reader.get_end());
455 session->progress_bar.emplace(reader.get_remaining(), log_pointer);
456
457 session->buffer.index = 1;
458 session->buffer.write<int64_t>(reader.get_end());
459
460 read_transfer_handler
461 (
462 session,
463 reader,
464 std::error_code(),
465 0,
466 session->buffer.index
467 );
468 }
469
470 ///////////////////////////////////////////////////////////////////////////
471 void Server::start_pulling(std::shared_ptr<Session> session)
472 ///////////////////////////////////////////////////////////////////////////
473 {
474 if
475 (
476 session->lock_before_pulling &&
477 session->state != Session::State::waiting_for_lock_to_pull
478 )
479 {
480 lock(session, Session::State::waiting_for_lock_to_pull);
481 return;
482 }
483
484 if (!session->send_pull_data)
485 session->pull_checkpoint = client.get_journal_checkpoint();
486
487 start_reading
488 (
489 session,
490 client.get_journal().get_async_tail_reader(session->pull_checkpoint)
491 );
492 }
493
494 ///////////////////////////////////////////////////////////////////////////
495 void Server::pull_handler
496 ///////////////////////////////////////////////////////////////////////////
497 (
498 const std::shared_ptr<Session> session,
499 const std::error_code error,
500 const size_t bytes_transferred
501 )
502 {
503 if (!error)
504 {
505 session->buffer.index = 1;
506 const std::chrono::milliseconds wait{session->buffer.read<int64_t>()};
507 session->pull_checkpoint = session->buffer.read<int64_t>();
508
509 if (!client_lock)
510 client.pull(); // ??? takes_time
511
512 if
513 (
514 wait.count() > 0 &&
515 session->pull_checkpoint == client.get_journal_checkpoint()
516 )
517 {
518 LOGID
519 (
520 "waiting at checkpoint = " << session->pull_checkpoint <<
521 " for " << double(wait.count()) * 0.001 << "s\n"
522 );
523
524 session->state = Session::State::waiting_for_push_to_pull;
525 session->pull_timer.emplace(io_context);
526 session->pull_timer->expires_after(wait);
527 session->pull_timer->async_wait
528 (
529 [this, session](std::error_code timer_error)
530 {
531 if (!timer_error)
532 {
533 if (session->state == Session::State::waiting_for_push_to_pull)
534 {
535 session->state = Session::State::not_locking;
536 start_pulling(session);
537 }
538 }
539 }
540 );
541 }
542 else
543 start_pulling(session);
544 }
545 }
546
547 ///////////////////////////////////////////////////////////////////////////
548 void Server::read_handler
549 ///////////////////////////////////////////////////////////////////////////
550 (
551 std::shared_ptr<Session> session,
552 std::error_code error,
553 size_t bytes_transferred
554 )
555 {
556 if (!error)
557 {
558 session->buffer.index = 1;
559 int64_t from = session->buffer.read<int64_t>();
560 int64_t until = session->buffer.read<int64_t>();
561
562 if (until > client.get_journal_checkpoint())
563 until = client.get_journal_checkpoint();
564 if (from > until)
565 from = until;
566
567 start_reading
568 (
569 session,
570 client.get_journal().get_async_reader(from, until)
571 );
572 }
573 }
574
575 ///////////////////////////////////////////////////////////////////////////
576 void Server::check_hash_handler
577 ///////////////////////////////////////////////////////////////////////////
578 (
579 const std::shared_ptr<Session> session,
580 const std::error_code error,
581 const size_t bytes_transferred
582 )
583 {
584 if (!error)
585 {
586 session->buffer.index = 1;
587 const auto checkpoint = session->buffer.read<int64_t>();
588 const auto hash = session->buffer.read<SHA_256::Hash>();
589
590 const Readonly_Journal &journal = client.get_journal();
591
592 if
593 (
594 checkpoint > journal.get_checkpoint() ||
595 hash != (session->buffer.data[0] == 'H' // ??? takes_time
596 ? Journal_Hasher::get_fast_hash(journal, checkpoint)
597 : Journal_Hasher::get_full_hash(journal, checkpoint))
598 )
599 {
600 session->buffer.data[0] = 'h';
601 }
602
603 LOGID("hash for checkpoint = " << checkpoint << ", result = "
604 << session->buffer.data[0] << '\n');
605
606 write_buffer_and_next_command(session, 1);
607 }
608 }
609
610 ///////////////////////////////////////////////////////////////////////////
611 void Server::read_command_handler
612 ///////////////////////////////////////////////////////////////////////////
613 (
614 const std::shared_ptr<Session> session,
615 const std::error_code error,
616 const size_t bytes_transferred
617 )
618 {
619 if (!error)
620 {
621 const char code = session->buffer.data[0];
622
623 if (log_pointer)
624 {
625 const auto i = request_description.find(code);
626 if (i != request_description.end())
627 LOGID("received request " << code << ": " << i->second << '\n');
628 else
629 LOGID("unknown code: " << code << '\n');
630 }
631
632 switch (code)
633 {
634 case 'H': case 'I':
635 async_read(session, 1, 40, &Server::check_hash_handler);
636 break;
637
638 case 'r':
639 async_read(session, 1, 16, &Server::read_handler);
640 break;
641
642 case 'D': case 'E': case 'F': case 'G':
643 session->lock_before_pulling = code & 1;
644 session->send_pull_data = code & 2;
645 async_read(session, 1, 16, &Server::pull_handler);
646 break;
647
648 case 'M':
649 unlock(*session);
650 write_buffer_and_next_command(session, 1);
651 break;
652
653 case 'N': case 'O':
654 session->unlock_after_push = (code == 'O');
655 session->push_status = code;
656 async_read(session, 0, 16, &Server::push_handler);
657 break;
658
659 default:
660 break;
661 }
662 }
663 }
664
665 ///////////////////////////////////////////////////////////////////////////
666 void Server::read_command(const std::shared_ptr<Session> session)
667 ///////////////////////////////////////////////////////////////////////////
668 {
669 async_read(session, 0, 1, &Server::read_command_handler);
670 }
671
672 ////////////////////////////////////////////////////////////////////////////
673 void Server::write_buffer_and_next_command
674 ////////////////////////////////////////////////////////////////////////////
675 (
676 const std::shared_ptr<Session> session,
677 const size_t size
678 )
679 {
680 asio::async_write
681 (
682 session->socket,
683 asio::buffer(session->buffer.data, size),
684 [this, session](std::error_code e, size_t s)
685 {
686 if (!e)
687 read_command(session);
688 }
689 );
690 }
691
692 ///////////////////////////////////////////////////////////////////////////
693 void Server::handshake_handler
694 ///////////////////////////////////////////////////////////////////////////
695 (
696 const std::shared_ptr<Session> session,
697 const std::error_code error,
698 const size_t bytes_transferred
699 )
700 {
701 if (!error)
702 {
703 session->buffer.index = 0;
704
705 if (session->buffer.read<std::array<char, 5>>() == Header::joedb)
706 {
707 const int64_t client_version = session->buffer.read<int64_t>();
708 LOGID("client_version = " << client_version << '\n');
709
710 session->buffer.index = 5;
711 session->buffer.write<int64_t>(client_version < protocol_version ? 0 : protocol_version);
712 session->buffer.write<int64_t>(session->id);
713 session->buffer.write<int64_t>(client.get_journal_checkpoint());
714 session->buffer.write<char>
715 (
716 (writable_journal_client && !client.is_pullonly()) ? 'W' : 'R'
717 );
718
719 write_buffer_and_next_command(session, session->buffer.index);
720 return;
721 }
722
723 LOGID("bad handshake\n");
724 }
725 }
726
727 ////////////////////////////////////////////////////////////////////////////
728 void Server::start_accept()
729 ////////////////////////////////////////////////////////////////////////////
730 {
731 if (!stopped)
732 {
733 acceptor.async_accept
734 (
735 io_context,
736 [this](std::error_code error, asio::local::stream_protocol::socket socket)
737 {
738 if (!error && !stopped)
739 {
740 std::shared_ptr<Session> session(new Session(*this, std::move(socket)));
741 async_read(session, 0, 13, &Server::handshake_handler);
742
743 start_accept();
744 }
745 }
746 );
747 }
748 }
749
750 ////////////////////////////////////////////////////////////////////////////
752 ////////////////////////////////////////////////////////////////////////////
753 (
754 Client &client,
755 asio::io_context &io_context,
756 std::string endpoint_path,
757 const std::chrono::milliseconds lock_timeout,
758 std::ostream * const log_pointer
759 ):
760 start_time(std::chrono::steady_clock::now()),
761 client(client),
762 writable_journal_client(dynamic_cast<Writable_Journal_Client*>(&client)),
763 io_context(io_context),
764 endpoint_path(std::move(endpoint_path)),
765 endpoint(this->endpoint_path),
766 acceptor(io_context, endpoint, false),
767 stopped(true),
768 interrupt_signals(io_context, SIGINT, SIGTERM),
769 session_id(0),
770 lock_timeout(lock_timeout),
771 lock_timeout_timer(io_context),
772 locked(false),
773 log_pointer(log_pointer)
774 {
775 if (writable_journal_client)
776 writable_journal_client->push_if_ahead();
777
778 start();
779 }
780
781 ////////////////////////////////////////////////////////////////////////////
783 ////////////////////////////////////////////////////////////////////////////
784 {
785 if (stopped)
786 {
787 stopped = false;
788
789 if (!client.is_shared() && writable_journal_client)
790 client_lock.emplace(*writable_journal_client);
791
792 interrupt_signals.async_wait([this](const asio::error_code &error, int)
793 {
794 if (!error)
795 stop();
796 });
797
798 start_accept();
799
800 // Note: C++20 has operator<< for durations
801 LOG
802 (
804 ": start. lock_timeout = " << double(lock_timeout.count()) * 0.001 <<
805 "s; protocol_version = " << protocol_version <<
806 "; shared = " << client.is_shared() << '\n'
807 );
808 write_status();
809 }
810 }
811
812 ////////////////////////////////////////////////////////////////////////////
814 ////////////////////////////////////////////////////////////////////////////
815 {
816 acceptor.cancel();
817 interrupt_signals.cancel();
818 stopped = true;
819 }
820
821 ////////////////////////////////////////////////////////////////////////////
823 ////////////////////////////////////////////////////////////////////////////
824 {
825 if (!stopped)
826 {
827 LOG(get_endpoint_path() << ": stop\n");
828
829 for (Session *session: sessions)
830 {
831 session->socket.close();
832 session->pull_timer.reset();
833 }
834
835 if (client_lock)
836 {
837 client_lock->unlock();
838 client_lock.reset();
839 }
840
842 }
843 }
844
845 ////////////////////////////////////////////////////////////////////////////
847 ////////////////////////////////////////////////////////////////////////////
848 {
849 try
850 {
851 if (!sessions.empty())
852 LOG("Destroying server before sessions.\n");
853 std::remove(endpoint_path.c_str());
854 }
855 catch (...)
856 {
857 }
858 }
859}
860
861#undef LOGID
862#undef LOG
#define LOG(x)
Definition Server.cpp:8
#define LOGID(x)
Definition Server.cpp:9
Handle concurrent access to a file with a joedb::Connection.
Definition Client.h:12
int64_t get_journal_checkpoint() const
Definition Client.h:62
const Readonly_Journal & get_journal() const
Definition Client.h:47
bool is_shared() const
Definition Client.h:52
static SHA_256::Hash get_fast_hash(const Readonly_Journal &journal, int64_t checkpoint)
Async_Reader get_async_tail_reader(int64_t start_position) const
std::array< uint32_t, 8 > Hash
Definition SHA_256.h:60
void stop()
Definition Server.cpp:822
void start()
Definition Server.cpp:782
const std::string & get_endpoint_path() const
Definition Server.h:206
void stop_after_sessions()
Definition Server.cpp:813
std::chrono::milliseconds get_time_stamp() const
Definition Server.cpp:35
Server(Client &client, asio::io_context &io_context, std::string endpoint_path, std::chrono::milliseconds lock_timeout, std::ostream *log_pointer)
Definition Server.cpp:753
int64_t push_if_ahead() override
constexpr int protocol_version
std::string get_time_string_of_now()
Definition Blob.h:7
static constexpr std::array< char, 5 > joedb
Definition Header.h:17