42class DhtProtocolException :
public DhtException
46 static const constexpr uint16_t NON_AUTHORITATIVE_INFORMATION {203};
47 static const constexpr uint16_t UNAUTHORIZED {401};
48 static const constexpr uint16_t NOT_FOUND {404};
50 static const constexpr uint16_t INVALID_TID_SIZE {421};
51 static const constexpr uint16_t UNKNOWN_TID {422};
52 static const constexpr uint16_t WRONG_NODE_INFO_BUF_LEN {423};
54 static const std::string GET_NO_INFOHASH;
55 static const std::string LISTEN_NO_INFOHASH;
56 static const std::string LISTEN_WRONG_TOKEN;
57 static const std::string PUT_NO_INFOHASH;
58 static const std::string PUT_WRONG_TOKEN;
59 static const std::string STORAGE_NOT_FOUND;
60 static const std::string PUT_INVALID_ID;
62 DhtProtocolException(uint16_t code,
const std::string& msg =
"", InfoHash failing_node_id = {})
66 , failing_node_id(failing_node_id)
69 const std::string& getMsg()
const {
return msg; }
70 uint16_t getCode()
const {
return code; }
71 const InfoHash& getNodeId()
const {
return failing_node_id; }
76 InfoHash failing_node_id;
116class NetworkEngine final
130 std::function<void(
const Sp<Node>&,
int)> onNewNode;
137 std::function<void(
const InfoHash&,
const SockAddr&)> onReportedAddr;
152 std::function<
RequestAnswer(Sp<Node>,
const InfoHash&, want_t)> onFindNode {};
161 std::function<
RequestAnswer(Sp<Node>,
const InfoHash&, want_t,
const Query&)> onGetValues {};
180 std::function<
RequestAnswer(Sp<Node>,
const InfoHash&,
const Blob&,
const std::vector<Sp<Value>>&,
const time_point&)>
190 std::function<
RequestAnswer(Sp<Node>,
const InfoHash&,
const Blob&,
const Value::Id&)> onRefresh {};
193 using RequestCb = std::function<void(
const Request&,
RequestAnswer&&)>;
195 using RequestExpiredCb = std::function<void(
const Request&,
bool)>;
197 NetworkEngine(InfoHash& myid,
199 std::unique_ptr<DatagramSocket>&& sock,
200 const Sp<Logger>&
log,
203 decltype(NetworkEngine::onError)&& onError,
204 decltype(NetworkEngine::onNewNode)&& onNewNode,
205 decltype(NetworkEngine::onReportedAddr)&& onReportedAddr,
206 decltype(NetworkEngine::onPing)&& onPing,
207 decltype(NetworkEngine::onFindNode)&& onFindNode,
208 decltype(NetworkEngine::onGetValues)&& onGetValues,
209 decltype(NetworkEngine::onListen)&& onListen,
210 decltype(NetworkEngine::onAnnounce)&& onAnnounce,
211 decltype(NetworkEngine::onRefresh)&& onRefresh);
236 const InfoHash& hash,
239 std::vector<Sp<Node>>&& nodes,
240 std::vector<Sp<Node>>&& nodes6,
241 std::vector<Sp<Value>>&& values,
245 void tellListenerRefreshed(
const Sp<Node>& n,
247 const InfoHash& hash,
249 const std::vector<Value::Id>& values,
251 void tellListenerExpired(
const Sp<Node>& n,
253 const InfoHash& hash,
255 const std::vector<Value::Id>& values,
258 bool isRunning(sa_family_t af)
const;
259 inline want_t want()
const {
return dht_socket->hasIPv4() and dht_socket->hasIPv6() ? (WANT4 | WANT6) : -1; }
261 void connectivityChanged(sa_family_t);
276 Sp<Request>
sendPing(
const Sp<Node>& n, RequestCb&& on_done, RequestExpiredCb&& on_expired);
290 return sendPing(std::make_shared<Node>(InfoHash::zero(), std::move(sa), rd),
291 std::forward<RequestCb>(on_done),
292 std::forward<RequestExpiredCb>(on_expired));
308 const InfoHash& hash,
310 RequestCb&& on_done = {},
311 RequestExpiredCb&& on_expired = {});
327 const InfoHash& hash,
331 RequestExpiredCb&& on_expired);
356 const InfoHash& hash,
361 RequestExpiredCb&& on_expired);
376 const InfoHash& hash,
381 RequestExpiredCb&& on_expired);
396 const InfoHash& hash,
397 const Value::Id& vid,
400 RequestErrorCb&& on_error,
401 RequestExpiredCb&& on_expired);
415 const InfoHash& infohash,
416 std::vector<Sp<Value>>&& values,
421 const InfoHash& infohash,
422 std::vector<Sp<Value>>::iterator begin,
423 std::vector<Sp<Value>>::iterator end,
439 Sp<Node> insertNode(
const InfoHash&
id,
const SockAddr& addr)
441 auto n = cache.getNode(
id, addr, scheduler.time(), 0);
446 std::vector<unsigned> getNodeMessageStats(
bool in)
448 auto& st = in ? in_stats : out_stats;
449 std::vector<unsigned> stats {st.ping, st.find, st.get, st.listen, st.put};
454 void blacklistNode(
const Sp<Node>& n);
456 std::vector<Sp<Node>> getCachedNodes(
const InfoHash&
id, sa_family_t sa_f,
size_t count)
458 return cache.getCachedNodes(
id, sa_f, count);
461 size_t getNodeCacheSize()
const {
return cache.size(); }
462 size_t getNodeCacheSize(sa_family_t af)
const {
return cache.size(af); }
464 size_t getRateLimiterSize()
const {
return address_rate_limiter.size(); }
466 size_t getPartialCount()
const {
return partial_messages.size(); }
469 struct PartialMessage;
475 static const constexpr size_t NODE4_INFO_BUF_LEN {HASH_LEN +
sizeof(in_addr) +
sizeof(in_port_t)};
477 static const constexpr size_t NODE6_INFO_BUF_LEN {HASH_LEN +
sizeof(in6_addr) +
sizeof(in_port_t)};
479 static constexpr std::chrono::seconds UDP_REPLY_TIME {15};
482 static constexpr std::chrono::seconds RX_MAX_PACKET_TIME {10};
484 static constexpr std::chrono::seconds RX_TIMEOUT {3};
487 static constexpr unsigned BLACKLISTED_MAX {10};
489 static constexpr size_t MTU {1280};
490 static constexpr size_t MAX_PACKET_VALUE_SIZE {600};
491 static constexpr size_t MAX_MESSAGE_VALUE_SIZE {56 * 1024};
493 void process(std::unique_ptr<ParsedMessage>&&,
const SockAddr& from);
495 bool rateLimit(
const SockAddr& addr);
497 static bool isMartian(
const SockAddr& addr);
498 bool isNodeBlacklisted(
const SockAddr& addr)
const;
500 void requestStep(Sp<Request> req);
506 void sendRequest(
const Sp<Request>& request);
515 unsigned refresh {0};
516 unsigned updateValue {0};
520 int send(
const SockAddr& addr,
const char* buf,
size_t len,
bool confirmed =
false);
522 void sendValueParts(Tid tid,
const std::vector<Blob>& svals,
const SockAddr& addr);
523 std::vector<Blob> packValueHeader(msgpack::sbuffer&,
524 std::vector<Sp<Value>>::const_iterator,
525 std::vector<Sp<Value>>::const_iterator)
const;
526 std::vector<Blob> packValueHeader(msgpack::sbuffer& buf,
const std::vector<Sp<Value>>& values)
const
528 return packValueHeader(buf, values.begin(), values.end());
530 void maintainRxBuffer(Tid tid);
536 void sendPong(
const SockAddr& addr, Tid tid);
538 void sendNodesValues(
const SockAddr& addr,
542 const std::vector<Sp<Value>>& st,
545 Blob bufferNodes(sa_family_t af,
const InfoHash&
id, std::vector<Sp<Node>>& nodes);
547 std::pair<Blob, Blob> bufferNodes(
548 sa_family_t af,
const InfoHash&
id, want_t want, std::vector<Sp<Node>>& nodes, std::vector<Sp<Node>>& nodes6);
550 void sendListenConfirmation(
const SockAddr& addr, Tid tid);
552 void sendValueAnnounced(
const SockAddr& addr, Tid, Value::Id);
554 void sendError(
const SockAddr& addr, Tid tid, uint16_t code,
const std::string& message,
bool include_id =
false);
556 void deserializeNodes(ParsedMessage& msg,
const SockAddr& from);
559 const InfoHash& myid;
560 const NetworkConfig config {};
561 const std::unique_ptr<DatagramSocket> dht_socket;
568 using IpLimiter = RateLimiter;
569 using IpLimiterMap = std::map<SockAddr, IpLimiter, SockAddr::ipCmp>;
570 IpLimiterMap address_rate_limiter;
571 RateLimiter rate_limiter;
572 ssize_t limiter_maintenance {0};
575 std::map<Tid, Sp<Request>> requests {};
576 std::map<Tid, PartialMessage> partial_messages;
578 MessageStats in_stats {}, out_stats {};
579 std::set<SockAddr> blacklist {};
581 Scheduler& scheduler;
583 bool logIncoming_ {
false};