22 #include <ndn-cxx/util/logger.hpp> 23 #include <ndn-cxx/util/segment-fetcher.hpp> 24 #include <ndn-cxx/security/validator-null.hpp> 36 const ndn::Name& syncPrefix,
37 const ndn::Name& userPrefix,
39 ndn::time::milliseconds syncInterestLifetime,
40 ndn::time::milliseconds syncReplyFreshness)
41 :
ProducerBase(expectedNumEntries, face, syncPrefix, userPrefix, syncReplyFreshness)
42 , m_syncInterestLifetime(syncInterestLifetime)
43 , m_onUpdate(onUpdateCallBack)
45 int jitter = m_syncInterestLifetime.count() * .20;
46 m_jitter = std::uniform_int_distribution<>(-jitter, jitter);
48 m_registeredPrefix =
m_face.setInterestFilter(
50 std::bind(&FullProducer::onSyncInterest,
this, _1, _2),
69 NDN_LOG_WARN(
"Prefix not added: " << prefix);
73 uint64_t newSeq = seq.value_or(
m_prefixes[prefix] + 1);
75 NDN_LOG_INFO(
"Publish: "<< prefix <<
"/" << newSeq);
79 satisfyPendingInterests();
83 FullProducer::sendSyncInterest()
98 m_outstandingInterestName = syncInterestName;
100 m_scheduledSyncInterestId =
101 m_scheduler.scheduleEvent(m_syncInterestLifetime / 2 + ndn::time::milliseconds(m_jitter(
m_rng)),
102 [
this] { sendSyncInterest(); });
104 ndn::Interest syncInterest(syncInterestName);
106 ndn::util::SegmentFetcher::Options options;
107 options.interestLifetime = m_syncInterestLifetime;
108 options.maxTimeout = m_syncInterestLifetime;
110 m_fetcher = ndn::util::SegmentFetcher::start(
m_face,
112 ndn::security::v2::getAcceptAllValidator(),
115 m_fetcher->onComplete.connect([
this, syncInterest] (ndn::ConstBufferPtr bufferPtr) {
116 onSyncData(syncInterest, bufferPtr);
119 m_fetcher->onError.connect([] (uint32_t errorCode,
const std::string& msg) {
120 NDN_LOG_ERROR(
"Cannot fetch sync data, error: " <<
121 errorCode <<
" message: " << msg);
124 NDN_LOG_DEBUG(
"sendFullSyncInterest, nonce: " << syncInterest.getNonce() <<
125 ", hash: " << std::hash<ndn::Name>{}(syncInterestName));
129 FullProducer::onSyncInterest(
const ndn::Name& prefixName,
const ndn::Interest& interest)
135 ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefixName.size());
136 ndn::Name interestName;
138 if (nameWithoutSyncPrefix.size() == 1) {
140 interestName = interest.getName();
142 else if (nameWithoutSyncPrefix.size() == 3) {
144 interestName = interest.getName().getPrefix(-2);
150 ndn::name::Component ibltName = interestName.get(interestName.size()-1);
152 NDN_LOG_DEBUG(
"Full Sync Interest Received, nonce: " << interest.getNonce() <<
153 ", hash: " << std::hash<ndn::Name>{}(interestName));
159 catch (
const std::exception& e) {
160 NDN_LOG_WARN(e.what());
166 std::set<uint32_t> positive;
167 std::set<uint32_t> negative;
170 NDN_LOG_TRACE(
"Cannot decode differences, positive: " << positive.size()
171 <<
" negative: " << negative.size() <<
" m_threshold: " 176 if (positive.size() + negative.size() >=
m_threshold ||
177 (positive.size() == 0 && negative.size() == 0)) {
180 if (content.second != 0) {
181 state.
addContent(ndn::Name(content.first).appendNumber(content.second));
195 for (
const auto& hash : positive) {
198 if (
m_prefixes[prefix] != 0 && !isFutureHash(prefix.toUri(), negative)) {
204 NDN_LOG_DEBUG(
"Sending sync content: " << state);
205 sendSyncData(interestName, state.
wireEncode());
209 auto& entry = m_pendingEntries.emplace(interestName,
PendingEntryInfoFull{iblt, {}}).first->second;
210 entry.expirationEvent =
m_scheduler.scheduleEvent(interest.getInterestLifetime(),
212 NDN_LOG_TRACE(
"Erase Pending Interest " << interest.getNonce());
213 m_pendingEntries.erase(interest.getName());
218 FullProducer::sendSyncData(
const ndn::Name& name,
const ndn::Block& block)
220 NDN_LOG_DEBUG(
"Checking if data will satisfy our own pending interest");
222 ndn::Name nameWithIblt;
226 ndn::Name dataName(ndn::Name(name).appendNumber(std::hash<ndn::Name>{}(nameWithIblt)));
229 if (m_outstandingInterestName == name) {
230 NDN_LOG_DEBUG(
"Satisfied our own pending interest");
233 NDN_LOG_DEBUG(
"Removing our pending interest from face (stop fetcher)");
235 m_outstandingInterestName = ndn::Name(
"");
238 NDN_LOG_DEBUG(
"Sending Sync Data");
243 NDN_LOG_TRACE(
"Renewing sync interest");
247 NDN_LOG_DEBUG(
"Sending Sync Data");
253 FullProducer::onSyncData(
const ndn::Interest& interest,
const ndn::ConstBufferPtr& bufferPtr)
255 deletePendingInterests(interest.getName());
257 State state(ndn::Block(std::move(bufferPtr)));
258 std::vector<MissingDataInfo> updates;
260 NDN_LOG_DEBUG(
"Sync Data Received: " << state);
262 for (
const auto& content : state.getContent()) {
263 ndn::Name prefix = content.getPrefix(-1);
264 uint64_t seq = content.get(content.size()-1).toNumber();
277 if (!updates.empty()) {
279 NDN_LOG_TRACE(
"Renewing sync interest");
283 NDN_LOG_TRACE(
"No new update, interest nonce: " << interest.getNonce() <<
284 " , hash: " << std::hash<ndn::Name>{}(interest.getName()));
289 FullProducer::satisfyPendingInterests()
291 NDN_LOG_DEBUG(
"Satisfying full sync interest: " << m_pendingEntries.size());
293 for (
auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
296 std::set<uint32_t> positive;
297 std::set<uint32_t> negative;
300 NDN_LOG_TRACE(
"Decode failed for pending interest");
301 if (positive.size() + negative.size() >=
m_threshold ||
302 (positive.size() == 0 && negative.size() == 0)) {
303 NDN_LOG_TRACE(
"pos + neg > threshold or no diff can be found, erase pending interest");
304 m_pendingEntries.erase(it++);
310 for (
const auto& hash : positive) {
319 NDN_LOG_DEBUG(
"Satisfying sync content: " << state);
321 m_pendingEntries.erase(it++);
330 FullProducer::isFutureHash(
const ndn::Name& prefix,
const std::set<uint32_t>& negative)
333 ndn::Name(prefix).appendNumber(
m_prefixes[prefix] + 1).toUri());
334 for (
const auto& nHash : negative) {
335 if (nHash == nextHash) {
344 FullProducer::deletePendingInterests(
const ndn::Name& interestName) {
345 for (
auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
346 if (it->first == interestName) {
347 NDN_LOG_TRACE(
"Delete pending interest: " << interestName);
348 m_pendingEntries.erase(it++);
void onRegisterFailed(const ndn::Name &prefix, const std::string &msg) const
Logs a message if setting an interest filter fails.
void updateSeqNo(const ndn::Name &prefix, uint64_t seq)
Update m_prefixes and IBF with the given prefix and seq.
void initialize(const ndn::name::Component &ibltName)
Populate the hash table using the vector representation of IBLT.
void addContent(const ndn::Name &prefix)
std::vector< ndn::Name > getContent() const
const ndn::Block & wireEncode() const
bool listEntries(std::set< uint32_t > &positive, std::set< uint32_t > &negative) const
List all the entries in the IBLT.
Full sync logic to synchronize with other nodes where all nodes wants to get all names prefixes synce...
uint32_t murmurHash3(uint32_t nHashSeed, const std::vector< unsigned char > &vDataToHash)
void publishName(const ndn::Name &prefix, ndn::optional< uint64_t > seq=ndn::nullopt)
Publish name to let others know.
Invertible Bloom Lookup Table (Invertible Bloom Filter)
void publish(const ndn::Name &interestName, const ndn::Name &dataName, const ndn::Block &block, ndn::time::milliseconds freshness, const ndn::security::SigningInfo &signingInfo=ndn::security::v2::KeyChain::getDefaultSigningInfo())
Put all the segments in memory.
FullProducer(size_t expectedNumEntries, ndn::Face &face, const ndn::Name &syncPrefix, const ndn::Name &userPrefix, const UpdateCallback &onUpdateCallBack, ndn::time::milliseconds syncInterestLifetime=SYNC_INTEREST_LIFTIME, ndn::time::milliseconds syncReplyFreshness=SYNC_REPLY_FRESHNESS)
constructor
uint32_t m_expectedNumEntries
bool replyFromStore(const ndn::Name &interestName)
Try to reply from memory, return false if we cannot find the segment.
std::map< uint32_t, ndn::Name > m_hash2prefix
ndn::random::RandomNumberEngine & m_rng
ndn::Scheduler m_scheduler
std::function< void(const std::vector< MissingDataInfo > &)> UpdateCallback
ndn::time::milliseconds m_syncReplyFreshness
SegmentPublisher m_segmentPublisher
void appendToName(ndn::Name &name) const
Appends self to name.
Base class for PartialProducer and FullProducer.
std::map< ndn::Name, uint64_t > m_prefixes