13 buffer.write<
char>(
'r');
14 buffer.write<int64_t>(offset);
15 buffer.write<int64_t>(offset + int64_t(size));
18 lock->write(buffer.data, buffer.index);
19 lock->read(buffer.data, 9);
22 const int64_t until = buffer.read<int64_t>();
24 if (until < offset || until > offset + int64_t(size))
27 const size_t returned_size = size_t(until - offset);
29 for (
size_t read = 0; read < returned_size;)
30 read += lock->read_some(data + read, returned_size - read);
44 return server_checkpoint;
46 log(
"checking hash... ");
48 const int64_t checkpoint = std::min
51 client_journal.get_checkpoint()
56 buffer.write<int64_t>(checkpoint);
66 lock->write(buffer.data, buffer.index);
67 lock->read(buffer.data, 1);
70 log(std::string(
"hash result: ") + buffer.data[0]);
72 if (buffer.data[0] ==
'h')
75 return server_checkpoint;
85 std::chrono::milliseconds wait
90 const char pull_type = char(
'D' +
int(lock_action) + 2*
int(data_transfer));
92 log(
"pulling, lock = " + std::to_string(
int(lock_action)) +
93 ", data = " + std::to_string(
int(data_transfer)) +
94 ", wait = " + std::to_string(
double(wait.count()) * 0.001) +
's');
97 buffer.write<
char>(pull_type);
98 buffer.write<int64_t>(wait.count());
99 buffer.write<int64_t>(client_journal.get_checkpoint());
100 lock->write(buffer.data, buffer.index);
103 lock->read(buffer.data, 9);
105 const char reply = buffer.read<
char>();
107 throw Disconnection(
"pull error: server is pull-only, cannot lock");
108 else if (reply != pull_type)
111 server_checkpoint = buffer.read<int64_t>();
113 if (
bool(data_transfer))
116 const int64_t size = server_checkpoint - client_journal.get_checkpoint();
117 Async_Writer writer = client_journal.get_async_tail_writer();
118 download(writer, lock, size);
119 client_journal.soft_checkpoint_at(writer.
get_position());
122 log(
"!data_transfer");
124 return server_checkpoint;
139 const char push_type = char(
'N' +
int(unlock_action));
141 buffer.write<
char>(push_type);
142 buffer.write<int64_t>(from);
143 buffer.write<int64_t>(until);
146 Async_Reader reader = client_journal.get_async_reader(from, until);
150 std::string(
"pushing(") + push_type +
151 ")... from = " + std::to_string(from) +
152 ", until = " + std::to_string(until)
157 size_t offset = buffer.index;
161 const size_t size = reader.
read(buffer.data + offset, buffer.size - offset);
164 lock->write(buffer.data, size + offset);
170 lock->read(buffer.data, 1);
172 if (buffer.data[0] ==
'R')
174 else if (buffer.data[0] ==
'C')
176 else if (buffer.data[0] ==
't')
178 else if (buffer.data[0] != push_type)
181 server_checkpoint = until;
183 return server_checkpoint;
190 log(
"joedb::Server_Connection::unlock");
int64_t get_remaining() const
size_t read(char *buffer, size_t capacity)
bool is_end_of_file() const
int64_t get_position() const
char data[size+extra_size]
joedb::Robust_Connection does not try to reconnect when thrown
static SHA_256::Hash get_full_hash(const Readonly_Journal &journal, int64_t checkpoint)
static SHA_256::Hash get_fast_hash(const Readonly_Journal &journal, int64_t checkpoint)
void print_remaining(int64_t remaining)
void log(const std::string &message) noexcept override
Thread_Safe< Channel & > channel
int64_t pull(Lock_Action lock_action, Data_Transfer data_transfer, Writable_Journal &client_journal, std::chrono::milliseconds wait=std::chrono::milliseconds(0)) override
Pull from the connection.
void unlock() override
Unlock the connection.
int64_t handshake(const Readonly_Journal &client_journal, Content_Check content_check) override
Called during Client construction.
size_t pread(char *data, size_t size, int64_t offset) const
int64_t push(const Readonly_Journal &client_journal, int64_t from, int64_t until, Unlock_Action unlock_action) override
Push new data to the connection.