Joedb 9.6.2
The Journal-Only Embedded Database
Loading...
Searching...
No Matches
Server_Connection.cpp
Go to the documentation of this file.
6
7#include <iostream>
8
9#define LOG(x) do {if (log) *log << x;} while (false)
10#define LOGID(x) do {if (log) *log << get_time_string_of_now() << ' ' << get_session_id() << ": " << x;} while (false)
11
12namespace joedb
13{
14 ////////////////////////////////////////////////////////////////////////////
15 size_t Server_Connection::pread(char *data, size_t size, int64_t offset) const
16 ////////////////////////////////////////////////////////////////////////////
17 {
18 buffer.index = 0;
19 buffer.write<char>('r');
20 buffer.write<int64_t>(offset);
21 buffer.write<int64_t>(offset + int64_t(size));
22
23 Channel_Lock lock(channel);
24 lock.write(buffer.data, buffer.index);
25 lock.read(buffer.data, 9);
26
27 buffer.index = 1;
28 const int64_t until = buffer.read<int64_t>();
29
30 if (until < offset || until > offset + int64_t(size))
31 throw Exception("bad pread size from server");
32
33 const size_t returned_size = size_t(until - offset);
34
35 for (size_t read = 0; read < returned_size;)
36 read += lock.read_some(data + read, returned_size - read);
37
38 return returned_size;
39 }
40
41 ////////////////////////////////////////////////////////////////////////////
43 ////////////////////////////////////////////////////////////////////////////
44 (
45 const Readonly_Journal &client_journal,
46 Content_Check content_check
47 )
48 {
49 if (content_check == Content_Check::none)
50 return server_checkpoint;
51
52 LOGID("checking_hash... ");
53
54 const int64_t checkpoint = std::min
55 (
56 server_checkpoint,
57 client_journal.get_checkpoint()
58 );
59
60 buffer.index = 0;
61 buffer.write<char>(content_check == Content_Check::fast ? 'H' : 'I');
62 buffer.write<int64_t>(checkpoint);
63 buffer.write
64 (
65 content_check == Content_Check::fast
66 ? Journal_Hasher::get_fast_hash(client_journal, checkpoint)
67 : Journal_Hasher::get_full_hash(client_journal, checkpoint)
68 );
69
70 {
71 Channel_Lock lock(channel);
72 lock.write(buffer.data, buffer.index);
73 lock.read(buffer.data, 1);
74 }
75
76 LOG(buffer.data[0] << '\n');
77
78 if (buffer.data[0] == 'h')
79 content_mismatch();
80
81 return server_checkpoint;
82 }
83
84 ////////////////////////////////////////////////////////////////////////////
86 ////////////////////////////////////////////////////////////////////////////
87 (
88 Lock_Action lock_action,
89 Data_Transfer data_transfer,
90 Writable_Journal &client_journal,
91 std::chrono::milliseconds wait
92 )
93 {
94 Channel_Lock lock(channel);
95
96 const char pull_type = char('D' + int(lock_action) + 2*int(data_transfer));
97
98 LOGID("pulling, lock = " << int(lock_action) << ", data = " <<
99 int(data_transfer) << ", wait = " << double(wait.count()) * 0.001 << "s\n");
100
101 buffer.index = 0;
102 buffer.write<char>(pull_type);
103 buffer.write<int64_t>(wait.count());
104 buffer.write<int64_t>(client_journal.get_checkpoint());
105 lock.write(buffer.data, buffer.index);
106
107 buffer.index = 0;
108 lock.read(buffer.data, 9);
109 {
110 const char reply = buffer.read<char>();
111 if (reply == 'R')
112 throw Exception("pull error: server is pull-only, cannot lock");
113 else if (reply != pull_type)
114 throw Exception("pull error: unexpected server reply");
115 }
116 server_checkpoint = buffer.read<int64_t>();
117
118 if (bool(data_transfer))
119 {
120 buffer.index = 0;
121 const int64_t size = server_checkpoint - client_journal.get_checkpoint();
122 Async_Writer writer = client_journal.get_async_tail_writer();
123 LOGID("data transfer: ");
124 download(writer, lock, size);
125 client_journal.soft_checkpoint_at(writer.get_position());
126 }
127 else
128 LOG("no data\n");
129
130 return server_checkpoint;
131 }
132
133 ////////////////////////////////////////////////////////////////////////////
135 ////////////////////////////////////////////////////////////////////////////
136 (
137 const Readonly_Journal &client_journal,
138 int64_t from,
139 int64_t until,
140 Unlock_Action unlock_action
141 )
142 {
143 Channel_Lock lock(channel);
144
145 const char push_type = char('N' + int(unlock_action));
146 buffer.index = 0;
147 buffer.write<char>(push_type);
148 buffer.write<int64_t>(from);
149 buffer.write<int64_t>(until);
150
151 {
152 Async_Reader reader = client_journal.get_async_reader(from, until);
153
154 LOGID("pushing(" << push_type << ")... from = " << from << ", until = " << until);
155 Progress_Bar progress_bar(reader.get_remaining(), log);
156
157 size_t offset = buffer.index;
158
159 while (offset + reader.get_remaining() > 0)
160 {
161 const size_t size = reader.read(buffer.data + offset, buffer.size - offset);
162 if (reader.is_end_of_file())
163 throw Exception("push error: unexpected end of file");
164 lock.write(buffer.data, size + offset);
165 offset = 0;
166 progress_bar.print_remaining(reader.get_remaining());
167 }
168 }
169
170 lock.read(buffer.data, 1);
171
172 if (buffer.data[0] == 'R')
173 throw Exception("push error: server is pull-only");
174 else if (buffer.data[0] == 'C')
175 throw Exception("push error: conflict");
176 else if (buffer.data[0] == 't')
177 throw Exception("push error: time out");
178 else if (buffer.data[0] != push_type)
179 throw Exception("push error: unexpected reply");
180
181 server_checkpoint = until;
182
183 return server_checkpoint;
184 }
185
186 ////////////////////////////////////////////////////////////////////////////
188 ////////////////////////////////////////////////////////////////////////////
189 {
190 LOGID("releasing lock... ");
191
192 {
193 Channel_Lock lock(channel);
194 buffer.data[0] = 'M';
195 lock.write(buffer.data, 1);
196 lock.read(buffer.data, 1);
197 }
198
199 LOG(buffer.data[0] << '\n');
200
201 if (buffer.data[0] != 'M')
202 throw Exception("unlock error: unexpected reply");
203 }
204}
205
206#undef LOGID
207#undef LOG
#define LOG(x)
Definition Server.cpp:8
#define LOGID(x)
Definition Server.cpp:9
int64_t get_remaining() const
size_t read(char *buffer, size_t capacity)
bool is_end_of_file() const
int64_t get_position() const
char data[size+extra_size]
Definition Buffer.h:19
void write(const char *data, size_t size)
void read(char *data, size_t size)
static SHA_256::Hash get_full_hash(const Readonly_Journal &journal, int64_t checkpoint)
static SHA_256::Hash get_fast_hash(const Readonly_Journal &journal, int64_t checkpoint)
void print_remaining(int64_t remaining)
Thread_Safe_Channel channel
int64_t pull(Lock_Action lock_action, Data_Transfer data_transfer, Writable_Journal &client_journal, std::chrono::milliseconds wait=std::chrono::milliseconds(0)) override
Pull from the connection.
void unlock() override
Unlock the connection.
int64_t handshake(const Readonly_Journal &client_journal, Content_Check content_check) override
Called during Client construction.
size_t pread(char *data, size_t size, int64_t offset) const
int64_t push(const Readonly_Journal &client_journal, int64_t from, int64_t until, Unlock_Action unlock_action) override
Push new data to the connection.
Data_Transfer
Definition Connection.h:27
Lock_Action
Definition Connection.h:34
Content_Check
Definition Connection.h:19
Unlock_Action
Definition Connection.h:41
Definition Blob.h:7