24 #include <ndn-cxx/lp/tags.hpp>
25 #include <ndn-cxx/security/validator-null.hpp>
26 #include <ndn-cxx/util/logger.hpp>
35 ndn::KeyChain& keyChain,
36 const ndn::Name& syncPrefix,
38 :
ProducerBase(face, keyChain, opts.ibfCount, syncPrefix, opts.syncDataFreshness,
39 opts.ibfCompression, opts.contentCompression)
40 , m_syncInterestLifetime(opts.syncInterestLifetime)
41 , m_onUpdate(opts.onUpdate)
43 m_registeredPrefix =
m_face.setInterestFilter(ndn::InterestFilter(
m_syncPrefix).allowLoopback(
false),
44 [
this] (
auto&&... args) { onSyncInterest(std::forward<decltype(args)>(args)...); },
45 [] (
auto&&... args) {
onRegisterFailed(std::forward<decltype(args)>(args)...); });
53 ndn::KeyChain& keyChain,
54 size_t expectedNumEntries,
55 const ndn::Name& syncPrefix,
56 const ndn::Name& userPrefix,
58 ndn::time::milliseconds syncInterestLifetime,
59 ndn::time::milliseconds syncReplyFreshness,
63 Options{std::move(onUpdateCb), static_cast<uint32_t>(expectedNumEntries), ibltCompression,
64 syncInterestLifetime, syncReplyFreshness, contentCompression})
80 NDN_LOG_WARN(
"Prefix not added: " << prefix);
84 uint64_t newSeq = seq.value_or(
m_prefixes[prefix] + 1);
85 NDN_LOG_INFO(
"Publish: " << prefix <<
"/" << newSeq);
88 m_inNoNewDataWaitOutPeriod =
false;
90 satisfyPendingInterests(ndn::Name(prefix).appendNumber(newSeq));
94 FullProducer::sendSyncInterest()
96 if (m_inNoNewDataWaitOutPeriod) {
97 NDN_LOG_TRACE(
"Cannot send sync Interest as Data is expected from CS");
116 auto currentTime = ndn::time::system_clock::now();
117 if ((currentTime - m_lastInterestSentTime < ndn::time::milliseconds(MIN_JITTER)) &&
118 (m_outstandingInterestName == syncInterestName)) {
119 NDN_LOG_TRACE(
"Suppressing Interest: " << std::hash<ndn::Name>{}(syncInterestName));
123 m_outstandingInterestName = syncInterestName;
125 m_scheduledSyncInterestId =
126 m_scheduler.schedule(m_syncInterestLifetime / 2 + ndn::time::milliseconds(m_jitter(
m_rng)),
127 [
this] { sendSyncInterest(); });
129 ndn::Interest syncInterest(syncInterestName);
131 using ndn::SegmentFetcher;
132 SegmentFetcher::Options options;
133 options.interestLifetime = m_syncInterestLifetime;
134 options.maxTimeout = m_syncInterestLifetime;
135 options.rttOptions.initialRto = m_syncInterestLifetime;
141 NDN_LOG_DEBUG(
"sendFullSyncInterest, nonce: " << syncInterest.getNonce() <<
142 ", hash: " << std::hash<ndn::Name>{}(syncInterestName));
144 m_lastInterestSentTime = currentTime;
145 m_fetcher = SegmentFetcher::start(
m_face, syncInterest,
146 ndn::security::getAcceptAllValidator(), options);
148 m_fetcher->onComplete.connect([
this, syncInterest] (
const ndn::ConstBufferPtr& bufferPtr) {
149 onSyncData(syncInterest, bufferPtr);
152 m_fetcher->afterSegmentValidated.connect([
this] (
const ndn::Data& data) {
153 auto tag = data.getTag<ndn::lp::IncomingFaceIdTag>();
155 m_incomingFace = *tag;
162 m_fetcher->onError.connect([
this] (uint32_t errorCode,
const std::string& msg) {
163 NDN_LOG_ERROR(
"Cannot fetch sync data, error: " << errorCode <<
", message: " << msg);
168 if (errorCode != SegmentFetcher::ErrorCode::INTEREST_TIMEOUT) {
169 auto after = ndn::time::milliseconds(m_jitter(
m_rng));
170 NDN_LOG_DEBUG(
"Schedule sync Interest after: " << after);
171 m_scheduledSyncInterestId =
m_scheduler.schedule(after, [
this] { sendSyncInterest(); });
177 FullProducer::processWaitingInterests()
179 NDN_LOG_TRACE(
"Processing waiting Interest list, size: " << m_waitingForProcessing.size());
180 if (m_waitingForProcessing.size() == 0) {
184 for (
auto it = m_waitingForProcessing.begin(); it != m_waitingForProcessing.end();) {
185 if (it->second.numTries == std::numeric_limits<uint16_t>::max()) {
186 NDN_LOG_TRACE(
"Interest with hash already marked for deletion, removing now: " <<
187 std::hash<ndn::Name>{}(it->first));
188 it = m_waitingForProcessing.erase(it);
192 it->second.numTries += 1;
193 ndn::Interest interest(it->first);
194 interest.setNonce(it->second.nonce);
196 if (it->second.numTries == std::numeric_limits<uint16_t>::max()) {
197 NDN_LOG_TRACE(
"Removing Interest with hash: " << std::hash<ndn::Name>{}(it->first));
198 it = m_waitingForProcessing.erase(it);
204 NDN_LOG_TRACE(
"Done processing waiting Interest list, size: " << m_waitingForProcessing.size());
208 FullProducer::scheduleProcessWaitingInterests()
211 if (m_waitingForProcessing.size() == 0) {
215 if (!m_interestDelayTimerId) {
216 auto after = ndn::time::milliseconds(m_jitter(
m_rng));
217 NDN_LOG_TRACE(
"Setting a timer to processes waiting Interest(s) in: " << after);
219 m_interestDelayTimerId =
m_scheduler.schedule(after, [=] {
220 NDN_LOG_TRACE(
"Timer has expired, trying to process waiting Interest(s)");
221 processWaitingInterests();
222 scheduleProcessWaitingInterests();
228 FullProducer::onSyncInterest(
const ndn::Name& prefixName,
const ndn::Interest& interest,
229 bool isTimedProcessing)
231 ndn::Name interestName = interest.getName();
232 auto interestNameHash = std::hash<ndn::Name>{}(interestName);
233 NDN_LOG_DEBUG(
"Full sync Interest received, nonce: " << interest.getNonce() <<
234 ", hash: " << interestNameHash);
236 if (isTimedProcessing) {
237 NDN_LOG_TRACE(
"Delayed Interest being processed now");
241 NDN_LOG_DEBUG(
"Answer from memory");
245 ndn::Name nameWithoutSyncPrefix = interestName.getSubName(prefixName.size());
247 if (nameWithoutSyncPrefix.size() == 4) {
249 NDN_LOG_DEBUG(
"Segment not found in memory. Other side will have to restart");
255 if (nameWithoutSyncPrefix.size() != 2) {
256 NDN_LOG_WARN(
"Two components required after sync prefix: /<IBF>/<numCumulativeElements>; received: " << interestName);
260 ndn::name::Component ibltName = interestName[-2];
261 uint64_t numRcvdElements = interestName[-1].toNumber();
265 iblt.initialize(ibltName);
267 catch (
const std::exception& e) {
268 NDN_LOG_WARN(e.what());
272 auto diff =
m_iblt - iblt;
274 NDN_LOG_TRACE(
"Decode, positive: " << diff.positive.size()
275 <<
" negative: " << diff.negative.size() <<
" m_threshold: "
278 auto waitingIt = m_waitingForProcessing.find(interestName);
280 if (!diff.canDecode) {
281 NDN_LOG_DEBUG(
"Cannot decode differences!");
284 if (!isTimedProcessing && waitingIt == m_waitingForProcessing.end()) {
285 NDN_LOG_TRACE(
"Decode failure, adding to waiting Interest list " << interestNameHash);
286 m_waitingForProcessing.emplace(interestName, WaitingEntryInfo{0, interest.getNonce()});
287 scheduleProcessWaitingInterests();
289 else if (isTimedProcessing && waitingIt != m_waitingForProcessing.end()) {
290 if (waitingIt->second.numTries > 1) {
291 NDN_LOG_TRACE(
"Decode failure, still behind. Erasing waiting Interest as we have tried twice");
292 waitingIt->second.numTries = std::numeric_limits<uint16_t>::max();
293 NDN_LOG_DEBUG(
"Waiting Interest has been deleted. Sending new sync interest");
297 NDN_LOG_TRACE(
"Decode failure, still behind, waiting more till the next timer");
301 NDN_LOG_TRACE(
"Decode failure, still behind");
305 if (
m_numOwnElements == numRcvdElements && diff.positive.size() == 0 && diff.negative.size() > 0) {
306 NDN_LOG_TRACE(
"We have nothing to offer and are actually behind");
307 #ifdef PSYNC_WITH_TESTS
308 ++nIbfDecodeFailuresBelowThreshold;
315 if (content.second != 0) {
316 state.addContent(ndn::Name(content.first).appendNumber(content.second));
319 #ifdef PSYNC_WITH_TESTS
320 ++nIbfDecodeFailuresAboveThreshold;
323 if (!state.getContent().empty()) {
324 NDN_LOG_DEBUG(
"Sending entire state: " << state);
326 sendSyncData(interestName, state.wireEncode(), 10_ms);
328 deletePendingInterests(interestName);
331 if (waitingIt != m_waitingForProcessing.end()) {
332 waitingIt->second.numTries = std::numeric_limits<uint16_t>::max();
338 if (diff.positive.size() == 0 && diff.negative.size() == 0) {
339 NDN_LOG_TRACE(
"Saving positive: " << diff.positive.size() <<
" negative: " << diff.negative.size());
341 auto& entry = m_pendingEntries.emplace(interestName, PendingEntryInfo{iblt, {}}).first->second;
342 entry.expirationEvent =
m_scheduler.schedule(interest.getInterestLifetime(),
344 NDN_LOG_TRACE(
"Erase pending Interest " << interest.getNonce());
345 m_pendingEntries.erase(interest.getName());
350 if (isTimedProcessing) {
351 if (waitingIt != m_waitingForProcessing.end()) {
352 waitingIt->second.numTries = std::numeric_limits<uint16_t>::max();
359 if (diff.positive.size() == 0 && diff.negative.size() > 0) {
360 if (!isTimedProcessing && waitingIt == m_waitingForProcessing.end()) {
361 NDN_LOG_TRACE(
"Adding Interest to waiting list: " << interestNameHash);
362 m_waitingForProcessing.emplace(interestName, WaitingEntryInfo{0, interest.getNonce()});
363 scheduleProcessWaitingInterests();
365 else if (isTimedProcessing && waitingIt != m_waitingForProcessing.end()) {
366 if (waitingIt->second.numTries > 1) {
367 NDN_LOG_TRACE(
"Still behind after waiting for Interest " << interestNameHash <<
368 ". Erasing waiting Interest as we have tried twice");
369 waitingIt->second.numTries = std::numeric_limits<uint16_t>::max();
372 NDN_LOG_TRACE(
"Still behind after waiting for Interest " << interestNameHash <<
373 ". Keep waiting for Interest as number of tries is not exhausted");
377 NDN_LOG_TRACE(
"Still behind after waiting for Interest " << interestNameHash);
382 if (diff.positive.size() > 0) {
384 for (
const auto& hash : diff.positive) {
385 auto nameIt =
m_biMap.left.find(hash);
386 if (nameIt !=
m_biMap.left.end()) {
387 ndn::Name nameWithoutSeq = nameIt->second.getPrefix(-1);
390 !isFutureHash(nameWithoutSeq.toUri(), diff.negative)) {
391 state.addContent(nameIt->second);
396 if (!state.getContent().empty()) {
397 NDN_LOG_DEBUG(
"Sending sync content: " << state);
401 if (waitingIt != m_waitingForProcessing.end()) {
402 waitingIt->second.numTries = std::numeric_limits<uint16_t>::max();
409 FullProducer::sendSyncData(
const ndn::Name& name,
const ndn::Block& block,
410 ndn::time::milliseconds syncReplyFreshness)
412 bool isSatisfyingOwnInterest = m_outstandingInterestName == name;
413 if (isSatisfyingOwnInterest && m_fetcher) {
414 NDN_LOG_DEBUG(
"Removing our pending Interest from face (stop fetcher)");
416 m_outstandingInterestName.clear();
419 NDN_LOG_DEBUG(
"Sending sync Data");
422 if (isSatisfyingOwnInterest) {
423 NDN_LOG_DEBUG(
"Renewing sync interest");
429 FullProducer::onSyncData(
const ndn::Interest& interest,
const ndn::ConstBufferPtr& bufferPtr)
431 deletePendingInterests(interest.getName());
436 state.wireDecode(ndn::Block(std::move(decompressed)));
438 catch (
const std::exception& e) {
439 NDN_LOG_ERROR(
"Cannot parse received sync Data: " << e.what());
442 NDN_LOG_DEBUG(
"Sync Data received: " << state);
444 std::vector<MissingDataInfo> updates;
446 for (
const auto& content : state) {
447 ndn::Name prefix = content.getPrefix(-1);
448 uint64_t seq = content.get(content.size() - 1).toNumber();
451 updates.push_back({prefix,
m_prefixes[prefix] + 1, seq, m_incomingFace});
460 if (!updates.empty()) {
463 auto after = ndn::time::milliseconds(m_jitter(
m_rng));
464 m_scheduledSyncInterestId =
m_scheduler.schedule(after, [
this] {
465 NDN_LOG_DEBUG(
"Got updates, renewing sync Interest now");
468 NDN_LOG_DEBUG(
"Schedule sync Interest after: " << after);
469 m_inNoNewDataWaitOutPeriod =
false;
471 processWaitingInterests();
474 NDN_LOG_TRACE(
"No new update, Interest nonce: " << interest.getNonce() <<
475 " , hash: " << std::hash<ndn::Name>{}(interest.getName()));
476 m_inNoNewDataWaitOutPeriod =
true;
480 m_scheduledSyncInterestId =
m_scheduler.schedule(after, [
this] {
481 NDN_LOG_DEBUG(
"Sending sync Interest after no new update");
482 m_inNoNewDataWaitOutPeriod =
false;
485 NDN_LOG_DEBUG(
"Schedule sync after: " << after);
491 FullProducer::satisfyPendingInterests(
const ndn::Name& updatedPrefixWithSeq)
493 NDN_LOG_DEBUG(
"Satisfying full sync Interest: " << m_pendingEntries.size());
495 for (
auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
496 NDN_LOG_TRACE(
"Satisfying pending Interest: " << std::hash<ndn::Name>{}(it->first.getPrefix(-1)));
497 const auto& entry = it->second;
498 auto diff =
m_iblt - entry.iblt;
499 NDN_LOG_TRACE(
"Decoded: " << diff.canDecode <<
" positive: " << diff.positive.size() <<
500 " negative: " << diff.negative.size());
503 bool publishedPrefixInDiff =
false;
504 for (
const auto& hash : diff.positive) {
505 auto nameIt =
m_biMap.left.find(hash);
506 if (nameIt !=
m_biMap.left.end()) {
507 if (updatedPrefixWithSeq == nameIt->second) {
508 publishedPrefixInDiff =
true;
510 state.addContent(nameIt->second);
514 if (!publishedPrefixInDiff) {
515 state.addContent(updatedPrefixWithSeq);
518 NDN_LOG_DEBUG(
"Satisfying sync content: " << state);
520 it = m_pendingEntries.erase(it);
525 FullProducer::isFutureHash(
const ndn::Name& prefix,
const std::set<uint32_t>& negative)
528 ndn::Name(prefix).appendNumber(
m_prefixes[prefix] + 1));
529 return negative.find(nextHash) != negative.end();
533 FullProducer::deletePendingInterests(
const ndn::Name& interestName)
535 auto it = m_pendingEntries.find(interestName);
536 if (it != m_pendingEntries.end()) {
537 NDN_LOG_TRACE(
"Delete pending Interest: " << std::hash<ndn::Name>{}(interestName));
538 it = m_pendingEntries.erase(it);
Full sync logic to synchronize with other nodes where all nodes wants to get all names prefixes synce...
void publishName(const ndn::Name &prefix, std::optional< uint64_t > seq=std::nullopt)
Publish name to let others know.
FullProducer(ndn::Face &face, ndn::KeyChain &keyChain, const ndn::Name &syncPrefix, const Options &opts)
Constructor.
Base class for PartialProducer and FullProducer.
const CompressionScheme m_contentCompression
void sendApplicationNack(const ndn::Name &name)
Sends a data packet with content type nack.
const ndn::Name m_syncPrefix
const ndn::time::milliseconds m_syncReplyFreshness
const size_t m_expectedNumEntries
bool addUserNode(const ndn::Name &prefix)
Adds a user node for synchronization.
std::map< ndn::Name, uint64_t > m_prefixes
uint64_t m_numOwnElements
ndn::Scheduler m_scheduler
static void onRegisterFailed(const ndn::Name &prefix, const std::string &msg)
Logs a message and throws if setting an interest filter fails.
const CompressionScheme m_ibltCompression
void updateSeqNo(const ndn::Name &prefix, uint64_t seq)
Update m_prefixes and IBF with the given prefix and seq.
SegmentPublisher m_segmentPublisher
ndn::random::RandomNumberEngine & m_rng
bool replyFromStore(const ndn::Name &interestName)
Try to reply from memory, return false if we cannot find the segment.
void publish(const ndn::Name &interestName, const ndn::Name &dataName, ndn::span< const uint8_t > buffer, ndn::time::milliseconds freshness)
Put all the segments in memory.
void appendToName(ndn::Name &name) const
Appends self to name.
uint32_t murmurHash3(const void *key, size_t len, uint32_t seed)
constexpr size_t N_HASHCHECK
std::shared_ptr< ndn::Buffer > compress(CompressionScheme scheme, ndn::span< const uint8_t > buffer)
std::shared_ptr< ndn::Buffer > decompress(CompressionScheme scheme, ndn::span< const uint8_t > buffer)
std::function< void(const std::vector< MissingDataInfo > &)> UpdateCallback