Joedb 10.2.1
The Journal-Only Embedded Database
Loading...
Searching...
No Matches
Server.cpp
Go to the documentation of this file.
5
6#include <boost/asio/redirect_error.hpp>
7#include <boost/asio/use_awaitable.hpp>
8
9#include <cstdio>
10#include <algorithm>
11
12namespace joedb
13{
14 ////////////////////////////////////////////////////////////////////////////
15 const std::map<char, const char *> Server::request_description
16 ////////////////////////////////////////////////////////////////////////////
17 {
18 {'H', "get fast SHA-256 hash code"},
19 {'I', "get full SHA-256 hash code"},
20 {'r', "read a range of bytes"},
21 {'D', "get checkpoint"},
22 {'E', "lock"},
23 {'F', "pull"},
24 {'G', "lock_pull"},
25 {'L', "lock"},
26 {'M', "unlock"},
27 {'N', "lock_push"},
28 {'O', "push_unlock"},
29 {'Q', "quit"}
30 };
31
32 ////////////////////////////////////////////////////////////////////////////
33 Server::Session::Session
34 ////////////////////////////////////////////////////////////////////////////
35 (
36 Server &server,
37 boost::asio::local::stream_protocol::socket &&socket
38 ):
39 joedb::asio::Server::Session(server, std::move(socket)),
40 timer(strand),
41 lock_timeout_timer(strand)
42 {
43 }
44
45 ////////////////////////////////////////////////////////////////////////////
46 void Server::Session::remove_from_queue(std::deque<Session *> &queue)
47 ////////////////////////////////////////////////////////////////////////////
48 {
49 const auto it = std::find(queue.begin(), queue.end(), this);
50 if (it != queue.end())
51 queue.erase(it);
52 }
53
54 ////////////////////////////////////////////////////////////////////////////
55 void Server::Session::cleanup()
56 ////////////////////////////////////////////////////////////////////////////
57 {
58 if (locking)
59 unlock();
60
61 remove_from_queue(get_server().lock_waiters);
62 remove_from_queue(get_server().pull_waiters);
63 }
64
65 ////////////////////////////////////////////////////////////////////////////
66 boost::asio::awaitable<void> Server::Session::lock()
67 ////////////////////////////////////////////////////////////////////////////
68 {
69 if (locking)
70 throw Exception("error: locking an already locked session");
71 else
72 {
73 if (!get_server().locked)
74 {
75 if (get_server().log_level > 2)
76 log("obtained lock immediately");
77 }
78 else
79 {
80 if (get_server().log_level > 2)
81 log("waiting for lock");
82
83 // dirty, using asio::experimental::channel is probably better
84 timer.expires_after(boost::asio::steady_timer::duration::max());
85
86 get_server().lock_waiters.emplace_back(this);
87 {
88 boost::system::error_code ec;
89 co_await timer.async_wait
90 (
91 boost::asio::redirect_error(boost::asio::use_awaitable, ec)
92 );
93 }
94 get_server().lock_waiters.pop_front();
95
96 if (get_server().log_level > 2)
97 log("obtained lock after waiting");
98 }
99
100 get_server().locked = true;
101 locking = true;
102 refresh_lock_timeout();
103
104 if (!get_server().client_lock)
105 {
106 if (!get_server().writable_journal_client)
107 {
108 log("error: locking pull-only server");
109 buffer.data[0] = 'R';
110 }
111 else
112 get_server().client_lock.emplace(*get_server().writable_journal_client);
113 }
114 }
115 }
116
117 ////////////////////////////////////////////////////////////////////////////
118 void Server::Session::unlock()
119 ////////////////////////////////////////////////////////////////////////////
120 {
121 if (locking)
122 {
123 if (get_server().log_level > 2)
124 log("unlock");
125
126 locking = false;
127 lock_timeout_timer.cancel();
128
129 if (get_server().lock_waiters.empty())
130 {
131 get_server().locked = false;
132 if (get_server().client.is_shared() && get_server().client_lock)
133 {
134 get_server().client_lock->unlock();
135 get_server().client_lock.reset();
136 }
137 }
138 else
139 get_server().lock_waiters.front()->timer.cancel();
140 }
141 }
142
143 ///////////////////////////////////////////////////////////////////////////
144 void Server::Session::refresh_lock_timeout()
145 ///////////////////////////////////////////////////////////////////////////
146 {
147 if (locking && get_server().lock_timeout.count() > 0)
148 {
149 lock_timeout_timer.expires_after(get_server().lock_timeout);
150 lock_timeout_timer.async_wait
151 (
152 [this](std::error_code error)
153 {
154 if (!error)
155 {
156 if (get_server().log_level > 2)
157 log("timeout");
158
159 if (push_writer)
160 {
161 push_writer.reset();
162 push_status = 't';
163 }
164
165 unlock();
166 }
167 }
168 );
169 }
170 }
171
172 ///////////////////////////////////////////////////////////////////////////
173 boost::asio::awaitable<void> Server::Session::send(Async_Reader reader)
174 ///////////////////////////////////////////////////////////////////////////
175 {
176 log
177 (
178 "sending to client, from = " + std::to_string(reader.get_current()) +
179 ", until = " + std::to_string(reader.get_end())
180 );
181
182 buffer.index = 1;
183 buffer.write<int64_t>(reader.get_end());
184
185 const int64_t display_step = std::max
186 (
187 int64_t(1000000),
188 reader.get_remaining() >> 6
189 );
190 int64_t next_display = reader.get_remaining() - display_step;
191
192 while (buffer.index + reader.get_remaining() > 0)
193 {
194 if
195 (
196 get_server().log_level > 3 &&
197 reader.get_remaining() > 0 &&
198 reader.get_remaining() <= next_display
199 )
200 {
201 next_display -= display_step;
202 log("remaining_size = " + std::to_string(reader.get_remaining()));
203 }
204
205 buffer.index += reader.read
206 (
207 buffer.data + buffer.index,
208 buffer.size - buffer.index
209 );
210
211 if (reader.is_end_of_file())
212 throw Exception("unexpected end of file");
213
214 refresh_lock_timeout();
215 co_await write_buffer();
216 buffer.index = 0;
217 }
218 }
219
220 ///////////////////////////////////////////////////////////////////////////
221 boost::asio::awaitable<void> Server::Session::check_hash()
222 ///////////////////////////////////////////////////////////////////////////
223 {
224 co_await read_buffer(1, 40);
225
226 const auto checkpoint = buffer.read<int64_t>();
227 const auto hash = buffer.read<SHA_256::Hash>();
228
229 const Readonly_Journal &journal = get_server().client.get_journal();
230
231 if
232 (
233 checkpoint > journal.get_checkpoint() ||
234 hash != (buffer.data[0] == 'H' // ??? takes_time
235 ? Journal_Hasher::get_fast_hash(journal, checkpoint)
236 : Journal_Hasher::get_full_hash(journal, checkpoint))
237 )
238 {
239 buffer.data[0] = 'h';
240 }
241
242 if (get_server().log_level > 2)
243 {
244 log
245 (
246 "hash for checkpoint = " + std::to_string(checkpoint) +
247 ", result = " + buffer.data[0]
248 );
249 }
250
251 buffer.index = 1;
252 co_await write_buffer();
253 }
254
255 ///////////////////////////////////////////////////////////////////////////
256 boost::asio::awaitable<void> Server::Session::handshake()
257 ///////////////////////////////////////////////////////////////////////////
258 {
259 co_await read_buffer(0, 13);
260
261 if (buffer.read<std::array<char, 5>>() != Header::joedb)
262 throw Exception("handshake does not start by joedb");
263
264 const int64_t client_version = buffer.read<int64_t>();
265 if (get_server().log_level > 0)
266 log("client_version = " + std::to_string(client_version));
267
268 const int64_t version =
269 client_version < protocol_version ? 0 : protocol_version;
270
271 const bool writable =
272 get_server().writable_journal_client &&
273 !get_server().client.is_pullonly();
274
275 buffer.index = 5;
276 buffer.write<int64_t>(version);
277 buffer.write<int64_t>(id);
278 buffer.write<int64_t>(get_server().client.get_journal_checkpoint());
279 buffer.write<char>(writable ? 'W' : 'R');
280
281 co_await write_buffer();
282 }
283
284 ///////////////////////////////////////////////////////////////////////////
285 boost::asio::awaitable<void> Server::Session::read()
286 ///////////////////////////////////////////////////////////////////////////
287 {
288 co_await read_buffer(1, 16);
289
290 int64_t from = buffer.read<int64_t>();
291 int64_t until = buffer.read<int64_t>();
292
293 if (until > get_server().client.get_journal_checkpoint())
294 until = get_server().client.get_journal_checkpoint();
295 if (from > until)
296 from = until;
297
298 co_await send
299 (
300 get_server().client.get_journal().get_async_reader(from, until)
301 );
302 }
303
304 ///////////////////////////////////////////////////////////////////////////
305 boost::asio::awaitable<void> Server::Session::pull
306 ///////////////////////////////////////////////////////////////////////////
307 (
308 bool lock_before,
309 bool send_data
310 )
311 {
312 co_await read_buffer(1, 16);
313
314 const std::chrono::milliseconds wait{buffer.read<int64_t>()};
315 int64_t pull_checkpoint = buffer.read<int64_t>();
316
317 if (!get_server().client_lock)
318 get_server().client.pull(); // ??? takes_time
319
320 if
321 (
322 wait.count() > 0 &&
323 pull_checkpoint == get_server().client.get_journal_checkpoint()
324 )
325 {
326 if (get_server().log_level > 2)
327 {
328 log
329 (
330 "waiting at checkpoint = " + std::to_string(pull_checkpoint) +
331 " for " + std::to_string(double(wait.count()) * 0.001) + 's'
332 );
333 }
334
335 timer.expires_after(wait);
336
337 get_server().pull_waiters.emplace_back(this);
338 {
339 boost::system::error_code ec;
340 co_await timer.async_wait
341 (
342 boost::asio::redirect_error(boost::asio::use_awaitable, ec)
343 );
344 }
345 remove_from_queue(get_server().pull_waiters);
346 }
347
348 if (lock_before)
349 co_await lock();
350
351 if (!send_data)
352 pull_checkpoint = get_server().client.get_journal_checkpoint();
353
354 co_await send
355 (
356 get_server().client.get_journal().get_async_tail_reader(pull_checkpoint)
357 );
358 }
359
360 ///////////////////////////////////////////////////////////////////////////
361 boost::asio::awaitable<void> Server::Session::push
362 ///////////////////////////////////////////////////////////////////////////
363 (
364 bool unlock_after
365 )
366 {
367 co_await read_buffer(1, 16);
368
369 const int64_t from = buffer.read<int64_t>();
370 const int64_t until = buffer.read<int64_t>();
371
372 if (get_server().locked && !locking)
373 throw Exception("trying to push while someone else is locking");
374 else if (!get_server().locked)
375 {
376 if (get_server().log_level > 2)
377 log("taking the lock for push attempt.");
378 co_await lock();
379 }
380
381 const bool conflict =
382 (
383 !locking ||
384 from != get_server().client.get_journal().get_checkpoint()
385 );
386
387 push_status = buffer.data[0];
388
389 if (!get_server().writable_journal_client)
390 push_status = 'R';
391 else if (conflict)
392 push_status = 'C';
393 else
394 {
395 if (get_server().client_lock)
396 {
397 push_writer.emplace
398 (
399 get_server().client_lock->get_journal().get_async_tail_writer()
400 );
401 }
402 }
403
404 if (get_server().log_level > 2)
405 {
406 log
407 (
408 "receiving from client, from = " + std::to_string(from) +
409 ", until = " + std::to_string(until)
410 );
411 }
412
413 int64_t remaining_size = until - from;
414
415 while (remaining_size > 0)
416 {
417 refresh_lock_timeout();
418
419 const size_t size = co_await read_buffer
420 (
421 0,
422 size_t(std::min(remaining_size, int64_t(buffer.size)))
423 );
424
425 if (push_writer)
426 push_writer->write(buffer.data, size); // ??? takes_time
427
428 remaining_size -= int64_t(size);
429 if (get_server().log_level > 3 && remaining_size > 0)
430 log("remaining_size = " + std::to_string(remaining_size));
431 }
432
433 if (until > from)
434 {
435 if (push_writer)
436 {
437 if (get_server().client_lock)
438 {
439 get_server().client_lock->get_journal().soft_checkpoint_at
440 (
441 push_writer->get_position()
442 );
443
444 if (get_server().client.is_shared() && unlock_after)
445 {
446 get_server().client_lock->checkpoint_and_push_unlock();
447 get_server().client_lock.reset();
448 }
449 else
450 get_server().client_lock->checkpoint_and_push();
451 }
452 }
453 }
454
455 if (get_server().log_level > 2)
456 log(std::string("done pushing, status = ") + push_status);
457
458 if (unlock_after)
459 unlock();
460
461 for (auto *waiter: get_server().pull_waiters)
462 waiter->timer.cancel();
463 get_server().pull_waiters.clear();
464
465 buffer.data[0] = push_status;
466 buffer.index = 1;
467 co_await write_buffer();
468 }
469
470 ///////////////////////////////////////////////////////////////////////////
471 boost::asio::awaitable<void> Server::Session::run()
472 ///////////////////////////////////////////////////////////////////////////
473 {
474 co_await handshake();
475
476 while (true)
477 {
478 co_await read_buffer(0, 1);
479
480 const char code = buffer.read<char>();
481
482 if (server.get_log_level() > 2)
483 {
484 const auto i = request_description.find(code);
485 if (i != request_description.end())
486 log(std::string("received request ") + code + ": " + i->second);
487 else
488 log(std::string("unknown code: ") + code);
489 }
490
491 switch (code)
492 {
493 case 'H': case 'I':
494 co_await check_hash();
495 break;
496
497 case 'r':
498 co_await read();
499 break;
500
501 case 'D': case 'E': case 'F': case 'G':
502 co_await pull(code & 1, code & 2);
503 break;
504
505 case 'M':
506 unlock();
507 co_await write_buffer();
508 break;
509
510 case 'N': case 'O':
511 co_await push(code == 'O');
512 break;
513
514 case 'Q': default:
515 co_return;
516 break;
517 }
518 }
519 }
520
521 ////////////////////////////////////////////////////////////////////////////
522 Server::Server
523 ////////////////////////////////////////////////////////////////////////////
524 (
525 Logger &logger,
526 int log_level,
527 int thread_count,
528 std::string endpoint_path,
529 Client &client,
530 std::chrono::milliseconds lock_timeout
531 ):
532 joedb::asio::Server(logger, log_level, thread_count, std::move(endpoint_path)),
533 client(client),
534 writable_journal_client(dynamic_cast<Writable_Journal_Client*>(&client)),
535 lock_timeout(lock_timeout),
536 locked(false)
537 {
539
540 if (writable_journal_client)
541 {
542 if (!client.is_shared())
543 client_lock.emplace(*writable_journal_client);
544 writable_journal_client->push_if_ahead();
545 }
546 }
547
548 ////////////////////////////////////////////////////////////////////////////
550 ////////////////////////////////////////////////////////////////////////////
551 {
552 if (client_lock)
553 client_lock->unlock();
554 }
555
556 ////////////////////////////////////////////////////////////////////////////
557 Server::~Server() = default;
558 ////////////////////////////////////////////////////////////////////////////
559}
Handle concurrent access to a file with a joedb::Connection.
Definition Client.h:12
void cleanup_after_join() override
Definition Server.cpp:549
int64_t push_if_ahead(int64_t until) override
const int thread_count
Definition Server.h:25
constexpr int protocol_version
#define JOEDB_RELEASE_ASSERT(x)
always-tested assertion (release and debug mode)
Definition assert.h:24