17 static std::chrono::milliseconds to_milliseconds(
float seconds)
20 return std::chrono::milliseconds(int64_t(seconds * 1000.0f));
27 out << get_name() <<
'(';
29 const int64_t journal_checkpoint = client.get_journal_checkpoint();
30 const int64_t connection_checkpoint = client.get_connection_checkpoint();
31 const int64_t diff = connection_checkpoint - journal_checkpoint;
33 out << journal_checkpoint;
35 out <<
'+' << diff <<
")(pull to sync)";
40 if (client.is_pullonly())
41 out <<
"(cannot push)";
43 out <<
"(push to sync)";
48 if (client.get_journal().get_position() > journal_checkpoint)
58 std::chrono::milliseconds wait
61 const int64_t byte_count = client.pull(wait);
64 out <<
"pulled " << byte_count <<
" bytes, checkpoint = ";
65 out << client.get_journal_checkpoint() <<
'\n';
72 int64_t Writable_Client_Command_Processor::pull
76 std::chrono::milliseconds wait
82 get_writable_client().touch();
94 out <<
". Sleeping for " << seconds <<
" seconds...\n";
106 const std::string_view command,
107 std::istream ¶meters,
112 if (command ==
"help")
119 pull [<wait_seconds>]
120 pull_every [<wait_seconds>] [<sleep_seconds>]
122 if (!client.is_pullonly())
123 out <<
" push\n push_every [<sleep_seconds>]\n";
128 else if (command ==
"db")
137 database = &rdc->get_database();
139 database = &wdc->get_database();
146 run_interpreter(interpreter, in, out);
152 run_interpreter(interpreter, in, out);
155 else if (command ==
"pull")
157 float wait_seconds = 0;
158 parameters >> wait_seconds;
159 pull(out, to_milliseconds(wait_seconds));
161 else if (command ==
"pull_every")
163 float wait_seconds = 1;
164 float sleep_seconds = 0;
165 parameters >> wait_seconds >> sleep_seconds;
170 pull(out, to_milliseconds(wait_seconds));
171 while (sleep(sleep_seconds, out));
173 else if (command ==
"push" && !client.is_pullonly())
175 client.push_if_ahead();
177 else if (command ==
"push_every" && !client.is_pullonly())
179 float sleep_seconds = 1.0f;
180 parameters >> sleep_seconds;
185 client.push_if_ahead();
186 while (sleep(sleep_seconds, out));
198 const std::string_view command,
199 std::istream ¶meters,
204 if (command ==
"help")
208 out << R
"RRR(Writable Client
211 set_valid_data <true|false>
212 set_timestamp <true|false>
213 set_hard_checkpoint <true|false>
219 else if (command ==
"transaction")
221 auto *
const wdc =
dynamic_cast<Writable_Database_Client *
>(&client);
222 auto *
const wjc =
dynamic_cast<Writable_Journal_Client *
>(&client);
226 wdc->transaction([&](
const Readable &readable, Writable &writable)
229 interpreter.set_prompt_string(
"transaction");
230 run_interpreter(interpreter, in, out);
235 wjc->transaction([&](Writable_Journal &journal)
237 Writable_Interpreter interpreter(journal);
238 interpreter.set_prompt_string(
"transaction(journal)");
239 run_interpreter(interpreter, in, out);
243 out <<
"Client is not writable, cannot run transaction\n";
245 else if (command ==
"set_valid_data")
247 get_writable_client().set_valid_data(
read_boolean(parameters));
249 else if (command ==
"set_timestamp")
251 get_writable_client().set_timestamp(
read_boolean(parameters));
253 else if (command ==
"set_hard_checkpoint")
255 get_writable_client().set_hard_checkpoint(
read_boolean(parameters));
virtual int64_t pull(std::ostream &out, std::chrono::milliseconds wait)
static bool sleep(float seconds, std::ostream &out)
void write_prompt(std::ostream &out) const override
Status process_command(std::string_view command, std::istream ¶meters, std::istream &in, std::ostream &out) override
void set_prompt_string(std::string s)
Status process_command(std::string_view command, std::istream ¶meters, std::istream &in, std::ostream &out) override
static const Record_Id null
Status process_command(std::string_view command, std::istream ¶meters, std::istream &in, std::ostream &out) override
bool interruptible_sleep(std::chrono::milliseconds duration)
std::string get_time_string_of_now()
bool read_boolean(std::istream &in)