Joedb 9.5.0
The Journal-Only Embedded Database
Loading...
Searching...
No Matches
Server_Client.cpp
Go to the documentation of this file.
6
7#include <iostream>
8
9#define LOG(x) do {if (log) *log << x;} while (false)
10
11namespace joedb
12{
13 ////////////////////////////////////////////////////////////////////////////
14 void Server_Client::ping(Channel_Lock &lock)
15 ////////////////////////////////////////////////////////////////////////////
16 {
17 buffer.index = 0;
18 buffer.write<char>('D');
19 buffer.write<int64_t>(0);
20 buffer.write<int64_t>(0);
21 lock.write(buffer.data, buffer.index);
22 lock.read(buffer.data, 9);
23 }
24
25 ////////////////////////////////////////////////////////////////////////////
27 ////////////////////////////////////////////////////////////////////////////
28 {
30 ping(lock);
31 }
32
33 ////////////////////////////////////////////////////////////////////////////
34 void Server_Client::keep_alive()
35 ////////////////////////////////////////////////////////////////////////////
36 {
37 try
38 {
40
41 while (!keep_alive_thread_must_stop)
42 {
43 condition.wait_for(lock, keep_alive_interval);
44
45 if (keep_alive_thread_must_stop)
46 break;
47
48 ping(lock);
49 }
50 }
51 catch(...)
52 {
53 }
54 }
55
56 ////////////////////////////////////////////////////////////////////////////
57 void Server_Client::connect()
58 ////////////////////////////////////////////////////////////////////////////
59 {
60 LOG("Connecting... ");
61
62 buffer.index = 0;
63 buffer.write<std::array<char, 5>>(Header::joedb);
65
66 {
67 Channel_Lock lock(channel);
68 lock.write(buffer.data, buffer.index);
69 lock.read(buffer.data, 5 + 8 + 8 + 8 + 1);
70 }
71
72 buffer.index = 0;
73
74 if (buffer.read<std::array<char, 5>>() != Header::joedb)
75 throw Exception("Did not receive \"joedb\" from server");
76
77 const int64_t server_version = buffer.read<int64_t>();
78
79 if (server_version == 0)
80 throw Exception("Client version rejected by server");
81
82 LOG("server_version = " << server_version << ". ");
83
84 if (server_version < protocol_version)
85 throw Exception("Unsupported server version");
86
87 session_id = buffer.read<int64_t>();
88 server_checkpoint = buffer.read<int64_t>();
89 const char mode = buffer.read<char>();
90
91 if (mode == 'R')
92 pullonly_server = true;
93 else if (mode == 'W')
94 pullonly_server = false;
95 else
96 throw Exception("Unexpected server mode");
97
98 LOG
99 (
100 "session_id = " << session_id <<
101 "; server_checkpoint = " << server_checkpoint <<
102 "; mode = " << mode <<
103 ". OK.\n"
104 );
105
106 if (keep_alive_interval.count() > 0)
107 {
108 keep_alive_thread_must_stop = false;
109 keep_alive_thread = std::thread([this](){keep_alive();});
110 }
111 }
112
113 ////////////////////////////////////////////////////////////////////////////
115 ////////////////////////////////////////////////////////////////////////////
116 (
117 Async_Writer &writer,
118 Channel_Lock &lock,
119 int64_t size
120 ) const
121 {
122 LOG("downloading");
123 Progress_Bar progress_bar(size, log);
124
125 for (int64_t read = 0; read < size;)
126 {
127 const int64_t remaining = size - read;
128 const size_t read_size = size_t(std::min(buffer.ssize, remaining));
129 const size_t n = lock.read_some(buffer.data, read_size);
130 writer.write(buffer.data, n);
131 read += int64_t(n);
132 progress_bar.print(read);
133 }
134 }
135
136 ////////////////////////////////////////////////////////////////////////////
137 Server_Client::Server_Client(Channel &channel, std::ostream *log):
138 ////////////////////////////////////////////////////////////////////////////
139 keep_alive_interval(std::chrono::seconds{240}),
140 channel(channel),
141 log(log),
142 session_id(-1),
143 pullonly_server(false)
144 {
145 connect();
146 }
147
148 ////////////////////////////////////////////////////////////////////////////
149 void Server_Client::disconnect()
150 ////////////////////////////////////////////////////////////////////////////
151 {
152 {
153 Channel_Lock lock(channel);
154 keep_alive_thread_must_stop = true;
155 }
156
157 condition.notify_one();
158 if (keep_alive_thread.joinable())
159 keep_alive_thread.join();
160 }
161
162 ////////////////////////////////////////////////////////////////////////////
164 ////////////////////////////////////////////////////////////////////////////
165 {
166 try { disconnect(); } catch (...) {}
167 }
168}
169
170#undef LOG
#define LOG(x)
Definition Server.cpp:8
size_t index
Definition Buffer.h:20
void write(T x)
Definition Buffer.h:23
char data[size+extra_size]
Definition Buffer.h:19
void print(int64_t current)
void download(Async_Writer &writer, Channel_Lock &lock, int64_t size) const
Server_Client(Channel &channel, std::ostream *log=nullptr)
Thread_Safe_Channel channel
constexpr int protocol_version
Definition Blob.h:7
static constexpr std::array< char, 5 > joedb
Definition Header.h:17