9#define LOG(x) do {if (log) *log << x;} while (false)
10#define LOGID(x) do {if (log) *log << get_time_string_of_now() << ' ' << get_session_id() << ": " << x;} while (false)
19 int64_t server_checkpoint
22 LOGID(
"checking_hash... ");
24 const int64_t checkpoint = std::min
27 client_journal.get_checkpoint()
31 buffer.write<
char>(
'H');
32 buffer.write<int64_t>(checkpoint);
37 lock.
write(buffer.data, buffer.index);
38 lock.
read(buffer.data, 1);
41 LOG(buffer.data[0] <<
'\n');
43 return (buffer.data[0] ==
'H');
51 buffer.write<
char>(
'r');
52 buffer.write<int64_t>(offset);
53 buffer.write<int64_t>(offset + int64_t(size));
56 lock.write(buffer.data, buffer.index);
57 lock.read(buffer.data, 9);
60 const int64_t until = buffer.read<int64_t>();
62 if (until < offset || until > offset + int64_t(size))
63 throw Exception(
"bad pread size from server");
65 const size_t returned_size = size_t(until - offset);
67 for (
size_t read = 0; read < returned_size;)
68 read += lock.read_some(data + read, returned_size - read);
82 if (!check_matching_content(client_journal, server_checkpoint))
85 return server_checkpoint;
95 std::chrono::milliseconds wait
100 const char pull_type = char(
'D' +
int(lock_action) + 2*
int(data_transfer));
102 LOGID(
"pulling, lock = " <<
int(lock_action) <<
", data = " <<
103 int(data_transfer) <<
", wait = " <<
double(wait.count()) * 0.001 <<
"s\n");
106 buffer.write<
char>(pull_type);
107 buffer.write<int64_t>(wait.count());
108 buffer.write<int64_t>(client_journal.get_checkpoint());
109 lock.
write(buffer.data, buffer.index);
112 lock.
read(buffer.data, 9);
114 const char reply = buffer.read<
char>();
116 throw Exception(
"pull error: server is pull-only, cannot lock");
117 else if (reply != pull_type)
118 throw Exception(
"pull error: unexpected server reply");
120 server_checkpoint = buffer.read<int64_t>();
122 if (
bool(data_transfer))
125 const int64_t size = server_checkpoint - client_journal.get_checkpoint();
126 Async_Writer writer = client_journal.get_async_tail_writer();
127 LOGID(
"data transfer: ");
128 download(writer, lock, size);
129 client_journal.soft_checkpoint_at(writer.
get_position());
134 return server_checkpoint;
149 const char push_type = char(
'N' +
int(unlock_action));
151 buffer.write<
char>(push_type);
152 buffer.write<int64_t>(from);
153 buffer.write<int64_t>(until);
156 Async_Reader reader = client_journal.get_async_reader(from, until);
158 LOGID(
"pushing(" << push_type <<
")... from = " << from <<
", until = " << until);
161 size_t offset = buffer.index;
165 const size_t size = reader.
read(buffer.data + offset, buffer.size - offset);
167 throw Exception(
"push error: unexpected end of file");
168 lock.
write(buffer.data, size + offset);
174 lock.
read(buffer.data, 1);
176 if (buffer.data[0] ==
'R')
177 throw Exception(
"push error: server is pull-only");
178 else if (buffer.data[0] ==
'C')
180 else if (buffer.data[0] ==
't')
182 else if (buffer.data[0] != push_type)
183 throw Exception(
"push error: unexpected reply");
185 server_checkpoint = until;
187 return server_checkpoint;
194 LOGID(
"releasing lock... ");
206 throw Exception(
"unlock error: unexpected reply");
int64_t get_remaining() const
size_t read(char *buffer, size_t capacity)
bool is_end_of_file() const
int64_t get_position() const
char data[size+extra_size]
void write(const char *data, size_t size)
void read(char *data, size_t size)
static SHA_256::Hash get_hash(const Readonly_Journal &journal, int64_t checkpoint)
void print_remaining(int64_t remaining)
Thread_Safe_Channel channel
int64_t pull(Lock_Action lock_action, Data_Transfer data_transfer, Writable_Journal &client_journal, std::chrono::milliseconds wait=std::chrono::milliseconds(0)) override
Pull from the connection.
void unlock() override
Unlock the connection.
int64_t handshake(const Readonly_Journal &client_journal, Content_Check content_check) override
Called during Client construction.
size_t pread(char *data, size_t size, int64_t offset) const
int64_t push(const Readonly_Journal &client_journal, int64_t from, int64_t until, Unlock_Action unlock_action) override
Push new data to the connection.
bool check_matching_content(const Readonly_Journal &client_journal, int64_t server_checkpoint)