My Project 3.7.1
C++ Distributed Hash Table
Loading...
Searching...
No Matches
network_engine.h
1// Copyright (c) 2014-2026 Savoir-faire Linux Inc.
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include "node_cache.h"
6#include "value.h"
7#include "infohash.h"
8#include "node.h"
9#include "scheduler.h"
10#include "utils.h"
11#include "rng.h"
12#include "rate_limiter.h"
13#include "logger.h"
14#include "network_utils.h"
15
16#include <vector>
17#include <string>
18#include <functional>
19#include <algorithm>
20#include <memory>
21#include <queue>
22
23namespace dht {
24namespace net {
25
26struct Request;
27struct Socket;
28struct TransId;
29
30#ifndef MSG_CONFIRM
31#define MSG_CONFIRM 0
32#endif
33
35{
36 NetId network {0};
37 ssize_t max_req_per_sec {0};
38 ssize_t max_peer_req_per_sec {0};
39 bool is_client {false};
40};
41
42class DhtProtocolException : public DhtException
43{
44public:
45 // sent to another peer (http-like).
46 static const constexpr uint16_t NON_AUTHORITATIVE_INFORMATION {203}; /* incomplete request packet. */
47 static const constexpr uint16_t UNAUTHORIZED {401}; /* incorrect tokens. */
48 static const constexpr uint16_t NOT_FOUND {404}; /* storage not found */
49 // for internal use (custom).
50 static const constexpr uint16_t INVALID_TID_SIZE {421}; /* id was truncated. */
51 static const constexpr uint16_t UNKNOWN_TID {422}; /* unknown tid */
52 static const constexpr uint16_t WRONG_NODE_INFO_BUF_LEN {423}; /* node info length is incorrect */
53
54 static const std::string GET_NO_INFOHASH; /* received "get" request with no infohash */
55 static const std::string LISTEN_NO_INFOHASH; /* got "listen" request without infohash */
56 static const std::string LISTEN_WRONG_TOKEN; /* incorrect token in "listen" request */
57 static const std::string PUT_NO_INFOHASH; /* no infohash in "put" request */
58 static const std::string PUT_WRONG_TOKEN; /* got "put" request with incorrect token */
59 static const std::string STORAGE_NOT_FOUND; /* got access request for an unknown storage */
60 static const std::string PUT_INVALID_ID; /* invalid id in "put" request */
61
62 DhtProtocolException(uint16_t code, const std::string& msg = "", InfoHash failing_node_id = {})
63 : DhtException(msg)
64 , msg(msg)
65 , code(code)
66 , failing_node_id(failing_node_id)
67 {}
68
69 const std::string& getMsg() const { return msg; }
70 uint16_t getCode() const { return code; }
71 const InfoHash& getNodeId() const { return failing_node_id; }
72
73private:
74 std::string msg;
75 uint16_t code;
76 InfoHash failing_node_id;
77};
78
79struct ParsedMessage;
80
84struct RequestAnswer
85{
86 Blob ntoken {};
87 Value::Id vid {};
88 std::vector<Sp<Value>> values {};
89 std::vector<Value::Id> refreshed_values {};
90 std::vector<Value::Id> expired_values {};
91 std::vector<Sp<FieldValueIndex>> fields {};
92 std::vector<Sp<Node>> nodes4 {};
93 std::vector<Sp<Node>> nodes6 {};
94 RequestAnswer() {}
95 RequestAnswer(ParsedMessage&& msg);
96};
97
116class NetworkEngine final
117{
118private:
122 std::function<void(Sp<Request>, DhtProtocolException)> onError;
123
130 std::function<void(const Sp<Node>&, int)> onNewNode;
137 std::function<void(const InfoHash&, const SockAddr&)> onReportedAddr;
143 std::function<RequestAnswer(Sp<Node>)> onPing {};
152 std::function<RequestAnswer(Sp<Node>, const InfoHash&, want_t)> onFindNode {};
161 std::function<RequestAnswer(Sp<Node>, const InfoHash&, want_t, const Query&)> onGetValues {};
170 std::function<RequestAnswer(Sp<Node>, const InfoHash&, const Blob&, Tid, const Query&, int)> onListen {};
180 std::function<RequestAnswer(Sp<Node>, const InfoHash&, const Blob&, const std::vector<Sp<Value>>&, const time_point&)>
181 onAnnounce {};
190 std::function<RequestAnswer(Sp<Node>, const InfoHash&, const Blob&, const Value::Id&)> onRefresh {};
191
192public:
193 using RequestCb = std::function<void(const Request&, RequestAnswer&&)>;
194 using RequestErrorCb = std::function<bool(const Request&, DhtProtocolException&&)>;
195 using RequestExpiredCb = std::function<void(const Request&, bool)>;
196
197 NetworkEngine(InfoHash& myid,
198 NetworkConfig config,
199 std::unique_ptr<DatagramSocket>&& sock,
200 const Sp<Logger>& log,
201 std::mt19937_64& rd,
202 Scheduler& scheduler,
203 decltype(NetworkEngine::onError)&& onError,
204 decltype(NetworkEngine::onNewNode)&& onNewNode,
205 decltype(NetworkEngine::onReportedAddr)&& onReportedAddr,
206 decltype(NetworkEngine::onPing)&& onPing,
207 decltype(NetworkEngine::onFindNode)&& onFindNode,
208 decltype(NetworkEngine::onGetValues)&& onGetValues,
209 decltype(NetworkEngine::onListen)&& onListen,
210 decltype(NetworkEngine::onAnnounce)&& onAnnounce,
211 decltype(NetworkEngine::onRefresh)&& onRefresh);
212
213 ~NetworkEngine();
214
215 net::DatagramSocket* getSocket() const { return dht_socket.get(); };
216
217 void clear();
218
234 void tellListener(const Sp<Node>& n,
235 Tid socket_id,
236 const InfoHash& hash,
237 want_t want,
238 const Blob& ntoken,
239 std::vector<Sp<Node>>&& nodes,
240 std::vector<Sp<Node>>&& nodes6,
241 std::vector<Sp<Value>>&& values,
242 const Query& q,
243 int version);
244
245 void tellListenerRefreshed(const Sp<Node>& n,
246 Tid socket_id,
247 const InfoHash& hash,
248 const Blob& ntoken,
249 const std::vector<Value::Id>& values,
250 int version);
251 void tellListenerExpired(const Sp<Node>& n,
252 Tid socket_id,
253 const InfoHash& hash,
254 const Blob& ntoken,
255 const std::vector<Value::Id>& values,
256 int version);
257
258 bool isRunning(sa_family_t af) const;
259 inline want_t want() const { return dht_socket->hasIPv4() and dht_socket->hasIPv6() ? (WANT4 | WANT6) : -1; }
260
261 void connectivityChanged(sa_family_t);
262
263 /**************
264 * Requests *
265 **************/
266
276 Sp<Request> sendPing(const Sp<Node>& n, RequestCb&& on_done, RequestExpiredCb&& on_expired);
277
288 Sp<Request> sendPing(SockAddr&& sa, RequestCb&& on_done, RequestExpiredCb&& on_expired)
289 {
290 return sendPing(std::make_shared<Node>(InfoHash::zero(), std::move(sa), rd),
291 std::forward<RequestCb>(on_done),
292 std::forward<RequestExpiredCb>(on_expired));
293 }
294
307 Sp<Request> sendFindNode(const Sp<Node>& n,
308 const InfoHash& hash,
309 want_t want = -1,
310 RequestCb&& on_done = {},
311 RequestExpiredCb&& on_expired = {});
326 Sp<Request> sendGetValues(const Sp<Node>& n,
327 const InfoHash& hash,
328 const Query& query,
329 want_t want,
330 RequestCb&& on_done,
331 RequestExpiredCb&& on_expired);
355 Sp<Request> sendListen(const Sp<Node>& n,
356 const InfoHash& hash,
357 const Query& query,
358 const Blob& token,
359 Tid socketId,
360 RequestCb&& on_done,
361 RequestExpiredCb&& on_expired);
375 Sp<Request> sendAnnounceValue(const Sp<Node>& n,
376 const InfoHash& hash,
377 const Sp<Value>& v,
378 time_point created,
379 const Blob& token,
380 RequestCb&& on_done,
381 RequestExpiredCb&& on_expired);
395 Sp<Request> sendRefreshValue(const Sp<Node>& n,
396 const InfoHash& hash,
397 const Value::Id& vid,
398 const Blob& token,
399 RequestCb&& on_done,
400 RequestErrorCb&& on_error,
401 RequestExpiredCb&& on_expired);
414 void sendUpdateValues(const Sp<Node>& n,
415 const InfoHash& infohash,
416 std::vector<Sp<Value>>&& values,
417 time_point created,
418 const Blob& token,
419 size_t sid);
420 Sp<Request> sendUpdateValues(const Sp<Node>& n,
421 const InfoHash& infohash,
422 std::vector<Sp<Value>>::iterator begin,
423 std::vector<Sp<Value>>::iterator end,
424 time_point created,
425 const Blob& token,
426 size_t sid);
427
437 void processMessage(const uint8_t* buf, size_t buflen, SockAddr addr);
438
439 Sp<Node> insertNode(const InfoHash& id, const SockAddr& addr)
440 {
441 auto n = cache.getNode(id, addr, scheduler.time(), 0);
442 onNewNode(n, 0);
443 return n;
444 }
445
446 std::vector<unsigned> getNodeMessageStats(bool in)
447 {
448 auto& st = in ? in_stats : out_stats;
449 std::vector<unsigned> stats {st.ping, st.find, st.get, st.listen, st.put};
450 st = {};
451 return stats;
452 }
453
454 void blacklistNode(const Sp<Node>& n);
455
456 std::vector<Sp<Node>> getCachedNodes(const InfoHash& id, sa_family_t sa_f, size_t count)
457 {
458 return cache.getCachedNodes(id, sa_f, count);
459 }
460
461 size_t getNodeCacheSize() const { return cache.size(); }
462 size_t getNodeCacheSize(sa_family_t af) const { return cache.size(af); }
463
464 size_t getRateLimiterSize() const { return address_rate_limiter.size(); }
465
466 size_t getPartialCount() const { return partial_messages.size(); }
467
468private:
469 struct PartialMessage;
470
471 /***************
472 * Constants *
473 ***************/
474 /* the length of a node info buffer in ipv4 format */
475 static const constexpr size_t NODE4_INFO_BUF_LEN {HASH_LEN + sizeof(in_addr) + sizeof(in_port_t)};
476 /* the length of a node info buffer in ipv6 format */
477 static const constexpr size_t NODE6_INFO_BUF_LEN {HASH_LEN + sizeof(in6_addr) + sizeof(in_port_t)};
478 /* after a UDP reply, the period during which we tell the link layer about it */
479 static constexpr std::chrono::seconds UDP_REPLY_TIME {15};
480
481 /* Max. time to receive a full fragmented packet */
482 static constexpr std::chrono::seconds RX_MAX_PACKET_TIME {10};
483 /* Max. time between packet fragments */
484 static constexpr std::chrono::seconds RX_TIMEOUT {3};
485 /* The maximum number of nodes that we snub. There is probably little
486 reason to increase this value. */
487 static constexpr unsigned BLACKLISTED_MAX {10};
488
489 static constexpr size_t MTU {1280};
490 static constexpr size_t MAX_PACKET_VALUE_SIZE {600};
491 static constexpr size_t MAX_MESSAGE_VALUE_SIZE {56 * 1024};
492
493 void process(std::unique_ptr<ParsedMessage>&&, const SockAddr& from);
494
495 bool rateLimit(const SockAddr& addr);
496
497 static bool isMartian(const SockAddr& addr);
498 bool isNodeBlacklisted(const SockAddr& addr) const;
499
500 void requestStep(Sp<Request> req);
501
506 void sendRequest(const Sp<Request>& request);
507
508 struct MessageStats
509 {
510 unsigned ping {0};
511 unsigned find {0};
512 unsigned get {0};
513 unsigned put {0};
514 unsigned listen {0};
515 unsigned refresh {0};
516 unsigned updateValue {0};
517 };
518
519 // basic wrapper for socket sendto function
520 int send(const SockAddr& addr, const char* buf, size_t len, bool confirmed = false);
521
522 void sendValueParts(Tid tid, const std::vector<Blob>& svals, const SockAddr& addr);
523 std::vector<Blob> packValueHeader(msgpack::sbuffer&,
524 std::vector<Sp<Value>>::const_iterator,
525 std::vector<Sp<Value>>::const_iterator) const;
526 std::vector<Blob> packValueHeader(msgpack::sbuffer& buf, const std::vector<Sp<Value>>& values) const
527 {
528 return packValueHeader(buf, values.begin(), values.end());
529 }
530 void maintainRxBuffer(Tid tid);
531
532 /*************
533 * Answers *
534 *************/
535 /* answer to a ping request */
536 void sendPong(const SockAddr& addr, Tid tid);
537 /* answer to findnodes/getvalues request */
538 void sendNodesValues(const SockAddr& addr,
539 Tid tid,
540 const Blob& nodes,
541 const Blob& nodes6,
542 const std::vector<Sp<Value>>& st,
543 const Query& query,
544 const Blob& token);
545 Blob bufferNodes(sa_family_t af, const InfoHash& id, std::vector<Sp<Node>>& nodes);
546
547 std::pair<Blob, Blob> bufferNodes(
548 sa_family_t af, const InfoHash& id, want_t want, std::vector<Sp<Node>>& nodes, std::vector<Sp<Node>>& nodes6);
549 /* answer to a listen request */
550 void sendListenConfirmation(const SockAddr& addr, Tid tid);
551 /* answer to put request */
552 void sendValueAnnounced(const SockAddr& addr, Tid, Value::Id);
553 /* answer in case of error */
554 void sendError(const SockAddr& addr, Tid tid, uint16_t code, const std::string& message, bool include_id = false);
555
556 void deserializeNodes(ParsedMessage& msg, const SockAddr& from);
557
558 /* DHT info */
559 const InfoHash& myid;
560 const NetworkConfig config {};
561 const std::unique_ptr<DatagramSocket> dht_socket;
562 Sp<Logger> logger_;
563 std::mt19937_64& rd;
564
565 NodeCache cache;
566
567 // global limiting should be triggered by at least 8 different IPs
568 using IpLimiter = RateLimiter;
569 using IpLimiterMap = std::map<SockAddr, IpLimiter, SockAddr::ipCmp>;
570 IpLimiterMap address_rate_limiter;
571 RateLimiter rate_limiter;
572 ssize_t limiter_maintenance {0};
573
574 // requests handling
575 std::map<Tid, Sp<Request>> requests {};
576 std::map<Tid, PartialMessage> partial_messages;
577
578 MessageStats in_stats {}, out_stats {};
579 std::set<SockAddr> blacklist {};
580
581 Scheduler& scheduler;
582
583 bool logIncoming_ {false};
584};
585
586} /* namespace net */
587} /* namespace dht */
Job scheduler.
Definition scheduler.h:19
void processMessage(const uint8_t *buf, size_t buflen, SockAddr addr)
void tellListener(const Sp< Node > &n, Tid socket_id, const InfoHash &hash, want_t want, const Blob &ntoken, std::vector< Sp< Node > > &&nodes, std::vector< Sp< Node > > &&nodes6, std::vector< Sp< Value > > &&values, const Query &q, int version)
Sp< Request > sendPing(SockAddr &&sa, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendFindNode(const Sp< Node > &n, const InfoHash &hash, want_t want=-1, RequestCb &&on_done={}, RequestExpiredCb &&on_expired={})
void sendUpdateValues(const Sp< Node > &n, const InfoHash &infohash, std::vector< Sp< Value > > &&values, time_point created, const Blob &token, size_t sid)
Sp< Request > sendRefreshValue(const Sp< Node > &n, const InfoHash &hash, const Value::Id &vid, const Blob &token, RequestCb &&on_done, RequestErrorCb &&on_error, RequestExpiredCb &&on_expired)
Sp< Request > sendGetValues(const Sp< Node > &n, const InfoHash &hash, const Query &query, want_t want, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendPing(const Sp< Node > &n, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendAnnounceValue(const Sp< Node > &n, const InfoHash &hash, const Sp< Value > &v, time_point created, const Blob &token, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendListen(const Sp< Node > &n, const InfoHash &hash, const Query &query, const Blob &token, Tid socketId, RequestCb &&on_done, RequestExpiredCb &&on_expired)
std::vector< uint8_t > Blob
Definition utils.h:156
Describes a query destined to another peer.
Definition value.h:988