23 out << get_name() <<
'(';
25 const int64_t journal_checkpoint = client.get_journal_checkpoint();
26 const int64_t connection_checkpoint = client.get_connection_checkpoint();
27 const int64_t diff = connection_checkpoint - journal_checkpoint;
29 out << journal_checkpoint;
31 out <<
'+' << diff <<
")(pull to sync)";
36 if (client.is_pullonly())
37 out <<
"(cannot push)";
39 out <<
"(push to sync)";
44 if (client.get_journal().get_position() > journal_checkpoint)
54 std::chrono::milliseconds wait
57 const int64_t byte_count = client.pull(wait);
60 out <<
"pulled " << byte_count <<
" bytes, checkpoint = ";
61 out << client.get_journal_checkpoint() <<
'\n';
68 int64_t Writable_Client_Command_Processor::pull
72 std::chrono::milliseconds wait
78 get_writable_client().touch();
90 out <<
". Sleeping for " << seconds <<
" seconds...\n";
93 std::this_thread::sleep_for(std::chrono::seconds(1));
101 const std::string &command,
102 std::istream ¶meters,
107 if (command ==
"help")
114 pull [<wait_seconds>]
115 pull_every [<wait_seconds>] [<sleep_seconds>]
117 if (!client.is_pullonly())
118 out <<
" push\n push_every [<sleep_seconds>]\n";
123 else if (command ==
"db")
132 database = &rdc->get_database();
134 database = &wdc->get_database();
141 run_interpreter(interpreter, in, out);
147 run_interpreter(interpreter, in, out);
150 else if (command ==
"pull")
152 float wait_seconds = 0;
153 parameters >> wait_seconds;
154 pull(out, std::chrono::milliseconds(std::lround(wait_seconds * 1000)));
156 else if (command ==
"pull_every")
158 float wait_seconds = 1;
159 int sleep_seconds = 0;
160 parameters >> wait_seconds >> sleep_seconds;
167 pull(out, std::chrono::milliseconds(std::lround(wait_seconds * 1000)));
168 sleep(sleep_seconds, out);
171 else if (command ==
"push" && !client.is_pullonly())
173 client.push_if_ahead();
175 else if (command ==
"push_every" && !client.is_pullonly())
177 int sleep_seconds = 1;
178 parameters >> sleep_seconds;
185 client.push_if_ahead();
186 sleep(sleep_seconds, out);
199 const std::string &command,
200 std::istream ¶meters,
205 if (command ==
"help")
209 out << R
"RRR(Writable Client
212 set_valid_data <true|false>
213 set_timestamp <true|false>
214 set_hard_checkpoint <true|false>
220 else if (command ==
"transaction")
222 auto *
const wdc =
dynamic_cast<Writable_Database_Client *
>(&client);
223 auto *
const wjc =
dynamic_cast<Writable_Journal_Client *
>(&client);
227 wdc->transaction([&](
const Readable &readable, Writable &writable)
230 interpreter.set_prompt_string(
"transaction");
231 run_interpreter(interpreter, in, out);
236 wjc->transaction([&](Writable_Journal &journal)
238 Writable_Interpreter interpreter(journal);
239 interpreter.set_prompt_string(
"transaction(journal)");
240 run_interpreter(interpreter, in, out);
244 out <<
"Client is not writable, cannot run transaction\n";
246 else if (command ==
"set_valid_data")
248 get_writable_client().set_valid_data(
read_boolean(parameters));
250 else if (command ==
"set_timestamp")
252 get_writable_client().set_timestamp(
read_boolean(parameters));
254 else if (command ==
"set_hard_checkpoint")
256 get_writable_client().set_hard_checkpoint(
read_boolean(parameters));
virtual int64_t pull(std::ostream &out, std::chrono::milliseconds wait)
Status process_command(const std::string &command, std::istream ¶meters, std::istream &in, std::ostream &out) override
void write_prompt(std::ostream &out) const override
static void sleep(int seconds, std::ostream &out)
void set_prompt_string(std::string s)
Status process_command(const std::string &command, std::istream ¶meters, std::istream &in, std::ostream &out) override
static const Record_Id null
static constexpr int no_signal
static void set_signal(int status)
Status process_command(const std::string &command, std::istream ¶meters, std::istream &in, std::ostream &out) override
std::string get_time_string_of_now()
bool read_boolean(std::istream &in)