Joedb 10.3.0
The Journal-Only Embedded Database
Loading...
Searching...
No Matches
Server.cpp
Go to the documentation of this file.
6
7#include <boost/asio/redirect_error.hpp>
8#include <boost/asio/use_awaitable.hpp>
9
10#include <cstdio>
11#include <algorithm>
12
13namespace joedb
14{
15 ////////////////////////////////////////////////////////////////////////////
16 const std::map<char, const char *> Server::request_description
17 ////////////////////////////////////////////////////////////////////////////
18 {
19 {'H', "get fast SHA-256 hash code"},
20 {'I', "get full SHA-256 hash code"},
21 {'r', "read a range of bytes"},
22 {'D', "get checkpoint"},
23 {'E', "lock"},
24 {'F', "pull"},
25 {'G', "lock_pull"},
26 {'L', "lock"},
27 {'M', "unlock"},
28 {'N', "lock_push"},
29 {'O', "push_unlock"},
30 {'Q', "quit"}
31 };
32
33 ////////////////////////////////////////////////////////////////////////////
34 Server::Session::Session
35 ////////////////////////////////////////////////////////////////////////////
36 (
37 Server &server,
38 boost::asio::local::stream_protocol::socket &&socket
39 ):
40 joedb::asio::Server::Session(server, std::move(socket)),
41 timer(strand),
42 lock_timeout_timer(strand)
43 {
44 }
45
46 ////////////////////////////////////////////////////////////////////////////
47 void Server::Session::remove_from_queue(std::deque<Session *> &queue)
48 ////////////////////////////////////////////////////////////////////////////
49 {
50 const auto it = std::find(queue.begin(), queue.end(), this);
51 if (it != queue.end())
52 queue.erase(it);
53 }
54
55 ////////////////////////////////////////////////////////////////////////////
56 void Server::Session::cleanup()
57 ////////////////////////////////////////////////////////////////////////////
58 {
59 if (locking)
60 unlock();
61
62 remove_from_queue(get_server().lock_waiters);
63 remove_from_queue(get_server().pull_waiters);
64 }
65
66 ////////////////////////////////////////////////////////////////////////////
67 boost::asio::awaitable<void> Server::Session::lock()
68 ////////////////////////////////////////////////////////////////////////////
69 {
70 if (locking)
71 throw Exception("error: locking an already locked session");
72 else
73 {
74 if (!get_server().locked)
75 {
76 if (get_server().log_level > 2)
77 log("obtained lock immediately");
78 }
79 else
80 {
81 if (get_server().log_level > 2)
82 log("waiting for lock");
83
84 // dirty, using asio::experimental::channel is probably better
85 timer.expires_after(boost::asio::steady_timer::duration::max());
86
87 get_server().lock_waiters.emplace_back(this);
88 {
89 boost::system::error_code ec;
90 co_await timer.async_wait
91 (
92 boost::asio::redirect_error(boost::asio::use_awaitable, ec)
93 );
94 }
95 get_server().lock_waiters.pop_front();
96
97 if (get_server().log_level > 2)
98 log("obtained lock after waiting");
99 }
100
101 get_server().locked = true;
102 locking = true;
103 refresh_lock_timeout();
104
105 if (!get_server().client_lock)
106 {
107 if (!get_server().writable_journal_client)
108 {
109 log("error: locking pull-only server");
110 buffer.data[0] = 'R';
111 }
112 else
113 get_server().client_lock.emplace(*get_server().writable_journal_client);
114 }
115 }
116 }
117
118 ////////////////////////////////////////////////////////////////////////////
119 void Server::Session::unlock()
120 ////////////////////////////////////////////////////////////////////////////
121 {
122 if (locking)
123 {
124 if (get_server().log_level > 2)
125 log("unlock");
126
127 locking = false;
128 lock_timeout_timer.cancel();
129
130 if (get_server().lock_waiters.empty())
131 {
132 get_server().locked = false;
133 if (get_server().client.is_shared() && get_server().client_lock)
134 {
135 get_server().client_lock->unlock();
136 get_server().client_lock.reset();
137 }
138 }
139 else
140 get_server().lock_waiters.front()->timer.cancel();
141 }
142 }
143
144 ///////////////////////////////////////////////////////////////////////////
145 void Server::Session::refresh_lock_timeout()
146 ///////////////////////////////////////////////////////////////////////////
147 {
148 if (locking && get_server().lock_timeout.count() > 0)
149 {
150 lock_timeout_timer.expires_after(get_server().lock_timeout);
151 lock_timeout_timer.async_wait
152 (
153 [this](std::error_code error)
154 {
155 if (!error)
156 {
157 if (get_server().log_level > 2)
158 log("timeout");
159
160 if (push_writer)
161 {
162 push_writer.reset();
163 push_status = 't';
164 }
165
166 unlock();
167 }
168 }
169 );
170 }
171 }
172
173 ///////////////////////////////////////////////////////////////////////////
174 boost::asio::awaitable<void> Server::Session::send(Async_Reader reader)
175 ///////////////////////////////////////////////////////////////////////////
176 {
177 log
178 (
179 "sending to client, from = " + std::to_string(reader.get_current()) +
180 ", until = " + std::to_string(reader.get_end())
181 );
182
183 buffer.index = 1;
184 buffer.write<int64_t>(reader.get_end());
185 Progress_Bar progress_bar(reader.get_remaining(), *this);
186
187 while (buffer.index + reader.get_remaining() > 0)
188 {
189 buffer.index += reader.read
190 (
191 buffer.data + buffer.index,
192 buffer.size - buffer.index
193 );
194
195 if (reader.is_end_of_file())
196 throw Exception("unexpected end of file");
197
198 refresh_lock_timeout();
199 co_await write_buffer();
200 buffer.index = 0;
201
202 if (get_server().log_level > 3)
203 progress_bar.print_remaining(reader.get_remaining());
204 }
205 }
206
207 ///////////////////////////////////////////////////////////////////////////
208 boost::asio::awaitable<void> Server::Session::check_hash()
209 ///////////////////////////////////////////////////////////////////////////
210 {
211 co_await read_buffer(1, 40);
212
213 const auto checkpoint = buffer.read<int64_t>();
214 const auto hash = buffer.read<SHA_256::Hash>();
215
216 const Readonly_Journal &journal = get_server().client.get_journal();
217
218 if
219 (
220 checkpoint > journal.get_checkpoint() ||
221 hash != (buffer.data[0] == 'H' // ??? takes_time
222 ? Journal_Hasher::get_fast_hash(journal, checkpoint)
223 : Journal_Hasher::get_full_hash(journal, checkpoint))
224 )
225 {
226 buffer.data[0] = 'h';
227 }
228
229 if (get_server().log_level > 2)
230 {
231 log
232 (
233 "hash for checkpoint = " + std::to_string(checkpoint) +
234 ", result = " + buffer.data[0]
235 );
236 }
237
238 buffer.index = 1;
239 co_await write_buffer();
240 }
241
242 ///////////////////////////////////////////////////////////////////////////
243 boost::asio::awaitable<void> Server::Session::handshake()
244 ///////////////////////////////////////////////////////////////////////////
245 {
246 co_await read_buffer(0, 13);
247
248 if (buffer.read<std::array<char, 5>>() != Header::joedb)
249 throw Exception("handshake does not start by joedb");
250
251 const int64_t client_version = buffer.read<int64_t>();
252 if (get_server().log_level > 0)
253 log("client_version = " + std::to_string(client_version));
254
255 const int64_t version =
256 client_version < protocol_version ? 0 : protocol_version;
257
258 const bool writable =
259 get_server().writable_journal_client &&
260 !get_server().client.is_pullonly();
261
262 buffer.index = 5;
263 buffer.write<int64_t>(version);
264 buffer.write<int64_t>(id);
265 buffer.write<int64_t>(get_server().client.get_journal_checkpoint());
266 buffer.write<char>(writable ? 'W' : 'R');
267
268 co_await write_buffer();
269 }
270
271 ///////////////////////////////////////////////////////////////////////////
272 boost::asio::awaitable<void> Server::Session::read()
273 ///////////////////////////////////////////////////////////////////////////
274 {
275 co_await read_buffer(1, 16);
276
277 int64_t from = buffer.read<int64_t>();
278 int64_t until = buffer.read<int64_t>();
279
280 if (until > get_server().client.get_journal_checkpoint())
281 until = get_server().client.get_journal_checkpoint();
282 if (from > until)
283 from = until;
284
285 co_await send
286 (
287 get_server().client.get_journal().get_async_reader(from, until)
288 );
289 }
290
291 ///////////////////////////////////////////////////////////////////////////
292 boost::asio::awaitable<void> Server::Session::pull
293 ///////////////////////////////////////////////////////////////////////////
294 (
295 bool lock_before,
296 bool send_data
297 )
298 {
299 co_await read_buffer(1, 16);
300
301 const std::chrono::milliseconds wait{buffer.read<int64_t>()};
302 int64_t pull_checkpoint = buffer.read<int64_t>();
303
304 if (!get_server().client_lock)
305 get_server().client.pull(); // ??? takes_time
306
307 if
308 (
309 wait.count() > 0 &&
310 pull_checkpoint == get_server().client.get_journal_checkpoint()
311 )
312 {
313 if (get_server().log_level > 2)
314 {
315 log
316 (
317 "waiting at checkpoint = " + std::to_string(pull_checkpoint) +
318 " for " + std::to_string(double(wait.count()) * 0.001) + 's'
319 );
320 }
321
322 timer.expires_after(wait);
323
324 get_server().pull_waiters.emplace_back(this);
325 {
326 boost::system::error_code ec;
327 co_await timer.async_wait
328 (
329 boost::asio::redirect_error(boost::asio::use_awaitable, ec)
330 );
331 }
332 remove_from_queue(get_server().pull_waiters);
333 }
334
335 if (lock_before)
336 co_await lock();
337
338 if (!send_data)
339 pull_checkpoint = get_server().client.get_journal_checkpoint();
340
341 co_await send
342 (
343 get_server().client.get_journal().get_async_tail_reader(pull_checkpoint)
344 );
345 }
346
347 ///////////////////////////////////////////////////////////////////////////
348 boost::asio::awaitable<void> Server::Session::push
349 ///////////////////////////////////////////////////////////////////////////
350 (
351 bool unlock_after
352 )
353 {
354 co_await read_buffer(1, 16);
355
356 const int64_t from = buffer.read<int64_t>();
357 const int64_t until = buffer.read<int64_t>();
358
359 if (get_server().locked && !locking)
360 throw Exception("trying to push while someone else is locking");
361 else if (!get_server().locked)
362 {
363 if (get_server().log_level > 2)
364 log("taking the lock for push attempt.");
365 co_await lock();
366 }
367
368 const bool conflict =
369 (
370 !locking ||
371 from != get_server().client.get_journal().get_checkpoint()
372 );
373
374 push_status = buffer.data[0];
375
376 if (!get_server().writable_journal_client)
377 push_status = 'R';
378 else if (conflict)
379 push_status = 'C';
380 else
381 {
382 if (get_server().client_lock)
383 {
384 push_writer.emplace
385 (
386 get_server().client_lock->get_journal().get_async_tail_writer()
387 );
388 }
389 }
390
391 if (get_server().log_level > 2)
392 {
393 log
394 (
395 "receiving from client, from = " + std::to_string(from) +
396 ", until = " + std::to_string(until)
397 );
398 }
399
400 int64_t remaining_size = until - from;
401 Progress_Bar progress_bar(remaining_size, *this);
402
403 while (remaining_size > 0)
404 {
405 refresh_lock_timeout();
406
407 const size_t size = co_await read_buffer
408 (
409 0,
410 size_t(std::min(remaining_size, int64_t(buffer.size)))
411 );
412
413 if (push_writer)
414 push_writer->write(buffer.data, size); // ??? takes_time
415
416 remaining_size -= int64_t(size);
417 if (get_server().log_level > 3)
418 progress_bar.print_remaining(remaining_size);
419 }
420
421 if (until > from)
422 {
423 if (push_writer)
424 {
425 if (get_server().client_lock)
426 {
427 get_server().client_lock->get_journal().soft_checkpoint_at
428 (
429 push_writer->get_position()
430 );
431
432 if (get_server().client.is_shared() && unlock_after)
433 {
434 get_server().client_lock->checkpoint_and_push_unlock();
435 get_server().client_lock.reset();
436 }
437 else
438 get_server().client_lock->checkpoint_and_push();
439 }
440 }
441 }
442
443 if (get_server().log_level > 2)
444 log(std::string("done pushing, status = ") + push_status);
445
446 if (unlock_after)
447 unlock();
448
449 for (auto *waiter: get_server().pull_waiters)
450 waiter->timer.cancel();
451 get_server().pull_waiters.clear();
452
453 buffer.data[0] = push_status;
454 buffer.index = 1;
455 co_await write_buffer();
456 }
457
458 ///////////////////////////////////////////////////////////////////////////
459 boost::asio::awaitable<void> Server::Session::run()
460 ///////////////////////////////////////////////////////////////////////////
461 {
462 co_await handshake();
463
464 while (true)
465 {
466 co_await read_buffer(0, 1);
467
468 const char code = buffer.read<char>();
469
470 if (server.get_log_level() > 2)
471 {
472 const auto i = request_description.find(code);
473 if (i != request_description.end())
474 log(std::string("received request ") + code + ": " + i->second);
475 else
476 log(std::string("unknown code: ") + code);
477 }
478
479 switch (code)
480 {
481 case 'H': case 'I':
482 co_await check_hash();
483 break;
484
485 case 'r':
486 co_await read();
487 break;
488
489 case 'D': case 'E': case 'F': case 'G':
490 co_await pull(code & 1, code & 2);
491 break;
492
493 case 'M':
494 unlock();
495 co_await write_buffer();
496 break;
497
498 case 'N': case 'O':
499 co_await push(code == 'O');
500 break;
501
502 case 'Q': default:
503 co_return;
504 break;
505 }
506 }
507 }
508
509 ////////////////////////////////////////////////////////////////////////////
510 Server::Server
511 ////////////////////////////////////////////////////////////////////////////
512 (
513 Logger &logger,
514 int log_level,
515 int thread_count,
516 std::string endpoint_path,
517 Client &client,
518 std::chrono::milliseconds lock_timeout
519 ):
520 joedb::asio::Server(logger, log_level, thread_count, std::move(endpoint_path)),
521 client(client),
522 writable_journal_client(dynamic_cast<Writable_Journal_Client*>(&client)),
523 lock_timeout(lock_timeout),
524 locked(false)
525 {
527
528 if (writable_journal_client)
529 {
530 if (!client.is_shared())
531 client_lock.emplace(*writable_journal_client);
532 writable_journal_client->push_if_ahead();
533 }
534 }
535
536 ////////////////////////////////////////////////////////////////////////////
538 ////////////////////////////////////////////////////////////////////////////
539 {
540 if (client_lock)
541 client_lock->unlock();
542 }
543
544 ////////////////////////////////////////////////////////////////////////////
545 Server::~Server() = default;
546 ////////////////////////////////////////////////////////////////////////////
547}
Handle concurrent access to a file with a joedb::Connection.
Definition Client.h:12
void cleanup_after_join() override
Definition Server.cpp:537
int64_t push_if_ahead(int64_t until) override
const int thread_count
Definition Server.h:25
constexpr int protocol_version
#define JOEDB_RELEASE_ASSERT(x)
always-tested assertion (release and debug mode)
Definition assert.h:24