44class OPENDHT_PUBLIC
Dht final :
public DhtInterface
51 Dht(std::unique_ptr<net::DatagramSocket>&& sock,
53 const Sp<Logger>& l = {},
54 std::unique_ptr<std::mt19937_64>&& rd = {});
61 inline const InfoHash&
getNodeId()
const override {
return myid; }
62 void setOnPublicAddressChanged(PublicAddressChangedCb cb)
override { publicAddressChangedCb_ = std::move(cb); }
71 NodeStatus getStatus()
const override {
return std::max(getStatus(AF_INET), getStatus(AF_INET6)); }
73 net::DatagramSocket* getSocket()
const override {
return network_engine.getSocket(); };
78 void shutdown(ShutdownCallback cb,
bool stop =
false)
override;
88 virtual void registerType(
const ValueType& type)
override { types.registerType(type); }
89 const ValueType& getType(ValueType::Id type_id)
const override {
return types.getType(type_id); }
91 void addBootstrap(
const std::string& host,
const std::string& service)
override
93 bootstrap_nodes.emplace_back(host, service);
97 void clearBootstrap()
override { bootstrap_nodes.clear(); }
107 void pingNode(
SockAddr, DoneCallbackSimple&& cb = {})
override;
109 time_point periodic(
const uint8_t* buf,
size_t buflen, SockAddr,
const time_point& now)
override;
111 const uint8_t* buf,
size_t buflen,
const sockaddr* from, socklen_t fromlen,
const time_point& now)
override
113 return periodic(buf, buflen, SockAddr(from, fromlen), now);
127 const InfoHash& key, GetCallback cb, DoneCallback donecb = {},
Value::Filter&& f = {}, Where&& w = {})
override;
128 virtual void get(
const InfoHash& key,
130 DoneCallbackSimple donecb = {},
131 Value::Filter&& f = {},
132 Where&& w = {})
override
134 get(key, cb, bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
136 virtual void get(
const InfoHash& key,
137 GetCallbackSimple cb,
138 DoneCallback donecb = {},
139 Value::Filter&& f = {},
140 Where&& w = {})
override
142 get(key, bindGetCb(cb), donecb, std::forward<Value::Filter>(f), std::forward<Where>(w));
144 virtual void get(
const InfoHash& key,
145 GetCallbackSimple cb,
146 DoneCallbackSimple donecb,
147 Value::Filter&& f = {},
148 Where&& w = {})
override
150 get(key, bindGetCb(cb), bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
162 virtual void query(
const InfoHash& key, QueryCallback cb, DoneCallback done_cb = {},
Query&& q = {})
override;
163 virtual void query(
const InfoHash& key, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query&& q = {})
override
165 query(key, cb, bindDoneCb(done_cb), std::forward<Query>(q));
176 Sp<Value>
getLocalById(
const InfoHash& key, Value::Id vid)
const override;
184 void put(
const InfoHash& key,
186 DoneCallback cb =
nullptr,
187 time_point created = time_point::max(),
188 bool permanent =
false)
override;
189 void put(
const InfoHash& key,
191 DoneCallbackSimple cb,
192 time_point created = time_point::max(),
193 bool permanent =
false)
override
195 put(key, v, bindDoneCb(cb), created, permanent);
198 void put(
const InfoHash& key,
200 DoneCallback cb =
nullptr,
201 time_point created = time_point::max(),
202 bool permanent =
false)
override
204 put(key, std::make_shared<Value>(std::move(v)), cb, created, permanent);
206 void put(
const InfoHash& key,
208 DoneCallbackSimple cb,
209 time_point created = time_point::max(),
210 bool permanent =
false)
override
212 put(key, std::forward<Value>(v), bindDoneCb(cb), created, permanent);
218 std::vector<Sp<Value>>
getPut(
const InfoHash&)
const override;
223 Sp<Value>
getPut(
const InfoHash&,
const Value::Id&)
const override;
229 bool cancelPut(
const InfoHash&,
const Value::Id&)
override;
244 [cb](
const std::vector<Sp<Value>>& vals,
bool expired) {
249 std::forward<Value::Filter>(f),
250 std::forward<Where>(w));
252 size_t listen(
const InfoHash& key, GetCallbackSimple cb, Value::Filter f = {}, Where w = {})
override
254 return listen(key, bindGetCb(cb), std::forward<Value::Filter>(f), std::forward<Where>(w));
257 bool cancelListen(
const InfoHash&,
size_t token)
override;
277 std::vector<ValuesExport> exportValues()
const override;
278 void importValues(
const std::vector<ValuesExport>&)
override;
280 void saveState(
const std::string& path)
const;
281 void loadState(
const std::string& path);
283 NodeStats getNodesStats(sa_family_t af)
const override;
285 std::string getStorageLog()
const override;
286 std::string getStorageLog(
const InfoHash&)
const override;
288 std::string getRoutingTablesLog(sa_family_t)
const override;
289 std::string getSearchesLog(sa_family_t)
const override;
290 std::string getSearchLog(
const InfoHash&, sa_family_t af = AF_UNSPEC)
const override;
292 void dumpTables()
const override;
293 std::vector<unsigned> getNodeMessageStats(
bool in =
false)
override
295 return network_engine.getNodeMessageStats(in);
301 void setStorageLimit(
size_t limit = 0)
override { max_store_size = limit == 0 ? STORAGE_LIMIT_DEFAULT : limit; }
302 size_t getStorageLimit()
const override {
return max_store_size; }
309 max_local_store_size = limit == 0 ? STORAGE_LIMIT_UNLIMITED : limit;
311 size_t getLocalStorageLimit()
const override {
return max_local_store_size; }
317 std::pair<size_t, size_t>
getStoreSize()
const override {
return {total_store_size, total_values}; }
319 std::pair<size_t, size_t> getLocalStoreSize()
const override;
321 std::vector<SockAddr> getPublicAddress(sa_family_t family = 0)
override;
325 return PushNotificationResult::IgnoredDisabled;
327 void resubscribe(
unsigned) {}
333 static constexpr unsigned SEARCH_NODES {14};
343 static constexpr unsigned SEARCH_MAX_BAD_NODES {25};
346 static constexpr unsigned MAX_REQUESTED_SEARCH_NODES {4};
349 static constexpr unsigned LISTEN_NODES {4};
352 static constexpr unsigned MAX_HASHES {1024 * 1024 * 1024};
355 static constexpr unsigned MAX_SEARCHES {1024 * 1024};
357 static constexpr std::chrono::minutes MAX_STORAGE_MAINTENANCE_EXPIRE_TIME {10};
360 static constexpr std::chrono::minutes SEARCH_EXPIRE_TIME {62};
363 static constexpr duration LISTEN_EXPIRE_TIME {std::chrono::seconds(30)};
364 static constexpr duration LISTEN_EXPIRE_TIME_PUBLIC {std::chrono::minutes(5)};
366 static constexpr duration REANNOUNCE_MARGIN {std::chrono::seconds(10)};
368 static constexpr std::chrono::seconds BOOTSTRAP_PERIOD {10};
370 static constexpr size_t TOKEN_SIZE {32};
379 Dht(
const Dht&) =
delete;
380 Dht& operator=(
const Dht&) =
delete;
387 uint64_t oldsecret {};
392 using SearchMap = std::map<InfoHash, Sp<Search>>;
393 using ReportedAddr = std::pair<unsigned, SockAddr>;
397 RoutingTable buckets {};
398 SearchMap searches {};
399 unsigned pending_pings {0};
401 std::vector<ReportedAddr> reported_addr;
404 NodeStats getNodesStats(time_point now,
const InfoHash& myid)
const;
409 PublicAddressChangedCb publicAddressChangedCb_ {};
411 std::vector<std::pair<std::string, std::string>> bootstrap_nodes {};
412 std::chrono::steady_clock::duration bootstrap_period {BOOTSTRAP_PERIOD};
413 Sp<Scheduler::Job> bootstrapJob {};
415 std::map<InfoHash, Storage> store;
416 std::map<SockAddr, StorageBucket, SockAddr::ipCmp> store_quota;
417 std::unique_ptr<StorageBucket> local_store_quota;
418 size_t total_values {0};
419 size_t total_store_size {0};
420 size_t max_store_keys {MAX_HASHES};
421 size_t max_store_size {STORAGE_LIMIT_DEFAULT};
422 size_t max_local_store_size {STORAGE_LIMIT_UNLIMITED};
424 size_t max_searches {MAX_SEARCHES};
425 size_t search_id {0};
429 std::map<size_t, std::tuple<size_t, size_t, size_t>> listeners {};
430 size_t listener_token {1};
434 Sp<Scheduler::Job> nextNodesConfirmation {};
435 Sp<Scheduler::Job> nextStorageMaintenance {};
437 net::NetworkEngine network_engine;
439 std::string persistPath;
445 const bool is_bootstrap {
false};
446 const bool maintain_storage {
false};
447 const bool public_stable {
false};
449 inline const duration& getListenExpiration()
const
451 return public_stable ? LISTEN_EXPIRE_TIME_PUBLIC : LISTEN_EXPIRE_TIME;
454 void rotateSecrets();
456 Blob makeToken(
const SockAddr&,
bool old)
const;
457 bool tokenMatch(
const Blob& token,
const SockAddr&)
const;
459 void reportedAddr(
const SockAddr&);
462 void storageAddListener(
const InfoHash&
id,
const Sp<Node>& node,
size_t tid, Query&& = {},
int version = 0);
463 bool storageStore(
const InfoHash&
id,
464 const Sp<Value>& value,
466 const SockAddr& sa = {},
467 bool permanent =
false);
468 bool storageRefresh(
const InfoHash&
id, Value::Id vid);
470 void expireStorage(InfoHash h);
471 void expireStore(
decltype(store)::iterator);
473 void storageRemoved(
const InfoHash&
id, Storage& st,
const std::vector<Sp<Value>>& values,
size_t totalSize);
474 void storageChanged(
const InfoHash&
id, Storage& st,
const Sp<Value>&,
bool newValue);
475 std::string printStorageLog(
const decltype(store)::value_type&)
const;
482 void dataPersistence(InfoHash
id);
483 size_t maintainStorage(
decltype(store)::value_type&,
bool force =
false,
const DoneCallback& donecb = {});
486 Kad& dht(sa_family_t af) {
return af == AF_INET ? dht4 : dht6; }
487 const Kad& dht(sa_family_t af)
const {
return af == AF_INET ? dht4 : dht6; }
488 RoutingTable& buckets(sa_family_t af) {
return dht(af).buckets; }
489 const RoutingTable& buckets(sa_family_t af)
const {
return dht(af).buckets; }
490 Bucket* findBucket(
const InfoHash&
id, sa_family_t af)
492 auto& b = buckets(af);
493 auto it = b.findBucket(
id);
494 return it == b.end() ? nullptr : &(*it);
496 const Bucket* findBucket(
const InfoHash&
id, sa_family_t af)
const
498 return const_cast<Dht*
>(
this)->findBucket(
id, af);
501 void expireBuckets(RoutingTable&);
502 void sendCachedPing(Bucket& b);
503 bool bucketMaintenance(RoutingTable&);
504 void dumpBucket(
const Bucket& b, std::ostream& out)
const;
506 void startBootstrap();
507 void stopBootstrap();
510 void onNewNode(
const Sp<Node>& node,
int confirm);
511 const Sp<Node> findNode(
const InfoHash&
id, sa_family_t af)
const;
512 bool trySearchInsert(
const Sp<Node>& node);
515 inline SearchMap& searches(sa_family_t af) {
return dht(af).searches; }
516 inline const SearchMap& searches(sa_family_t af)
const {
return dht(af).searches; }
522 Sp<Search> search(
const InfoHash&
id,
528 const Sp<Query>& q = {});
530 void announce(
const InfoHash&
id,
533 DoneCallback callback,
534 time_point created = time_point::max(),
535 bool permanent =
false);
536 size_t listenTo(
const InfoHash&
id, sa_family_t af, ValueCallback cb, Value::Filter f = {},
const Sp<Query>& q = {});
545 unsigned refill(Search& sr);
546 void expireSearches();
552 void onDisconnected();
562 void searchNodeGetDone(
const net::Request& status,
563 net::RequestAnswer&& answer,
564 std::weak_ptr<Search> ws,
576 void searchNodeGetExpired(
const net::Request& status,
bool over, std::weak_ptr<Search> ws, Sp<Query> query);
585 void paginate(std::weak_ptr<Search> ws, Sp<Query> query, SearchNode* n);
590 SearchNode* searchSendGetValues(Sp<Search> sr, SearchNode* n =
nullptr,
bool update =
true);
598 void searchSendAnnounceValue(
const Sp<Search>& sr,
unsigned syncLevel = TARGET_NODES);
607 void searchStep(std::weak_ptr<Search> ws);
609 void searchSynchedNodeListen(
const Sp<Search>&, SearchNode&);
611 void dumpSearch(
const Search& sr, std::ostream& out)
const;
613 bool neighbourhoodMaintenance(RoutingTable&);
615 void onError(Sp<net::Request> node, net::DhtProtocolException e);
617 void onReportedAddr(
const InfoHash&
id,
const SockAddr&);
619 net::RequestAnswer onPing(Sp<Node> node);
621 net::RequestAnswer onFindNode(Sp<Node> node,
const InfoHash& hash, want_t want);
622 void onFindNodeDone(
const Sp<Node>& status, net::RequestAnswer& a, Sp<Search> sr);
624 net::RequestAnswer onGetValues(Sp<Node> node,
const InfoHash& hash, want_t want,
const Query& q);
625 void onGetValuesDone(
const Sp<Node>& status, net::RequestAnswer& a, Sp<Search>& sr,
const Sp<Query>& orig_query);
627 net::RequestAnswer onListen(
628 Sp<Node> node,
const InfoHash& hash,
const Blob& token,
size_t socket_id,
const Query& query,
int version = 0);
629 void onListenDone(
const Sp<Node>& status, net::RequestAnswer& a, Sp<Search>& sr);
631 net::RequestAnswer onAnnounce(Sp<Node> node,
632 const InfoHash& hash,
634 const std::vector<Sp<Value>>& v,
635 const time_point& created);
636 net::RequestAnswer onRefresh(Sp<Node> node,
const InfoHash& hash,
const Blob& token,
const Value::Id& vid);
637 void onAnnounceDone(
const Sp<Node>& status, net::RequestAnswer& a, Sp<Search>& sr);