My Project 3.7.1
C++ Distributed Hash Table
Loading...
Searching...
No Matches
pht.h
1// Copyright (c) 2014-2026 Savoir-faire Linux Inc.
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include <string>
6#include <vector>
7#include <memory>
8#include <map>
9#include <functional>
10#include <stdexcept>
11#include <bitset>
12#include <iostream>
13#include <sstream>
14
15#include "../value.h"
16#include "../dhtrunner.h"
17#include "../infohash.h"
18
19namespace dht {
20namespace indexation {
21
31struct OPENDHT_PUBLIC Prefix
32{
33 Prefix() {}
34 Prefix(InfoHash h)
35 : size_(h.size() * 8)
36 , content_(h.begin(), h.end())
37 {}
38 Prefix(const Blob& d, const Blob& f = {})
39 : size_(d.size() * 8)
40 , flags_(f)
41 , content_(d)
42 {}
43
44 Prefix(const Prefix& p, size_t first)
45 : size_(std::min(first, p.content_.size() * 8))
46 , content_(Blob(p.content_.begin(), p.content_.begin() + size_ / 8))
47 {
48 auto rem = size_ % 8;
49 if (not p.flags_.empty()) {
50 flags_ = Blob(p.flags_.begin(), p.flags_.begin() + size_ / 8);
51 if (rem)
52 flags_.push_back(p.flags_[size_ / 8] & (0xFF << (8 - rem)));
53 }
54
55 if (rem)
56 content_.push_back(p.content_[size_ / 8] & (0xFF << (8 - rem)));
57 }
58
71 Prefix getPrefix(ssize_t len) const
72 {
73 if ((size_t) std::abs(len) >= content_.size() * 8)
74 throw std::out_of_range("len larger than prefix size.");
75 if (len < 0)
76 len += size_;
77
78 return Prefix(*this, len);
79 }
80
87 bool isFlagActive(size_t pos) const { return flags_.empty() or isActiveBit(flags_, pos); }
88
92 bool isContentBitActive(size_t pos) const { return isActiveBit(content_, pos); }
93
94 Prefix getFullSize() { return Prefix(*this, content_.size() * 8); }
95
101 Prefix getSibling() const
102 {
103 Prefix copy = *this;
104 if (size_)
105 copy.swapContentBit(size_ - 1);
106
107 return copy;
108 }
109
110 InfoHash hash() const
111 {
112 Blob copy(content_);
113 copy.push_back(size_);
114 return InfoHash::get(copy);
115 }
116
124 static inline unsigned commonBits(const Prefix& p1, const Prefix& p2)
125 {
126 unsigned i, j;
127 uint8_t x;
128 auto longest_prefix_size = std::min(p1.size_, p2.size_);
129
130 for (i = 0; i < longest_prefix_size; i++) {
131 if (p1.content_.data()[i] != p2.content_.data()[i] or not p1.isFlagActive(i) or not p2.isFlagActive(i)) {
132 break;
133 }
134 }
135
136 if (i == longest_prefix_size)
137 return 8 * longest_prefix_size;
138
139 x = p1.content_.data()[i] ^ p2.content_.data()[i];
140
141 j = 0;
142 while ((x & 0x80) == 0) {
143 x <<= 1;
144 j++;
145 }
146
147 return 8 * i + j;
148 }
149
153 void swapContentBit(size_t bit) { swapBit(content_, bit); }
154
158 void swapFlagBit(size_t bit) { swapBit(flags_, bit); }
159
163 void addPaddingContent(size_t size) { content_ = addPadding(content_, size); }
164
165 void updateFlags()
166 {
167 /* Fill first known bit */
168 auto csize = size_ - flags_.size() * 8;
169 while (csize >= 8) {
170 flags_.push_back(0xFF);
171 csize -= 8;
172 }
173
174 /* if needed fill remaining bit */
175 if (csize)
176 flags_.push_back(0xFF << (8 - csize));
177
178 /* Complet vector space missing */
179 for (auto i = flags_.size(); i < content_.size(); i++)
180 flags_.push_back(0xFF);
181 }
182
183 std::string toString() const;
184
185 size_t size_ {0};
186
187 /* Will contain flags according to content_.
188 If flags_[i] == 0, then content_[i] is unknown
189 else if flags_[i] == 1, then content_[i] is known */
190 Blob flags_ {};
191 Blob content_ {};
192
193private:
202 Blob addPadding(Blob toP, size_t size)
203 {
204 Blob copy = toP;
205 for (auto i = copy.size(); i < size; i++)
206 copy.push_back(0);
207
208 swapBit(copy, size_ + 1);
209 return copy;
210 }
211
222 bool isActiveBit(const Blob& b, size_t pos) const
223 {
224 if (pos >= content_.size() * 8)
225 throw std::out_of_range("Can't detect active bit at pos, pos larger than prefix size or empty prefix");
226
227 return ((b[pos / 8] >> (7 - (pos % 8))) & 1) == 1;
228 }
229
240 void swapBit(Blob& b, size_t bit)
241 {
242 if (bit >= b.size() * 8)
243 throw std::out_of_range("bit larger than prefix size.");
244
245 size_t offset_bit = (8 - bit) % 8;
246 b[bit / 8] ^= (1 << offset_bit);
247 }
248};
249
250using Value = std::pair<InfoHash, dht::Value::Id>;
251struct OPENDHT_PUBLIC IndexEntry : public dht::Value::Serializable<IndexEntry>
252{
253 static const ValueType& TYPE;
254
255 virtual void unpackValue(const dht::Value& v)
256 {
257 Serializable<IndexEntry>::unpackValue(v);
258 name = v.user_type;
259 }
260
261 virtual dht::Value packValue() const
262 {
263 auto pack = Serializable<IndexEntry>::packValue();
264 pack.user_type = name;
265 return pack;
266 }
267
268 Blob prefix;
269 Value value;
270 std::string name;
271 MSGPACK_DEFINE_MAP(prefix, value)
272};
273
274class OPENDHT_PUBLIC Pht
275{
276 static constexpr const char* INVALID_KEY = "Key does not match the PHT key spec.";
277
278 /* Prefixes the user_type for all dht values put on the DHT */
279 static constexpr const char* INDEX_PREFIX = "index.pht.";
280
281public:
282 /* This is the maximum number of entries per node. This parameter is
283 * critical and influences the traffic a lot during a lookup operation.
284 */
285 static constexpr const size_t MAX_NODE_ENTRY_COUNT {16};
286
287 /* A key for a an index entry */
288 using Key = std::map<std::string, Blob>;
289
290 /* Specifications of the keys. It defines the number, the length and the
291 * serialization order of fields. */
292 using KeySpec = std::map<std::string, size_t>;
293 using LookupCallback = std::function<void(std::vector<std::shared_ptr<Value>>& values, const Prefix& p)>;
294
295 typedef void (*LookupCallbackRaw)(std::vector<std::shared_ptr<Value>>* values, Prefix* p, void* user_data);
296 static LookupCallback bindLookupCb(LookupCallbackRaw raw_cb, void* user_data)
297 {
298 if (not raw_cb)
299 return {};
300 return [=](std::vector<std::shared_ptr<Value>>& values, const Prefix& p) {
301 raw_cb((std::vector<std::shared_ptr<Value>>*) &values, (Prefix*) &p, user_data);
302 };
303 }
304 using LookupCallbackSimple = std::function<void(std::vector<std::shared_ptr<Value>>& values)>;
305 typedef void (*LookupCallbackSimpleRaw)(std::vector<std::shared_ptr<Value>>* values, void* user_data);
306 static LookupCallbackSimple bindLookupCbSimple(LookupCallbackSimpleRaw raw_cb, void* user_data)
307 {
308 if (not raw_cb)
309 return {};
310 return [=](std::vector<std::shared_ptr<Value>>& values) {
311 raw_cb((std::vector<std::shared_ptr<Value>>*) &values, user_data);
312 };
313 }
314
315 Pht(std::string name, KeySpec k_spec, std::shared_ptr<DhtRunner> dht)
316 : name_(INDEX_PREFIX + name)
317 , canary_(name_ + ".canary")
318 , keySpec_(k_spec)
319 , dht_(dht)
320 {}
321
322 virtual ~Pht() {}
323
327 void lookup(Key k, LookupCallback cb = {}, DoneCallbackSimple done_cb = {}, bool exact_match = true);
328 void lookup(Key k, LookupCallbackSimple cb = {}, DoneCallbackSimple done_cb = {}, bool exact_match = true)
329 {
330 lookup(
331 k,
332 [cb = std::move(cb)](std::vector<std::shared_ptr<Value>>& values, Prefix) { cb(values); },
333 done_cb,
334 exact_match);
335 }
336
344 void insert(Key k, Value v, DoneCallbackSimple done_cb = {})
345 {
346 Prefix p = linearize(k);
347
348 auto lo = std::make_shared<int>(0);
349 auto hi = std::make_shared<int>(p.size_);
350
351 IndexEntry entry;
352 entry.value = v;
353 entry.prefix = p.content_;
354 entry.name = name_;
355
356 Pht::insert(p, entry, lo, hi, clock::now(), true, done_cb);
357 }
358
359private:
371
372 void insert(const Prefix& kp,
373 IndexEntry entry,
374 std::shared_ptr<int> lo,
375 std::shared_ptr<int> hi,
376 time_point time_p,
377 bool check_split,
378 DoneCallbackSimple done_cb = {});
379
380 class Cache
381 {
382 public:
387 void insert(const Prefix& p);
388
396
397 int lookup(const Prefix& p);
398
399 private:
400 static constexpr const size_t MAX_ELEMENT {1024};
401 static constexpr const std::chrono::minutes NODE_EXPIRE_TIME {5};
402
403 struct Node
404 {
405 time_point last_reply; /* Made the assocation between leaves and leaves multimap */
406 std::shared_ptr<Node> parent; /* Share_ptr to the parent, it allow the self destruction of tree */
407 std::weak_ptr<Node> left_child; /* Left child, for bit equal to 1 */
408 std::weak_ptr<Node> right_child; /* Right child, for bit equal to 0 */
409 };
410
411 std::weak_ptr<Node> root_; /* Root of the tree */
412
418 std::multimap<time_point, std::shared_ptr<Node>> leaves_;
419 };
420
421 /* Callback used for insert value by using the pht */
422 using RealInsertCallback = std::function<void(const Prefix& p, IndexEntry entry)>;
423 using LookupCallbackWrapper = std::function<void(std::vector<std::shared_ptr<IndexEntry>>& values, const Prefix& p)>;
424
440 void lookupStep(Prefix k,
441 std::shared_ptr<int> lo,
442 std::shared_ptr<int> hi,
443 std::shared_ptr<std::vector<std::shared_ptr<IndexEntry>>> vals,
444 LookupCallbackWrapper cb,
445 DoneCallbackSimple done_cb,
446 std::shared_ptr<unsigned> max_common_prefix_len,
447 int start = -1,
448 bool all_values = false);
449
457 Prefix zcurve(const std::vector<Prefix>& all_prefix) const;
458
467 virtual Prefix linearize(Key k) const;
468
477 void getRealPrefix(const std::shared_ptr<Prefix>& p, IndexEntry entry, RealInsertCallback end_cb);
478
487 void checkPhtUpdate(Prefix p, IndexEntry entry, time_point time_p);
488
496 static size_t findSplitLocation(const Prefix& compared, const std::vector<std::shared_ptr<IndexEntry>>& vals)
497 {
498 for (size_t i = 0; i < compared.content_.size() * 8 - 1; i++)
499 for (auto const& v : vals)
500 if (Prefix(v->prefix).isContentBitActive(i) != compared.isContentBitActive(i))
501 return i + 1;
502 return compared.content_.size() * 8 - 1;
503 }
504
513 void split(const Prefix& insert,
514 const std::vector<std::shared_ptr<IndexEntry>>& vals,
515 IndexEntry entry,
516 RealInsertCallback end_cb);
517
521 bool validKey(const Key& k) const
522 {
523 return k.size() == keySpec_.size()
524 and std::equal(k.begin(),
525 k.end(),
526 keySpec_.begin(),
527 [&](const Key::value_type& key, const KeySpec::value_type& key_spec) {
528 return key.first == key_spec.first and key.second.size() <= key_spec.second;
529 });
530 }
531
536 void updateCanary(Prefix p);
537
538 const std::string name_;
539 const std::string canary_;
540 const KeySpec keySpec_;
541 Cache cache_;
542 std::shared_ptr<DhtRunner> dht_;
543};
544
545} // namespace indexation
546} // namespace dht
void insert(Key k, Value v, DoneCallbackSimple done_cb={})
Definition pht.h:344
void lookup(Key k, LookupCallback cb={}, DoneCallbackSimple done_cb={}, bool exact_match=true)
std::vector< uint8_t > Blob
Definition utils.h:156
std::string user_type
Definition value.h:650
A blob structure which prefixes a Key in the PHT.
Definition pht.h:32
static unsigned commonBits(const Prefix &p1, const Prefix &p2)
Definition pht.h:124
void swapFlagBit(size_t bit)
Definition pht.h:158
void swapContentBit(size_t bit)
Definition pht.h:153
void addPaddingContent(size_t size)
Definition pht.h:163
Prefix getSibling() const
Definition pht.h:101
bool isFlagActive(size_t pos) const
Definition pht.h:87
bool isContentBitActive(size_t pos) const
Definition pht.h:92
Prefix getPrefix(ssize_t len) const
Definition pht.h:71