Joedb 10.3.0
The Journal-Only Embedded Database
Loading...
Searching...
No Matches
Server_Connection.cpp
Go to the documentation of this file.
5
6namespace joedb
7{
8 ////////////////////////////////////////////////////////////////////////////
9 size_t Server_Connection::pread(char *data, size_t size, int64_t offset) const
10 ////////////////////////////////////////////////////////////////////////////
11 {
12 buffer.index = 0;
13 buffer.write<char>('r');
14 buffer.write<int64_t>(offset);
15 buffer.write<int64_t>(offset + int64_t(size));
16
17 Lock<Channel&> lock(channel);
18 lock->write(buffer.data, buffer.index);
19 lock->read(buffer.data, 9);
20
21 buffer.index = 1;
22 const int64_t until = buffer.read<int64_t>();
23
24 if (until < offset || until > offset + int64_t(size))
25 throw Disconnection("bad pread size from server");
26
27 const size_t returned_size = size_t(until - offset);
28
29 for (size_t read = 0; read < returned_size;)
30 read += lock->read_some(data + read, returned_size - read);
31
32 return returned_size;
33 }
34
35 ////////////////////////////////////////////////////////////////////////////
37 ////////////////////////////////////////////////////////////////////////////
38 (
39 const Readonly_Journal &client_journal,
40 Content_Check content_check
41 )
42 {
43 if (content_check == Content_Check::none)
44 return server_checkpoint;
45
46 log("checking hash... ");
47
48 const int64_t checkpoint = std::min
49 (
50 server_checkpoint,
51 client_journal.get_checkpoint()
52 );
53
54 buffer.index = 0;
55 buffer.write<char>(content_check == Content_Check::fast ? 'H' : 'I');
56 buffer.write<int64_t>(checkpoint);
57 buffer.write
58 (
59 content_check == Content_Check::fast
60 ? Journal_Hasher::get_fast_hash(client_journal, checkpoint)
61 : Journal_Hasher::get_full_hash(client_journal, checkpoint)
62 );
63
64 {
65 Lock<Channel&> lock(channel);
66 lock->write(buffer.data, buffer.index);
67 lock->read(buffer.data, 1);
68 }
69
70 log(std::string("hash result: ") + buffer.data[0]);
71
72 if (buffer.data[0] == 'h')
73 content_mismatch();
74
75 return server_checkpoint;
76 }
77
78 ////////////////////////////////////////////////////////////////////////////
80 ////////////////////////////////////////////////////////////////////////////
81 (
82 Lock_Action lock_action,
83 Data_Transfer data_transfer,
84 Writable_Journal &client_journal,
85 std::chrono::milliseconds wait
86 )
87 {
88 Lock<Channel&> lock(channel);
89
90 const char pull_type = char('D' + int(lock_action) + 2*int(data_transfer));
91
92 log("pulling, lock = " + std::to_string(int(lock_action)) +
93 ", data = " + std::to_string(int(data_transfer)) +
94 ", wait = " + std::to_string(double(wait.count()) * 0.001) + 's');
95
96 buffer.index = 0;
97 buffer.write<char>(pull_type);
98 buffer.write<int64_t>(wait.count());
99 buffer.write<int64_t>(client_journal.get_checkpoint());
100 lock->write(buffer.data, buffer.index);
101
102 buffer.index = 0;
103 lock->read(buffer.data, 9);
104 {
105 const char reply = buffer.read<char>();
106 if (reply == 'R')
107 throw Disconnection("pull error: server is pull-only, cannot lock");
108 else if (reply != pull_type)
109 throw Disconnection("pull error: unexpected server reply");
110 }
111 server_checkpoint = buffer.read<int64_t>();
112
113 if (bool(data_transfer))
114 {
115 buffer.index = 0;
116 const int64_t size = server_checkpoint - client_journal.get_checkpoint();
117 Async_Writer writer = client_journal.get_async_tail_writer();
118 download(writer, lock, size);
119 client_journal.soft_checkpoint_at(writer.get_position());
120 }
121 else
122 log("!data_transfer");
123
124 return server_checkpoint;
125 }
126
127 ////////////////////////////////////////////////////////////////////////////
129 ////////////////////////////////////////////////////////////////////////////
130 (
131 const Readonly_Journal &client_journal,
132 int64_t from,
133 int64_t until,
134 Unlock_Action unlock_action
135 )
136 {
137 Lock<Channel&> lock(channel);
138
139 const char push_type = char('N' + int(unlock_action));
140 buffer.index = 0;
141 buffer.write<char>(push_type);
142 buffer.write<int64_t>(from);
143 buffer.write<int64_t>(until);
144
145 {
146 Async_Reader reader = client_journal.get_async_reader(from, until);
147
148 log
149 (
150 std::string("pushing(") + push_type +
151 ")... from = " + std::to_string(from) +
152 ", until = " + std::to_string(until)
153 );
154
155 Progress_Bar progress_bar(reader.get_remaining(), *this);
156
157 size_t offset = buffer.index;
158
159 while (offset + reader.get_remaining() > 0)
160 {
161 const size_t size = reader.read(buffer.data + offset, buffer.size - offset);
162 if (reader.is_end_of_file())
163 throw Disconnection("push error: unexpected end of file");
164 lock->write(buffer.data, size + offset);
165 offset = 0;
166 progress_bar.print_remaining(reader.get_remaining());
167 }
168 }
169
170 lock->read(buffer.data, 1);
171
172 if (buffer.data[0] == 'R')
173 throw Disconnection("push error: server is pull-only");
174 else if (buffer.data[0] == 'C')
175 throw Disconnection("push error: conflict");
176 else if (buffer.data[0] == 't')
177 throw Exception("push error: time out");
178 else if (buffer.data[0] != push_type)
179 throw Disconnection("push error: unexpected reply");
180
181 server_checkpoint = until;
182
183 return server_checkpoint;
184 }
185
186 ////////////////////////////////////////////////////////////////////////////
188 ////////////////////////////////////////////////////////////////////////////
189 {
190 log("joedb::Server_Connection::unlock");
191
192 {
194 buffer.data[0] = 'M';
195 lock->write(buffer.data, 1);
196 lock->read(buffer.data, 1);
197 }
198
199 log(std::string() + buffer.data[0]);
200
201 if (buffer.data[0] != 'M')
202 throw Disconnection("unlock error: unexpected reply");
203 }
204}
int64_t get_remaining() const
size_t read(char *buffer, size_t capacity)
bool is_end_of_file() const
int64_t get_position() const
char data[size+extra_size]
Definition Buffer.h:19
joedb::Robust_Connection does not try to reconnect when thrown
static SHA_256::Hash get_full_hash(const Readonly_Journal &journal, int64_t checkpoint)
static SHA_256::Hash get_fast_hash(const Readonly_Journal &journal, int64_t checkpoint)
void print_remaining(int64_t remaining)
void log(const std::string &message) noexcept override
Thread_Safe< Channel & > channel
int64_t pull(Lock_Action lock_action, Data_Transfer data_transfer, Writable_Journal &client_journal, std::chrono::milliseconds wait=std::chrono::milliseconds(0)) override
Pull from the connection.
void unlock() override
Unlock the connection.
int64_t handshake(const Readonly_Journal &client_journal, Content_Check content_check) override
Called during Client construction.
size_t pread(char *data, size_t size, int64_t offset) const
int64_t push(const Readonly_Journal &client_journal, int64_t from, int64_t until, Unlock_Action unlock_action) override
Push new data to the connection.
Data_Transfer
Definition Connection.h:28
Lock_Action
Definition Connection.h:35
Content_Check
Definition Connection.h:20
Unlock_Action
Definition Connection.h:42