Joedb 10.2.1
The Journal-Only Embedded Database
Loading...
Searching...
No Matches
Client_Command_Processor.cpp
Go to the documentation of this file.
6#include "joedb/ui/type_io.h"
11#include "joedb/Signal.h"
12
13#include <thread>
14#include <chrono>
15#include <cmath>
16
17namespace joedb
18{
19 ////////////////////////////////////////////////////////////////////////////
20 void Client_Command_Processor::write_prompt(std::ostream &out) const
21 ////////////////////////////////////////////////////////////////////////////
22 {
23 out << get_name() << '(';
24
25 const int64_t journal_checkpoint = client.get_journal_checkpoint();
26 const int64_t connection_checkpoint = client.get_connection_checkpoint();
27 const int64_t diff = connection_checkpoint - journal_checkpoint;
28
29 out << journal_checkpoint;
30 if (diff > 0)
31 out << '+' << diff << ")(pull to sync)";
32 else if (diff < 0)
33 {
34 out << diff << ')';
35
36 if (client.is_pullonly())
37 out << "(cannot push)";
38 else
39 out << "(push to sync)";
40 }
41 else
42 {
43 out << ')';
44 if (client.get_journal().get_position() > journal_checkpoint)
45 out << '*';
46 }
47 }
48
49 ////////////////////////////////////////////////////////////////////////////
50 void Client_Command_Processor::pull
51 ////////////////////////////////////////////////////////////////////////////
52 (
53 std::ostream &out,
54 std::chrono::milliseconds wait
55 )
56 {
57 const int64_t byte_count = client.pull(wait);
58 if (byte_count > 0)
59 {
60 out << "pulled " << byte_count << " bytes, checkpoint = ";
61 out << client.get_journal_checkpoint() << '\n';
62 }
63 }
64
65 ////////////////////////////////////////////////////////////////////////////
66 void Client_Command_Processor::sleep(int seconds, std::ostream &out)
67 ////////////////////////////////////////////////////////////////////////////
68 {
69 if (seconds > 0)
70 {
72 out << ". Sleeping for " << seconds << " seconds...\n";
73 out.flush();
74 for (int i = seconds; Signal::get_signal() != SIGINT && --i >= 0;)
75 std::this_thread::sleep_for(std::chrono::seconds(1));
76 }
77 }
78
79 ////////////////////////////////////////////////////////////////////////////
81 ////////////////////////////////////////////////////////////////////////////
82 (
83 const std::string &command,
84 std::istream &parameters,
85 std::istream &in,
86 std::ostream &out
87 )
88 {
89 if (command == "help") ////////////////////////////////////////////////////
90 {
91 Command_Interpreter::process_command(command, parameters, in, out);
92
93 out << R"RRR(Client
94~~~~~~
95 db
96 pull [<wait_seconds>]
97 pull_every [<wait_seconds>] [<sleep_seconds>]
98)RRR";
99 if (!client.is_pullonly())
100 out << " push\n push_every [<sleep_seconds>]\n";
101 out << '\n';
102
103 return Status::ok;
104 }
105 else if (command == "db") /////////////////////////////////////////////////
106 {
107 const Database *database = nullptr;
108
109 {
110 auto * const rdc = dynamic_cast<Readonly_Database_Client *>(&client);
111 auto * const wdc = dynamic_cast<Writable_Database_Client *>(&client);
112
113 if (rdc)
114 database = &rdc->get_database();
115 else if (wdc)
116 database = &wdc->get_database();
117 }
118
119 if (database)
120 {
121 Readable_Interpreter interpreter(*database);
122 interpreter.set_prompt_string("db");
123 run_interpreter(interpreter, in, out);
124 }
125 else
126 {
127 Command_Interpreter interpreter;
128 interpreter.set_prompt_string("db(blobs)");
129 run_interpreter(interpreter, in, out);
130 }
131 }
132 else if (command == "pull") ///////////////////////////////////////////////
133 {
134 float wait_seconds = 0;
135 parameters >> wait_seconds;
136 pull(out, std::chrono::milliseconds(std::lround(wait_seconds * 1000)));
137 }
138 else if (command == "pull_every") /////////////////////////////////////////
139 {
140 float wait_seconds = 1;
141 int sleep_seconds = 0;
142 parameters >> wait_seconds >> sleep_seconds;
143
146
147 while (Signal::get_signal() != SIGINT)
148 {
149 pull(out, std::chrono::milliseconds(std::lround(wait_seconds * 1000)));
150 sleep(sleep_seconds, out);
151 }
152 }
153 else if (command == "push" && !client.is_pullonly()) //////////////////////
154 {
155 client.push_if_ahead();
156 }
157 else if (command == "push_every" && !client.is_pullonly()) ////////////////
158 {
159 int sleep_seconds = 1;
160 parameters >> sleep_seconds;
161
164
165 while (Signal::get_signal() != SIGINT)
166 {
167 client.push_if_ahead();
168 sleep(sleep_seconds, out);
169 }
170 }
171 else //////////////////////////////////////////////////////////////////////
172 return Command_Interpreter::process_command(command, parameters, in, out);
174 return Status::done;
175 }
176
177 ////////////////////////////////////////////////////////////////////////////
179 ////////////////////////////////////////////////////////////////////////////
180 (
181 const std::string &command,
182 std::istream &parameters,
183 std::istream &in,
184 std::ostream &out
185 )
186 {
187 if (command == "help") ////////////////////////////////////////////////////
188 {
189 Client_Command_Processor::process_command(command, parameters, in, out);
190
191 out << R"RRR(Writable Client
192~~~~~~~~~~~~~~~
193 transaction
194 set_valid_data <true|false>
195 set_timestamp <true|false>
196 set_hard_checkpoint <true|false>
197
198)RRR";
199
200 return Status::ok;
201 }
202 else if (command == "transaction") ////////////////////////////////////////
203 {
204 auto * const wdc = dynamic_cast<Writable_Database_Client *>(&client);
205 auto * const wjc = dynamic_cast<Writable_Journal_Client *>(&client);
206
207 if (wdc)
208 {
209 wdc->transaction([&](const Readable &readable, Writable &writable)
210 {
211 Interpreter interpreter(readable, writable, Record_Id::null);
212 interpreter.set_prompt_string("transaction");
213 run_interpreter(interpreter, in, out);
214 });
215 }
216 else if (wjc)
217 {
218 wjc->transaction([&](Writable_Journal &journal)
219 {
220 Writable_Interpreter interpreter(journal);
221 interpreter.set_prompt_string("transaction(journal)");
222 run_interpreter(interpreter, in, out);
223 });
224 }
225 else
226 out << "Client is not writable, cannot run transaction\n";
227 }
228 else if (command == "set_valid_data") /////////////////////////////////////
229 {
230 get_writable_client().set_valid_data(read_boolean(parameters));
231 }
232 else if (command == "set_timestamp") //////////////////////////////////////
233 {
234 get_writable_client().set_timestamp(read_boolean(parameters));
235 }
236 else if (command == "set_hard_checkpoint") ////////////////////////////////
237 {
238 get_writable_client().set_hard_checkpoint(read_boolean(parameters));
239 }
240 else //////////////////////////////////////////////////////////////////////
241 return Client_Command_Processor::process_command(command, parameters, in, out);
242
243 return Status::done;
244 }
245}
Status process_command(const std::string &command, std::istream &parameters, std::istream &in, std::ostream &out) override
void write_prompt(std::ostream &out) const override
static void sleep(int seconds, std::ostream &out)
void set_prompt_string(std::string s)
Status process_command(const std::string &command, std::istream &parameters, std::istream &in, std::ostream &out) override
static const Record_Id null
Definition index_types.h:44
static int get_signal()
Definition Signal.cpp:27
static void start()
Definition Signal.cpp:34
static constexpr int no_signal
Definition Signal.h:16
static void set_signal(int status)
Definition Signal.cpp:20
Status process_command(const std::string &command, std::istream &parameters, std::istream &in, std::ostream &out) override
std::string get_time_string_of_now()
bool read_boolean(std::istream &in)
Definition type_io.cpp:215