Joedb 9.5.0
The Journal-Only Embedded Database
Loading...
Searching...
No Matches
Server.h
Go to the documentation of this file.
1#ifndef joedb_Server_declared
2#define joedb_Server_declared
3
7
8#include <queue>
9#include <iosfwd>
10#include <set>
11#include <chrono>
12#include <optional>
13
14#include <asio/local/stream_protocol.hpp>
15#include <asio/steady_timer.hpp>
16#include <asio/signal_set.hpp>
17
18namespace joedb
19{
20 /// @ingroup concurrency
21 class Server
22 {
23 private:
24 const std::chrono::time_point<std::chrono::steady_clock> start_time;
25 Client &client;
26 Writable_Journal_Client *writable_journal_client;
27 std::optional<Writable_Journal_Client_Lock> client_lock;
28 asio::io_context &io_context;
29 std::string endpoint_path;
30 asio::local::stream_protocol::endpoint endpoint;
31 asio::local::stream_protocol::acceptor acceptor;
32 bool stopped;
33 asio::signal_set interrupt_signals;
34
35 int64_t session_id;
36
37 struct Session: public std::enable_shared_from_this<Session>
38 {
39 const int64_t id;
40 Server &server;
41 asio::local::stream_protocol::socket socket;
42 Buffer<13> buffer;
43 enum class State
44 {
45 not_locking,
46 waiting_for_push_to_pull,
47 waiting_for_lock_to_pull,
48 waiting_for_lock_to_push,
49 locking
50 };
51 State state;
52
53 char push_status;
54 int64_t push_remaining_size;
55 std::optional<Async_Writer> push_writer;
56 bool unlock_after_push;
57
58 std::optional<asio::steady_timer> pull_timer;
59 bool lock_before_pulling;
60 bool send_pull_data;
61 int64_t pull_checkpoint;
62
63 std::ostream &write_id(std::ostream &out) const;
64 std::optional<Progress_Bar> progress_bar;
65
66 Session(Server &server, asio::local::stream_protocol::socket &&socket);
67 ~Session();
68 };
69
70 typedef void (Server::*Transfer_Handler)
71 (
72 std::shared_ptr<Session> session,
73 std::error_code error,
74 size_t bytes_transferred
75 );
76
77 void async_read
78 (
79 std::shared_ptr<Session> session,
80 size_t offset,
81 size_t size,
82 Transfer_Handler handler
83 );
84
85 std::set<Session *> sessions;
86
87 void write_status();
88
89 const std::chrono::milliseconds lock_timeout;
90 asio::steady_timer lock_timeout_timer;
91 bool locked;
92 std::queue<std::shared_ptr<Session>> lock_queue;
93 void lock_dequeue();
94 void lock(std::shared_ptr<Session> session, Session::State state);
95 void unlock(Session &session);
96
97 void lock_timeout_handler
98 (
99 std::shared_ptr<Session> session,
100 std::error_code error
101 );
102
103 void refresh_lock_timeout(std::shared_ptr<Session> session);
104
105 void push_transfer_handler
106 (
107 std::shared_ptr<Session> session,
108 std::error_code error,
109 size_t bytes_transferred
110 );
111
112 void push_transfer
113 (
114 std::shared_ptr<Session> session
115 );
116
117 void push_handler
118 (
119 std::shared_ptr<Session> session,
120 std::error_code error,
121 size_t bytes_transferred
122 );
123
124 void read_transfer_handler
125 (
126 std::shared_ptr<Session> session,
127 Async_Reader reader,
128 std::error_code error,
129 size_t bytes_transferred,
130 size_t offset
131 );
132
133 void start_reading(std::shared_ptr<Session> session, Async_Reader reader);
134
135 void start_pulling(std::shared_ptr<Session> session);
136
137 void pull_handler
138 (
139 std::shared_ptr<Session> session,
140 std::error_code error,
141 size_t bytes_transferred
142 );
143
144 void read_handler
145 (
146 std::shared_ptr<Session> session,
147 std::error_code error,
148 size_t bytes_transferred
149 );
150
151 void check_hash_handler
152 (
153 std::shared_ptr<Session> session,
154 std::error_code error,
155 size_t bytes_transferred
156 );
157
158 void read_command_handler
159 (
160 std::shared_ptr<Session> session,
161 std::error_code error,
162 size_t bytes_transferred
163 );
164
165 void read_command(std::shared_ptr<Session> session);
166
167 void write_buffer_and_next_command
168 (
169 std::shared_ptr<Session> session,
170 size_t size
171 );
172
173 void handshake_handler
174 (
175 std::shared_ptr<Session> session,
176 std::error_code error,
177 size_t bytes_transferred
178 );
179
180 void start_accept();
181
182 std::ostream *log_pointer;
183
184 template<typename F> void log(F f)
185 {
186 if (log_pointer)
187 {
188 f(*log_pointer);
189 log_pointer->flush();
190 }
191 }
192
193 public:
194 Server
195 (
196 Client &client,
197 asio::io_context &io_context,
198 std::string endpoint_path,
199 std::chrono::milliseconds lock_timeout,
200 std::ostream *log_pointer
201 );
202
203 const std::string &get_endpoint_path() const {return endpoint_path;}
204 bool has_client_lock() const {return bool(client_lock);}
205 std::chrono::milliseconds get_time_stamp() const;
206
207 // Note: run on io_context if on another thread: io_context.post([&](){server.stop();});
208 void start();
209 void stop_after_sessions();
210 void stop();
211
212 ~Server();
213 };
214}
215
216#endif
Handle concurrent access to a file with a joedb::Connection.
Definition Client.h:12
void stop()
Definition Server.cpp:797
void start()
Definition Server.cpp:757
bool has_client_lock() const
Definition Server.h:204
const std::string & get_endpoint_path() const
Definition Server.h:203
void stop_after_sessions()
Definition Server.cpp:788
std::chrono::milliseconds get_time_stamp() const
Definition Server.cpp:18
Definition Blob.h:7