36class OPENDHT_PUBLIC DhtRunner
45 std::string proxy_server {};
46 std::string push_node_id {};
47 std::string push_token {};
48 std::string push_topic {};
49 std::string push_platform {};
50 std::string proxy_user_agent {};
51 bool peer_discovery {
false};
52 bool peer_publish {
false};
53 std::shared_ptr<dht::crypto::Certificate> server_ca;
54 dht::crypto::Identity client_identity;
60 std::shared_ptr<Logger> logger {};
61 std::unique_ptr<net::DatagramSocket> sock;
62 std::shared_ptr<PeerDiscovery> peerDiscovery {};
63 StatusCallback statusChangedCallback {};
64 CertificateStoreQueryLegacy certificateStore {};
65 CertificateStoreQuery certificateStorePkId {};
66 IdentityAnnouncedCb identityAnnouncedCb {};
67 PublicAddressChangedCb publicAddressChangedCb {};
68 std::unique_ptr<std::mt19937_64> rng {};
75 void get(InfoHash
id, GetCallbackSimple cb, DoneCallback donecb = {},
Value::Filter f = {}, Where w = {})
77 get(
id, bindGetCb(cb), donecb, f, w);
80 void get(InfoHash
id, GetCallbackSimple cb, DoneCallbackSimple donecb = {}, Value::Filter f = {}, Where w = {})
82 get(
id, bindGetCb(cb), donecb, f, w);
85 void get(InfoHash hash, GetCallback vcb, DoneCallback dcb, Value::Filter f = {}, Where w = {});
87 void get(InfoHash
id, GetCallback cb, DoneCallbackSimple donecb = {}, Value::Filter f = {}, Where w = {})
89 get(
id, cb, bindDoneCb(donecb), f, w);
91 void get(
const std::string& key, GetCallback vcb, DoneCallbackSimple dcb = {}, Value::Filter f = {}, Where w = {});
94 void get(InfoHash hash, std::function<
bool(std::vector<T>&&)> cb, DoneCallbackSimple dcb = {})
98 [cb = std::move(cb)](
const std::vector<std::shared_ptr<Value>>& vals) {
return cb(unpackVector<T>(vals)); },
103 void get(InfoHash hash, std::function<
bool(T&&)> cb, DoneCallbackSimple dcb = {})
107 [cb = std::move(cb)](
const std::vector<std::shared_ptr<Value>>& vals) {
108 for (
const auto& v : vals) {
110 if (not cb(Value::unpack<T>(*v)))
112 }
catch (
const std::exception&) {
122 std::future<std::vector<std::shared_ptr<dht::Value>>> get(InfoHash key, Value::Filter f = {}, Where w = {})
124 auto p = std::make_shared<std::promise<std::vector<std::shared_ptr<dht::Value>>>>();
125 auto values = std::make_shared<std::vector<std::shared_ptr<dht::Value>>>();
128 [=](
const std::vector<std::shared_ptr<dht::Value>>& vlist) {
129 values->insert(values->end(), vlist.begin(), vlist.end());
132 [=](bool) { p->set_value(std::move(*values)); },
135 return p->get_future();
139 std::future<std::vector<T>> get(InfoHash key)
141 auto p = std::make_shared<std::promise<std::vector<T>>>();
142 auto values = std::make_shared<std::vector<T>>();
146 values->emplace_back(std::move(v));
149 [=](bool) { p->set_value(std::move(*values)); });
150 return p->get_future();
153 void query(
const InfoHash& hash, QueryCallback cb, DoneCallback done_cb = {}, Query q = {});
154 void query(
const InfoHash& hash, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query q = {})
156 query(hash, cb, bindDoneCb(done_cb), q);
159 std::future<size_t> listen(InfoHash key, ValueCallback vcb, Value::Filter f = {}, Where w = {});
161 std::future<size_t> listen(InfoHash key, GetCallback cb, Value::Filter f = {}, Where w = {})
165 [cb = std::move(cb)](
const std::vector<Sp<Value>>& vals,
bool expired) {
170 std::forward<Value::Filter>(f),
171 std::forward<Where>(w));
173 std::future<size_t> listen(
const std::string& key, GetCallback vcb, Value::Filter f = {}, Where w = {});
174 std::future<size_t> listen(InfoHash key, GetCallbackSimple cb, Value::Filter f = {}, Where w = {})
176 return listen(key, bindGetCb(cb), f, w);
180 std::future<size_t> listen(InfoHash hash, std::function<
bool(std::vector<T>&&)> cb)
184 [cb = std::move(cb)](
const std::vector<std::shared_ptr<Value>>& vals) {
return cb(unpackVector<T>(vals)); },
188 std::future<size_t> listen(InfoHash hash, std::function<
bool(std::vector<T>&&,
bool)> cb)
192 [cb = std::move(cb)](
const std::vector<std::shared_ptr<Value>>& vals,
bool expired) {
193 return cb(unpackVector<T>(vals), expired);
199 std::future<size_t> listen(InfoHash hash, std::function<
bool(T&&)> cb, Value::Filter f = {}, Where w = {})
203 [cb = std::move(cb)](
const std::vector<std::shared_ptr<Value>>& vals) {
204 for (
const auto& v : vals) {
206 if (not cb(Value::unpack<T>(*v)))
208 }
catch (
const std::exception&) {
218 std::future<size_t> listen(InfoHash hash, std::function<
bool(T&&,
bool)> cb, Value::Filter f = {}, Where w = {})
222 [cb = std::move(cb)](
const std::vector<std::shared_ptr<Value>>& vals,
bool expired) {
223 for (
const auto& v : vals) {
225 if (not cb(Value::unpack<T>(*v), expired))
227 }
catch (
const std::exception&) {
237 void cancelListen(InfoHash h,
size_t token);
238 void cancelListen(InfoHash h, std::shared_future<size_t> token);
240 void put(InfoHash hash,
241 std::shared_ptr<Value> value,
242 DoneCallback cb = {},
243 time_point created = time_point::max(),
244 bool permanent =
false);
245 void put(InfoHash hash,
246 std::shared_ptr<Value> value,
247 DoneCallbackSimple cb,
248 time_point created = time_point::max(),
249 bool permanent =
false)
251 put(hash, value, bindDoneCb(cb), created, permanent);
254 void put(InfoHash hash,
256 DoneCallback cb = {},
257 time_point created = time_point::max(),
258 bool permanent =
false);
259 void put(InfoHash hash,
261 DoneCallbackSimple cb,
262 time_point created = time_point::max(),
263 bool permanent =
false)
265 put(hash, std::forward<Value>(value), bindDoneCb(cb), created, permanent);
267 void put(
const std::string& key,
269 DoneCallbackSimple cb = {},
270 time_point created = time_point::max(),
271 bool permanent =
false);
273 void cancelPut(
const InfoHash& h, Value::Id
id);
274 void cancelPut(
const InfoHash& h,
const std::shared_ptr<Value>& value);
276 void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb = {},
bool permanent =
false);
277 void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb,
bool permanent =
false)
279 putSigned(hash, value, bindDoneCb(cb), permanent);
282 void putSigned(InfoHash hash, Value&& value, DoneCallback cb = {},
bool permanent =
false);
283 void putSigned(InfoHash hash, Value&& value, DoneCallbackSimple cb,
bool permanent =
false)
285 putSigned(hash, std::forward<Value>(value), bindDoneCb(cb), permanent);
287 void putSigned(
const std::string& key, Value&& value, DoneCallbackSimple cb = {},
bool permanent =
false);
290 InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallback cb = {},
bool permanent =
false);
292 InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallbackSimple cb,
bool permanent =
false)
294 putEncrypted(hash, to, value, bindDoneCb(cb), permanent);
297 void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallback cb = {},
bool permanent =
false);
298 void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallbackSimple cb,
bool permanent =
false)
300 putEncrypted(hash, to, std::forward<Value>(value), bindDoneCb(cb), permanent);
302 void putEncrypted(
const std::string& key, InfoHash to, Value&& value, DoneCallback cb = {},
bool permanent =
false);
304 void putEncrypted(InfoHash hash,
305 const std::shared_ptr<crypto::PublicKey>& to,
306 std::shared_ptr<Value> value,
307 DoneCallback cb = {},
308 bool permanent =
false);
309 void putEncrypted(InfoHash hash,
310 const std::shared_ptr<crypto::PublicKey>& to,
311 std::shared_ptr<Value> value,
312 DoneCallbackSimple cb,
313 bool permanent =
false)
315 putEncrypted(hash, to, value, bindDoneCb(cb), permanent);
318 [[deprecated(
"Use the shared_ptr version instead")]]
319 void putEncrypted(InfoHash hash,
320 const std::shared_ptr<crypto::PublicKey>& to,
322 DoneCallback cb = {},
323 bool permanent =
false);
324 [[deprecated(
"Use the shared_ptr version instead")]]
325 void putEncrypted(InfoHash hash,
326 const std::shared_ptr<crypto::PublicKey>& to,
328 DoneCallbackSimple cb,
329 bool permanent =
false)
331 putEncrypted(hash, to, std::forward<Value>(value), bindDoneCb(cb), permanent);
335 InfoHash hash,
const PkId& to, std::shared_ptr<Value> value, DoneCallback cb = {},
bool permanent =
false);
337 InfoHash hash,
const PkId& to, std::shared_ptr<Value> value, DoneCallbackSimple cb,
bool permanent =
false)
339 putEncrypted(hash, to, value, bindDoneCb(cb), permanent);
346 void bootstrap(std::vector<SockAddr> nodes, DoneCallbackSimple cb = {});
347 void bootstrap(
SockAddr addr, DoneCallbackSimple cb = {});
361 void bootstrap(
const std::string& host,
const std::string& service);
362 void bootstrap(
const std::string& hostService);
382 void dumpTables()
const;
387 [[deprecated(
"Use getPublicKey()->getLongId() instead")]] InfoHash
getId()
const;
388 std::shared_ptr<crypto::PublicKey> getPublicKey()
const;
407 std::pair<size_t, size_t> getStoreSize()
const;
409 size_t getStorageLimit()
const;
410 void setStorageLimit(
size_t limit = 0);
412 size_t getLocalStorageLimit()
const;
413 void setLocalStorageLimit(
size_t limit = 0);
415 std::vector<NodeExport> exportNodes()
const;
417 std::vector<ValuesExport> exportValues()
const;
419 void setLogger(
const Sp<Logger>& logger = {});
426 void registerType(
const ValueType& type);
427 void registerInsecureType(
const ValueType& type);
429 void importValues(
const std::vector<ValuesExport>& values);
431 bool isRunning()
const {
return running != State::Idle; }
433 NodeStats getNodesStats(sa_family_t af)
const;
434 unsigned getNodesStats(sa_family_t af,
435 unsigned* good_return,
436 unsigned* dubious_return,
437 unsigned* cached_return,
438 unsigned* incoming_return)
const;
439 NodeInfo getNodeInfo()
const;
440 void getNodeInfo(std::function<
void(std::shared_ptr<NodeInfo>)>);
442 std::vector<unsigned> getNodeMessageStats(
bool in =
false)
const;
443 std::string getStorageLog()
const;
444 std::string getStorageLog(
const InfoHash&)
const;
445 std::string getRoutingTablesLog(sa_family_t af)
const;
446 std::string getSearchesLog(sa_family_t af = AF_UNSPEC)
const;
447 std::string getSearchLog(
const InfoHash&, sa_family_t af = AF_UNSPEC)
const;
448 std::vector<SockAddr> getPublicAddress(sa_family_t af = AF_UNSPEC)
const;
449 std::vector<std::string> getPublicAddressStr(sa_family_t af = AF_UNSPEC)
const;
450 void getPublicAddress(std::function<
void(std::vector<SockAddr>&&)>, sa_family_t af = AF_UNSPEC);
454 void findCertificate(InfoHash hash, std::function<
void(
const std::shared_ptr<crypto::Certificate>&)>);
455 void findCertificate(PkId hash, std::function<
void(
const std::shared_ptr<crypto::Certificate>&)>);
457 void registerCertificate(
const std::shared_ptr<crypto::Certificate>& cert);
458 void setLocalCertificateStore(CertificateStoreQueryLegacy&& query_method);
459 void setLocalCertificateStore(CertificateStoreQuery&& query_method);
467 void run(in_port_t port = dht::net::DHT_DEFAULT_PORT,
468 const crypto::Identity& identity = {},
469 bool threaded =
true,
473 config.dht_config.node_config.
network = network;
474 config.dht_config.id = identity;
475 config.threaded = threaded;
478 void run(in_port_t port, Config& config, Context&& context = {});
484 void run(
const char* ip4,
const char* ip6,
const char* service,
Config& config,
Context&& context = {});
486 void run(
const Config& config, Context&& context);
488 void setOnStatusChanged(StatusCallback&& cb)
491 statusCbs.emplace_back(std::move(cb));
501 std::lock_guard<std::mutex> lck(dht_mtx);
508 void shutdown(ShutdownCallback cb = {},
bool stop =
false);
519 std::shared_ptr<PeerDiscovery> getPeerDiscovery()
const {
return peerDiscovery_; };
521 void setProxyServer(
const std::string& proxy,
const std::string& pushNodeId =
"");
552 void forwardAllMessages(
bool forward);
555 enum class State { Idle, Running, Stopping };
559 NodeStatus getStatus()
const {
return std::max(status4, status6); }
561 bool checkShutdown();
563 DoneCallback bindOpDoneCallback(DoneCallback&& cb);
564 DoneCallbackSimple bindOpDoneCallback(DoneCallbackSimple&& cb);
567 std::unique_ptr<SecureDht> dht_;
570 std::atomic_bool use_proxy {
false};
574 IdentityAnnouncedCb identityAnnouncedCb_;
581 mutable std::mutex dht_mtx {};
582 std::thread dht_thread {};
583 std::condition_variable cv {};
584 std::mutex sock_mtx {};
585 net::PacketList rcv {};
586 decltype(rcv) rcv_free {};
588 std::queue<std::function<void(SecureDht&)>> pending_ops_prio {};
589 std::queue<std::function<void(SecureDht&)>> pending_ops {};
590 std::mutex storage_mtx {};
592 std::atomic<State> running {State::Idle};
593 std::atomic_size_t ongoing_ops {0};
594 std::vector<ShutdownCallback> shutdownCallbacks_;
596 NodeStatus status4 {NodeStatus::Disconnected}, status6 {NodeStatus::Disconnected};
598 std::vector<StatusCallback> statusCbs {};
601 std::shared_ptr<PeerDiscovery> peerDiscovery_;
607 std::shared_ptr<dht::Logger> logger_;