24 #include <ndn-cxx/security/validator-null.hpp> 25 #include <ndn-cxx/util/logger.hpp> 26 #include <ndn-cxx/util/segment-fetcher.hpp> 36 const ndn::Name& syncPrefix,
37 const ndn::Name& userPrefix,
39 ndn::time::milliseconds syncInterestLifetime,
40 ndn::time::milliseconds syncReplyFreshness,
43 :
ProducerBase(expectedNumEntries, face, syncPrefix, userPrefix, syncReplyFreshness,
44 ibltCompression, contentCompression)
45 , m_syncInterestLifetime(syncInterestLifetime)
46 , m_onUpdate(onUpdateCallBack)
49 m_registeredPrefix =
m_face.setInterestFilter(
51 std::bind(&FullProducer::onSyncInterest,
this, _1, _2),
70 NDN_LOG_WARN(
"Prefix not added: " << prefix);
74 uint64_t newSeq = seq.value_or(
m_prefixes[prefix] + 1);
76 NDN_LOG_INFO(
"Publish: " << prefix <<
"/" << newSeq);
80 satisfyPendingInterests();
84 FullProducer::sendSyncInterest()
99 m_outstandingInterestName = syncInterestName;
101 m_scheduledSyncInterestId =
102 m_scheduler.schedule(m_syncInterestLifetime / 2 + ndn::time::milliseconds(m_jitter(
m_rng)),
103 [
this] { sendSyncInterest(); });
105 ndn::Interest syncInterest(syncInterestName);
107 using ndn::util::SegmentFetcher;
108 SegmentFetcher::Options options;
109 options.interestLifetime = m_syncInterestLifetime;
110 options.maxTimeout = m_syncInterestLifetime;
111 options.rttOptions.initialRto = m_syncInterestLifetime;
113 m_fetcher = SegmentFetcher::start(
m_face, syncInterest,
114 ndn::security::getAcceptAllValidator(), options);
116 m_fetcher->onComplete.connect([
this, syncInterest] (
const ndn::ConstBufferPtr& bufferPtr) {
117 onSyncData(syncInterest, bufferPtr);
120 m_fetcher->onError.connect([
this] (uint32_t errorCode,
const std::string& msg) {
121 NDN_LOG_ERROR(
"Cannot fetch sync data, error: " << errorCode <<
", message: " << msg);
126 if (errorCode != SegmentFetcher::ErrorCode::INTEREST_TIMEOUT) {
127 auto after = ndn::time::milliseconds(m_jitter(
m_rng));
128 NDN_LOG_DEBUG(
"Schedule sync interest after: " << after);
129 m_scheduledSyncInterestId =
m_scheduler.schedule(after, [
this] { sendSyncInterest(); });
133 NDN_LOG_DEBUG(
"sendFullSyncInterest, nonce: " << syncInterest.getNonce() <<
134 ", hash: " << std::hash<ndn::Name>{}(syncInterestName));
138 FullProducer::onSyncInterest(
const ndn::Name& prefixName,
const ndn::Interest& interest)
145 ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefixName.size());
146 ndn::Name interestName;
148 if (nameWithoutSyncPrefix.size() == 1) {
150 interestName = interest.getName();
152 else if (nameWithoutSyncPrefix.size() == 3) {
154 interestName = interest.getName().getPrefix(-2);
160 ndn::name::Component ibltName = interestName.get(interestName.size() - 1);
162 NDN_LOG_DEBUG(
"Full sync Interest received, nonce: " << interest.getNonce() <<
163 ", hash: " << std::hash<ndn::Name>{}(interestName));
169 catch (
const std::exception& e) {
170 NDN_LOG_WARN(e.what());
174 auto diff =
m_iblt - iblt;
176 std::set<uint32_t> positive;
177 std::set<uint32_t> negative;
179 if (!diff.listEntries(positive, negative)) {
180 NDN_LOG_TRACE(
"Cannot decode differences, positive: " << positive.size()
181 <<
" negative: " << negative.size() <<
" m_threshold: " 186 if (positive.size() + negative.size() >=
m_threshold ||
187 (positive.size() == 0 && negative.size() == 0)) {
190 if (content.second != 0) {
191 state.
addContent(ndn::Name(content.first).appendNumber(content.second));
196 sendSyncData(interest.getName(), state.
wireEncode());
204 for (
const auto& hash : positive) {
205 auto nameIt =
m_biMap.left.find(hash);
206 if (nameIt !=
m_biMap.left.end()) {
207 ndn::Name nameWithoutSeq = nameIt->second.getPrefix(-1);
210 !isFutureHash(nameWithoutSeq.toUri(), negative)) {
217 NDN_LOG_DEBUG(
"Sending sync content: " << state);
218 sendSyncData(interestName, state.
wireEncode());
222 auto& entry = m_pendingEntries.emplace(interestName, PendingEntryInfo{iblt, {}}).first->second;
223 entry.expirationEvent =
m_scheduler.schedule(interest.getInterestLifetime(),
225 NDN_LOG_TRACE(
"Erase pending Interest " << interest.getNonce());
226 m_pendingEntries.erase(interest.getName());
231 FullProducer::sendSyncData(
const ndn::Name& name,
const ndn::Block& block)
233 NDN_LOG_DEBUG(
"Checking if data will satisfy our own pending interest");
235 ndn::Name nameWithIblt;
239 ndn::Name dataName(ndn::Name(name).appendNumber(std::hash<ndn::Name>{}(nameWithIblt)));
244 if (m_outstandingInterestName == name) {
245 NDN_LOG_DEBUG(
"Satisfied our own pending interest");
248 NDN_LOG_DEBUG(
"Removing our pending interest from face (stop fetcher)");
250 m_outstandingInterestName = ndn::Name(
"");
253 NDN_LOG_DEBUG(
"Sending sync Data");
258 NDN_LOG_TRACE(
"Renewing sync interest");
262 NDN_LOG_DEBUG(
"Sending sync Data");
268 FullProducer::onSyncData(
const ndn::Interest& interest,
const ndn::ConstBufferPtr& bufferPtr)
270 deletePendingInterests(interest.getName());
275 state.wireDecode(ndn::Block(std::move(decompressed)));
277 catch (
const std::exception& e) {
278 NDN_LOG_ERROR(
"Cannot parse received sync Data: " << e.what());
282 NDN_LOG_DEBUG(
"Sync Data received: " << state);
284 std::vector<MissingDataInfo> updates;
286 for (
const auto& content : state) {
287 ndn::Name prefix = content.getPrefix(-1);
288 uint64_t seq = content.get(content.size() - 1).toNumber();
291 updates.push_back({prefix,
m_prefixes[prefix] + 1, seq});
301 if (!updates.empty()) {
303 NDN_LOG_TRACE(
"Renewing sync interest");
307 NDN_LOG_TRACE(
"No new update, interest nonce: " << interest.getNonce() <<
308 " , hash: " << std::hash<ndn::Name>{}(interest.getName()));
313 FullProducer::satisfyPendingInterests()
315 NDN_LOG_DEBUG(
"Satisfying full sync interest: " << m_pendingEntries.size());
317 for (
auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
318 const auto& entry = it->second;
319 auto diff =
m_iblt - entry.iblt;
320 std::set<uint32_t> positive;
321 std::set<uint32_t> negative;
323 if (!diff.listEntries(positive, negative)) {
324 NDN_LOG_TRACE(
"Decode failed for pending interest");
325 if (positive.size() + negative.size() >=
m_threshold ||
326 (positive.size() == 0 && negative.size() == 0)) {
327 NDN_LOG_TRACE(
"pos + neg > threshold or no diff can be found, erase pending interest");
328 it = m_pendingEntries.erase(it);
334 for (
const auto& hash : positive) {
335 auto nameIt =
m_biMap.left.find(hash);
336 if (nameIt !=
m_biMap.left.end()) {
337 if (
m_prefixes[nameIt->second.getPrefix(-1)] != 0) {
344 NDN_LOG_DEBUG(
"Satisfying sync content: " << state);
346 it = m_pendingEntries.erase(it);
355 FullProducer::isFutureHash(
const ndn::Name& prefix,
const std::set<uint32_t>& negative)
358 ndn::Name(prefix).appendNumber(
m_prefixes[prefix] + 1).toUri());
359 return negative.find(nextHash) != negative.end();
363 FullProducer::deletePendingInterests(
const ndn::Name& interestName)
365 auto it = m_pendingEntries.find(interestName);
366 if (it != m_pendingEntries.end()) {
367 NDN_LOG_TRACE(
"Delete pending interest: " << interestName);
368 it = m_pendingEntries.erase(it);
void appendToName(ndn::Name &name) const
Appends self to name.
FullProducer(size_t expectedNumEntries, ndn::Face &face, const ndn::Name &syncPrefix, const ndn::Name &userPrefix, const UpdateCallback &onUpdateCallBack, ndn::time::milliseconds syncInterestLifetime=SYNC_INTEREST_LIFTIME, ndn::time::milliseconds syncReplyFreshness=SYNC_REPLY_FRESHNESS, CompressionScheme ibltCompression=CompressionScheme::DEFAULT, CompressionScheme contentCompression=CompressionScheme::DEFAULT)
constructor
void updateSeqNo(const ndn::Name &prefix, uint64_t seq)
Update m_prefixes and IBF with the given prefix and seq.
Invertible Bloom Lookup Table (Invertible Bloom Filter)
const ndn::Block & wireEncode() const
NDN_LOG_INIT(psync.Consumer)
Full sync logic to synchronize with other nodes where all nodes wants to get all names prefixes synce...
std::shared_ptr< ndn::Buffer > decompress(CompressionScheme scheme, const uint8_t *buffer, size_t bufferSize)
void publishName(const ndn::Name &prefix, ndn::optional< uint64_t > seq=ndn::nullopt)
Publish name to let others know.
void publish(const ndn::Name &interestName, const ndn::Name &dataName, const ndn::Block &block, ndn::time::milliseconds freshness, const ndn::security::SigningInfo &signingInfo=ndn::security::SigningInfo())
Put all the segments in memory.
void addContent(const ndn::Name &prefix)
std::function< void(const std::vector< MissingDataInfo > &)> UpdateCallback
CompressionScheme m_ibltCompression
std::shared_ptr< ndn::Buffer > compress(CompressionScheme scheme, const uint8_t *buffer, size_t bufferSize)
uint32_t m_expectedNumEntries
bool replyFromStore(const ndn::Name &interestName)
Try to reply from memory, return false if we cannot find the segment.
void initialize(const ndn::name::Component &ibltName)
Populate the hash table using the vector representation of IBLT.
CompressionScheme m_contentCompression
const std::vector< ndn::Name > & getContent() const
ndn::random::RandomNumberEngine & m_rng
void onRegisterFailed(const ndn::Name &prefix, const std::string &msg) const
Logs a message if setting an interest filter fails.
ndn::Scheduler m_scheduler
ndn::time::milliseconds m_syncReplyFreshness
SegmentPublisher m_segmentPublisher
Base class for PartialProducer and FullProducer.
uint32_t murmurHash3(const void *key, size_t len, uint32_t seed)
std::map< ndn::Name, uint64_t > m_prefixes