Joedb 10.3.2
The Journal-Only Embedded Database
Loading...
Searching...
No Matches
Client.h
Go to the documentation of this file.
1#ifndef joedb_rpc_Client_declared
2#define joedb_rpc_Client_declared
3
8#include "joedb/Thread_Safe.h"
12
13#include <vector>
14
15namespace joedb::rpc
16{
17 class Client: public Ping_Client
18 {
19 private:
20 Buffer<13> buffer;
22 const std::vector<Signature> &signatures;
23
24 int64_t session_id;
25
26 void handshake()
27 {
28 const SHA_256::Hash h = get_hash(signatures);
29
30 {
31 Lock<Channel&> lock(channel);
32 lock->write((const char *)h.data(), h.size() * sizeof h[0]);
33 lock->read(buffer.data, 9);
34 }
35
36 buffer.index = 0;
37 if (buffer.read<char>() != 'H')
38 throw Exception("failed handshake");
39 session_id = buffer.read<int64_t>();
40 }
41
42 Thread_Safe<Channel&> &get_channel() override {return channel;}
43 void locked_ping(Lock<Channel&> &lock) override
44 {
45 char c = 'P';
46 lock->write(&c, 1);
47 lock->read(&c, 1);
48 }
49 Keep_Alive_Thread keep_alive;
50
51 public:
53 (
54 Channel &channel,
55 const std::vector<Signature> &signatures,
56 std::chrono::milliseconds keep_alive_interval = std::chrono::milliseconds(0)
57 ):
58 channel(channel),
59 signatures(signatures),
60 keep_alive(*this, keep_alive_interval)
61 {
62 handshake();
63 }
64
65 void call(int64_t procedure_id, Memory_File &file)
66 {
67 Lock<Channel&> lock(channel);
68
69 auto &signature = signatures[procedure_id];
70
71 {
72 // Check that the prolog is matching?
73 const int64_t from = int64_t(signature.prolog.size());
74 const int64_t until = file.get_size();
75
76 buffer.index = 0;
77 buffer.write<char>('C');
78 buffer.write<int64_t>(procedure_id);
79 buffer.write<int64_t>(until);
80
81 // Could be optimized into one single write?
82 lock->write(buffer.data, buffer.index);
83 lock->write(file.get_data().data() + from, until - from);
84 lock->read(buffer.data, 9);
85 }
86
87 buffer.index = 0;
88 const char reply = buffer.read<char>();
89
90 if (reply == 'C')
91 {
92 const size_t from = file.get_data().size();
93 const int64_t until = buffer.read<int64_t>();
94 file.get_data().resize(size_t(until));
95 lock->read(file.get_data().data() + from, size_t(until) - from);
96 file.pwrite((const char *)&until, 8, 0);
97 file.pwrite((const char *)&until, 8, 8);
98 }
99 else
100 {
101 const int64_t n = buffer.read<int64_t>();
102 std::string error_message;
103 error_message.resize(n);
104 lock->read(error_message.data(), n);
105 throw Exception(error_message);
106 }
107 }
108
110 {
111 try
112 {
113 Lock<Channel&> lock(channel);
114 lock->write("Q", 1);
115 }
116 catch (...)
117 {
118 }
119 }
120 };
121}
122
123#endif
size_t index
Definition Buffer.h:20
void write(T x)
Definition Buffer.h:23
char data[size+extra_size]
Definition Buffer.h:19
void pwrite(const char *buffer, size_t size, int64_t offset) override
Write a range of bytes. Extend file size if necessary.
std::string & get_data()
Definition Memory_File.h:23
int64_t get_size() const override
Get the size of the file, or -1 if it is unknown.
Definition Memory_File.h:27
std::array< uint32_t, 8 > Hash
Definition SHA_256.h:59
Client(Channel &channel, const std::vector< Signature > &signatures, std::chrono::milliseconds keep_alive_interval=std::chrono::milliseconds(0))
Definition Client.h:53
void call(int64_t procedure_id, Memory_File &file)
Definition Client.h:65
SHA_256::Hash get_hash(const std::vector< Signature > &signatures)
Compute hash code for a collection of procedure signatures.
Definition get_hash.cpp:9