Joedb 9.5.0
The Journal-Only Embedded Database
Loading...
Searching...
No Matches
Server_Connection.cpp
Go to the documentation of this file.
6
7#include <iostream>
8
9#define LOG(x) do {if (log) *log << x;} while (false)
10#define LOGID(x) do {if (log) *log << get_time_string_of_now() << ' ' << get_session_id() << ": " << x;} while (false)
11
12namespace joedb
13{
14 ////////////////////////////////////////////////////////////////////////////
16 ////////////////////////////////////////////////////////////////////////////
17 (
18 const Readonly_Journal &client_journal,
19 int64_t server_checkpoint
20 )
21 {
22 LOGID("checking_hash... ");
23
24 const int64_t checkpoint = std::min
25 (
26 server_checkpoint,
27 client_journal.get_checkpoint()
28 );
29
30 buffer.index = 0;
31 buffer.write<char>('H');
32 buffer.write<int64_t>(checkpoint);
33 buffer.write(Journal_Hasher::get_hash(client_journal, checkpoint));
34
35 {
36 Channel_Lock lock(channel);
37 lock.write(buffer.data, buffer.index);
38 lock.read(buffer.data, 1);
39 }
40
41 LOG(buffer.data[0] << '\n');
42
43 return (buffer.data[0] == 'H');
44 }
45
46 ////////////////////////////////////////////////////////////////////////////
47 size_t Server_Connection::pread(char *data, size_t size, int64_t offset) const
48 ////////////////////////////////////////////////////////////////////////////
49 {
50 buffer.index = 0;
51 buffer.write<char>('r');
52 buffer.write<int64_t>(offset);
53 buffer.write<int64_t>(offset + int64_t(size));
54
55 Channel_Lock lock(channel);
56 lock.write(buffer.data, buffer.index);
57 lock.read(buffer.data, 9);
58
59 buffer.index = 1;
60 const int64_t until = buffer.read<int64_t>();
61
62 if (until < offset || until > offset + int64_t(size))
63 throw Exception("bad pread size from server");
64
65 const size_t returned_size = size_t(until - offset);
66
67 for (size_t read = 0; read < returned_size;)
68 read += lock.read_some(data + read, returned_size - read);
69
70 return returned_size;
71 }
72
73 ////////////////////////////////////////////////////////////////////////////
75 ////////////////////////////////////////////////////////////////////////////
76 (
77 const Readonly_Journal &client_journal,
78 Content_Check content_check
79 )
80 {
81 if (content_check != Content_Check::none)
82 if (!check_matching_content(client_journal, server_checkpoint))
83 content_mismatch();
84
85 return server_checkpoint;
86 }
87
88 ////////////////////////////////////////////////////////////////////////////
90 ////////////////////////////////////////////////////////////////////////////
91 (
92 Lock_Action lock_action,
93 Data_Transfer data_transfer,
94 Writable_Journal &client_journal,
95 std::chrono::milliseconds wait
96 )
97 {
98 Channel_Lock lock(channel);
99
100 const char pull_type = char('D' + int(lock_action) + 2*int(data_transfer));
101
102 LOGID("pulling, lock = " << int(lock_action) << ", data = " <<
103 int(data_transfer) << ", wait = " << double(wait.count()) * 0.001 << "s\n");
104
105 buffer.index = 0;
106 buffer.write<char>(pull_type);
107 buffer.write<int64_t>(wait.count());
108 buffer.write<int64_t>(client_journal.get_checkpoint());
109 lock.write(buffer.data, buffer.index);
110
111 buffer.index = 0;
112 lock.read(buffer.data, 9);
113 {
114 const char reply = buffer.read<char>();
115 if (reply == 'R')
116 throw Exception("pull error: server is pull-only, cannot lock");
117 else if (reply != pull_type)
118 throw Exception("pull error: unexpected server reply");
119 }
120 server_checkpoint = buffer.read<int64_t>();
121
122 if (bool(data_transfer))
123 {
124 buffer.index = 0;
125 const int64_t size = server_checkpoint - client_journal.get_checkpoint();
126 Async_Writer writer = client_journal.get_async_tail_writer();
127 LOGID("data transfer: ");
128 download(writer, lock, size);
129 client_journal.soft_checkpoint_at(writer.get_position());
130 }
131 else
132 LOG("no data\n");
133
134 return server_checkpoint;
135 }
136
137 ////////////////////////////////////////////////////////////////////////////
139 ////////////////////////////////////////////////////////////////////////////
140 (
141 const Readonly_Journal &client_journal,
142 int64_t from,
143 int64_t until,
144 Unlock_Action unlock_action
145 )
146 {
147 Channel_Lock lock(channel);
148
149 const char push_type = char('N' + int(unlock_action));
150 buffer.index = 0;
151 buffer.write<char>(push_type);
152 buffer.write<int64_t>(from);
153 buffer.write<int64_t>(until);
154
155 {
156 Async_Reader reader = client_journal.get_async_reader(from, until);
157
158 LOGID("pushing(" << push_type << ")... from = " << from << ", until = " << until);
159 Progress_Bar progress_bar(reader.get_remaining(), log);
160
161 size_t offset = buffer.index;
162
163 while (offset + reader.get_remaining() > 0)
164 {
165 const size_t size = reader.read(buffer.data + offset, buffer.size - offset);
166 if (reader.is_end_of_file())
167 throw Exception("push error: unexpected end of file");
168 lock.write(buffer.data, size + offset);
169 offset = 0;
170 progress_bar.print_remaining(reader.get_remaining());
171 }
172 }
173
174 lock.read(buffer.data, 1);
175
176 if (buffer.data[0] == 'R')
177 throw Exception("push error: server is pull-only");
178 else if (buffer.data[0] == 'C')
179 throw Exception("push error: conflict");
180 else if (buffer.data[0] == 't')
181 throw Exception("push error: time out");
182 else if (buffer.data[0] != push_type)
183 throw Exception("push error: unexpected reply");
184
185 server_checkpoint = until;
186
187 return server_checkpoint;
188 }
189
190 ////////////////////////////////////////////////////////////////////////////
192 ////////////////////////////////////////////////////////////////////////////
193 {
194 LOGID("releasing lock... ");
195
196 {
197 Channel_Lock lock(channel);
198 buffer.data[0] = 'M';
199 lock.write(buffer.data, 1);
200 lock.read(buffer.data, 1);
201 }
202
203 LOG(buffer.data[0] << '\n');
204
205 if (buffer.data[0] != 'M')
206 throw Exception("unlock error: unexpected reply");
207 }
208}
209
210#undef LOGID
211#undef LOG
#define LOG(x)
Definition Server.cpp:8
#define LOGID(x)
Definition Server.cpp:9
int64_t get_remaining() const
size_t read(char *buffer, size_t capacity)
bool is_end_of_file() const
int64_t get_position() const
char data[size+extra_size]
Definition Buffer.h:19
void write(const char *data, size_t size)
void read(char *data, size_t size)
static SHA_256::Hash get_hash(const Readonly_Journal &journal, int64_t checkpoint)
void print_remaining(int64_t remaining)
Thread_Safe_Channel channel
int64_t pull(Lock_Action lock_action, Data_Transfer data_transfer, Writable_Journal &client_journal, std::chrono::milliseconds wait=std::chrono::milliseconds(0)) override
Pull from the connection.
void unlock() override
Unlock the connection.
int64_t handshake(const Readonly_Journal &client_journal, Content_Check content_check) override
Called during Client construction.
size_t pread(char *data, size_t size, int64_t offset) const
int64_t push(const Readonly_Journal &client_journal, int64_t from, int64_t until, Unlock_Action unlock_action) override
Push new data to the connection.
bool check_matching_content(const Readonly_Journal &client_journal, int64_t server_checkpoint)
Data_Transfer
Definition Connection.h:27
Lock_Action
Definition Connection.h:34
Content_Check
Definition Connection.h:19
Unlock_Action
Definition Connection.h:41
Definition Blob.h:7