23 #include <ndn-cxx/security/validator-null.hpp>
24 #include <ndn-cxx/util/logger.hpp>
35 double false_positive = 0.001,
36 ndn::time::milliseconds helloInterestLifetime,
37 ndn::time::milliseconds syncInterestLifetime)
39 , m_scheduler(m_face.getIoService())
40 , m_syncPrefix(syncPrefix)
41 , m_helloInterestPrefix(ndn::Name(m_syncPrefix).append(
"hello"))
42 , m_syncInterestPrefix(ndn::Name(m_syncPrefix).append(
"sync"))
43 , m_syncDataContentType(ndn::tlv::ContentType_Blob)
44 , m_onReceiveHelloData(onReceiveHelloData)
45 , m_onUpdate(onUpdate)
46 , m_bloomFilter(count, false_positive)
47 , m_helloInterestLifetime(helloInterestLifetime)
48 , m_syncInterestLifetime(syncInterestLifetime)
49 , m_rng(ndn::random::getRandomNumberEngine())
50 , m_rangeUniformRandom(100, 500)
57 auto it = m_prefixes.emplace(prefix, seqNo);
62 NDN_LOG_DEBUG(
"Subscribing prefix: " << prefix);
64 m_subscriptionList.emplace(prefix);
65 m_bloomFilter.
insert(prefix);
67 if (callSyncDataCb && seqNo != 0) {
68 m_onUpdate({{prefix, seqNo, seqNo, 0}});
80 NDN_LOG_DEBUG(
"Unsubscribing prefix: " << prefix);
82 m_prefixes.erase(prefix);
83 m_subscriptionList.erase(prefix);
86 m_bloomFilter.
clear();
88 for (
const auto& item : m_subscriptionList)
89 m_bloomFilter.
insert(item);
97 NDN_LOG_DEBUG(
"Canceling all the scheduled events");
98 m_scheduler.cancelAllEvents();
101 m_syncFetcher->stop();
102 m_syncFetcher.reset();
105 if (m_helloFetcher) {
106 m_helloFetcher->stop();
107 m_helloFetcher.reset();
114 ndn::Interest helloInterest(m_helloInterestPrefix);
115 NDN_LOG_DEBUG(
"Send Hello Interest " << helloInterest);
117 if (m_helloFetcher) {
118 m_helloFetcher->stop();
121 using ndn::util::SegmentFetcher;
122 SegmentFetcher::Options options;
123 options.interestLifetime = m_helloInterestLifetime;
124 options.maxTimeout = m_helloInterestLifetime;
125 options.rttOptions.initialRto = m_syncInterestLifetime;
127 m_helloFetcher = SegmentFetcher::start(m_face, helloInterest,
128 ndn::security::getAcceptAllValidator(), options);
130 m_helloFetcher->afterSegmentValidated.connect([
this] (
const ndn::Data& data) {
131 if (data.getFinalBlock()) {
132 m_helloDataName = data.getName().getPrefix(-2);
136 m_helloFetcher->onComplete.connect([
this] (
const ndn::ConstBufferPtr& bufferPtr) {
137 onHelloData(bufferPtr);
140 m_helloFetcher->onError.connect([
this] (uint32_t errorCode,
const std::string& msg) {
141 NDN_LOG_TRACE(
"Cannot fetch hello data, error: " << errorCode <<
" message: " << msg);
142 ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
143 NDN_LOG_TRACE(
"Scheduling after " << after);
149 Consumer::onHelloData(
const ndn::ConstBufferPtr& bufferPtr)
151 NDN_LOG_DEBUG(
"On Hello Data");
154 m_iblt = m_helloDataName.getSubName(m_helloDataName.size() - 1, 1);
156 NDN_LOG_TRACE(
"m_iblt: " << std::hash<ndn::Name>{}(m_iblt));
158 detail::State state{ndn::Block(bufferPtr)};
159 std::vector<MissingDataInfo> updates;
160 std::map<ndn::Name, uint64_t> availableSubscriptions;
162 NDN_LOG_DEBUG(
"Hello Data: " << state);
164 for (
const auto& content : state) {
165 const ndn::Name& prefix = content.getPrefix(-1);
166 uint64_t seq = content.get(content.size() - 1).toNumber();
171 updates.push_back({prefix, m_prefixes[prefix] + 1, seq, 0});
172 m_prefixes[prefix] = seq;
174 availableSubscriptions.emplace(prefix, seq);
177 m_onReceiveHelloData(availableSubscriptions);
179 if (!updates.empty()) {
180 NDN_LOG_DEBUG(
"Updating application with missed updates");
188 BOOST_ASSERT(!m_iblt.empty());
190 ndn::Name syncInterestName(m_syncInterestPrefix);
196 syncInterestName.append(m_iblt);
198 ndn::Interest syncInterest(syncInterestName);
200 NDN_LOG_DEBUG(
"sendSyncInterest, nonce: " << syncInterest.getNonce() <<
201 " hash: " << std::hash<ndn::Name>{}(syncInterest.getName()));
204 m_syncFetcher->stop();
207 using ndn::util::SegmentFetcher;
208 SegmentFetcher::Options options;
209 options.interestLifetime = m_syncInterestLifetime;
210 options.maxTimeout = m_syncInterestLifetime;
211 options.rttOptions.initialRto = m_syncInterestLifetime;
213 m_syncFetcher = SegmentFetcher::start(m_face, syncInterest,
214 ndn::security::getAcceptAllValidator(), options);
216 m_syncFetcher->afterSegmentValidated.connect([
this] (
const ndn::Data& data) {
217 if (data.getFinalBlock()) {
218 m_syncDataName = data.getName().getPrefix(-2);
219 m_syncDataContentType = data.getContentType();
222 if (m_syncDataContentType == ndn::tlv::ContentType_Nack) {
223 NDN_LOG_DEBUG(
"Received application Nack from producer, sending hello again");
228 m_syncFetcher->onComplete.connect([
this] (
const ndn::ConstBufferPtr& bufferPtr) {
229 if (m_syncDataContentType == ndn::tlv::ContentType_Nack) {
230 m_syncDataContentType = ndn::tlv::ContentType_Blob;
233 NDN_LOG_TRACE(
"Segment fetcher got sync data");
234 onSyncData(bufferPtr);
237 m_syncFetcher->onError.connect([
this] (uint32_t errorCode,
const std::string& msg) {
238 NDN_LOG_TRACE(
"Cannot fetch sync data, error: " << errorCode <<
" message: " << msg);
239 if (errorCode == SegmentFetcher::ErrorCode::INTEREST_TIMEOUT) {
243 ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
244 NDN_LOG_TRACE(
"Scheduling sync Interest after: " << after);
251 Consumer::onSyncData(
const ndn::ConstBufferPtr& bufferPtr)
254 m_iblt = m_syncDataName.getSubName(m_syncDataName.size() - 1, 1);
257 std::vector<MissingDataInfo> updates;
259 for (
const auto& content : state) {
260 NDN_LOG_DEBUG(content);
261 const ndn::Name& prefix = content.getPrefix(-1);
262 uint64_t seq = content.get(content.size() - 1).toNumber();
263 if (m_prefixes.find(prefix) == m_prefixes.end() || seq > m_prefixes[prefix]) {
266 updates.push_back({prefix, m_prefixes[prefix] + 1, seq, 0});
267 m_prefixes[prefix] = seq;
272 NDN_LOG_DEBUG(
"Sync Data: " << state);
274 if (!updates.empty()) {
Consumer logic to subscribe to producer's data.
bool addSubscription(const ndn::Name &prefix, uint64_t seqNo, bool callSyncDataCb=true)
Add prefix to subscription list.
bool removeSubscription(const ndn::Name &prefix)
Remove prefix from subscription list.
void stop()
Stop segment fetcher to stop the sync and free resources.
Consumer(const ndn::Name &syncPrefix, ndn::Face &face, const ReceiveHelloCallback &onReceiveHelloData, const UpdateCallback &onUpdate, unsigned int count, double false_positive, ndn::time::milliseconds helloInterestLifetime=HELLO_INTEREST_LIFETIME, ndn::time::milliseconds syncInterestLifetime=SYNC_INTEREST_LIFETIME)
constructor
void sendHelloInterest()
send hello interest /<sync-prefix>/hello/
void sendSyncInterest()
send sync interest /<sync-prefix>/sync/<BF>/<producers-IBF>
bool isSubscribed(const ndn::Name &prefix) const
void insert(const ndn::Name &key)
void appendToName(ndn::Name &name) const
Append our bloom filter to the given name.
std::function< void(const std::vector< MissingDataInfo > &)> UpdateCallback
std::function< void(const std::map< ndn::Name, uint64_t > &)> ReceiveHelloCallback