23 #include <ndn-cxx/security/validator-null.hpp> 24 #include <ndn-cxx/util/logger.hpp> 35 double false_positive = 0.001,
36 ndn::time::milliseconds helloInterestLifetime,
37 ndn::time::milliseconds syncInterestLifetime)
39 , m_scheduler(m_face.getIoService())
40 , m_syncPrefix(syncPrefix)
41 , m_helloInterestPrefix(ndn::Name(m_syncPrefix).append(
"hello"))
42 , m_syncInterestPrefix(ndn::Name(m_syncPrefix).append(
"sync"))
43 , m_syncDataContentType(ndn::tlv::ContentType_Blob)
44 , m_onReceiveHelloData(onReceiveHelloData)
45 , m_onUpdate(onUpdate)
46 , m_bloomFilter(count, false_positive)
47 , m_helloInterestLifetime(helloInterestLifetime)
48 , m_syncInterestLifetime(syncInterestLifetime)
49 , m_rng(ndn::random::getRandomNumberEngine())
50 , m_rangeUniformRandom(100, 500)
57 auto it = m_prefixes.emplace(prefix, seqNo);
62 m_subscriptionList.emplace(prefix);
63 m_bloomFilter.
insert(prefix.toUri());
65 if (callSyncDataCb && seqNo != 0) {
66 m_onUpdate({{prefix, seqNo, seqNo}});
75 m_scheduler.cancelAllEvents();
78 m_syncFetcher->stop();
79 m_syncFetcher.reset();
83 m_helloFetcher->stop();
84 m_helloFetcher.reset();
91 ndn::Interest helloInterest(m_helloInterestPrefix);
92 NDN_LOG_DEBUG(
"Send Hello Interest " << helloInterest);
95 m_helloFetcher->stop();
98 using ndn::util::SegmentFetcher;
99 SegmentFetcher::Options options;
100 options.interestLifetime = m_helloInterestLifetime;
101 options.maxTimeout = m_helloInterestLifetime;
102 options.rttOptions.initialRto = m_syncInterestLifetime;
104 m_helloFetcher = SegmentFetcher::start(m_face, helloInterest,
105 ndn::security::getAcceptAllValidator(), options);
107 m_helloFetcher->afterSegmentValidated.connect([
this] (
const ndn::Data& data) {
108 if (data.getFinalBlock()) {
109 m_helloDataName = data.getName().getPrefix(-2);
113 m_helloFetcher->onComplete.connect([
this] (
const ndn::ConstBufferPtr& bufferPtr) {
114 onHelloData(bufferPtr);
117 m_helloFetcher->onError.connect([
this] (uint32_t errorCode,
const std::string& msg) {
118 NDN_LOG_TRACE(
"Cannot fetch hello data, error: " << errorCode <<
" message: " << msg);
119 ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
120 NDN_LOG_TRACE(
"Scheduling after " << after);
126 Consumer::onHelloData(
const ndn::ConstBufferPtr& bufferPtr)
128 NDN_LOG_DEBUG(
"On Hello Data");
131 m_iblt = m_helloDataName.getSubName(m_helloDataName.size() - 1, 1);
133 NDN_LOG_TRACE(
"m_iblt: " << std::hash<std::string>{}(m_iblt.toUri()));
136 std::vector<MissingDataInfo> updates;
137 std::map<ndn::Name, uint64_t> availableSubscriptions;
139 NDN_LOG_DEBUG(
"Hello Data: " << state);
141 for (
const auto& content : state) {
142 const ndn::Name& prefix = content.getPrefix(-1);
143 uint64_t seq = content.get(content.size() - 1).toNumber();
148 updates.push_back({prefix, m_prefixes[prefix] + 1, seq});
149 m_prefixes[prefix] = seq;
151 availableSubscriptions.emplace(prefix, seq);
154 m_onReceiveHelloData(availableSubscriptions);
156 if (!updates.empty()) {
157 NDN_LOG_DEBUG(
"Updating application with missed updates");
165 BOOST_ASSERT(!m_iblt.empty());
167 ndn::Name syncInterestName(m_syncInterestPrefix);
173 syncInterestName.append(m_iblt);
175 ndn::Interest syncInterest(syncInterestName);
177 NDN_LOG_DEBUG(
"sendSyncInterest, nonce: " << syncInterest.getNonce() <<
178 " hash: " << std::hash<std::string>{}(syncInterest.getName().toUri()));
181 m_syncFetcher->stop();
184 using ndn::util::SegmentFetcher;
185 SegmentFetcher::Options options;
186 options.interestLifetime = m_syncInterestLifetime;
187 options.maxTimeout = m_syncInterestLifetime;
188 options.rttOptions.initialRto = m_syncInterestLifetime;
190 m_syncFetcher = SegmentFetcher::start(m_face, syncInterest,
191 ndn::security::getAcceptAllValidator(), options);
193 m_syncFetcher->afterSegmentValidated.connect([
this] (
const ndn::Data& data) {
194 if (data.getFinalBlock()) {
195 m_syncDataName = data.getName().getPrefix(-2);
196 m_syncDataContentType = data.getContentType();
199 if (m_syncDataContentType == ndn::tlv::ContentType_Nack) {
200 NDN_LOG_DEBUG(
"Received application Nack from producer, sending hello again");
205 m_syncFetcher->onComplete.connect([
this] (
const ndn::ConstBufferPtr& bufferPtr) {
206 if (m_syncDataContentType == ndn::tlv::ContentType_Nack) {
207 m_syncDataContentType = ndn::tlv::ContentType_Blob;
210 NDN_LOG_TRACE(
"Segment fetcher got sync data");
211 onSyncData(bufferPtr);
214 m_syncFetcher->onError.connect([
this] (uint32_t errorCode,
const std::string& msg) {
215 NDN_LOG_TRACE(
"Cannot fetch sync data, error: " << errorCode <<
" message: " << msg);
216 if (errorCode == SegmentFetcher::ErrorCode::INTEREST_TIMEOUT) {
220 ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
221 NDN_LOG_TRACE(
"Scheduling sync Interest after: " << after);
228 Consumer::onSyncData(
const ndn::ConstBufferPtr& bufferPtr)
231 m_iblt = m_syncDataName.getSubName(m_syncDataName.size() - 1, 1);
234 std::vector<MissingDataInfo> updates;
236 for (
const auto& content : state) {
237 NDN_LOG_DEBUG(content);
238 const ndn::Name& prefix = content.getPrefix(-1);
239 uint64_t seq = content.get(content.size() - 1).toNumber();
240 if (m_prefixes.find(prefix) == m_prefixes.end() || seq > m_prefixes[prefix]) {
243 updates.push_back({prefix, m_prefixes[prefix] + 1, seq});
244 m_prefixes[prefix] = seq;
249 NDN_LOG_DEBUG(
"Sync Data: " << state);
251 if (!updates.empty()) {
Consumer logic to subscribe to producer's data.
void insert(const std::string &key)
NDN_LOG_INIT(psync.Consumer)
Consumer(const ndn::Name &syncPrefix, ndn::Face &face, const ReceiveHelloCallback &onReceiveHelloData, const UpdateCallback &onUpdate, unsigned int count, double false_positive, ndn::time::milliseconds helloInterestLifetime=HELLO_INTEREST_LIFETIME, ndn::time::milliseconds syncInterestLifetime=SYNC_INTEREST_LIFETIME)
constructor
std::function< void(const std::map< ndn::Name, uint64_t > &)> ReceiveHelloCallback
bool isSubscribed(const ndn::Name &prefix) const
void sendSyncInterest()
send sync interest /<sync-prefix>/sync/<BF>/<producers-IBF>
std::function< void(const std::vector< MissingDataInfo > &)> UpdateCallback
bool addSubscription(const ndn::Name &prefix, uint64_t seqNo, bool callSyncDataCb=true)
Add prefix to subscription list.
void stop()
Stop segment fetcher to stop the sync and free resources.
void sendHelloInterest()
send hello interest /<sync-prefix>/hello/
void appendToName(ndn::Name &name) const
Append our bloom filter to the given name.