Joedb 10.3.0
The Journal-Only Embedded Database
Loading...
Searching...
No Matches
Client_Command_Processor.cpp
Go to the documentation of this file.
6#include "joedb/ui/type_io.h"
11#include "joedb/Signal.h"
12
13#include <thread>
14#include <chrono>
15#include <cmath>
16
17namespace joedb
18{
19 ////////////////////////////////////////////////////////////////////////////
20 void Client_Command_Processor::write_prompt(std::ostream &out) const
21 ////////////////////////////////////////////////////////////////////////////
22 {
23 out << get_name() << '(';
24
25 const int64_t journal_checkpoint = client.get_journal_checkpoint();
26 const int64_t connection_checkpoint = client.get_connection_checkpoint();
27 const int64_t diff = connection_checkpoint - journal_checkpoint;
28
29 out << journal_checkpoint;
30 if (diff > 0)
31 out << '+' << diff << ")(pull to sync)";
32 else if (diff < 0)
33 {
34 out << diff << ')';
35
36 if (client.is_pullonly())
37 out << "(cannot push)";
38 else
39 out << "(push to sync)";
40 }
41 else
42 {
43 out << ')';
44 if (client.get_journal().get_position() > journal_checkpoint)
45 out << '*';
46 }
47 }
48
49 ////////////////////////////////////////////////////////////////////////////
51 ////////////////////////////////////////////////////////////////////////////
52 (
53 std::ostream &out,
54 std::chrono::milliseconds wait
55 )
56 {
57 const int64_t byte_count = client.pull(wait);
58 if (byte_count > 0)
59 {
60 out << "pulled " << byte_count << " bytes, checkpoint = ";
61 out << client.get_journal_checkpoint() << '\n';
62 }
63
64 return byte_count;
65 }
66
67 ////////////////////////////////////////////////////////////////////////////
68 int64_t Writable_Client_Command_Processor::pull
69 ////////////////////////////////////////////////////////////////////////////
70 (
71 std::ostream &out,
72 std::chrono::milliseconds wait
73 )
74 {
75 const int64_t byte_count = Client_Command_Processor::pull(out, wait);
76
77 if (byte_count == 0)
78 get_writable_client().touch();
79
80 return byte_count;
81 }
82
83 ////////////////////////////////////////////////////////////////////////////
84 void Client_Command_Processor::sleep(int seconds, std::ostream &out)
85 ////////////////////////////////////////////////////////////////////////////
86 {
87 if (seconds > 0)
88 {
90 out << ". Sleeping for " << seconds << " seconds...\n";
91 out.flush();
92 for (int i = seconds; Signal::get_signal() != SIGINT && --i >= 0;)
93 std::this_thread::sleep_for(std::chrono::seconds(1));
94 }
95 }
96
97 ////////////////////////////////////////////////////////////////////////////
99 ////////////////////////////////////////////////////////////////////////////
100 (
101 const std::string &command,
102 std::istream &parameters,
103 std::istream &in,
104 std::ostream &out
105 )
106 {
107 if (command == "help") ////////////////////////////////////////////////////
108 {
109 Command_Interpreter::process_command(command, parameters, in, out);
110
111 out << R"RRR(Client
112~~~~~~
113 db
114 pull [<wait_seconds>]
115 pull_every [<wait_seconds>] [<sleep_seconds>]
116)RRR";
117 if (!client.is_pullonly())
118 out << " push\n push_every [<sleep_seconds>]\n";
119 out << '\n';
120
121 return Status::ok;
122 }
123 else if (command == "db") /////////////////////////////////////////////////
124 {
125 const Database *database = nullptr;
126
127 {
128 auto * const rdc = dynamic_cast<Readonly_Database_Client *>(&client);
129 auto * const wdc = dynamic_cast<Writable_Database_Client *>(&client);
130
131 if (rdc)
132 database = &rdc->get_database();
133 else if (wdc)
134 database = &wdc->get_database();
135 }
136
137 if (database)
138 {
139 Readable_Interpreter interpreter(*database);
140 interpreter.set_prompt_string("db");
141 run_interpreter(interpreter, in, out);
142 }
143 else
144 {
145 Command_Interpreter interpreter;
146 interpreter.set_prompt_string("db(blobs)");
147 run_interpreter(interpreter, in, out);
148 }
149 }
150 else if (command == "pull") ///////////////////////////////////////////////
151 {
152 float wait_seconds = 0;
153 parameters >> wait_seconds;
154 pull(out, std::chrono::milliseconds(std::lround(wait_seconds * 1000)));
155 }
156 else if (command == "pull_every") /////////////////////////////////////////
157 {
158 float wait_seconds = 1;
159 int sleep_seconds = 0;
160 parameters >> wait_seconds >> sleep_seconds;
161
164
165 while (Signal::get_signal() != SIGINT)
166 {
167 pull(out, std::chrono::milliseconds(std::lround(wait_seconds * 1000)));
168 sleep(sleep_seconds, out);
169 }
170 }
171 else if (command == "push" && !client.is_pullonly()) //////////////////////
172 {
173 client.push_if_ahead();
174 }
175 else if (command == "push_every" && !client.is_pullonly()) ////////////////
176 {
177 int sleep_seconds = 1;
178 parameters >> sleep_seconds;
179
182
183 while (Signal::get_signal() != SIGINT)
184 {
185 client.push_if_ahead();
186 sleep(sleep_seconds, out);
187 }
188 }
189 else //////////////////////////////////////////////////////////////////////
190 return Command_Interpreter::process_command(command, parameters, in, out);
192 return Status::done;
193 }
194
195 ////////////////////////////////////////////////////////////////////////////
197 ////////////////////////////////////////////////////////////////////////////
198 (
199 const std::string &command,
200 std::istream &parameters,
201 std::istream &in,
202 std::ostream &out
203 )
204 {
205 if (command == "help") ////////////////////////////////////////////////////
206 {
207 Client_Command_Processor::process_command(command, parameters, in, out);
208
209 out << R"RRR(Writable Client
210~~~~~~~~~~~~~~~
211 transaction
212 set_valid_data <true|false>
213 set_timestamp <true|false>
214 set_hard_checkpoint <true|false>
215
216)RRR";
217
218 return Status::ok;
219 }
220 else if (command == "transaction") ////////////////////////////////////////
221 {
222 auto * const wdc = dynamic_cast<Writable_Database_Client *>(&client);
223 auto * const wjc = dynamic_cast<Writable_Journal_Client *>(&client);
224
225 if (wdc)
226 {
227 wdc->transaction([&](const Readable &readable, Writable &writable)
228 {
229 Interpreter interpreter(readable, writable, Record_Id::null);
230 interpreter.set_prompt_string("transaction");
231 run_interpreter(interpreter, in, out);
232 });
233 }
234 else if (wjc)
235 {
236 wjc->transaction([&](Writable_Journal &journal)
237 {
238 Writable_Interpreter interpreter(journal);
239 interpreter.set_prompt_string("transaction(journal)");
240 run_interpreter(interpreter, in, out);
241 });
242 }
243 else
244 out << "Client is not writable, cannot run transaction\n";
245 }
246 else if (command == "set_valid_data") /////////////////////////////////////
247 {
248 get_writable_client().set_valid_data(read_boolean(parameters));
249 }
250 else if (command == "set_timestamp") //////////////////////////////////////
251 {
252 get_writable_client().set_timestamp(read_boolean(parameters));
253 }
254 else if (command == "set_hard_checkpoint") ////////////////////////////////
255 {
256 get_writable_client().set_hard_checkpoint(read_boolean(parameters));
257 }
258 else //////////////////////////////////////////////////////////////////////
259 return Client_Command_Processor::process_command(command, parameters, in, out);
260
261 return Status::done;
262 }
263}
virtual int64_t pull(std::ostream &out, std::chrono::milliseconds wait)
Status process_command(const std::string &command, std::istream &parameters, std::istream &in, std::ostream &out) override
void write_prompt(std::ostream &out) const override
static void sleep(int seconds, std::ostream &out)
void set_prompt_string(std::string s)
Status process_command(const std::string &command, std::istream &parameters, std::istream &in, std::ostream &out) override
static const Record_Id null
Definition index_types.h:44
static int get_signal()
Definition Signal.cpp:27
static void start()
Definition Signal.cpp:34
static constexpr int no_signal
Definition Signal.h:16
static void set_signal(int status)
Definition Signal.cpp:20
Status process_command(const std::string &command, std::istream &parameters, std::istream &in, std::ostream &out) override
std::string get_time_string_of_now()
bool read_boolean(std::istream &in)
Definition type_io.cpp:215