Joedb 10.2.1
The Journal-Only Embedded Database
Loading...
Searching...
No Matches
Server_Client.cpp
Go to the documentation of this file.
6
7#include <iostream>
8
9#define LOG(x) do {if (log) *log << x;} while (false)
10
11namespace joedb
12{
13 ////////////////////////////////////////////////////////////////////////////
14 void Server_Client::ping(Lock<Channel&> &lock)
15 ////////////////////////////////////////////////////////////////////////////
16 {
17 buffer.index = 0;
18 buffer.write<char>('D');
19 buffer.write<int64_t>(0);
20 buffer.write<int64_t>(0);
21 lock->write(buffer.data, buffer.index);
22 lock->read(buffer.data, 9);
23 }
24
25 ////////////////////////////////////////////////////////////////////////////
27 ////////////////////////////////////////////////////////////////////////////
28 {
30 ping(lock);
31 }
32
33 ////////////////////////////////////////////////////////////////////////////
34 void Server_Client::keep_alive()
35 ////////////////////////////////////////////////////////////////////////////
36 {
37 try
38 {
40
41 while (!keep_alive_thread_must_stop)
42 {
43 condition.wait_for(lock, keep_alive_interval);
44
45 if (keep_alive_thread_must_stop)
46 break;
47
48 ping(lock);
49 }
50 }
51 catch(...)
52 {
53 }
54 }
55
56 ////////////////////////////////////////////////////////////////////////////
57 void Server_Client::connect()
58 ////////////////////////////////////////////////////////////////////////////
59 {
60 LOG("Connecting... ");
61
62 buffer.index = 0;
63 buffer.write<std::array<char, 5>>(Header::joedb);
65
66 {
67 Lock<Channel&> lock(channel);
68 lock->write(buffer.data, buffer.index);
69 lock->read(buffer.data, 5 + 8 + 8 + 8 + 1);
70 }
71
72 buffer.index = 0;
73
74 if (buffer.read<std::array<char, 5>>() != Header::joedb)
75 throw Exception("Did not receive \"joedb\" from server");
76
77 const int64_t server_version = buffer.read<int64_t>();
78
79 if (server_version == 0)
80 throw Exception("Client version rejected by server");
81
82 LOG("server_version = " << server_version << ". ");
83
84 if (server_version < protocol_version)
85 throw Exception("Unsupported server version");
86
87 session_id = buffer.read<int64_t>();
88 server_checkpoint = buffer.read<int64_t>();
89 const char mode = buffer.read<char>();
90
91 if (mode == 'R')
92 pullonly_server = true;
93 else if (mode == 'W')
94 pullonly_server = false;
95 else
96 throw Exception("Unexpected server mode");
97
98 LOG
99 (
100 "session_id = " << session_id <<
101 "; server_checkpoint = " << server_checkpoint <<
102 "; mode = " << mode <<
103 ". OK.\n"
104 );
105
106 if (keep_alive_interval.count() > 0)
107 {
108 keep_alive_thread_must_stop = false;
109 keep_alive_thread = std::thread([this](){keep_alive();});
110 }
111 }
112
113 ////////////////////////////////////////////////////////////////////////////
115 ////////////////////////////////////////////////////////////////////////////
116 (
117 Async_Writer &writer,
118 Lock<Channel&> &lock,
119 int64_t size
120 ) const
121 {
122 LOG("downloading");
123 Progress_Bar progress_bar(size, log);
124
125 for (int64_t read = 0; read < size;)
126 {
127 const int64_t remaining = size - read;
128 const size_t read_size = size_t(std::min(int64_t(buffer.size), remaining));
129 const size_t n = lock->read_some(buffer.data, read_size);
130 writer.write(buffer.data, n);
131 read += int64_t(n);
132 progress_bar.print(read);
133 }
134 }
135
136 ////////////////////////////////////////////////////////////////////////////
138 ////////////////////////////////////////////////////////////////////////////
139 (
140 Channel &channel,
141 std::ostream *log,
142 std::chrono::milliseconds keep_alive_interval
143 ):
144 keep_alive_interval(keep_alive_interval),
145 channel(channel),
146 log(log),
147 session_id(-1),
148 pullonly_server(false)
149 {
150 connect();
151 }
152
153 ////////////////////////////////////////////////////////////////////////////
154 void Server_Client::disconnect() noexcept
155 ////////////////////////////////////////////////////////////////////////////
156 {
157 try
158 {
160 keep_alive_thread_must_stop = true;
161 buffer.data[0] = 'Q';
162 lock->write(buffer.data, 1);
163 }
164 catch (...)
165 {
166 }
167
168 try
169 {
170 condition.notify_one();
171 if (keep_alive_thread.joinable())
172 keep_alive_thread.join();
173 }
174 catch (...)
175 {
176 }
177 }
178
179 ////////////////////////////////////////////////////////////////////////////
181 ////////////////////////////////////////////////////////////////////////////
182 {
183 disconnect();
184 }
185}
186
187#undef LOG
#define LOG(x)
size_t index
Definition Buffer.h:20
void write(T x)
Definition Buffer.h:23
char data[size+extra_size]
Definition Buffer.h:19
void print(int64_t current)
Thread_Safe< Channel & > channel
Server_Client(Channel &channel, std::ostream *log=nullptr, std::chrono::milliseconds keep_alive_interval=std::chrono::seconds(0))
void download(Async_Writer &writer, Lock< Channel & > &lock, int64_t size) const
constexpr int protocol_version
static constexpr std::array< char, 5 > joedb
Definition Header.h:17