My Project 3.7.1
C++ Distributed Hash Table
Loading...
Searching...
No Matches
dht_proxy_server.h
1// Copyright (c) 2014-2026 Savoir-faire Linux Inc.
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include "callbacks.h"
6#include "def.h"
7#include "infohash.h"
8#include "proxy.h"
9#include "scheduler.h"
10#include "sockaddr.h"
11#include "value.h"
12#include "http.h"
13
14#include <restinio/all.hpp>
15#include <restinio/tls.hpp>
16#include <json/json.h>
17
18#include <memory>
19#include <mutex>
20
21namespace dht {
22enum class PushType { None = 0, Android, iOS, UnifiedPush };
23}
24MSGPACK_ADD_ENUM(dht::PushType)
25
26namespace Json {
27class Value;
28}
29
30namespace dht {
31
32namespace http {
33class Request;
34struct ListenerSession;
35} // namespace http
36
37class DhtRunner;
38
39using RestRouter = restinio::router::express_router_t<>;
40using RequestStatus = restinio::request_handling_status_t;
41
42struct OPENDHT_PUBLIC ProxyServerConfig
43{
44 std::string address {};
45 in_port_t port {8000};
46 std::string pushServer {};
47 std::string persistStatePath {};
48 dht::crypto::Identity identity {};
49 std::string bundleId {};
50};
51
55class OPENDHT_PUBLIC DhtProxyServer
56{
57public:
66 DhtProxyServer(const std::shared_ptr<DhtRunner>& dht,
67 const ProxyServerConfig& config = {},
68 const std::shared_ptr<log::Logger>& logger = {});
69
70 virtual ~DhtProxyServer();
71
72 DhtProxyServer(const DhtProxyServer& other) = delete;
73 DhtProxyServer(DhtProxyServer&& other) = delete;
74 DhtProxyServer& operator=(const DhtProxyServer& other) = delete;
75 DhtProxyServer& operator=(DhtProxyServer&& other) = delete;
76
77 asio::io_context& io_context() const;
78
79 using clock = std::chrono::steady_clock;
80 using time_point = clock::time_point;
81
82 struct PushStats
83 {
84 uint64_t highPriorityCount {0};
85 uint64_t normalPriorityCount {0};
86
87 void increment(bool highPriority)
88 {
89 if (highPriority)
90 highPriorityCount++;
91 else
92 normalPriorityCount++;
93 }
94
95 Json::Value toJson() const
96 {
97 Json::Value val;
98 val["highPriorityCount"] = static_cast<Json::UInt64>(highPriorityCount);
99 val["normalPriorityCount"] = static_cast<Json::UInt64>(normalPriorityCount);
100 return val;
101 }
102
103 std::string toString() const
104 {
105 return fmt::format("{} high priority, {} normal priority", highPriorityCount, normalPriorityCount);
106 }
107 };
108
109 struct OPENDHT_PUBLIC ServerStats
110 {
112 size_t listenCount {0};
114 size_t putCount {0};
119
121 time_point serverStartTime;
123 time_point lastUpdated;
127 PushStats iosPush;
128 PushStats unifiedPush;
129
131 double requestRate {0};
133 std::shared_ptr<NodeInfo> nodeInfo {};
134
135 std::string toString() const;
136
140 Json::Value toJson() const;
141 };
142
143 std::shared_ptr<ServerStats> stats() const { return stats_; }
144
145 std::shared_ptr<ServerStats> updateStats(std::shared_ptr<NodeInfo> info) const;
146
147 std::shared_ptr<DhtRunner> getNode() const { return dht_; }
148
149private:
150 class ConnectionListener;
151 struct RestRouterTraitsTls;
152 struct RestRouterTraits;
153
154 template<typename HttpResponse>
155 static HttpResponse initHttpResponse(HttpResponse response);
156 static restinio::request_handling_status_t serverError(restinio::request_t& request);
157
158 template<typename ServerSettings>
159 void addServerSettings(ServerSettings& serverSettings, const unsigned int max_pipelined_requests = 16);
160
161 std::unique_ptr<RestRouter> createRestRouter();
162
163 void onConnectionClosed(restinio::connection_id_t);
164
172 RequestStatus getNodeInfo(restinio::request_handle_t request, restinio::router::route_params_t params) const;
173
180 RequestStatus getStats(restinio::request_handle_t request, restinio::router::route_params_t params);
181
192 RequestStatus get(restinio::request_handle_t request, restinio::router::route_params_t params);
193
204 RequestStatus listen(restinio::request_handle_t request, restinio::router::route_params_t params);
205
215 RequestStatus put(restinio::request_handle_t request, restinio::router::route_params_t params);
216
217 void handleCancelPermamentPut(const asio::error_code& ec, const InfoHash& key, Value::Id vid);
218
219#ifdef OPENDHT_PROXY_SERVER_IDENTITY
229 RequestStatus putSigned(restinio::request_handle_t request, restinio::router::route_params_t params) const;
230
240 RequestStatus putEncrypted(restinio::request_handle_t request, restinio::router::route_params_t params);
241
242#endif // OPENDHT_PROXY_SERVER_IDENTITY
243
254 RequestStatus getFiltered(restinio::request_handle_t request, restinio::router::route_params_t params);
255
263 RequestStatus options(restinio::request_handle_t request, restinio::router::route_params_t params);
264
265 struct PushSessionContext
266 {
267 std::mutex lock;
268 std::string sessionId;
269 PushSessionContext(const std::string& id)
270 : sessionId(id)
271 {}
272 };
273
274#ifdef OPENDHT_PUSH_NOTIFICATIONS
275 PushType getTypeFromString(const std::string& type);
276 std::string getDefaultTopic(PushType type);
277
278 RequestStatus pingPush(restinio::request_handle_t request, restinio::router::route_params_t /*params*/);
288 RequestStatus subscribe(restinio::request_handle_t request, restinio::router::route_params_t params);
289
297 RequestStatus unsubscribe(restinio::request_handle_t request, restinio::router::route_params_t params);
298
304 void sendPushNotification(
305 const std::string& key, Json::Value&& json, PushType type, bool highPriority, const std::string& topic);
306
315 void handleNotifyPushListenExpire(const asio::error_code& ec,
316 const std::string pushToken,
317 std::function<Json::Value()> json,
318 PushType type,
319 const std::string& topic);
320
328 void handleCancelPushListen(const asio::error_code& ec,
329 const std::string pushToken,
330 const InfoHash key,
331 const std::string clientId);
332
346 bool handlePushListen(const InfoHash& infoHash,
347 const std::string& pushToken,
348 PushType type,
349 const std::string& clientId,
350 const std::shared_ptr<DhtProxyServer::PushSessionContext>& sessionCtx,
351 const std::string& topic,
352 const std::vector<std::shared_ptr<Value>>& values,
353 bool expired);
354
355#endif // OPENDHT_PUSH_NOTIFICATIONS
356
357 void handlePrintStats(const asio::error_code& ec);
358 void updateStats();
359
360 template<typename Os>
361 void saveState(Os& stream);
362
363 template<typename Is>
364 void loadState(Is& is, size_t size);
365
366 std::shared_ptr<asio::io_context> ioContext_;
367 std::shared_ptr<DhtRunner> dht_;
368 Json::StreamWriterBuilder jsonBuilder_;
369 Json::CharReaderBuilder jsonReaderBuilder_;
370 std::mt19937_64 rd {crypto::getSeededRandomEngine<std::mt19937_64>()};
371
372 std::string persistPath_;
373
374 // http server
375 std::thread serverThread_;
376 std::unique_ptr<restinio::http_server_t<RestRouterTraitsTls>> httpsServer_;
377 std::unique_ptr<restinio::http_server_t<RestRouterTraits>> httpServer_;
378
379 // http client
380 std::pair<std::string, std::string> pushHostPort_;
381
382 mutable std::mutex requestLock_;
383 std::map<unsigned int /*id*/, std::shared_ptr<http::Request>> requests_;
384
385 std::shared_ptr<log::Logger> logger_;
386
387 std::shared_ptr<ServerStats> stats_;
388 std::shared_ptr<NodeInfo> nodeInfo_ {};
389 std::unique_ptr<asio::steady_timer> printStatsTimer_;
390 const time_point serverStartTime_;
391 mutable std::mutex pushStatsMutex_;
392 PushStats androidPush_;
393 PushStats iosPush_;
394 PushStats unifiedPush_;
395
396 // Thread-safe access to listeners map.
397 std::mutex lockListener_;
398 // Shared with connection listener.
399 std::map<restinio::connection_id_t, http::ListenerSession> listeners_;
400 // Connection Listener observing conn state changes.
401 std::shared_ptr<ConnectionListener> connListener_;
402 struct PermanentPut
403 {
404 time_point expiration;
405 std::string pushToken;
406 std::string clientId;
407 std::shared_ptr<PushSessionContext> sessionCtx;
408 std::unique_ptr<asio::steady_timer> expireTimer;
409 std::unique_ptr<asio::steady_timer> expireNotifyTimer;
410 Sp<Value> value;
411 PushType type;
412 std::string topic;
413
414 template<typename Packer>
415 void msgpack_pack(Packer& p) const
416 {
417 p.pack_map(2 + (sessionCtx ? 1 : 0) + (clientId.empty() ? 0 : 1) + (type == PushType::None ? 0 : 2)
418 + (topic.empty() ? 0 : 1));
419 p.pack("value");
420 p.pack(value);
421 p.pack("exp");
422 p.pack(to_time_t(expiration));
423 if (not clientId.empty()) {
424 p.pack("cid");
425 p.pack(clientId);
426 }
427 if (sessionCtx) {
428 std::lock_guard<std::mutex> l(sessionCtx->lock);
429 p.pack("sid");
430 p.pack(sessionCtx->sessionId);
431 }
432 if (type != PushType::None) {
433 p.pack("t");
434 p.pack(type);
435 p.pack("token");
436 p.pack(pushToken);
437 }
438 if (not topic.empty()) {
439 p.pack("top");
440 p.pack(topic);
441 }
442 }
443
444 void msgpack_unpack(const msgpack::object& o);
445 };
446 struct SearchPuts
447 {
448 std::map<dht::Value::Id, PermanentPut> puts;
449 MSGPACK_DEFINE_ARRAY(puts)
450 };
451 std::mutex lockSearchPuts_;
452 std::map<InfoHash, SearchPuts> puts_;
453
454 mutable std::atomic<size_t> requestNum_ {0};
455 mutable std::atomic<time_point> lastStatsReset_ {time_point::min()};
456
457 std::string pushServer_;
458 std::string bundleId_;
459
460#ifdef OPENDHT_PUSH_NOTIFICATIONS
461 struct Listener
462 {
463 time_point expiration;
464 std::string clientId;
465 std::shared_ptr<PushSessionContext> sessionCtx;
466 std::future<size_t> internalToken;
467 std::unique_ptr<asio::steady_timer> expireTimer;
468 std::unique_ptr<asio::steady_timer> expireNotifyTimer;
469 PushType type;
470 std::string topic;
471
472 template<typename Packer>
473 void msgpack_pack(Packer& p) const
474 {
475 p.pack_map(3 + (sessionCtx ? 1 : 0) + (topic.empty() ? 0 : 1));
476 p.pack("cid");
477 p.pack(clientId);
478 p.pack("exp");
479 p.pack(to_time_t(expiration));
480 if (sessionCtx) {
481 std::lock_guard<std::mutex> l(sessionCtx->lock);
482 p.pack("sid");
483 p.pack(sessionCtx->sessionId);
484 }
485 p.pack("t");
486 p.pack(type);
487 if (!topic.empty()) {
488 p.pack("top");
489 p.pack(topic);
490 }
491 }
492
493 void msgpack_unpack(const msgpack::object& o);
494 };
495 struct PushListener
496 {
497 std::map<InfoHash, std::vector<Listener>> listeners;
498 MSGPACK_DEFINE_ARRAY(listeners)
499 };
500 std::map<std::string, PushListener> pushListeners_;
501#endif // OPENDHT_PUSH_NOTIFICATIONS
502};
503
504} // namespace dht
DhtProxyServer(const std::shared_ptr< DhtRunner > &dht, const ProxyServerConfig &config={}, const std::shared_ptr< log::Logger > &logger={})
std::shared_ptr< NodeInfo > nodeInfo