My Project 3.7.1
C++ Distributed Hash Table
Loading...
Searching...
No Matches
dhtrunner.h
1// Copyright (c) 2014-2026 Savoir-faire Linux Inc.
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include "def.h"
6#include "infohash.h"
7#include "value.h"
8#include "callbacks.h"
9#include "sockaddr.h"
10#include "logger.h"
11#include "network_utils.h"
12#include "node_export.h"
13
14#include <thread>
15#include <mutex>
16#include <atomic>
17#include <condition_variable>
18#include <future>
19#include <exception>
20#include <queue>
21#include <chrono>
22
23namespace dht {
24
25struct Node;
26class SecureDht;
27class PeerDiscovery;
28struct SecureDhtConfig;
29
36class OPENDHT_PUBLIC DhtRunner
37{
38public:
39 using StatusCallback = std::function<void(NodeStatus, NodeStatus)>;
40
41 struct Config
42 {
43 SecureDhtConfig dht_config {};
44 bool threaded {true};
45 std::string proxy_server {};
46 std::string push_node_id {};
47 std::string push_token {};
48 std::string push_topic {};
49 std::string push_platform {};
50 std::string proxy_user_agent {};
51 bool peer_discovery {false};
52 bool peer_publish {false};
53 std::shared_ptr<dht::crypto::Certificate> server_ca;
54 dht::crypto::Identity client_identity;
55 SockAddr bind4 {}, bind6 {};
56 };
57
58 struct Context
59 {
60 std::shared_ptr<Logger> logger {};
61 std::unique_ptr<net::DatagramSocket> sock;
62 std::shared_ptr<PeerDiscovery> peerDiscovery {};
63 StatusCallback statusChangedCallback {};
64 CertificateStoreQueryLegacy certificateStore {};
65 CertificateStoreQuery certificateStorePkId {};
66 IdentityAnnouncedCb identityAnnouncedCb {};
67 PublicAddressChangedCb publicAddressChangedCb {};
68 std::unique_ptr<std::mt19937_64> rng {};
69 Context() {}
70 };
71
72 DhtRunner();
73 virtual ~DhtRunner();
74
75 void get(InfoHash id, GetCallbackSimple cb, DoneCallback donecb = {}, Value::Filter f = {}, Where w = {})
76 {
77 get(id, bindGetCb(cb), donecb, f, w);
78 }
79
80 void get(InfoHash id, GetCallbackSimple cb, DoneCallbackSimple donecb = {}, Value::Filter f = {}, Where w = {})
81 {
82 get(id, bindGetCb(cb), donecb, f, w);
83 }
84
85 void get(InfoHash hash, GetCallback vcb, DoneCallback dcb, Value::Filter f = {}, Where w = {});
86
87 void get(InfoHash id, GetCallback cb, DoneCallbackSimple donecb = {}, Value::Filter f = {}, Where w = {})
88 {
89 get(id, cb, bindDoneCb(donecb), f, w);
90 }
91 void get(const std::string& key, GetCallback vcb, DoneCallbackSimple dcb = {}, Value::Filter f = {}, Where w = {});
92
93 template<class T>
94 void get(InfoHash hash, std::function<bool(std::vector<T>&&)> cb, DoneCallbackSimple dcb = {})
95 {
96 get(
97 hash,
98 [cb = std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals) { return cb(unpackVector<T>(vals)); },
99 dcb,
100 getFilterSet<T>());
101 }
102 template<class T>
103 void get(InfoHash hash, std::function<bool(T&&)> cb, DoneCallbackSimple dcb = {})
104 {
105 get(
106 hash,
107 [cb = std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals) {
108 for (const auto& v : vals) {
109 try {
110 if (not cb(Value::unpack<T>(*v)))
111 return false;
112 } catch (const std::exception&) {
113 continue;
114 }
115 }
116 return true;
117 },
118 dcb,
119 getFilterSet<T>());
120 }
121
122 std::future<std::vector<std::shared_ptr<dht::Value>>> get(InfoHash key, Value::Filter f = {}, Where w = {})
123 {
124 auto p = std::make_shared<std::promise<std::vector<std::shared_ptr<dht::Value>>>>();
125 auto values = std::make_shared<std::vector<std::shared_ptr<dht::Value>>>();
126 get(
127 key,
128 [=](const std::vector<std::shared_ptr<dht::Value>>& vlist) {
129 values->insert(values->end(), vlist.begin(), vlist.end());
130 return true;
131 },
132 [=](bool) { p->set_value(std::move(*values)); },
133 f,
134 w);
135 return p->get_future();
136 }
137
138 template<class T>
139 std::future<std::vector<T>> get(InfoHash key)
140 {
141 auto p = std::make_shared<std::promise<std::vector<T>>>();
142 auto values = std::make_shared<std::vector<T>>();
143 get<T>(
144 key,
145 [=](T&& v) {
146 values->emplace_back(std::move(v));
147 return true;
148 },
149 [=](bool) { p->set_value(std::move(*values)); });
150 return p->get_future();
151 }
152
153 void query(const InfoHash& hash, QueryCallback cb, DoneCallback done_cb = {}, Query q = {});
154 void query(const InfoHash& hash, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query q = {})
155 {
156 query(hash, cb, bindDoneCb(done_cb), q);
157 }
158
159 std::future<size_t> listen(InfoHash key, ValueCallback vcb, Value::Filter f = {}, Where w = {});
160
161 std::future<size_t> listen(InfoHash key, GetCallback cb, Value::Filter f = {}, Where w = {})
162 {
163 return listen(
164 key,
165 [cb = std::move(cb)](const std::vector<Sp<Value>>& vals, bool expired) {
166 if (not expired)
167 return cb(vals);
168 return true;
169 },
170 std::forward<Value::Filter>(f),
171 std::forward<Where>(w));
172 }
173 std::future<size_t> listen(const std::string& key, GetCallback vcb, Value::Filter f = {}, Where w = {});
174 std::future<size_t> listen(InfoHash key, GetCallbackSimple cb, Value::Filter f = {}, Where w = {})
175 {
176 return listen(key, bindGetCb(cb), f, w);
177 }
178
179 template<class T>
180 std::future<size_t> listen(InfoHash hash, std::function<bool(std::vector<T>&&)> cb)
181 {
182 return listen(
183 hash,
184 [cb = std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals) { return cb(unpackVector<T>(vals)); },
185 getFilterSet<T>());
186 }
187 template<class T>
188 std::future<size_t> listen(InfoHash hash, std::function<bool(std::vector<T>&&, bool)> cb)
189 {
190 return listen(
191 hash,
192 [cb = std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals, bool expired) {
193 return cb(unpackVector<T>(vals), expired);
194 },
195 getFilterSet<T>());
196 }
197
198 template<typename T>
199 std::future<size_t> listen(InfoHash hash, std::function<bool(T&&)> cb, Value::Filter f = {}, Where w = {})
200 {
201 return listen(
202 hash,
203 [cb = std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals) {
204 for (const auto& v : vals) {
205 try {
206 if (not cb(Value::unpack<T>(*v)))
207 return false;
208 } catch (const std::exception&) {
209 continue;
210 }
211 }
212 return true;
213 },
214 getFilterSet<T>(f),
215 w);
216 }
217 template<typename T>
218 std::future<size_t> listen(InfoHash hash, std::function<bool(T&&, bool)> cb, Value::Filter f = {}, Where w = {})
219 {
220 return listen(
221 hash,
222 [cb = std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals, bool expired) {
223 for (const auto& v : vals) {
224 try {
225 if (not cb(Value::unpack<T>(*v), expired))
226 return false;
227 } catch (const std::exception&) {
228 continue;
229 }
230 }
231 return true;
232 },
233 getFilterSet<T>(f),
234 w);
235 }
236
237 void cancelListen(InfoHash h, size_t token);
238 void cancelListen(InfoHash h, std::shared_future<size_t> token);
239
240 void put(InfoHash hash,
241 std::shared_ptr<Value> value,
242 DoneCallback cb = {},
243 time_point created = time_point::max(),
244 bool permanent = false);
245 void put(InfoHash hash,
246 std::shared_ptr<Value> value,
247 DoneCallbackSimple cb,
248 time_point created = time_point::max(),
249 bool permanent = false)
250 {
251 put(hash, value, bindDoneCb(cb), created, permanent);
252 }
253
254 void put(InfoHash hash,
255 Value&& value,
256 DoneCallback cb = {},
257 time_point created = time_point::max(),
258 bool permanent = false);
259 void put(InfoHash hash,
260 Value&& value,
261 DoneCallbackSimple cb,
262 time_point created = time_point::max(),
263 bool permanent = false)
264 {
265 put(hash, std::forward<Value>(value), bindDoneCb(cb), created, permanent);
266 }
267 void put(const std::string& key,
268 Value&& value,
269 DoneCallbackSimple cb = {},
270 time_point created = time_point::max(),
271 bool permanent = false);
272
273 void cancelPut(const InfoHash& h, Value::Id id);
274 void cancelPut(const InfoHash& h, const std::shared_ptr<Value>& value);
275
276 void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb = {}, bool permanent = false);
277 void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb, bool permanent = false)
278 {
279 putSigned(hash, value, bindDoneCb(cb), permanent);
280 }
281
282 void putSigned(InfoHash hash, Value&& value, DoneCallback cb = {}, bool permanent = false);
283 void putSigned(InfoHash hash, Value&& value, DoneCallbackSimple cb, bool permanent = false)
284 {
285 putSigned(hash, std::forward<Value>(value), bindDoneCb(cb), permanent);
286 }
287 void putSigned(const std::string& key, Value&& value, DoneCallbackSimple cb = {}, bool permanent = false);
288
289 void putEncrypted(
290 InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallback cb = {}, bool permanent = false);
291 void putEncrypted(
292 InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallbackSimple cb, bool permanent = false)
293 {
294 putEncrypted(hash, to, value, bindDoneCb(cb), permanent);
295 }
296
297 void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallback cb = {}, bool permanent = false);
298 void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallbackSimple cb, bool permanent = false)
299 {
300 putEncrypted(hash, to, std::forward<Value>(value), bindDoneCb(cb), permanent);
301 }
302 void putEncrypted(const std::string& key, InfoHash to, Value&& value, DoneCallback cb = {}, bool permanent = false);
303
304 void putEncrypted(InfoHash hash,
305 const std::shared_ptr<crypto::PublicKey>& to,
306 std::shared_ptr<Value> value,
307 DoneCallback cb = {},
308 bool permanent = false);
309 void putEncrypted(InfoHash hash,
310 const std::shared_ptr<crypto::PublicKey>& to,
311 std::shared_ptr<Value> value,
312 DoneCallbackSimple cb,
313 bool permanent = false)
314 {
315 putEncrypted(hash, to, value, bindDoneCb(cb), permanent);
316 }
317
318 [[deprecated("Use the shared_ptr version instead")]]
319 void putEncrypted(InfoHash hash,
320 const std::shared_ptr<crypto::PublicKey>& to,
321 Value&& value,
322 DoneCallback cb = {},
323 bool permanent = false);
324 [[deprecated("Use the shared_ptr version instead")]]
325 void putEncrypted(InfoHash hash,
326 const std::shared_ptr<crypto::PublicKey>& to,
327 Value&& value,
328 DoneCallbackSimple cb,
329 bool permanent = false)
330 {
331 putEncrypted(hash, to, std::forward<Value>(value), bindDoneCb(cb), permanent);
332 }
333
334 void putEncrypted(
335 InfoHash hash, const PkId& to, std::shared_ptr<Value> value, DoneCallback cb = {}, bool permanent = false);
336 void putEncrypted(
337 InfoHash hash, const PkId& to, std::shared_ptr<Value> value, DoneCallbackSimple cb, bool permanent = false)
338 {
339 putEncrypted(hash, to, value, bindDoneCb(cb), permanent);
340 }
341
346 void bootstrap(std::vector<SockAddr> nodes, DoneCallbackSimple cb = {});
347 void bootstrap(SockAddr addr, DoneCallbackSimple cb = {});
348
353 void bootstrap(std::vector<NodeExport> nodes);
354
361 void bootstrap(const std::string& host, const std::string& service);
362 void bootstrap(const std::string& hostService);
363
368 void bootstrap(const InfoHash& id, const SockAddr& address);
369
374
381
382 void dumpTables() const;
383
387 [[deprecated("Use getPublicKey()->getLongId() instead")]] InfoHash getId() const;
388 std::shared_ptr<crypto::PublicKey> getPublicKey() const;
389
393 InfoHash getNodeId() const;
394
399 SockAddr getBound(sa_family_t f = AF_INET) const;
400
405 in_port_t getBoundPort(sa_family_t f = AF_INET) const;
406
407 std::pair<size_t, size_t> getStoreSize() const;
408
409 size_t getStorageLimit() const;
410 void setStorageLimit(size_t limit = 0);
411
412 size_t getLocalStorageLimit() const;
413 void setLocalStorageLimit(size_t limit = 0);
414
415 std::vector<NodeExport> exportNodes() const;
416
417 std::vector<ValuesExport> exportValues() const;
418
419 void setLogger(const Sp<Logger>& logger = {});
420
424 void setLogFilter(const InfoHash& f = {});
425
426 void registerType(const ValueType& type);
427 void registerInsecureType(const ValueType& type);
428
429 void importValues(const std::vector<ValuesExport>& values);
430
431 bool isRunning() const { return running != State::Idle; }
432
433 NodeStats getNodesStats(sa_family_t af) const;
434 unsigned getNodesStats(sa_family_t af,
435 unsigned* good_return,
436 unsigned* dubious_return,
437 unsigned* cached_return,
438 unsigned* incoming_return) const;
439 NodeInfo getNodeInfo() const;
440 void getNodeInfo(std::function<void(std::shared_ptr<NodeInfo>)>);
441
442 std::vector<unsigned> getNodeMessageStats(bool in = false) const;
443 std::string getStorageLog() const;
444 std::string getStorageLog(const InfoHash&) const;
445 std::string getRoutingTablesLog(sa_family_t af) const;
446 std::string getSearchesLog(sa_family_t af = AF_UNSPEC) const;
447 std::string getSearchLog(const InfoHash&, sa_family_t af = AF_UNSPEC) const;
448 std::vector<SockAddr> getPublicAddress(sa_family_t af = AF_UNSPEC) const;
449 std::vector<std::string> getPublicAddressStr(sa_family_t af = AF_UNSPEC) const;
450 void getPublicAddress(std::function<void(std::vector<SockAddr>&&)>, sa_family_t af = AF_UNSPEC);
451
452 // securedht methods
453
454 void findCertificate(InfoHash hash, std::function<void(const std::shared_ptr<crypto::Certificate>&)>);
455 void findCertificate(PkId hash, std::function<void(const std::shared_ptr<crypto::Certificate>&)>);
456
457 void registerCertificate(const std::shared_ptr<crypto::Certificate>& cert);
458 void setLocalCertificateStore(CertificateStoreQueryLegacy&& query_method);
459 void setLocalCertificateStore(CertificateStoreQuery&& query_method);
460
467 void run(in_port_t port = dht::net::DHT_DEFAULT_PORT,
468 const crypto::Identity& identity = {},
469 bool threaded = true,
470 NetId network = 0)
471 {
472 Config config;
473 config.dht_config.node_config.network = network;
474 config.dht_config.id = identity;
475 config.threaded = threaded;
476 run(port, config);
477 }
478 void run(in_port_t port, Config& config, Context&& context = {});
479
484 void run(const char* ip4, const char* ip6, const char* service, Config& config, Context&& context = {});
485
486 void run(const Config& config, Context&& context);
487
488 void setOnStatusChanged(StatusCallback&& cb)
489 {
490 if (cb)
491 statusCbs.emplace_back(std::move(cb));
492 }
493
499 time_point loop()
500 {
501 std::lock_guard<std::mutex> lck(dht_mtx);
502 return loop_();
503 }
504
508 void shutdown(ShutdownCallback cb = {}, bool stop = false);
509
517 void join();
518
519 std::shared_ptr<PeerDiscovery> getPeerDiscovery() const { return peerDiscovery_; };
520
521 void setProxyServer(const std::string& proxy, const std::string& pushNodeId = "");
522
527 void enableProxy(bool proxify);
528
529 /* Push notification methods */
530
534 void setPushNotificationToken(const std::string& token);
535
539 void setPushNotificationTopic(const std::string& topic);
540
544 void setPushNotificationPlatform(const std::string& platform);
545
549 std::future<PushNotificationResult> pushNotificationReceived(const std::map<std::string, std::string>& data);
550
551 /* Proxy server mothods */
552 void forwardAllMessages(bool forward);
553
554private:
555 enum class State { Idle, Running, Stopping };
556
557 time_point loop_();
558
559 NodeStatus getStatus() const { return std::max(status4, status6); }
560
561 bool checkShutdown();
562 void opEnded();
563 DoneCallback bindOpDoneCallback(DoneCallback&& cb);
564 DoneCallbackSimple bindOpDoneCallback(DoneCallbackSimple&& cb);
565
567 std::unique_ptr<SecureDht> dht_;
568
570 std::atomic_bool use_proxy {false};
571
573 Config config_;
574 IdentityAnnouncedCb identityAnnouncedCb_;
575
579 void resetDht();
580
581 mutable std::mutex dht_mtx {};
582 std::thread dht_thread {};
583 std::condition_variable cv {};
584 std::mutex sock_mtx {};
585 net::PacketList rcv {};
586 decltype(rcv) rcv_free {};
587
588 std::queue<std::function<void(SecureDht&)>> pending_ops_prio {};
589 std::queue<std::function<void(SecureDht&)>> pending_ops {};
590 std::mutex storage_mtx {};
591
592 std::atomic<State> running {State::Idle};
593 std::atomic_size_t ongoing_ops {0};
594 std::vector<ShutdownCallback> shutdownCallbacks_;
595
596 NodeStatus status4 {NodeStatus::Disconnected}, status6 {NodeStatus::Disconnected};
597
598 std::vector<StatusCallback> statusCbs {};
599
601 std::shared_ptr<PeerDiscovery> peerDiscovery_;
602
607 std::shared_ptr<dht::Logger> logger_;
608};
609
610} // namespace dht
InfoHash getId() const
in_port_t getBoundPort(sa_family_t f=AF_INET) const
void clearBootstrap()
void shutdown(ShutdownCallback cb={}, bool stop=false)
void bootstrap(const std::string &host, const std::string &service)
void setPushNotificationToken(const std::string &token)
void connectivityChanged()
time_point loop()
Definition dhtrunner.h:499
InfoHash getNodeId() const
void run(in_port_t port=dht::net::DHT_DEFAULT_PORT, const crypto::Identity &identity={}, bool threaded=true, NetId network=0)
Definition dhtrunner.h:467
void bootstrap(std::vector< SockAddr > nodes, DoneCallbackSimple cb={})
void setLogFilter(const InfoHash &f={})
SockAddr getBound(sa_family_t f=AF_INET) const
void run(const char *ip4, const char *ip6, const char *service, Config &config, Context &&context={})
void enableProxy(bool proxify)
void setPushNotificationPlatform(const std::string &platform)
void bootstrap(const InfoHash &id, const SockAddr &address)
std::future< PushNotificationResult > pushNotificationReceived(const std::map< std::string, std::string > &data)
void setPushNotificationTopic(const std::string &topic)
void bootstrap(std::vector< NodeExport > nodes)
NodeStatus
Definition callbacks.h:24
NetId network
Definition callbacks.h:108