Joedb 10.2.1
The Journal-Only Embedded Database
Loading...
Searching...
No Matches
Server.h
Go to the documentation of this file.
1#ifndef joedb_rpc_Server_declared
2#define joedb_rpc_Server_declared
3
7#include "joedb/asio/Server.h"
9
10#include <algorithm>
11
12namespace joedb::rpc
13{
14 /// RPC Server
15 ///
16 /// @ingroup rpc
18 {
19 private:
20 const std::vector<Signature> &signatures;
21 const std::vector<Procedure *> &procedures;
22
23 protected:
25 {
26 private:
27 Server &get_server() {return *(Server *)&server;}
28
29 ////////////////////////////////////////////////////////////////////////
30 boost::asio::awaitable<bool> handshake()
31 ////////////////////////////////////////////////////////////////////////
32 {
33 co_await read_buffer(0, 32);
34
35 const SHA_256::Hash hash = buffer.read<SHA_256::Hash>();
36 const bool correct_hash = hash == get_hash(get_server().signatures);
37
38 buffer.index = 0;
39 buffer.write<char>(correct_hash ? 'H': 'h');
40 buffer.write<int64_t>(id);
41
42 if (get_server().log_level > 2)
43 log(correct_hash ? "correct hash" : "incorrect hash");
44
45 co_await write_buffer();
46 co_return correct_hash;
47 }
48
49 ////////////////////////////////////////////////////////////////////////
50 boost::asio::awaitable<void> call()
51 ////////////////////////////////////////////////////////////////////////
52 {
53 co_await read_buffer(1, 16);
54
55 const size_t id = static_cast<size_t>(buffer.read<int64_t>());
56 const int64_t until = buffer.read<int64_t>();
57
58 //
59 // Get procedure from id
60 //
61 if (id >= get_server().procedures.size())
62 throw Exception("bad procedure id");
63
64 auto &signature = get_server().signatures[id];
65 auto &procedure = *get_server().procedures[id];
66
67 if (get_server().log_level > 2)
68 log("procedure[" + std::to_string(id) + "]: " + signature.name);
69
70 //
71 // Read input message into a Memory_File
72 //
73 Memory_File file;
74
75 {
76 std::string &data = file.get_data();
77 data.reserve(size_t(until));
78 data = signature.prolog;
79
80 int64_t remaining = until - file.get_size();
81 while (remaining > 0)
82 {
83 const size_t n = co_await read_buffer
84 (
85 0,
86 std::min(remaining, int64_t(buffer.size))
87 );
88 data.append(buffer.data, n);
89 remaining -= n;
90 }
91 }
92
93 //
94 // Execute procedure
95 //
96 buffer.index = 0;
97
98 try
99 {
100 procedure.execute(file);
101 }
102 catch (const std::exception &e)
103 {
104 const std::string_view message(e.what());
105
106 if (get_server().log_level > 2)
107 log("error: " + std::string(message));
108
109 const size_t n = std::min(message.size(), buffer.size - 9);
110 buffer.write<char>('c');
111 buffer.write<int64_t>(int64_t(n));
112 std::strncpy(buffer.data + buffer.index, message.data(), n);
113 buffer.index += n;
114 }
115
116 //
117 // Return either an error message or the procedure output
118 //
119 if (buffer.index > 0)
120 co_await write_buffer();
121 else
122 {
123 buffer.index = 0;
124 buffer.write<char>('C');
125 buffer.write<int64_t>(file.get_size());
126
127 size_t offset = size_t(until);
128 size_t remaining = file.get_data().size() - offset;
129 while (remaining + buffer.index > 0)
130 {
131 const size_t n = std::min(remaining, buffer.size - buffer.index);
132 file.pread(buffer.data + buffer.index, n, offset);
133 buffer.index += n;
134 offset += n;
135 remaining -= n;
136 co_await write_buffer();
137 buffer.index = 0;
138 }
139 }
140 }
141
142 public:
143 ////////////////////////////////////////////////////////////////////////
145 ////////////////////////////////////////////////////////////////////////
146 (
147 Server &server,
148 boost::asio::local::stream_protocol::socket &&socket
149 ):
150 joedb::asio::Server::Session(server, std::move(socket))
151 {
152 }
153
154 ////////////////////////////////////////////////////////////////////////
155 boost::asio::awaitable<void> run() override
156 ////////////////////////////////////////////////////////////////////////
157 {
158 if (!co_await handshake())
159 co_return;
160
161 while (true)
162 {
163 co_await read_buffer(0, 1);
164
165 if (server.get_log_level() > 2)
166 log(std::string("received command: ") + buffer.data[0]);
167
168 switch (buffer.data[0])
169 {
170 case 'C':
171 co_await call();
172 break;
173
174 case 'P':
175 buffer.index = 1;
176 co_await write_buffer();
177 break;
178
179 default:
180 co_return;
181 break;
182 }
183 }
184 }
185 };
186
187 //////////////////////////////////////////////////////////////////////////
188 std::unique_ptr<joedb::asio::Server::Session> new_session
189 //////////////////////////////////////////////////////////////////////////
190 (
191 boost::asio::local::stream_protocol::socket &&socket
192 ) override
193 {
194 return std::make_unique<Session>(*this, std::move(socket));
195 }
196
197 public:
198 //////////////////////////////////////////////////////////////////////////
200 //////////////////////////////////////////////////////////////////////////
201 (
202 Logger &logger,
203 int log_level,
204 int thread_count,
205 std::string endpoint_path,
206 const std::vector<Signature> &signatures,
207 const std::vector<Procedure *> &procedures
208 ):
209 joedb::asio::Server
210 (
211 logger,
212 log_level,
214 std::move(endpoint_path)
215 ),
216 signatures(signatures),
217 procedures(procedures)
218 {
219 }
220 };
221}
222
223#endif
size_t index
Definition Buffer.h:20
void write(T x)
Definition Buffer.h:23
char data[size+extra_size]
Definition Buffer.h:19
static constexpr size_t size
Definition Buffer.h:15
std::string & get_data()
Definition Memory_File.h:23
size_t pread(char *buffer, size_t size, int64_t offset) const override
Read a range of bytes.
int64_t get_size() const override
Get the size of the file, or -1 if it is unknown.
Definition Memory_File.h:27
std::array< uint32_t, 8 > Hash
Definition SHA_256.h:59
void log(std::string_view s)
Definition Server.cpp:64
boost::asio::awaitable< void > write_buffer()
Definition Server.cpp:54
boost::asio::awaitable< size_t > read_buffer(size_t offset, size_t size)
Definition Server.cpp:37
boost::asio::local::stream_protocol::socket socket
Definition Server.h:39
Superclass for asio servers.
Definition Server.h:19
const std::string endpoint_path
Definition Server.h:29
Logger & logger
Definition Server.h:21
int get_log_level() const
Definition Server.h:80
const int thread_count
Definition Server.h:25
const int log_level
Definition Server.h:22
boost::asio::awaitable< void > run() override
Definition Server.h:155
RPC Server.
Definition Server.h:18
std::unique_ptr< joedb::asio::Server::Session > new_session(boost::asio::local::stream_protocol::socket &&socket) override
Definition Server.h:190
SHA_256::Hash get_hash(const std::vector< Signature > &signatures)
Compute hash code for a collection of procedure signatures.
Definition get_hash.cpp:9