32class OPENDHT_PUBLIC DhtProxyClient final :
public DhtInterface
37 explicit DhtProxyClient(std::shared_ptr<crypto::Certificate> serverCA,
38 crypto::Identity clientIdentity,
39 std::function<
void()> loopSignal,
40 const std::string& serverHost,
41 const std::string& userAgent =
"",
42 const std::string& pushClientId =
"",
43 const std::string& pushToken =
"",
44 const std::string& pushTopic =
"",
45 const std::string& pushPlatform =
"",
46 std::shared_ptr<Logger> logger = {});
50 virtual void setPushNotificationToken(
const std::string& token)
override;
52 virtual void setPushNotificationTopic(
const std::string& topic)
override
54#ifdef OPENDHT_PUSH_NOTIFICATIONS
55 notificationTopic_ = topic;
61 virtual void setPushNotificationPlatform(
const std::string& platform)
override
63#ifdef OPENDHT_PUSH_NOTIFICATIONS
70 virtual ~DhtProxyClient();
75 inline const InfoHash&
getNodeId()
const override {
return myid; }
76 void setOnPublicAddressChanged(PublicAddressChangedCb cb)
override { publicAddressChangedCb_ = std::move(cb); }
87 void shutdown(ShutdownCallback cb,
bool)
override;
108 const InfoHash& key, GetCallback cb, DoneCallback donecb = {},
Value::Filter&& f = {}, Where&& w = {})
override;
109 virtual void get(
const InfoHash& key,
111 DoneCallbackSimple donecb = {},
112 Value::Filter&& f = {},
113 Where&& w = {})
override
115 get(key, cb, bindDoneCb(std::move(donecb)), std::forward<Value::Filter>(f), std::forward<Where>(w));
117 virtual void get(
const InfoHash& key,
118 GetCallbackSimple cb,
119 DoneCallback donecb = {},
120 Value::Filter&& f = {},
121 Where&& w = {})
override
123 get(key, bindGetCb(cb), std::move(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
125 virtual void get(
const InfoHash& key,
126 GetCallbackSimple cb,
127 DoneCallbackSimple donecb,
128 Value::Filter&& f = {},
129 Where&& w = {})
override
131 get(key, bindGetCb(cb), bindDoneCb(std::move(donecb)), std::forward<Value::Filter>(f), std::forward<Where>(w));
141 void put(
const InfoHash& key,
143 DoneCallback cb =
nullptr,
144 time_point created = time_point::max(),
145 bool permanent =
false)
override;
146 void put(
const InfoHash& key,
148 DoneCallbackSimple cb,
149 time_point created = time_point::max(),
150 bool permanent =
false)
override
152 put(key, v, bindDoneCb(std::move(cb)), created, permanent);
155 void put(
const InfoHash& key,
157 DoneCallback cb =
nullptr,
158 time_point created = time_point::max(),
159 bool permanent =
false)
override
161 put(key, std::make_shared<Value>(std::move(v)), std::move(cb), created, permanent);
163 void put(
const InfoHash& key,
165 DoneCallbackSimple cb,
166 time_point created = time_point::max(),
167 bool permanent =
false)
override
169 put(key, std::forward<Value>(v), bindDoneCb(std::move(cb)), created, permanent);
197 [cb = std::move(cb)](
const std::vector<Sp<Value>>& vals,
bool expired) {
202 std::forward<Value::Filter>(f),
203 std::forward<Where>(w));
205 virtual size_t listen(
const InfoHash& key, GetCallbackSimple cb, Value::Filter f = {}, Where w = {})
override
207 return listen(key, bindGetCb(std::move(cb)), std::forward<Value::Filter>(f), std::forward<Where>(w));
213 virtual bool cancelListen(
const InfoHash& key,
size_t token)
override;
221 time_point periodic(
const uint8_t*,
size_t,
SockAddr,
const time_point& now)
override;
223 const uint8_t* buf,
size_t buflen,
const sockaddr* from, socklen_t fromlen,
const time_point& now)
override
225 return periodic(buf, buflen,
SockAddr(from, fromlen), now);
238 virtual void query(
const InfoHash& ,
241 Query&& = {})
override
243 virtual void query(
const InfoHash& key, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query&& q = {})
override
245 query(key, cb, bindDoneCb(std::move(done_cb)), std::forward<Query>(q));
251 std::vector<Sp<Value>>
getPut(
const InfoHash&)
const override;
256 Sp<Value>
getPut(
const InfoHash&,
const Value::Id&)
const override;
262 bool cancelPut(
const InfoHash&,
const Value::Id&)
override;
264 void pingNode(
SockAddr, DoneCallbackSimple&& = {})
override {}
266 virtual void registerType(
const ValueType& type)
override { types.registerType(type); }
267 const ValueType& getType(ValueType::Id type_id)
const override {
return types.getType(type_id); }
270 Sp<Value>
getLocalById(
const InfoHash& k, Value::Id
id)
const override;
277 void insertNode(
const NodeExport&)
override {}
279 std::pair<size_t, size_t> getLocalStoreSize()
const override {
return {}; }
280 std::vector<NodeExport>
exportNodes()
const override {
return {}; }
281 std::vector<ValuesExport> exportValues()
const override {
return {}; }
282 void importValues(
const std::vector<ValuesExport>&)
override {}
283 std::string getStorageLog()
const override {
return {}; }
284 std::string getStorageLog(
const InfoHash&)
const override {
return {}; }
285 std::string getRoutingTablesLog(sa_family_t)
const override {
return {}; }
286 std::string getSearchesLog(sa_family_t)
const override {
return {}; }
287 std::string getSearchLog(
const InfoHash&, sa_family_t)
const override {
return {}; }
288 void dumpTables()
const override {}
289 std::vector<unsigned> getNodeMessageStats(
bool)
override {
return {}; }
291 virtual size_t getStorageLimit()
const override {
return 0; }
293 virtual size_t getLocalStorageLimit()
const override {
return STORAGE_LIMIT_UNLIMITED; }
295 void connectivityChanged()
override
301 void listenKeepIdle(uint32_t seconds) { listenKeepIdle_ = seconds; }
302 inline uint32_t listenKeepIdle() {
return listenKeepIdle_; }
316 void getProxyInfos();
317 void queryProxyInfo(
const std::shared_ptr<InfoState>& infoState,
318 const std::shared_ptr<http::Resolver>& resolver,
320 void onProxyInfos(
const Json::Value& val,
const sa_family_t family);
321 SockAddr parsePublicAddress(
const Json::Value& val);
325 void handleExpireListener(
const asio::error_code& ec,
const InfoHash& key);
328 struct OperationState;
329 enum class ListenMethod {
334 using CacheValueCallback
335 = std::function<bool(
const std::vector<std::shared_ptr<Value>>& values,
bool expired, system_clock::time_point)>;
340 void sendListen(
const restinio::http_request_header_t& header,
341 const CacheValueCallback& cb,
342 const Sp<OperationState>& opstate,
344 ListenMethod method = ListenMethod::LISTEN);
345 void handleResubscribe(
const asio::error_code& ec,
348 std::shared_ptr<OperationState> opstate);
350 void doPut(
const InfoHash&, Sp<Value>, DoneCallbackSimple, time_point created,
bool permanent);
351 void handleRefreshPut(
const asio::error_code& ec, InfoHash key, Value::Id
id);
356 void getConnectivityStatus();
360 void cancelAllListeners();
362 std::atomic_bool isDestroying_ {
false};
364 std::string proxyUrl_;
365 dht::crypto::Identity clientIdentity_;
366 std::shared_ptr<dht::crypto::Certificate> serverCertificate_;
367 std::string userAgent_ {
"OpenDHT"};
368 std::string pushClientId_;
369 std::string pushSessionId_;
371 mutable std::mutex lockCurrentProxyInfos_;
372 NodeStatus statusIpv4_ {NodeStatus::Disconnected};
373 NodeStatus statusIpv6_ {NodeStatus::Disconnected};
374 NodeStats stats4_ {};
375 NodeStats stats6_ {};
376 SockAddr localAddrv4_;
377 SockAddr localAddrv6_;
378 SockAddr publicAddressV4_;
379 SockAddr publicAddressV6_;
380 std::atomic_bool launchConnectedCbs_ {
false};
381 PublicAddressChangedCb publicAddressChangedCb_ {};
392 asio::io_context httpContext_;
393 mutable std::mutex resolverLock_;
394 std::shared_ptr<http::Resolver> resolver_;
396 mutable std::mutex requestLock_;
397 std::map<unsigned, std::shared_ptr<http::Request>> requests_;
401 std::thread httpClientThread_;
408 mutable std::mutex searchLock_;
409 size_t listenerToken_ {0};
410 std::map<InfoHash, ProxySearch> searches_;
415 uint32_t listenKeepIdle_ {120};
420 std::mutex lockCallbacks_;
421 std::vector<std::function<void()>> callbacks_;
423 Sp<InfoState> infoState_;
428 void handleProxyConfirm(
const asio::error_code& ec);
429 std::unique_ptr<asio::steady_timer> nextProxyConfirmationTimer_;
430 std::unique_ptr<asio::steady_timer> listenerRestartTimer_;
435 void restartListeners(
const asio::error_code& ec);
441 void resubscribe(
const InfoHash& key,
const size_t token, Listener& listener);
447 std::string deviceKey_ {};
452 std::string notificationTopic_ {};
457 std::string platform_
468 const std::function<void()> loopSignal_;
470#ifdef OPENDHT_PUSH_NOTIFICATIONS
471 std::string fillBody(
bool resubscribe);
472 void getPushRequest(Json::Value&)
const;
475 Json::StreamWriterBuilder jsonBuilder_;
476 std::unique_ptr<Json::CharReader> jsonReader_;
478 std::shared_ptr<http::Request> buildRequest(
const std::string& target = {});