My Project 3.7.1
C++ Distributed Hash Table
Loading...
Searching...
No Matches
dht.h
1// Copyright (c) 2014-2026 Savoir-faire Linux Inc.
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include "infohash.h"
6#include "value.h"
7#include "utils.h"
8#include "network_engine.h"
9#include "scheduler.h"
10#include "routing_table.h"
11#include "callbacks.h"
12#include "dht_interface.h"
13
14#include <string>
15#include <array>
16#include <vector>
17#include <map>
18#include <functional>
19#include <memory>
20
21#ifdef _WIN32
22#include <iso646.h>
23#endif
24
25namespace dht {
26
27namespace net {
28struct Request;
29} /* namespace net */
30
31struct Storage;
32struct ValueStorage;
33class StorageBucket;
34struct Listener;
35struct LocalListener;
36
44class OPENDHT_PUBLIC Dht final : public DhtInterface
45{
46public:
51 Dht(std::unique_ptr<net::DatagramSocket>&& sock,
52 const Config& config,
53 const Sp<Logger>& l = {},
54 std::unique_ptr<std::mt19937_64>&& rd = {});
55
56 virtual ~Dht();
57
61 inline const InfoHash& getNodeId() const override { return myid; }
62 void setOnPublicAddressChanged(PublicAddressChangedCb cb) override { publicAddressChangedCb_ = std::move(cb); }
63
64 NodeStatus updateStatus(sa_family_t af) override;
65
69 NodeStatus getStatus(sa_family_t af) const override { return dht(af).status; }
70
71 NodeStatus getStatus() const override { return std::max(getStatus(AF_INET), getStatus(AF_INET6)); }
72
73 net::DatagramSocket* getSocket() const override { return network_engine.getSocket(); };
74
78 void shutdown(ShutdownCallback cb, bool stop = false) override;
79
86 bool isRunning(sa_family_t af = 0) const override;
87
88 virtual void registerType(const ValueType& type) override { types.registerType(type); }
89 const ValueType& getType(ValueType::Id type_id) const override { return types.getType(type_id); }
90
91 void addBootstrap(const std::string& host, const std::string& service) override
92 {
93 bootstrap_nodes.emplace_back(host, service);
94 startBootstrap();
95 }
96
97 void clearBootstrap() override { bootstrap_nodes.clear(); }
98
104 void insertNode(const InfoHash& id, const SockAddr&) override;
105 void insertNode(const NodeExport& n) override { insertNode(n.id, n.addr); }
106
107 void pingNode(SockAddr, DoneCallbackSimple&& cb = {}) override;
108
109 time_point periodic(const uint8_t* buf, size_t buflen, SockAddr, const time_point& now) override;
110 time_point periodic(
111 const uint8_t* buf, size_t buflen, const sockaddr* from, socklen_t fromlen, const time_point& now) override
112 {
113 return periodic(buf, buflen, SockAddr(from, fromlen), now);
114 }
115
126 virtual void get(
127 const InfoHash& key, GetCallback cb, DoneCallback donecb = {}, Value::Filter&& f = {}, Where&& w = {}) override;
128 virtual void get(const InfoHash& key,
129 GetCallback cb,
130 DoneCallbackSimple donecb = {},
131 Value::Filter&& f = {},
132 Where&& w = {}) override
133 {
134 get(key, cb, bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
135 }
136 virtual void get(const InfoHash& key,
137 GetCallbackSimple cb,
138 DoneCallback donecb = {},
139 Value::Filter&& f = {},
140 Where&& w = {}) override
141 {
142 get(key, bindGetCb(cb), donecb, std::forward<Value::Filter>(f), std::forward<Where>(w));
143 }
144 virtual void get(const InfoHash& key,
145 GetCallbackSimple cb,
146 DoneCallbackSimple donecb,
147 Value::Filter&& f = {},
148 Where&& w = {}) override
149 {
150 get(key, bindGetCb(cb), bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
151 }
162 virtual void query(const InfoHash& key, QueryCallback cb, DoneCallback done_cb = {}, Query&& q = {}) override;
163 virtual void query(const InfoHash& key, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query&& q = {}) override
164 {
165 query(key, cb, bindDoneCb(done_cb), std::forward<Query>(q));
166 }
167
171 std::vector<Sp<Value>> getLocal(const InfoHash& key, const Value::Filter& f = {}) const override;
172
176 Sp<Value> getLocalById(const InfoHash& key, Value::Id vid) const override;
177
184 void put(const InfoHash& key,
185 Sp<Value>,
186 DoneCallback cb = nullptr,
187 time_point created = time_point::max(),
188 bool permanent = false) override;
189 void put(const InfoHash& key,
190 const Sp<Value>& v,
191 DoneCallbackSimple cb,
192 time_point created = time_point::max(),
193 bool permanent = false) override
194 {
195 put(key, v, bindDoneCb(cb), created, permanent);
196 }
197
198 void put(const InfoHash& key,
199 Value&& v,
200 DoneCallback cb = nullptr,
201 time_point created = time_point::max(),
202 bool permanent = false) override
203 {
204 put(key, std::make_shared<Value>(std::move(v)), cb, created, permanent);
205 }
206 void put(const InfoHash& key,
207 Value&& v,
208 DoneCallbackSimple cb,
209 time_point created = time_point::max(),
210 bool permanent = false) override
211 {
212 put(key, std::forward<Value>(v), bindDoneCb(cb), created, permanent);
213 }
214
218 std::vector<Sp<Value>> getPut(const InfoHash&) const override;
219
223 Sp<Value> getPut(const InfoHash&, const Value::Id&) const override;
224
229 bool cancelPut(const InfoHash&, const Value::Id&) override;
230
238 size_t listen(const InfoHash&, ValueCallback, Value::Filter = {}, Where = {}) override;
239
240 size_t listen(const InfoHash& key, GetCallback cb, Value::Filter f = {}, Where w = {}) override
241 {
242 return listen(
243 key,
244 [cb](const std::vector<Sp<Value>>& vals, bool expired) {
245 if (not expired)
246 return cb(vals);
247 return true;
248 },
249 std::forward<Value::Filter>(f),
250 std::forward<Where>(w));
251 }
252 size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f = {}, Where w = {}) override
253 {
254 return listen(key, bindGetCb(cb), std::forward<Value::Filter>(f), std::forward<Where>(w));
255 }
256
257 bool cancelListen(const InfoHash&, size_t token) override;
258
264 void connectivityChanged(sa_family_t) override;
265 void connectivityChanged() override
266 {
267 connectivityChanged(AF_INET);
268 connectivityChanged(AF_INET6);
269 }
270
275 std::vector<NodeExport> exportNodes() const override;
276
277 std::vector<ValuesExport> exportValues() const override;
278 void importValues(const std::vector<ValuesExport>&) override;
279
280 void saveState(const std::string& path) const;
281 void loadState(const std::string& path);
282
283 NodeStats getNodesStats(sa_family_t af) const override;
284
285 std::string getStorageLog() const override;
286 std::string getStorageLog(const InfoHash&) const override;
287
288 std::string getRoutingTablesLog(sa_family_t) const override;
289 std::string getSearchesLog(sa_family_t) const override;
290 std::string getSearchLog(const InfoHash&, sa_family_t af = AF_UNSPEC) const override;
291
292 void dumpTables() const override;
293 std::vector<unsigned> getNodeMessageStats(bool in = false) override
294 {
295 return network_engine.getNodeMessageStats(in);
296 }
297
301 void setStorageLimit(size_t limit = 0) override { max_store_size = limit == 0 ? STORAGE_LIMIT_DEFAULT : limit; }
302 size_t getStorageLimit() const override { return max_store_size; }
303
307 void setLocalStorageLimit(size_t limit = 0) override
308 {
309 max_local_store_size = limit == 0 ? STORAGE_LIMIT_UNLIMITED : limit;
310 }
311 size_t getLocalStorageLimit() const override { return max_local_store_size; }
312
317 std::pair<size_t, size_t> getStoreSize() const override { return {total_store_size, total_values}; }
318
319 std::pair<size_t, size_t> getLocalStoreSize() const override;
320
321 std::vector<SockAddr> getPublicAddress(sa_family_t family = 0) override;
322
323 PushNotificationResult pushNotificationReceived(const std::map<std::string, std::string>&) override
324 {
325 return PushNotificationResult::IgnoredDisabled;
326 }
327 void resubscribe(unsigned) {}
328
329private:
330 /* When performing a search, we search for up to SEARCH_NODES closest nodes
331 to the destination, and use the additional ones to backtrack if any of
332 the target 8 turn out to be dead. */
333 static constexpr unsigned SEARCH_NODES {14};
334
335 /* The number of bad nodes is limited in order to help determine
336 * presence of connectivity changes. See
337 * https://github.com/savoirfairelinux/opendht/issues/137 for details.
338 *
339 * According to the tables, 25 is a good average value for big networks. If
340 * the network is small, normal search expiration process will handle the
341 * situation.
342 * */
343 static constexpr unsigned SEARCH_MAX_BAD_NODES {25};
344
345 /* Concurrent search nodes requested count */
346 static constexpr unsigned MAX_REQUESTED_SEARCH_NODES {4};
347
348 /* Number of listening nodes */
349 static constexpr unsigned LISTEN_NODES {4};
350
351 /* The maximum number of hashes we're willing to track. */
352 static constexpr unsigned MAX_HASHES {1024 * 1024 * 1024};
353
354 /* The maximum number of searches we keep data about. */
355 static constexpr unsigned MAX_SEARCHES {1024 * 1024};
356
357 static constexpr std::chrono::minutes MAX_STORAGE_MAINTENANCE_EXPIRE_TIME {10};
358
359 /* The time after which we consider a search to be expirable. */
360 static constexpr std::chrono::minutes SEARCH_EXPIRE_TIME {62};
361
362 /* Timeout for listen */
363 static constexpr duration LISTEN_EXPIRE_TIME {std::chrono::seconds(30)};
364 static constexpr duration LISTEN_EXPIRE_TIME_PUBLIC {std::chrono::minutes(5)};
365
366 static constexpr duration REANNOUNCE_MARGIN {std::chrono::seconds(10)};
367
368 static constexpr std::chrono::seconds BOOTSTRAP_PERIOD {10};
369
370 static constexpr size_t TOKEN_SIZE {32};
371
372 // internal structures
373 struct SearchNode;
374 struct Get;
375 struct Announce;
376 struct Search;
377
378 // prevent copy
379 Dht(const Dht&) = delete;
380 Dht& operator=(const Dht&) = delete;
381
382 std::mt19937_64 rd;
383
384 InfoHash myid {};
385
386 uint64_t secret {};
387 uint64_t oldsecret {};
388
389 // registred types
390 TypeStore types;
391
392 using SearchMap = std::map<InfoHash, Sp<Search>>;
393 using ReportedAddr = std::pair<unsigned, SockAddr>;
394
395 struct Kad
396 {
397 RoutingTable buckets {};
398 SearchMap searches {};
399 unsigned pending_pings {0};
400 NodeStatus status;
401 std::vector<ReportedAddr> reported_addr;
402
403 NodeStatus getStatus(time_point now) const;
404 NodeStats getNodesStats(time_point now, const InfoHash& myid) const;
405 };
406
407 Kad dht4 {};
408 Kad dht6 {};
409 PublicAddressChangedCb publicAddressChangedCb_ {};
410
411 std::vector<std::pair<std::string, std::string>> bootstrap_nodes {};
412 std::chrono::steady_clock::duration bootstrap_period {BOOTSTRAP_PERIOD};
413 Sp<Scheduler::Job> bootstrapJob {};
414
415 std::map<InfoHash, Storage> store;
416 std::map<SockAddr, StorageBucket, SockAddr::ipCmp> store_quota;
417 std::unique_ptr<StorageBucket> local_store_quota;
418 size_t total_values {0};
419 size_t total_store_size {0};
420 size_t max_store_keys {MAX_HASHES};
421 size_t max_store_size {STORAGE_LIMIT_DEFAULT};
422 size_t max_local_store_size {STORAGE_LIMIT_UNLIMITED};
423
424 size_t max_searches {MAX_SEARCHES};
425 size_t search_id {0};
426
427 // map a global listen token to IPv4, IPv6 specific listen tokens.
428 // 0 is the invalid token.
429 std::map<size_t, std::tuple<size_t, size_t, size_t>> listeners {};
430 size_t listener_token {1};
431
432 // timing
433 Scheduler scheduler;
434 Sp<Scheduler::Job> nextNodesConfirmation {};
435 Sp<Scheduler::Job> nextStorageMaintenance {};
436
437 net::NetworkEngine network_engine;
438
439 std::string persistPath;
440
441 // are we a bootstrap node ?
442 // note: Any running node can be used as a bootstrap node.
443 // Only nodes running only as bootstrap nodes should
444 // be put in bootstrap mode.
445 const bool is_bootstrap {false};
446 const bool maintain_storage {false};
447 const bool public_stable {false};
448
449 inline const duration& getListenExpiration() const
450 {
451 return public_stable ? LISTEN_EXPIRE_TIME_PUBLIC : LISTEN_EXPIRE_TIME;
452 }
453
454 void rotateSecrets();
455
456 Blob makeToken(const SockAddr&, bool old) const;
457 bool tokenMatch(const Blob& token, const SockAddr&) const;
458
459 void reportedAddr(const SockAddr&);
460
461 // Storage
462 void storageAddListener(const InfoHash& id, const Sp<Node>& node, size_t tid, Query&& = {}, int version = 0);
463 bool storageStore(const InfoHash& id,
464 const Sp<Value>& value,
465 time_point created,
466 const SockAddr& sa = {},
467 bool permanent = false);
468 bool storageRefresh(const InfoHash& id, Value::Id vid);
469 void expireStore();
470 void expireStorage(InfoHash h);
471 void expireStore(decltype(store)::iterator);
472
473 void storageRemoved(const InfoHash& id, Storage& st, const std::vector<Sp<Value>>& values, size_t totalSize);
474 void storageChanged(const InfoHash& id, Storage& st, const Sp<Value>&, bool newValue);
475 std::string printStorageLog(const decltype(store)::value_type&) const;
476
482 void dataPersistence(InfoHash id);
483 size_t maintainStorage(decltype(store)::value_type&, bool force = false, const DoneCallback& donecb = {});
484
485 // Buckets
486 Kad& dht(sa_family_t af) { return af == AF_INET ? dht4 : dht6; }
487 const Kad& dht(sa_family_t af) const { return af == AF_INET ? dht4 : dht6; }
488 RoutingTable& buckets(sa_family_t af) { return dht(af).buckets; }
489 const RoutingTable& buckets(sa_family_t af) const { return dht(af).buckets; }
490 Bucket* findBucket(const InfoHash& id, sa_family_t af)
491 {
492 auto& b = buckets(af);
493 auto it = b.findBucket(id);
494 return it == b.end() ? nullptr : &(*it);
495 }
496 const Bucket* findBucket(const InfoHash& id, sa_family_t af) const
497 {
498 return const_cast<Dht*>(this)->findBucket(id, af);
499 }
500
501 void expireBuckets(RoutingTable&);
502 void sendCachedPing(Bucket& b);
503 bool bucketMaintenance(RoutingTable&);
504 void dumpBucket(const Bucket& b, std::ostream& out) const;
505 void bootstrap();
506 void startBootstrap();
507 void stopBootstrap();
508
509 // Nodes
510 void onNewNode(const Sp<Node>& node, int confirm);
511 const Sp<Node> findNode(const InfoHash& id, sa_family_t af) const;
512 bool trySearchInsert(const Sp<Node>& node);
513
514 // Searches
515 inline SearchMap& searches(sa_family_t af) { return dht(af).searches; }
516 inline const SearchMap& searches(sa_family_t af) const { return dht(af).searches; }
517
522 Sp<Search> search(const InfoHash& id,
523 sa_family_t af,
524 GetCallback = {},
525 QueryCallback = {},
526 DoneCallback = {},
527 Value::Filter = {},
528 const Sp<Query>& q = {});
529
530 void announce(const InfoHash& id,
531 sa_family_t af,
532 Sp<Value> value,
533 DoneCallback callback,
534 time_point created = time_point::max(),
535 bool permanent = false);
536 size_t listenTo(const InfoHash& id, sa_family_t af, ValueCallback cb, Value::Filter f = {}, const Sp<Query>& q = {});
537
545 unsigned refill(Search& sr);
546 void expireSearches();
547
548 void confirmNodes();
549 void expire();
550
551 void onConnected();
552 void onDisconnected();
553
562 void searchNodeGetDone(const net::Request& status,
563 net::RequestAnswer&& answer,
564 std::weak_ptr<Search> ws,
565 Sp<Query> query);
566
576 void searchNodeGetExpired(const net::Request& status, bool over, std::weak_ptr<Search> ws, Sp<Query> query);
577
585 void paginate(std::weak_ptr<Search> ws, Sp<Query> query, SearchNode* n);
586
590 SearchNode* searchSendGetValues(Sp<Search> sr, SearchNode* n = nullptr, bool update = true);
591
598 void searchSendAnnounceValue(const Sp<Search>& sr, unsigned syncLevel = TARGET_NODES);
599
607 void searchStep(std::weak_ptr<Search> ws);
608
609 void searchSynchedNodeListen(const Sp<Search>&, SearchNode&);
610
611 void dumpSearch(const Search& sr, std::ostream& out) const;
612
613 bool neighbourhoodMaintenance(RoutingTable&);
614
615 void onError(Sp<net::Request> node, net::DhtProtocolException e);
616 /* when our address is reported by a distant peer. */
617 void onReportedAddr(const InfoHash& id, const SockAddr&);
618 /* when we receive a ping request */
619 net::RequestAnswer onPing(Sp<Node> node);
620 /* when we receive a "find node" request */
621 net::RequestAnswer onFindNode(Sp<Node> node, const InfoHash& hash, want_t want);
622 void onFindNodeDone(const Sp<Node>& status, net::RequestAnswer& a, Sp<Search> sr);
623 /* when we receive a "get values" request */
624 net::RequestAnswer onGetValues(Sp<Node> node, const InfoHash& hash, want_t want, const Query& q);
625 void onGetValuesDone(const Sp<Node>& status, net::RequestAnswer& a, Sp<Search>& sr, const Sp<Query>& orig_query);
626 /* when we receive a listen request */
627 net::RequestAnswer onListen(
628 Sp<Node> node, const InfoHash& hash, const Blob& token, size_t socket_id, const Query& query, int version = 0);
629 void onListenDone(const Sp<Node>& status, net::RequestAnswer& a, Sp<Search>& sr);
630 /* when we receive an announce request */
631 net::RequestAnswer onAnnounce(Sp<Node> node,
632 const InfoHash& hash,
633 const Blob& token,
634 const std::vector<Sp<Value>>& v,
635 const time_point& created);
636 net::RequestAnswer onRefresh(Sp<Node> node, const InfoHash& hash, const Blob& token, const Value::Id& vid);
637 void onAnnounceDone(const Sp<Node>& status, net::RequestAnswer& a, Sp<Search>& sr);
638};
639
640} // namespace dht
Dht(std::unique_ptr< net::DatagramSocket > &&sock, const Config &config, const Sp< Logger > &l={}, std::unique_ptr< std::mt19937_64 > &&rd={})
void insertNode(const InfoHash &id, const SockAddr &) override
NodeStatus updateStatus(sa_family_t af) override
NodeStatus getStatus(sa_family_t af) const override
Definition dht.h:69
std::pair< size_t, size_t > getStoreSize() const override
Definition dht.h:317
Sp< Value > getLocalById(const InfoHash &key, Value::Id vid) const override
std::vector< Sp< Value > > getPut(const InfoHash &) const override
std::vector< NodeExport > exportNodes() const override
size_t listen(const InfoHash &, ValueCallback, Value::Filter={}, Where={}) override
bool cancelPut(const InfoHash &, const Value::Id &) override
void put(const InfoHash &key, Sp< Value >, DoneCallback cb=nullptr, time_point created=time_point::max(), bool permanent=false) override
void connectivityChanged(sa_family_t) override
size_t listen(const InfoHash &key, GetCallback cb, Value::Filter f={}, Where w={}) override
Definition dht.h:240
bool isRunning(sa_family_t af=0) const override
Sp< Value > getPut(const InfoHash &, const Value::Id &) const override
void shutdown(ShutdownCallback cb, bool stop=false) override
virtual void get(const InfoHash &key, GetCallback cb, DoneCallback donecb={}, Value::Filter &&f={}, Where &&w={}) override
std::vector< Sp< Value > > getLocal(const InfoHash &key, const Value::Filter &f={}) const override
PushNotificationResult pushNotificationReceived(const std::map< std::string, std::string > &) override
Definition dht.h:323
void setStorageLimit(size_t limit=0) override
Definition dht.h:301
virtual void query(const InfoHash &key, QueryCallback cb, DoneCallback done_cb={}, Query &&q={}) override
const InfoHash & getNodeId() const override
Definition dht.h:61
void setLocalStorageLimit(size_t limit=0) override
Definition dht.h:307
std::vector< uint8_t > Blob
Definition utils.h:156
NodeStatus
Definition callbacks.h:24
Describes a query destined to another peer.
Definition value.h:988
Serializable dht::Value filter.
Definition value.h:850