68 const std::shared_ptr<log::Logger>& logger = {});
70 virtual ~DhtProxyServer();
72 DhtProxyServer(
const DhtProxyServer& other) =
delete;
73 DhtProxyServer(DhtProxyServer&& other) =
delete;
74 DhtProxyServer& operator=(
const DhtProxyServer& other) =
delete;
75 DhtProxyServer& operator=(DhtProxyServer&& other) =
delete;
77 asio::io_context& io_context()
const;
79 using clock = std::chrono::steady_clock;
80 using time_point = clock::time_point;
84 uint64_t highPriorityCount {0};
85 uint64_t normalPriorityCount {0};
87 void increment(
bool highPriority)
92 normalPriorityCount++;
95 Json::Value toJson()
const
98 val[
"highPriorityCount"] =
static_cast<Json::UInt64
>(highPriorityCount);
99 val[
"normalPriorityCount"] =
static_cast<Json::UInt64
>(normalPriorityCount);
103 std::string toString()
const
105 return fmt::format(
"{} high priority, {} normal priority", highPriorityCount, normalPriorityCount);
135 std::string toString()
const;
143 std::shared_ptr<ServerStats> stats()
const {
return stats_; }
145 std::shared_ptr<ServerStats> updateStats(std::shared_ptr<NodeInfo> info)
const;
147 std::shared_ptr<DhtRunner> getNode()
const {
return dht_; }
150 class ConnectionListener;
151 struct RestRouterTraitsTls;
152 struct RestRouterTraits;
154 template<
typename HttpResponse>
155 static HttpResponse initHttpResponse(HttpResponse response);
156 static restinio::request_handling_status_t serverError(restinio::request_t& request);
158 template<
typename ServerSettings>
159 void addServerSettings(ServerSettings& serverSettings,
const unsigned int max_pipelined_requests = 16);
161 std::unique_ptr<RestRouter> createRestRouter();
163 void onConnectionClosed(restinio::connection_id_t);
172 RequestStatus getNodeInfo(restinio::request_handle_t request, restinio::router::route_params_t params)
const;
180 RequestStatus getStats(restinio::request_handle_t request, restinio::router::route_params_t params);
192 RequestStatus get(restinio::request_handle_t request, restinio::router::route_params_t params);
204 RequestStatus listen(restinio::request_handle_t request, restinio::router::route_params_t params);
215 RequestStatus put(restinio::request_handle_t request, restinio::router::route_params_t params);
217 void handleCancelPermamentPut(
const asio::error_code& ec,
const InfoHash& key, Value::Id vid);
219#ifdef OPENDHT_PROXY_SERVER_IDENTITY
229 RequestStatus putSigned(restinio::request_handle_t request, restinio::router::route_params_t params)
const;
240 RequestStatus putEncrypted(restinio::request_handle_t request, restinio::router::route_params_t params);
254 RequestStatus getFiltered(restinio::request_handle_t request, restinio::router::route_params_t params);
263 RequestStatus options(restinio::request_handle_t request, restinio::router::route_params_t params);
265 struct PushSessionContext
268 std::string sessionId;
269 PushSessionContext(
const std::string&
id)
274#ifdef OPENDHT_PUSH_NOTIFICATIONS
275 PushType getTypeFromString(
const std::string& type);
276 std::string getDefaultTopic(PushType type);
278 RequestStatus pingPush(restinio::request_handle_t request, restinio::router::route_params_t );
288 RequestStatus subscribe(restinio::request_handle_t request, restinio::router::route_params_t params);
297 RequestStatus unsubscribe(restinio::request_handle_t request, restinio::router::route_params_t params);
304 void sendPushNotification(
305 const std::string& key, Json::Value&& json, PushType type,
bool highPriority,
const std::string& topic);
315 void handleNotifyPushListenExpire(
const asio::error_code& ec,
316 const std::string pushToken,
317 std::function<Json::Value()> json,
319 const std::string& topic);
328 void handleCancelPushListen(
const asio::error_code& ec,
329 const std::string pushToken,
331 const std::string clientId);
346 bool handlePushListen(
const InfoHash& infoHash,
347 const std::string& pushToken,
349 const std::string& clientId,
350 const std::shared_ptr<DhtProxyServer::PushSessionContext>& sessionCtx,
351 const std::string& topic,
352 const std::vector<std::shared_ptr<Value>>& values,
357 void handlePrintStats(
const asio::error_code& ec);
360 template<
typename Os>
361 void saveState(Os& stream);
363 template<
typename Is>
364 void loadState(Is& is,
size_t size);
366 std::shared_ptr<asio::io_context> ioContext_;
367 std::shared_ptr<DhtRunner> dht_;
368 Json::StreamWriterBuilder jsonBuilder_;
369 Json::CharReaderBuilder jsonReaderBuilder_;
370 std::mt19937_64 rd {crypto::getSeededRandomEngine<std::mt19937_64>()};
372 std::string persistPath_;
375 std::thread serverThread_;
376 std::unique_ptr<restinio::http_server_t<RestRouterTraitsTls>> httpsServer_;
377 std::unique_ptr<restinio::http_server_t<RestRouterTraits>> httpServer_;
380 std::pair<std::string, std::string> pushHostPort_;
382 mutable std::mutex requestLock_;
383 std::map<
unsigned int , std::shared_ptr<http::Request>> requests_;
385 std::shared_ptr<log::Logger> logger_;
387 std::shared_ptr<ServerStats> stats_;
388 std::shared_ptr<NodeInfo> nodeInfo_ {};
389 std::unique_ptr<asio::steady_timer> printStatsTimer_;
390 const time_point serverStartTime_;
391 mutable std::mutex pushStatsMutex_;
392 PushStats androidPush_;
394 PushStats unifiedPush_;
397 std::mutex lockListener_;
399 std::map<restinio::connection_id_t, http::ListenerSession> listeners_;
401 std::shared_ptr<ConnectionListener> connListener_;
404 time_point expiration;
405 std::string pushToken;
406 std::string clientId;
407 std::shared_ptr<PushSessionContext> sessionCtx;
408 std::unique_ptr<asio::steady_timer> expireTimer;
409 std::unique_ptr<asio::steady_timer> expireNotifyTimer;
414 template<
typename Packer>
415 void msgpack_pack(Packer& p)
const
417 p.pack_map(2 + (sessionCtx ? 1 : 0) + (clientId.empty() ? 0 : 1) + (type == PushType::None ? 0 : 2)
418 + (topic.empty() ? 0 : 1));
422 p.pack(to_time_t(expiration));
423 if (not clientId.empty()) {
428 std::lock_guard<std::mutex> l(sessionCtx->lock);
430 p.pack(sessionCtx->sessionId);
432 if (type != PushType::None) {
438 if (not topic.empty()) {
444 void msgpack_unpack(
const msgpack::object& o);
448 std::map<dht::Value::Id, PermanentPut> puts;
449 MSGPACK_DEFINE_ARRAY(puts)
451 std::mutex lockSearchPuts_;
452 std::map<InfoHash, SearchPuts> puts_;
454 mutable std::atomic<size_t> requestNum_ {0};
455 mutable std::atomic<time_point> lastStatsReset_ {time_point::min()};
457 std::string pushServer_;
458 std::string bundleId_;
460#ifdef OPENDHT_PUSH_NOTIFICATIONS
463 time_point expiration;
464 std::string clientId;
465 std::shared_ptr<PushSessionContext> sessionCtx;
466 std::future<size_t> internalToken;
467 std::unique_ptr<asio::steady_timer> expireTimer;
468 std::unique_ptr<asio::steady_timer> expireNotifyTimer;
472 template<
typename Packer>
473 void msgpack_pack(Packer& p)
const
475 p.pack_map(3 + (sessionCtx ? 1 : 0) + (topic.empty() ? 0 : 1));
479 p.pack(to_time_t(expiration));
481 std::lock_guard<std::mutex> l(sessionCtx->lock);
483 p.pack(sessionCtx->sessionId);
487 if (!topic.empty()) {
493 void msgpack_unpack(
const msgpack::object& o);
497 std::map<InfoHash, std::vector<Listener>> listeners;
498 MSGPACK_DEFINE_ARRAY(listeners)
500 std::map<std::string, PushListener> pushListeners_;