Joedb 9.5.0
The Journal-Only Embedded Database
Loading...
Searching...
No Matches
Server.cpp
Go to the documentation of this file.
3#include "joedb/get_pid.h"
7
8#define LOG(x) log([&](std::ostream &out){out << x;})
9#define LOGID(x) log([&](std::ostream &out){session->write_id(out) << x;})
10
11#include <asio/read.hpp>
12#include <asio/write.hpp>
13#include <cstdio>
14
15namespace joedb
16{
17 ////////////////////////////////////////////////////////////////////////////
18 std::chrono::milliseconds Server::get_time_stamp() const
19 ////////////////////////////////////////////////////////////////////////////
20 {
21 return std::chrono::duration_cast<std::chrono::milliseconds>
22 (
23 std::chrono::steady_clock::now() - start_time
24 );
25 }
26
27 ////////////////////////////////////////////////////////////////////////////
28 std::ostream &Server::Session::write_id(std::ostream &out) const
29 ////////////////////////////////////////////////////////////////////////////
30 {
31#if 0
32 out << server.get_time_stamp().count() << ' ';
33#endif
34
35 out << server.endpoint_path << '(' << id << "): ";
36
37 return out;
38 }
39
40 ////////////////////////////////////////////////////////////////////////////
41 Server::Session::Session
42 ////////////////////////////////////////////////////////////////////////////
43 (
44 Server &server,
45 asio::local::stream_protocol::socket &&socket
46 ):
47 id(++server.session_id),
48 server(server),
49 socket(std::move(socket)),
50 state(State::not_locking)
51 {
52 server.log([this](std::ostream &out)
53 {
54 write_id(out) << "new session created\n";}
55 );
56 server.sessions.insert(this);
57 server.write_status();
58 }
59
60 ////////////////////////////////////////////////////////////////////////////
61 Server::Session::~Session()
62 ////////////////////////////////////////////////////////////////////////////
63 {
64 try
65 {
66 server.sessions.erase(this);
67
68 if (state == State::locking)
69 {
70 server.log([this](std::ostream &out)
71 {
72 write_id(out) << "removing lock held by dying session.\n";
73 });
74
75 server.unlock(*this);
76 }
77
78 server.log([this](std::ostream &out)
79 {
80 write_id(out) << "deleted\n";
81 });
82
83 server.write_status();
84 }
85 catch (...)
86 {
87 // What should we do? maybe post the exception?
88 // This may not be a problem any more with coroutines.
89 }
90 }
91
92 ////////////////////////////////////////////////////////////////////////////
93 void Server::async_read
94 ////////////////////////////////////////////////////////////////////////////
95 (
96 std::shared_ptr<Session> session,
97 size_t offset,
98 size_t size,
99 Transfer_Handler handler
100 )
101 {
102 asio::async_read
103 (
104 session->socket,
105 asio::buffer(session->buffer.data + offset, size),
106 [this, handler, session](std::error_code e, size_t s)
107 {
108 (this->*handler)(session, e, s);
109 }
110 );
111 }
112
113 ////////////////////////////////////////////////////////////////////////////
114 void Server::write_status()
115 ////////////////////////////////////////////////////////////////////////////
116 {
117 log([this](std::ostream &out)
118 {
119 out << endpoint_path;
120 out << ": pid = " << joedb::get_pid();
121 out << "; " << get_time_string_of_now();
122 out << "; sessions = " << sessions.size();
123 out << "; checkpoint = ";
124 out << client.get_journal_checkpoint() << '\n';
125 });
126 }
127
128 ////////////////////////////////////////////////////////////////////////////
129 void Server::lock_dequeue()
130 ////////////////////////////////////////////////////////////////////////////
131 {
132 if (!locked)
133 {
134 if (lock_queue.empty())
135 {
136 if (client.is_shared() && client_lock)
137 {
138 client_lock->unlock();
139 client_lock.reset();
140 }
141 }
142 else
143 {
144 locked = true;
145 const std::shared_ptr<Session> session = lock_queue.front();
146 lock_queue.pop();
147 LOGID("locking\n");
148
149 if (!client_lock)
150 {
151 if (!writable_journal_client)
152 {
153 LOGID("Error: locking pull-only server\n");
154 session->buffer.data[0] = 'R';
155 }
156 else
157 client_lock.emplace(*writable_journal_client); // ??? takes_time
158 }
159
160 if (session->state == Session::State::waiting_for_lock_to_pull)
161 start_pulling(session);
162
163 session->state = Session::State::locking;
164 refresh_lock_timeout(session);
165 }
166 }
167 }
168
169 ////////////////////////////////////////////////////////////////////////////
170 void Server::lock
171 ////////////////////////////////////////////////////////////////////////////
172 (
173 const std::shared_ptr<Session> session,
174 const Session::State state
175 )
176 {
177 if (session->state != Session::State::locking)
178 {
179 session->state = state;
180 lock_queue.emplace(session);
181 lock_dequeue();
182 }
183 else
184 LOGID("Error: locking an already locked session\n");
185 }
186
187 ////////////////////////////////////////////////////////////////////////////
188 void Server::unlock(Session &session)
189 ////////////////////////////////////////////////////////////////////////////
190 {
191 if (session.state == Session::State::locking)
192 {
193 log([&session](std::ostream &out)
194 {
195 session.write_id(out) << "unlocking\n";
196 });
197 session.state = Session::State::not_locking;
198 locked = false;
199 lock_timeout_timer.cancel();
200
201 lock_dequeue();
202 }
203 }
204
205 ////////////////////////////////////////////////////////////////////////////
206 void Server::lock_timeout_handler
207 ////////////////////////////////////////////////////////////////////////////
208 (
209 const std::shared_ptr<Session> session,
210 const std::error_code error
211 )
212 {
213 if (!error)
214 {
215 LOGID("timeout\n");
216
217 if (session->push_writer)
218 {
219 session->push_writer.reset();
220 session->push_status = 't';
221 }
222
223 unlock(*session);
224 }
225 }
226
227 ////////////////////////////////////////////////////////////////////////////
228 void Server::refresh_lock_timeout(const std::shared_ptr<Session> session)
229 ////////////////////////////////////////////////////////////////////////////
230 {
231 if (lock_timeout.count() > 0 && session->state == Session::State::locking)
232 {
233 lock_timeout_timer.expires_after(lock_timeout);
234 lock_timeout_timer.async_wait
235 (
236 [this, session](std::error_code e){lock_timeout_handler(session, e);}
237 );
238 }
239 }
240
241 ////////////////////////////////////////////////////////////////////////////
242 void Server::push_transfer_handler
243 ////////////////////////////////////////////////////////////////////////////
244 (
245 const std::shared_ptr<Session> session,
246 const std::error_code error,
247 const size_t bytes_transferred
248 )
249 {
250 if (!error)
251 {
252 if (session->push_writer)
253 session->push_writer->write(session->buffer.data, bytes_transferred); // ??? takes_time
254
255 session->push_remaining_size -= int64_t(bytes_transferred);
256 session->progress_bar->print_remaining(session->push_remaining_size);
257
258 push_transfer(session);
259 }
260 }
261
262 ////////////////////////////////////////////////////////////////////////////
263 void Server::push_transfer(const std::shared_ptr<Session> session)
264 ////////////////////////////////////////////////////////////////////////////
265 {
266 if (session->push_remaining_size > 0)
267 {
268 refresh_lock_timeout(session);
269
270 async_read
271 (
272 session,
273 0,
274 size_t(std::min(session->push_remaining_size, session->buffer.ssize)),
275 &Server::push_transfer_handler
276 );
277 }
278 else
279 {
280 if (session->push_writer)
281 {
282 if (client_lock)
283 {
284 client_lock->get_journal().soft_checkpoint_at
285 (
286 session->push_writer->get_position()
287 );
288
289 // ??? takes_time
290 if (client.is_shared() && session->unlock_after_push)
291 {
292 client_lock->checkpoint_and_push_unlock();
293 client_lock.reset();
294 }
295 else
296 client_lock->checkpoint_and_push();
297 }
298
299 session->push_writer.reset();
300 }
301
302 session->buffer.data[0] = session->push_status;
303
304 session->progress_bar.reset();
305 LOGID("Returning '" << session->push_status << "'\n");
306
307 write_buffer_and_next_command(session, 1);
308
309 if (session->unlock_after_push)
310 unlock(*session);
311
312 for (auto *other_session: sessions)
313 {
314 if
315 (
316 other_session->state == Session::State::waiting_for_push_to_pull &&
317 other_session->pull_checkpoint < client.get_journal_checkpoint()
318 )
319 {
320 other_session->state = Session::State::not_locking;
321 start_pulling(other_session->shared_from_this());
322 }
323 }
324 }
325 }
326
327 ////////////////////////////////////////////////////////////////////////////
328 void Server::push_handler
329 ////////////////////////////////////////////////////////////////////////////
330 (
331 const std::shared_ptr<Session> session,
332 const std::error_code error,
333 const size_t bytes_transferred
334 )
335 {
336 if (!error)
337 {
338 session->buffer.index = 0;
339 const int64_t from = session->buffer.read<int64_t>();
340 const int64_t until = session->buffer.read<int64_t>();
341
342 if (locked && session->state != Session::State::locking)
343 {
344 LOGID("trying to push while someone else is locking\n");
345 }
346 else if (!locked)
347 {
348 LOGID("Taking the lock for push attempt.\n");
349 lock(session, Session::State::waiting_for_lock_to_push);
350 }
351
352 const bool conflict =
353 (
354 session->state != Session::State::locking ||
355 from != client.get_journal().get_checkpoint()
356 );
357
358 LOGID("pushing, from = " << from << ", until = " << until);
359 session->progress_bar.emplace(until - from, log_pointer);
360
361 if (!writable_journal_client)
362 session->push_status = 'R';
363 else if (conflict)
364 session->push_status = 'C';
365 else
366 {
367 if (client_lock)
368 {
369 session->push_writer.emplace
370 (
371 client_lock->get_journal().get_async_tail_writer()
372 );
373 }
374 }
375
376 session->push_remaining_size = until - from;
377
378 push_transfer(session);
379 }
380 }
381
382 ////////////////////////////////////////////////////////////////////////////
383 void Server::read_transfer_handler
384 ////////////////////////////////////////////////////////////////////////////
385 (
386 const std::shared_ptr<Session> session,
387 Async_Reader reader,
388 const std::error_code error,
389 const size_t bytes_transferred,
390 const size_t offset
391 )
392 {
393 if (!error)
394 {
395 session->progress_bar->print_remaining(reader.get_remaining());
396
397 if (offset + reader.get_remaining() > 0)
398 {
399 const size_t size = reader.read // ??? takes_time
400 (
401 session->buffer.data + offset,
402 session->buffer.size - offset
403 );
404
405 if (reader.is_end_of_file())
406 LOG("error: unexpected end of file\n");
407 else
408 {
409 refresh_lock_timeout(session);
410
411 asio::async_write
412 (
413 session->socket,
414 asio::buffer(session->buffer.data, size + offset),
415 [this, session, reader](std::error_code e, size_t s)
416 {
417 read_transfer_handler(session, reader, e, s, 0);
418 }
419 );
420 }
421 }
422 else
423 {
424 session->pull_timer.reset();
425 session->progress_bar.reset();
426 read_command(session);
427 }
428 }
429 }
430
431 ///////////////////////////////////////////////////////////////////////////
432 void Server::start_reading
433 ///////////////////////////////////////////////////////////////////////////
434 (
435 std::shared_ptr<Session> session,
436 Async_Reader reader
437 )
438 {
439 LOGID("reading from = " << reader.get_current() << ", until = " << reader.get_end());
440 session->progress_bar.emplace(reader.get_remaining(), log_pointer);
441
442 session->buffer.index = 1;
443 session->buffer.write<int64_t>(reader.get_end());
444
445 read_transfer_handler
446 (
447 session,
448 reader,
449 std::error_code(),
450 0,
451 session->buffer.index
452 );
453 }
454
455 ///////////////////////////////////////////////////////////////////////////
456 void Server::start_pulling(std::shared_ptr<Session> session)
457 ///////////////////////////////////////////////////////////////////////////
458 {
459 if
460 (
461 session->lock_before_pulling &&
462 session->state != Session::State::waiting_for_lock_to_pull
463 )
464 {
465 lock(session, Session::State::waiting_for_lock_to_pull);
466 return;
467 }
468
469 if (!session->send_pull_data)
470 session->pull_checkpoint = client.get_journal_checkpoint();
471
472 start_reading
473 (
474 session,
475 client.get_journal().get_async_tail_reader(session->pull_checkpoint)
476 );
477 }
478
479 ///////////////////////////////////////////////////////////////////////////
480 void Server::pull_handler
481 ///////////////////////////////////////////////////////////////////////////
482 (
483 const std::shared_ptr<Session> session,
484 const std::error_code error,
485 const size_t bytes_transferred
486 )
487 {
488 if (!error)
489 {
490 session->buffer.index = 1;
491 const std::chrono::milliseconds wait{session->buffer.read<int64_t>()};
492 session->pull_checkpoint = session->buffer.read<int64_t>();
493
494 if (!client_lock)
495 client.pull(); // ??? takes_time
496
497 if
498 (
499 wait.count() > 0 &&
500 session->pull_checkpoint == client.get_journal_checkpoint()
501 )
502 {
503 LOGID
504 (
505 "waiting at checkpoint = " << session->pull_checkpoint <<
506 " for " << wait.count() << " milliseconds\n"
507 );
508
509 session->state = Session::State::waiting_for_push_to_pull;
510 session->pull_timer.emplace(io_context);
511 session->pull_timer->expires_after(wait);
512 session->pull_timer->async_wait
513 (
514 [this, session](std::error_code timer_error)
515 {
516 if (!timer_error)
517 {
518 if (session->state == Session::State::waiting_for_push_to_pull)
519 {
520 session->state = Session::State::not_locking;
521 start_pulling(session);
522 }
523 }
524 }
525 );
526 }
527 else
528 start_pulling(session);
529 }
530 }
531
532 ///////////////////////////////////////////////////////////////////////////
533 void Server::read_handler
534 ///////////////////////////////////////////////////////////////////////////
535 (
536 std::shared_ptr<Session> session,
537 std::error_code error,
538 size_t bytes_transferred
539 )
540 {
541 if (!error)
542 {
543 session->buffer.index = 1;
544 int64_t from = session->buffer.read<int64_t>();
545 int64_t until = session->buffer.read<int64_t>();
546
547 if (until > client.get_journal_checkpoint())
548 until = client.get_journal_checkpoint();
549 if (from > until)
550 from = until;
551
552 start_reading
553 (
554 session,
555 client.get_journal().get_async_reader(from, until)
556 );
557 }
558 }
559
560 ///////////////////////////////////////////////////////////////////////////
561 void Server::check_hash_handler
562 ///////////////////////////////////////////////////////////////////////////
563 (
564 const std::shared_ptr<Session> session,
565 const std::error_code error,
566 const size_t bytes_transferred
567 )
568 {
569 if (!error)
570 {
571 session->buffer.index = 1;
572 const auto checkpoint = session->buffer.read<int64_t>();
573 const auto hash = session->buffer.read<SHA_256::Hash>();
574
575 const Readonly_Journal &readonly_journal = client.get_journal();
576
577 if
578 (
579 checkpoint > readonly_journal.get_checkpoint() ||
580 Journal_Hasher::get_hash(readonly_journal, checkpoint) != hash // ??? takes_time
581 )
582 {
583 session->buffer.data[0] = 'h';
584 }
585
586 LOGID("hash for checkpoint = " << checkpoint << ", result = "
587 << session->buffer.data[0] << '\n');
588
589 write_buffer_and_next_command(session, 1);
590 }
591 }
592
593 ///////////////////////////////////////////////////////////////////////////
594 void Server::read_command_handler
595 ///////////////////////////////////////////////////////////////////////////
596 (
597 const std::shared_ptr<Session> session,
598 const std::error_code error,
599 const size_t bytes_transferred
600 )
601 {
602 if (!error)
603 {
604 LOGID(session->buffer.data[0] << '\n');
605
606 switch (session->buffer.data[0])
607 {
608 case 'H':
609 async_read(session, 1, 40, &Server::check_hash_handler);
610 break;
611
612 case 'r':
613 async_read(session, 1, 16, &Server::read_handler);
614 break;
615
616 case 'D': case 'E': case 'F': case 'G':
617 session->lock_before_pulling = session->buffer.data[0] & 1;
618 session->send_pull_data = session->buffer.data[0] & 2;
619 async_read(session, 1, 16, &Server::pull_handler);
620 break;
621
622 case 'M':
623 unlock(*session);
624 write_buffer_and_next_command(session, 1);
625 break;
626
627 case 'N': case 'O':
628 session->unlock_after_push = (session->buffer.data[0] == 'O');
629 session->push_status = session->buffer.data[0];
630 async_read(session, 0, 16, &Server::push_handler);
631 break;
632
633 default:
634 LOGID("unexpected command\n");
635 break;
636 }
637 }
638 }
639
640 ///////////////////////////////////////////////////////////////////////////
641 void Server::read_command(const std::shared_ptr<Session> session)
642 ///////////////////////////////////////////////////////////////////////////
643 {
644 async_read(session, 0, 1, &Server::read_command_handler);
645 }
646
647 ////////////////////////////////////////////////////////////////////////////
648 void Server::write_buffer_and_next_command
649 ////////////////////////////////////////////////////////////////////////////
650 (
651 const std::shared_ptr<Session> session,
652 const size_t size
653 )
654 {
655 asio::async_write
656 (
657 session->socket,
658 asio::buffer(session->buffer.data, size),
659 [this, session](std::error_code e, size_t s)
660 {
661 if (!e)
662 read_command(session);
663 }
664 );
665 }
666
667 ///////////////////////////////////////////////////////////////////////////
668 void Server::handshake_handler
669 ///////////////////////////////////////////////////////////////////////////
670 (
671 const std::shared_ptr<Session> session,
672 const std::error_code error,
673 const size_t bytes_transferred
674 )
675 {
676 if (!error)
677 {
678 session->buffer.index = 0;
679
680 if (session->buffer.read<std::array<char, 5>>() == Header::joedb)
681 {
682 const int64_t client_version = session->buffer.read<int64_t>();
683 LOGID("client_version = " << client_version << '\n');
684
685 session->buffer.index = 5;
686 session->buffer.write<int64_t>(client_version < protocol_version ? 0 : protocol_version);
687 session->buffer.write<int64_t>(session->id);
688 session->buffer.write<int64_t>(client.get_journal_checkpoint());
689 session->buffer.write<char>
690 (
691 (writable_journal_client && !client.is_pullonly()) ? 'W' : 'R'
692 );
693
694 write_buffer_and_next_command(session, session->buffer.index);
695 return;
696 }
697
698 LOGID("bad handshake\n");
699 }
700 }
701
702 ////////////////////////////////////////////////////////////////////////////
703 void Server::start_accept()
704 ////////////////////////////////////////////////////////////////////////////
705 {
706 if (!stopped)
707 {
708 acceptor.async_accept
709 (
710 io_context,
711 [this](std::error_code error, asio::local::stream_protocol::socket socket)
712 {
713 if (!error && !stopped)
714 {
715 std::shared_ptr<Session> session(new Session(*this, std::move(socket)));
716 async_read(session, 0, 13, &Server::handshake_handler);
717
718 start_accept();
719 }
720 }
721 );
722 }
723 }
724
725 ////////////////////////////////////////////////////////////////////////////
727 ////////////////////////////////////////////////////////////////////////////
728 (
729 Client &client,
730 asio::io_context &io_context,
731 std::string endpoint_path,
732 const std::chrono::milliseconds lock_timeout,
733 std::ostream * const log_pointer
734 ):
735 start_time(std::chrono::steady_clock::now()),
736 client(client),
737 writable_journal_client(dynamic_cast<Writable_Journal_Client*>(&client)),
738 io_context(io_context),
739 endpoint_path(std::move(endpoint_path)),
740 endpoint(this->endpoint_path),
741 acceptor(io_context, endpoint, false),
742 stopped(true),
743 interrupt_signals(io_context, SIGINT, SIGTERM),
744 session_id(0),
745 lock_timeout(lock_timeout),
746 lock_timeout_timer(io_context),
747 locked(false),
748 log_pointer(log_pointer)
749 {
750 if (writable_journal_client)
751 writable_journal_client->push_if_ahead();
752
753 start();
754 }
755
756 ////////////////////////////////////////////////////////////////////////////
758 ////////////////////////////////////////////////////////////////////////////
759 {
760 if (stopped)
761 {
762 stopped = false;
763
764 if (!client.is_shared() && writable_journal_client)
765 client_lock.emplace(*writable_journal_client);
766
767 interrupt_signals.async_wait([this](const asio::error_code &error, int)
768 {
769 if (!error)
770 stop();
771 });
772
773 start_accept();
774
775 // Note: C++20 has operator<< for durations
776 LOG
777 (
779 ": start. lock_timeout = " << lock_timeout.count() <<
780 "; protocol_version = " << protocol_version <<
781 "; shared = " << client.is_shared() << '\n'
782 );
783 write_status();
784 }
785 }
786
787 ////////////////////////////////////////////////////////////////////////////
789 ////////////////////////////////////////////////////////////////////////////
790 {
791 acceptor.cancel();
792 interrupt_signals.cancel();
793 stopped = true;
794 }
795
796 ////////////////////////////////////////////////////////////////////////////
798 ////////////////////////////////////////////////////////////////////////////
799 {
800 if (!stopped)
801 {
802 LOG(get_endpoint_path() << ": stop\n");
803
804 for (Session *session: sessions)
805 {
806 session->socket.close();
807 session->pull_timer.reset();
808 }
809
810 if (client_lock)
811 {
812 client_lock->unlock();
813 client_lock.reset();
814 }
815
817 }
818 }
819
820 ////////////////////////////////////////////////////////////////////////////
822 ////////////////////////////////////////////////////////////////////////////
823 {
824 try
825 {
826 if (!sessions.empty())
827 LOG("Destroying server before sessions.\n");
828 std::remove(endpoint_path.c_str());
829 }
830 catch (...)
831 {
832 }
833 }
834}
835
836#undef LOGID
837#undef LOG
#define LOG(x)
Definition Server.cpp:8
#define LOGID(x)
Definition Server.cpp:9
Handle concurrent access to a file with a joedb::Connection.
Definition Client.h:12
int64_t get_journal_checkpoint() const
Definition Client.h:62
const Readonly_Journal & get_journal() const
Definition Client.h:47
bool is_shared() const
Definition Client.h:52
static SHA_256::Hash get_hash(const Readonly_Journal &journal, int64_t checkpoint)
Async_Reader get_async_tail_reader(int64_t start_position) const
std::array< uint32_t, 8 > Hash
Definition SHA_256.h:60
void stop()
Definition Server.cpp:797
void start()
Definition Server.cpp:757
const std::string & get_endpoint_path() const
Definition Server.h:203
void stop_after_sessions()
Definition Server.cpp:788
std::chrono::milliseconds get_time_stamp() const
Definition Server.cpp:18
Server(Client &client, asio::io_context &io_context, std::string endpoint_path, std::chrono::milliseconds lock_timeout, std::ostream *log_pointer)
Definition Server.cpp:728
int64_t push_if_ahead() override
constexpr int protocol_version
std::string get_time_string_of_now()
Definition Blob.h:7
static constexpr std::array< char, 5 > joedb
Definition Header.h:17