24 #include <ndn-cxx/lp/tags.hpp>
25 #include <ndn-cxx/security/validator-null.hpp>
26 #include <ndn-cxx/util/logger.hpp>
35 ndn::KeyChain& keyChain,
36 size_t expectedNumEntries,
37 const ndn::Name& syncPrefix,
38 const ndn::Name& userPrefix,
40 ndn::time::milliseconds syncInterestLifetime,
41 ndn::time::milliseconds syncReplyFreshness,
44 :
ProducerBase(face, keyChain, expectedNumEntries, syncPrefix, userPrefix,
45 syncReplyFreshness, ibltCompression, contentCompression)
46 , m_syncInterestLifetime(syncInterestLifetime)
47 , m_onUpdate(std::move(onUpdateCb))
49 m_registeredPrefix =
m_face.setInterestFilter(ndn::InterestFilter(
m_syncPrefix).allowLoopback(
false),
50 [
this] (
auto&&... args) { onSyncInterest(std::forward<decltype(args)>(args)...); },
51 [] (
auto&&... args) {
onRegisterFailed(std::forward<decltype(args)>(args)...); });
69 NDN_LOG_WARN(
"Prefix not added: " << prefix);
73 uint64_t newSeq = seq.value_or(
m_prefixes[prefix] + 1);
74 NDN_LOG_INFO(
"Publish: " << prefix <<
"/" << newSeq);
76 satisfyPendingInterests();
80 FullProducer::sendSyncInterest()
95 m_outstandingInterestName = syncInterestName;
97 m_scheduledSyncInterestId =
98 m_scheduler.schedule(m_syncInterestLifetime / 2 + ndn::time::milliseconds(m_jitter(
m_rng)),
99 [
this] { sendSyncInterest(); });
101 ndn::Interest syncInterest(syncInterestName);
103 using ndn::util::SegmentFetcher;
104 SegmentFetcher::Options options;
105 options.interestLifetime = m_syncInterestLifetime;
106 options.maxTimeout = m_syncInterestLifetime;
107 options.rttOptions.initialRto = m_syncInterestLifetime;
109 m_fetcher = SegmentFetcher::start(
m_face, syncInterest,
110 ndn::security::getAcceptAllValidator(), options);
112 m_fetcher->onComplete.connect([
this, syncInterest] (
const ndn::ConstBufferPtr& bufferPtr) {
113 onSyncData(syncInterest, bufferPtr);
116 m_fetcher->afterSegmentValidated.connect([
this] (
const ndn::Data& data) {
117 auto tag = data.getTag<ndn::lp::IncomingFaceIdTag>();
119 m_incomingFace = *tag;
126 m_fetcher->onError.connect([
this] (uint32_t errorCode,
const std::string& msg) {
127 NDN_LOG_ERROR(
"Cannot fetch sync data, error: " << errorCode <<
", message: " << msg);
132 if (errorCode != SegmentFetcher::ErrorCode::INTEREST_TIMEOUT) {
133 auto after = ndn::time::milliseconds(m_jitter(
m_rng));
134 NDN_LOG_DEBUG(
"Schedule sync interest after: " << after);
135 m_scheduledSyncInterestId =
m_scheduler.schedule(after, [
this] { sendSyncInterest(); });
139 NDN_LOG_DEBUG(
"sendFullSyncInterest, nonce: " << syncInterest.getNonce() <<
140 ", hash: " << std::hash<ndn::Name>{}(syncInterestName));
144 FullProducer::onSyncInterest(
const ndn::Name& prefixName,
const ndn::Interest& interest)
151 ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefixName.size());
152 ndn::Name interestName;
154 if (nameWithoutSyncPrefix.size() == 1) {
156 interestName = interest.getName();
158 else if (nameWithoutSyncPrefix.size() == 3) {
160 interestName = interest.getName().getPrefix(-2);
166 ndn::name::Component ibltName = interestName.get(interestName.size() - 1);
168 NDN_LOG_DEBUG(
"Full sync Interest received, nonce: " << interest.getNonce() <<
169 ", hash: " << std::hash<ndn::Name>{}(interestName));
173 iblt.initialize(ibltName);
175 catch (
const std::exception& e) {
176 NDN_LOG_WARN(e.what());
180 auto diff =
m_iblt - iblt;
182 std::set<uint32_t> positive;
183 std::set<uint32_t> negative;
185 if (!diff.listEntries(positive, negative)) {
186 NDN_LOG_TRACE(
"Cannot decode differences, positive: " << positive.size()
187 <<
" negative: " << negative.size() <<
" m_threshold: "
192 if (positive.size() + negative.size() >=
m_threshold ||
193 (positive.empty() && negative.empty())) {
196 if (content.second != 0) {
197 state.addContent(ndn::Name(content.first).appendNumber(content.second));
201 if (!state.getContent().empty()) {
202 sendSyncData(interest.getName(), state.wireEncode());
205 #ifdef PSYNC_WITH_TESTS
206 ++nIbfDecodeFailuresAboveThreshold;
211 #ifdef PSYNC_WITH_TESTS
212 ++nIbfDecodeFailuresBelowThreshold;
217 for (
const auto& hash : positive) {
218 auto nameIt =
m_biMap.left.find(hash);
219 if (nameIt !=
m_biMap.left.end()) {
220 ndn::Name nameWithoutSeq = nameIt->second.getPrefix(-1);
223 !isFutureHash(nameWithoutSeq, negative)) {
224 state.addContent(nameIt->second);
229 if (!state.getContent().empty()) {
230 NDN_LOG_DEBUG(
"Sending sync content: " << state);
231 sendSyncData(interestName, state.wireEncode());
235 auto& entry = m_pendingEntries.emplace(interestName, PendingEntryInfo{iblt, {}}).first->second;
236 entry.expirationEvent =
m_scheduler.schedule(interest.getInterestLifetime(),
238 NDN_LOG_TRACE(
"Erase pending Interest " << interest.getNonce());
239 m_pendingEntries.erase(interest.getName());
244 FullProducer::sendSyncData(
const ndn::Name& name,
const ndn::Block& block)
246 NDN_LOG_DEBUG(
"Checking if data will satisfy our own pending interest");
248 ndn::Name nameWithIblt;
252 ndn::Name dataName(ndn::Name(name).appendNumber(std::hash<ndn::Name>{}(nameWithIblt)));
257 if (m_outstandingInterestName == name) {
258 NDN_LOG_DEBUG(
"Satisfied our own pending interest");
261 NDN_LOG_DEBUG(
"Removing our pending interest from face (stop fetcher)");
263 m_outstandingInterestName = ndn::Name(
"");
266 NDN_LOG_DEBUG(
"Sending sync Data");
271 NDN_LOG_TRACE(
"Renewing sync interest");
275 NDN_LOG_DEBUG(
"Sending sync Data");
281 FullProducer::onSyncData(
const ndn::Interest& interest,
const ndn::ConstBufferPtr& bufferPtr)
283 deletePendingInterests(interest.getName());
288 state.wireDecode(ndn::Block(std::move(decompressed)));
290 catch (
const std::exception& e) {
291 NDN_LOG_ERROR(
"Cannot parse received sync Data: " << e.what());
294 NDN_LOG_DEBUG(
"Sync Data received: " << state);
296 std::vector<MissingDataInfo> updates;
298 for (
const auto& content : state) {
299 ndn::Name prefix = content.getPrefix(-1);
300 uint64_t seq = content.get(content.size() - 1).toNumber();
303 updates.push_back({prefix,
m_prefixes[prefix] + 1, seq, m_incomingFace});
313 if (!updates.empty()) {
315 NDN_LOG_TRACE(
"Renewing sync interest");
319 NDN_LOG_TRACE(
"No new update, interest nonce: " << interest.getNonce() <<
320 " , hash: " << std::hash<ndn::Name>{}(interest.getName()));
325 FullProducer::satisfyPendingInterests()
327 NDN_LOG_DEBUG(
"Satisfying full sync interest: " << m_pendingEntries.size());
329 for (
auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
330 const auto& entry = it->second;
331 auto diff =
m_iblt - entry.iblt;
332 std::set<uint32_t> positive;
333 std::set<uint32_t> negative;
335 if (!diff.listEntries(positive, negative)) {
336 NDN_LOG_TRACE(
"Decode failed for pending interest");
337 if (positive.size() + negative.size() >=
m_threshold ||
338 (positive.empty() && negative.empty())) {
339 NDN_LOG_TRACE(
"pos + neg > threshold or no diff can be found, erase pending interest");
340 it = m_pendingEntries.erase(it);
346 for (
const auto& hash : positive) {
347 auto nameIt =
m_biMap.left.find(hash);
348 if (nameIt !=
m_biMap.left.end()) {
349 if (
m_prefixes[nameIt->second.getPrefix(-1)] != 0) {
350 state.addContent(nameIt->second);
355 if (!state.getContent().empty()) {
356 NDN_LOG_DEBUG(
"Satisfying sync content: " << state);
357 sendSyncData(it->first, state.wireEncode());
358 it = m_pendingEntries.erase(it);
367 FullProducer::isFutureHash(
const ndn::Name& prefix,
const std::set<uint32_t>& negative)
370 ndn::Name(prefix).appendNumber(
m_prefixes[prefix] + 1));
371 return negative.find(nextHash) != negative.end();
375 FullProducer::deletePendingInterests(
const ndn::Name& interestName)
377 auto it = m_pendingEntries.find(interestName);
378 if (it != m_pendingEntries.end()) {
379 NDN_LOG_TRACE(
"Delete pending interest: " << interestName);
380 it = m_pendingEntries.erase(it);
Full sync logic to synchronize with other nodes where all nodes wants to get all names prefixes synce...
FullProducer(ndn::Face &face, ndn::KeyChain &keyChain, size_t expectedNumEntries, const ndn::Name &syncPrefix, const ndn::Name &userPrefix, UpdateCallback onUpdateCallBack, ndn::time::milliseconds syncInterestLifetime=SYNC_INTEREST_LIFETIME, ndn::time::milliseconds syncReplyFreshness=SYNC_REPLY_FRESHNESS, CompressionScheme ibltCompression=CompressionScheme::DEFAULT, CompressionScheme contentCompression=CompressionScheme::DEFAULT)
Constructor.
void publishName(const ndn::Name &prefix, std::optional< uint64_t > seq=std::nullopt)
Publish name to let others know.
Base class for PartialProducer and FullProducer.
const CompressionScheme m_contentCompression
const ndn::Name m_syncPrefix
const ndn::time::milliseconds m_syncReplyFreshness
const size_t m_expectedNumEntries
std::map< ndn::Name, uint64_t > m_prefixes
ndn::Scheduler m_scheduler
static void onRegisterFailed(const ndn::Name &prefix, const std::string &msg)
Logs a message and throws if setting an interest filter fails.
const CompressionScheme m_ibltCompression
void updateSeqNo(const ndn::Name &prefix, uint64_t seq)
Update m_prefixes and IBF with the given prefix and seq.
SegmentPublisher m_segmentPublisher
ndn::random::RandomNumberEngine & m_rng
bool replyFromStore(const ndn::Name &interestName)
Try to reply from memory, return false if we cannot find the segment.
void publish(const ndn::Name &interestName, const ndn::Name &dataName, ndn::span< const uint8_t > buffer, ndn::time::milliseconds freshness)
Put all the segments in memory.
void appendToName(ndn::Name &name) const
Appends self to name.
uint32_t murmurHash3(const void *key, size_t len, uint32_t seed)
constexpr size_t N_HASHCHECK
std::shared_ptr< ndn::Buffer > compress(CompressionScheme scheme, ndn::span< const uint8_t > buffer)
std::shared_ptr< ndn::Buffer > decompress(CompressionScheme scheme, ndn::span< const uint8_t > buffer)
std::function< void(const std::vector< MissingDataInfo > &)> UpdateCallback