Joedb 10.0.1
The Journal-Only Embedded Database
Loading...
Searching...
No Matches
Server.h
Go to the documentation of this file.
1#ifndef joedb_Server_declared
2#define joedb_Server_declared
3
7
8#include <queue>
9#include <iosfwd>
10#include <set>
11#include <chrono>
12#include <optional>
13#include <map>
14
15#include <asio/local/stream_protocol.hpp>
16#include <asio/steady_timer.hpp>
17#include <asio/signal_set.hpp>
18
19namespace joedb
20{
21 /// @ingroup concurrency
22 class Server
23 {
24 private:
25 const std::chrono::time_point<std::chrono::steady_clock> start_time;
26 Client &client;
27 Writable_Journal_Client *writable_journal_client;
28 std::optional<Writable_Journal_Client_Lock> client_lock;
29 asio::io_context &io_context;
30 std::string endpoint_path;
31 asio::local::stream_protocol::endpoint endpoint;
32 asio::local::stream_protocol::acceptor acceptor;
33 bool stopped;
34 asio::signal_set interrupt_signals;
35
36 int64_t session_id;
37
38 struct Session: public std::enable_shared_from_this<Session>
39 {
40 const int64_t id;
41 Server &server;
42 asio::local::stream_protocol::socket socket;
43 Buffer<13> buffer;
44 enum class State
45 {
46 not_locking,
47 waiting_for_push_to_pull,
48 waiting_for_lock_to_pull,
49 waiting_for_lock_to_push,
50 locking
51 };
52 State state;
53
54 char push_status;
55 int64_t push_remaining_size;
56 std::optional<Async_Writer> push_writer;
57 bool unlock_after_push;
58
59 std::optional<asio::steady_timer> pull_timer;
60 bool lock_before_pulling;
61 bool send_pull_data;
62 int64_t pull_checkpoint;
63
64 std::ostream &write_id(std::ostream &out) const;
65 std::optional<Progress_Bar> progress_bar;
66
67 Session(Server &server, asio::local::stream_protocol::socket &&socket);
68 ~Session();
69 };
70
71 typedef void (Server::*Transfer_Handler)
72 (
73 std::shared_ptr<Session> session,
74 std::error_code error,
75 size_t bytes_transferred
76 );
77
78 void async_read
79 (
80 std::shared_ptr<Session> session,
81 size_t offset,
82 size_t size,
83 Transfer_Handler handler
84 );
85
86 std::set<Session *> sessions;
87
88 void write_status();
89
90 const std::chrono::milliseconds lock_timeout;
91 asio::steady_timer lock_timeout_timer;
92 bool locked;
93 std::queue<std::shared_ptr<Session>> lock_queue;
94 void lock_dequeue();
95 void lock(std::shared_ptr<Session> session, Session::State state);
96 void unlock(Session &session);
97
98 void lock_timeout_handler
99 (
100 std::shared_ptr<Session> session,
101 std::error_code error
102 );
103
104 void refresh_lock_timeout(std::shared_ptr<Session> session);
105
106 void push_transfer_handler
107 (
108 std::shared_ptr<Session> session,
109 std::error_code error,
110 size_t bytes_transferred
111 );
112
113 void push_transfer
114 (
115 std::shared_ptr<Session> session
116 );
117
118 void push_handler
119 (
120 std::shared_ptr<Session> session,
121 std::error_code error,
122 size_t bytes_transferred
123 );
124
125 void read_transfer_handler
126 (
127 std::shared_ptr<Session> session,
128 Async_Reader reader,
129 std::error_code error,
130 size_t bytes_transferred,
131 size_t offset
132 );
133
134 void start_reading(std::shared_ptr<Session> session, Async_Reader reader);
135
136 void start_pulling(std::shared_ptr<Session> session);
137
138 void pull_handler
139 (
140 std::shared_ptr<Session> session,
141 std::error_code error,
142 size_t bytes_transferred
143 );
144
145 void read_handler
146 (
147 std::shared_ptr<Session> session,
148 std::error_code error,
149 size_t bytes_transferred
150 );
151
152 void check_hash_handler
153 (
154 std::shared_ptr<Session> session,
155 std::error_code error,
156 size_t bytes_transferred
157 );
158
159 void read_command_handler
160 (
161 std::shared_ptr<Session> session,
162 std::error_code error,
163 size_t bytes_transferred
164 );
165
166 void read_command(std::shared_ptr<Session> session);
167
168 void write_buffer_and_next_command
169 (
170 std::shared_ptr<Session> session,
171 size_t size
172 );
173
174 void handshake_handler
175 (
176 std::shared_ptr<Session> session,
177 std::error_code error,
178 size_t bytes_transferred
179 );
180
181 void start_accept();
182
183 std::ostream *log_pointer;
184
185 static const std::map<char, const char *> request_description;
186
187 template<typename F> void log(F f)
188 {
189 if (log_pointer)
190 {
191 f(*log_pointer);
192 log_pointer->flush();
193 }
194 }
195
196 public:
197 Server
198 (
199 Client &client,
200 asio::io_context &io_context,
201 std::string endpoint_path,
202 std::chrono::milliseconds lock_timeout,
203 std::ostream *log_pointer
204 );
205
206 const std::string &get_endpoint_path() const {return endpoint_path;}
207 bool has_client_lock() const {return bool(client_lock);}
208 std::chrono::milliseconds get_time_stamp() const;
209
210 // Note: run on io_context if on another thread: io_context.post([&](){server.stop();});
211 void start();
212 void stop_after_sessions();
213 void stop();
214
215 ~Server();
216 };
217}
218
219#endif
Handle concurrent access to a file with a joedb::Connection.
Definition Client.h:12
void stop()
Definition Server.cpp:822
void start()
Definition Server.cpp:782
bool has_client_lock() const
Definition Server.h:207
const std::string & get_endpoint_path() const
Definition Server.h:206
void stop_after_sessions()
Definition Server.cpp:813
std::chrono::milliseconds get_time_stamp() const
Definition Server.cpp:35
Definition Blob.h:7