Joedb 10.3.0
The Journal-Only Embedded Database
Loading...
Searching...
No Matches
Server_Client.cpp
Go to the documentation of this file.
6
7namespace joedb
8{
9 ////////////////////////////////////////////////////////////////////////////
10 void Server_Client::log(const std::string &message) noexcept
11 ////////////////////////////////////////////////////////////////////////////
12 {
13 try
14 {
15 logger.log(std::to_string(get_session_id()) + ": " + message);
16 }
17 catch (...)
18 {
19 }
20 }
21
22 ////////////////////////////////////////////////////////////////////////////
24 ////////////////////////////////////////////////////////////////////////////
25 {
26 buffer.index = 0;
27 buffer.write<char>('D');
28 buffer.write<int64_t>(0);
29 buffer.write<int64_t>(0);
30 lock->write(buffer.data, buffer.index);
31 lock->read(buffer.data, 9);
32 }
33
34 ////////////////////////////////////////////////////////////////////////////
36 ////////////////////////////////////////////////////////////////////////////
37 {
39 ping(lock);
40 }
41
42 ////////////////////////////////////////////////////////////////////////////
43 void Server_Client::keep_alive()
44 ////////////////////////////////////////////////////////////////////////////
45 {
46 try
47 {
49
50 while (!keep_alive_thread_must_stop)
51 {
52 condition.wait_for(lock, keep_alive_interval);
53
54 if (keep_alive_thread_must_stop)
55 break;
56
57 ping(lock);
58 }
59 }
60 catch(...)
61 {
62 }
63 }
64
65 ////////////////////////////////////////////////////////////////////////////
66 void Server_Client::connect()
67 ////////////////////////////////////////////////////////////////////////////
68 {
69 logger.log("joedb::Server_Client::connect");
70
71 buffer.index = 0;
72 buffer.write<std::array<char, 5>>(Header::joedb);
74
75 {
76 Lock<Channel&> lock(channel);
77 lock->write(buffer.data, buffer.index);
78 lock->read(buffer.data, 5 + 8 + 8 + 8 + 1);
79 }
80
81 buffer.index = 0;
82
83 if (buffer.read<std::array<char, 5>>() != Header::joedb)
84 throw Exception("Did not receive \"joedb\" from server");
85
86 const int64_t server_version = buffer.read<int64_t>();
87
88 if (server_version == 0)
89 throw Exception("Client version rejected by server");
90
91 logger.log("server_version = " + std::to_string(server_version));
92
93 if (server_version < protocol_version)
94 throw Exception("Unsupported server version");
95
96 session_id = buffer.read<int64_t>();
97 server_checkpoint = buffer.read<int64_t>();
98 const char mode = buffer.read<char>();
99
100 if (mode == 'R')
101 pullonly_server = true;
102 else if (mode == 'W')
103 pullonly_server = false;
104 else
105 throw Exception("Unexpected server mode");
106
107 log
108 (
109 "server_checkpoint = " + std::to_string(server_checkpoint) +
110 "; mode = " + mode
111 );
112
113 if (keep_alive_interval.count() > 0)
114 {
115 keep_alive_thread_must_stop = false;
116 keep_alive_thread = std::thread([this](){keep_alive();});
117 }
118 }
119
120 ////////////////////////////////////////////////////////////////////////////
122 ////////////////////////////////////////////////////////////////////////////
123 (
124 Async_Writer &writer,
125 Lock<Channel&> &lock,
126 int64_t size
127 ) const
128 {
129 Progress_Bar progress_bar
130 (
131 size,
132 *const_cast<Logger *>(static_cast<const Logger *>(this))
133 );
134
135 for (int64_t read = 0; read < size;)
136 {
137 const int64_t remaining = size - read;
138 const size_t read_size = size_t(std::min(int64_t(buffer.size), remaining));
139 const size_t n = lock->read_some(buffer.data, read_size);
140 writer.write(buffer.data, n);
141 read += int64_t(n);
142 progress_bar.print(read);
143 }
144 }
145
146 ////////////////////////////////////////////////////////////////////////////
148 ////////////////////////////////////////////////////////////////////////////
149 (
150 Channel &channel,
151 Logger &logger,
152 std::chrono::milliseconds keep_alive_interval
153 ):
154 keep_alive_interval(keep_alive_interval),
155 channel(channel),
156 logger(logger),
157 session_id(-1),
158 pullonly_server(false)
159 {
160 connect();
161 }
162
163 ////////////////////////////////////////////////////////////////////////////
164 void Server_Client::disconnect() noexcept
165 ////////////////////////////////////////////////////////////////////////////
166 {
167 try
168 {
170 keep_alive_thread_must_stop = true;
171 buffer.data[0] = 'Q';
172 lock->write(buffer.data, 1);
173 }
174 catch (...)
175 {
176 }
177
178 try
179 {
180 condition.notify_one();
181 if (keep_alive_thread.joinable())
182 keep_alive_thread.join();
183 }
184 catch (...)
185 {
186 }
187 }
188
189 ////////////////////////////////////////////////////////////////////////////
191 ////////////////////////////////////////////////////////////////////////////
192 {
193 disconnect();
194 }
195}
size_t index
Definition Buffer.h:20
void write(T x)
Definition Buffer.h:23
char data[size+extra_size]
Definition Buffer.h:19
virtual void log(const std::string &message) noexcept
Definition Logger.h:12
void print(int64_t current)
Server_Client(Channel &channel, Logger &logger=Logger::dummy_logger, std::chrono::milliseconds keep_alive_interval=std::chrono::seconds(0))
void log(const std::string &message) noexcept override
Thread_Safe< Channel & > channel
void download(Async_Writer &writer, Lock< Channel & > &lock, int64_t size) const
constexpr int protocol_version
static constexpr std::array< char, 5 > joedb
Definition Header.h:17