My Project 3.7.1
C++ Distributed Hash Table
Loading...
Searching...
No Matches
dht_proxy_client.h
1// Copyright (c) 2014-2026 Savoir-faire Linux Inc.
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include <functional>
6#include <mutex>
7
8#include "callbacks.h"
9#include "def.h"
10#include "dht_interface.h"
11#include "proxy.h"
12#include "http.h"
13
14#include <restinio/all.hpp>
15#include <json/json.h>
16
17#include <chrono>
18#include <vector>
19#include <functional>
20
21namespace Json {
22class Value;
23}
24
25namespace http {
26class Resolver;
27class Request;
28} // namespace http
29
30namespace dht {
31
32class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface
33{
34public:
35 DhtProxyClient();
36
37 explicit DhtProxyClient(std::shared_ptr<crypto::Certificate> serverCA,
38 crypto::Identity clientIdentity,
39 std::function<void()> loopSignal,
40 const std::string& serverHost,
41 const std::string& userAgent = "",
42 const std::string& pushClientId = "",
43 const std::string& pushToken = "",
44 const std::string& pushTopic = "",
45 const std::string& pushPlatform = "",
46 std::shared_ptr<Logger> logger = {});
47
48 void setHeaderFields(http::Request& request);
49
50 virtual void setPushNotificationToken(const std::string& token) override;
51
52 virtual void setPushNotificationTopic(const std::string& topic) override
53 {
54#ifdef OPENDHT_PUSH_NOTIFICATIONS
55 notificationTopic_ = topic;
56#else
57 (void) topic;
58#endif
59 }
60
61 virtual void setPushNotificationPlatform(const std::string& platform) override
62 {
63#ifdef OPENDHT_PUSH_NOTIFICATIONS
64 platform_ = platform;
65#else
66 (void) platform;
67#endif
68 }
69
70 virtual ~DhtProxyClient();
71
75 inline const InfoHash& getNodeId() const override { return myid; }
76 void setOnPublicAddressChanged(PublicAddressChangedCb cb) override { publicAddressChangedCb_ = std::move(cb); }
77
81 NodeStatus getStatus(sa_family_t af) const override;
82 NodeStatus getStatus() const override { return std::max(getStatus(AF_INET), getStatus(AF_INET6)); }
83
87 void shutdown(ShutdownCallback cb, bool) override;
88
95 bool isRunning(sa_family_t af = 0) const override;
96
107 virtual void get(
108 const InfoHash& key, GetCallback cb, DoneCallback donecb = {}, Value::Filter&& f = {}, Where&& w = {}) override;
109 virtual void get(const InfoHash& key,
110 GetCallback cb,
111 DoneCallbackSimple donecb = {},
112 Value::Filter&& f = {},
113 Where&& w = {}) override
114 {
115 get(key, cb, bindDoneCb(std::move(donecb)), std::forward<Value::Filter>(f), std::forward<Where>(w));
116 }
117 virtual void get(const InfoHash& key,
118 GetCallbackSimple cb,
119 DoneCallback donecb = {},
120 Value::Filter&& f = {},
121 Where&& w = {}) override
122 {
123 get(key, bindGetCb(cb), std::move(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
124 }
125 virtual void get(const InfoHash& key,
126 GetCallbackSimple cb,
127 DoneCallbackSimple donecb,
128 Value::Filter&& f = {},
129 Where&& w = {}) override
130 {
131 get(key, bindGetCb(cb), bindDoneCb(std::move(donecb)), std::forward<Value::Filter>(f), std::forward<Where>(w));
132 }
133
141 void put(const InfoHash& key,
142 Sp<Value>,
143 DoneCallback cb = nullptr,
144 time_point created = time_point::max(),
145 bool permanent = false) override;
146 void put(const InfoHash& key,
147 const Sp<Value>& v,
148 DoneCallbackSimple cb,
149 time_point created = time_point::max(),
150 bool permanent = false) override
151 {
152 put(key, v, bindDoneCb(std::move(cb)), created, permanent);
153 }
154
155 void put(const InfoHash& key,
156 Value&& v,
157 DoneCallback cb = nullptr,
158 time_point created = time_point::max(),
159 bool permanent = false) override
160 {
161 put(key, std::make_shared<Value>(std::move(v)), std::move(cb), created, permanent);
162 }
163 void put(const InfoHash& key,
164 Value&& v,
165 DoneCallbackSimple cb,
166 time_point created = time_point::max(),
167 bool permanent = false) override
168 {
169 put(key, std::forward<Value>(v), bindDoneCb(std::move(cb)), created, permanent);
170 }
171
176 NodeStats getNodesStats(sa_family_t af) const override;
177
182 std::vector<SockAddr> getPublicAddress(sa_family_t family = 0) override;
183
191 virtual size_t listen(const InfoHash&, ValueCallback, Value::Filter = {}, Where = {}) override;
192
193 virtual size_t listen(const InfoHash& key, GetCallback cb, Value::Filter f = {}, Where w = {}) override
194 {
195 return listen(
196 key,
197 [cb = std::move(cb)](const std::vector<Sp<Value>>& vals, bool expired) {
198 if (not expired)
199 return cb(vals);
200 return true;
201 },
202 std::forward<Value::Filter>(f),
203 std::forward<Where>(w));
204 }
205 virtual size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f = {}, Where w = {}) override
206 {
207 return listen(key, bindGetCb(std::move(cb)), std::forward<Value::Filter>(f), std::forward<Where>(w));
208 }
209 /*
210 * This function relies on the cache implementation.
211 * It means that there are no true cancel here, it keeps the caching in higher priority.
212 */
213 virtual bool cancelListen(const InfoHash& key, size_t token) override;
214
219 PushNotificationResult pushNotificationReceived(const std::map<std::string, std::string>& notification) override;
220
221 time_point periodic(const uint8_t*, size_t, SockAddr, const time_point& now) override;
222 time_point periodic(
223 const uint8_t* buf, size_t buflen, const sockaddr* from, socklen_t fromlen, const time_point& now) override
224 {
225 return periodic(buf, buflen, SockAddr(from, fromlen), now);
226 }
227
238 virtual void query(const InfoHash& /*key*/,
239 QueryCallback /*cb*/,
240 DoneCallback /*done_cb*/ = {},
241 Query&& /*q*/ = {}) override
242 {}
243 virtual void query(const InfoHash& key, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query&& q = {}) override
244 {
245 query(key, cb, bindDoneCb(std::move(done_cb)), std::forward<Query>(q));
246 }
247
251 std::vector<Sp<Value>> getPut(const InfoHash&) const override;
252
256 Sp<Value> getPut(const InfoHash&, const Value::Id&) const override;
257
262 bool cancelPut(const InfoHash&, const Value::Id&) override;
263
264 void pingNode(SockAddr, DoneCallbackSimple&& /*cb*/ = {}) override {}
265
266 virtual void registerType(const ValueType& type) override { types.registerType(type); }
267 const ValueType& getType(ValueType::Id type_id) const override { return types.getType(type_id); }
268
269 std::vector<Sp<Value>> getLocal(const InfoHash& k, const Value::Filter& filter) const override;
270 Sp<Value> getLocalById(const InfoHash& k, Value::Id id) const override;
271
276 void insertNode(const InfoHash&, const SockAddr&) override {}
277 void insertNode(const NodeExport&) override {}
278 std::pair<size_t, size_t> getStoreSize() const override { return {}; }
279 std::pair<size_t, size_t> getLocalStoreSize() const override { return {}; }
280 std::vector<NodeExport> exportNodes() const override { return {}; }
281 std::vector<ValuesExport> exportValues() const override { return {}; }
282 void importValues(const std::vector<ValuesExport>&) override {}
283 std::string getStorageLog() const override { return {}; }
284 std::string getStorageLog(const InfoHash&) const override { return {}; }
285 std::string getRoutingTablesLog(sa_family_t) const override { return {}; }
286 std::string getSearchesLog(sa_family_t) const override { return {}; }
287 std::string getSearchLog(const InfoHash&, sa_family_t) const override { return {}; }
288 void dumpTables() const override {}
289 std::vector<unsigned> getNodeMessageStats(bool) override { return {}; }
290 void setStorageLimit(size_t) override {}
291 virtual size_t getStorageLimit() const override { return 0; }
292 void setLocalStorageLimit(size_t) override {}
293 virtual size_t getLocalStorageLimit() const override { return STORAGE_LIMIT_UNLIMITED; }
294 void connectivityChanged(sa_family_t) override { getProxyInfos(); }
295 void connectivityChanged() override
296 {
297 getProxyInfos();
298 loopSignal_();
299 }
300
301 void listenKeepIdle(uint32_t seconds) { listenKeepIdle_ = seconds; }
302 inline uint32_t listenKeepIdle() { return listenKeepIdle_; }
303
304private:
308 void startProxy();
309 void stop();
310
315 struct InfoState;
316 void getProxyInfos();
317 void queryProxyInfo(const std::shared_ptr<InfoState>& infoState,
318 const std::shared_ptr<http::Resolver>& resolver,
319 sa_family_t family);
320 void onProxyInfos(const Json::Value& val, const sa_family_t family);
321 SockAddr parsePublicAddress(const Json::Value& val);
322
323 void opFailed();
324
325 void handleExpireListener(const asio::error_code& ec, const InfoHash& key);
326
327 struct Listener;
328 struct OperationState;
329 enum class ListenMethod {
330 LISTEN,
331 SUBSCRIBE,
332 RESUBSCRIBE,
333 };
334 using CacheValueCallback
335 = std::function<bool(const std::vector<std::shared_ptr<Value>>& values, bool expired, system_clock::time_point)>;
336
340 void sendListen(const restinio::http_request_header_t& header,
341 const CacheValueCallback& cb,
342 const Sp<OperationState>& opstate,
343 Listener& listener,
344 ListenMethod method = ListenMethod::LISTEN);
345 void handleResubscribe(const asio::error_code& ec,
346 const InfoHash& key,
347 const size_t token,
348 std::shared_ptr<OperationState> opstate);
349
350 void doPut(const InfoHash&, Sp<Value>, DoneCallbackSimple, time_point created, bool permanent);
351 void handleRefreshPut(const asio::error_code& ec, InfoHash key, Value::Id id);
352
356 void getConnectivityStatus();
360 void cancelAllListeners();
361
362 std::atomic_bool isDestroying_ {false};
363
364 std::string proxyUrl_;
365 dht::crypto::Identity clientIdentity_;
366 std::shared_ptr<dht::crypto::Certificate> serverCertificate_;
367 std::string userAgent_ {"OpenDHT"};
368 std::string pushClientId_;
369 std::string pushSessionId_;
370
371 mutable std::mutex lockCurrentProxyInfos_;
372 NodeStatus statusIpv4_ {NodeStatus::Disconnected};
373 NodeStatus statusIpv6_ {NodeStatus::Disconnected};
374 NodeStats stats4_ {};
375 NodeStats stats6_ {};
376 SockAddr localAddrv4_;
377 SockAddr localAddrv6_;
378 SockAddr publicAddressV4_;
379 SockAddr publicAddressV6_;
380 std::atomic_bool launchConnectedCbs_ {false};
381 PublicAddressChangedCb publicAddressChangedCb_ {};
382
383 InfoHash myid {};
384
385 // registred types
386 TypeStore types;
387
388 /*
389 * ASIO I/O Context for sockets in httpClient_
390 * Note: Each context is used in one thread only
391 */
392 asio::io_context httpContext_;
393 mutable std::mutex resolverLock_;
394 std::shared_ptr<http::Resolver> resolver_;
395
396 mutable std::mutex requestLock_;
397 std::map<unsigned, std::shared_ptr<http::Request>> requests_;
398 /*
399 * Thread for executing the http io_context.run() blocking call
400 */
401 std::thread httpClientThread_;
402
406 struct ProxySearch;
407
408 mutable std::mutex searchLock_;
409 size_t listenerToken_ {0};
410 std::map<InfoHash, ProxySearch> searches_;
411
415 uint32_t listenKeepIdle_ {120};
416
420 std::mutex lockCallbacks_;
421 std::vector<std::function<void()>> callbacks_;
422
423 Sp<InfoState> infoState_;
424
428 void handleProxyConfirm(const asio::error_code& ec);
429 std::unique_ptr<asio::steady_timer> nextProxyConfirmationTimer_;
430 std::unique_ptr<asio::steady_timer> listenerRestartTimer_;
431
435 void restartListeners(const asio::error_code& ec);
436
441 void resubscribe(const InfoHash& key, const size_t token, Listener& listener);
442
447 std::string deviceKey_ {};
448
452 std::string notificationTopic_ {};
453
457 std::string platform_
458#ifdef __ANDROID__
459 {"android"};
460#else
461#ifdef __APPLE__
462 {"ios"};
463#else
464 {};
465#endif
466#endif
467
468 const std::function<void()> loopSignal_;
469
470#ifdef OPENDHT_PUSH_NOTIFICATIONS
471 std::string fillBody(bool resubscribe);
472 void getPushRequest(Json::Value&) const;
473#endif // OPENDHT_PUSH_NOTIFICATIONS
474
475 Json::StreamWriterBuilder jsonBuilder_;
476 std::unique_ptr<Json::CharReader> jsonReader_;
477
478 std::shared_ptr<http::Request> buildRequest(const std::string& target = {});
479};
480
481} // namespace dht
bool cancelPut(const InfoHash &, const Value::Id &) override
void insertNode(const InfoHash &, const SockAddr &) override
NodeStatus getStatus(sa_family_t af) const override
Sp< Value > getPut(const InfoHash &, const Value::Id &) const override
virtual void query(const InfoHash &, QueryCallback, DoneCallback={}, Query &&={}) override
std::vector< Sp< Value > > getPut(const InfoHash &) const override
const InfoHash & getNodeId() const override
virtual void get(const InfoHash &key, GetCallback cb, DoneCallback donecb={}, Value::Filter &&f={}, Where &&w={}) override
virtual size_t listen(const InfoHash &, ValueCallback, Value::Filter={}, Where={}) override
bool isRunning(sa_family_t af=0) const override
Sp< Value > getLocalById(const InfoHash &k, Value::Id id) const override
PushNotificationResult pushNotificationReceived(const std::map< std::string, std::string > &notification) override
std::pair< size_t, size_t > getStoreSize() const override
void setStorageLimit(size_t) override
std::vector< NodeExport > exportNodes() const override
std::vector< Sp< Value > > getLocal(const InfoHash &k, const Value::Filter &filter) const override
std::vector< SockAddr > getPublicAddress(sa_family_t family=0) override
void connectivityChanged(sa_family_t) override
void put(const InfoHash &key, Sp< Value >, DoneCallback cb=nullptr, time_point created=time_point::max(), bool permanent=false) override
virtual size_t listen(const InfoHash &key, GetCallback cb, Value::Filter f={}, Where w={}) override
NodeStats getNodesStats(sa_family_t af) const override
void shutdown(ShutdownCallback cb, bool) override
void setLocalStorageLimit(size_t) override
NodeStatus
Definition callbacks.h:24
Describes a query destined to another peer.
Definition value.h:988
Serializable dht::Value filter.
Definition value.h:850