23 #include <ndn-cxx/security/validator-null.hpp>
24 #include <ndn-cxx/util/logger.hpp>
32 , m_scheduler(m_face.getIoContext())
33 , m_syncPrefix(syncPrefix)
34 , m_helloInterestPrefix(ndn::Name(m_syncPrefix).append(
"hello"))
35 , m_syncInterestPrefix(ndn::Name(m_syncPrefix).append(
"sync"))
36 , m_syncDataContentType(ndn::tlv::ContentType_Blob)
37 , m_onReceiveHelloData(opts.onHelloData)
38 , m_onUpdate(opts.onUpdate)
39 , m_bloomFilter(opts.bfCount, opts.bfFalsePositive)
40 , m_helloInterestLifetime(opts.helloInterestLifetime)
41 , m_syncInterestLifetime(opts.syncInterestLifetime)
42 , m_rng(ndn::random::getRandomNumberEngine())
43 , m_rangeUniformRandom(100, 500)
53 ndn::time::milliseconds helloInterestLifetime,
54 ndn::time::milliseconds syncInterestLifetime)
56 Options{onReceiveHelloData, onUpdate, static_cast<uint32_t>(count), falsePositive,
57 helloInterestLifetime, syncInterestLifetime})
64 auto it = m_prefixes.emplace(prefix, seqNo);
69 NDN_LOG_DEBUG(
"Subscribing prefix: " << prefix);
71 m_subscriptionList.emplace(prefix);
72 m_bloomFilter.
insert(prefix);
74 if (callSyncDataCb && seqNo != 0) {
75 m_onUpdate({{prefix, seqNo, seqNo, 0}});
87 NDN_LOG_DEBUG(
"Unsubscribing prefix: " << prefix);
89 m_prefixes.erase(prefix);
90 m_subscriptionList.erase(prefix);
93 m_bloomFilter.
clear();
95 for (
const auto& item : m_subscriptionList)
96 m_bloomFilter.
insert(item);
104 NDN_LOG_DEBUG(
"Canceling all the scheduled events");
105 m_scheduler.cancelAllEvents();
108 m_syncFetcher->stop();
109 m_syncFetcher.reset();
112 if (m_helloFetcher) {
113 m_helloFetcher->stop();
114 m_helloFetcher.reset();
121 ndn::Interest helloInterest(m_helloInterestPrefix);
122 NDN_LOG_DEBUG(
"Send Hello Interest " << helloInterest);
124 if (m_helloFetcher) {
125 m_helloFetcher->stop();
128 using ndn::SegmentFetcher;
129 SegmentFetcher::Options options;
130 options.interestLifetime = m_helloInterestLifetime;
131 options.maxTimeout = m_helloInterestLifetime;
132 options.rttOptions.initialRto = m_syncInterestLifetime;
134 m_helloFetcher = SegmentFetcher::start(m_face, helloInterest,
135 ndn::security::getAcceptAllValidator(), options);
137 m_helloFetcher->afterSegmentValidated.connect([
this] (
const ndn::Data& data) {
138 if (data.getFinalBlock()) {
139 m_helloDataName = data.getName().getPrefix(-2);
143 m_helloFetcher->onComplete.connect([
this] (
const ndn::ConstBufferPtr& bufferPtr) {
144 onHelloData(bufferPtr);
147 m_helloFetcher->onError.connect([
this] (uint32_t errorCode,
const std::string& msg) {
148 NDN_LOG_TRACE(
"Cannot fetch hello data, error: " << errorCode <<
" message: " << msg);
149 ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
150 NDN_LOG_TRACE(
"Scheduling after " << after);
156 Consumer::onHelloData(
const ndn::ConstBufferPtr& bufferPtr)
158 NDN_LOG_DEBUG(
"On Hello Data");
161 m_iblt = m_helloDataName.getSubName(m_helloDataName.size() - 1, 1);
163 NDN_LOG_TRACE(
"m_iblt: " << std::hash<ndn::Name>{}(m_iblt));
165 detail::State state{ndn::Block(bufferPtr)};
166 std::vector<MissingDataInfo> updates;
167 std::map<ndn::Name, uint64_t> availableSubscriptions;
169 NDN_LOG_DEBUG(
"Hello Data: " << state);
171 for (
const auto& content : state) {
172 const ndn::Name& prefix = content.getPrefix(-1);
173 uint64_t seq = content.get(content.size() - 1).toNumber();
178 updates.push_back({prefix, m_prefixes[prefix] + 1, seq, 0});
179 m_prefixes[prefix] = seq;
181 availableSubscriptions.emplace(prefix, seq);
184 m_onReceiveHelloData(availableSubscriptions);
186 if (!updates.empty()) {
187 NDN_LOG_DEBUG(
"Updating application with missed updates");
195 BOOST_ASSERT(!m_iblt.empty());
197 ndn::Name syncInterestName(m_syncInterestPrefix);
203 syncInterestName.append(m_iblt);
205 ndn::Interest syncInterest(syncInterestName);
207 NDN_LOG_DEBUG(
"sendSyncInterest, nonce: " << syncInterest.getNonce() <<
208 " hash: " << std::hash<ndn::Name>{}(syncInterest.getName()));
211 m_syncFetcher->stop();
214 using ndn::SegmentFetcher;
215 SegmentFetcher::Options options;
216 options.interestLifetime = m_syncInterestLifetime;
217 options.maxTimeout = m_syncInterestLifetime;
218 options.rttOptions.initialRto = m_syncInterestLifetime;
220 m_syncFetcher = SegmentFetcher::start(m_face, syncInterest,
221 ndn::security::getAcceptAllValidator(), options);
223 m_syncFetcher->afterSegmentValidated.connect([
this] (
const ndn::Data& data) {
224 if (data.getFinalBlock()) {
225 m_syncDataName = data.getName().getPrefix(-2);
226 m_syncDataContentType = data.getContentType();
229 if (m_syncDataContentType == ndn::tlv::ContentType_Nack) {
230 NDN_LOG_DEBUG(
"Received application Nack from producer, sending hello again");
235 m_syncFetcher->onComplete.connect([
this] (
const ndn::ConstBufferPtr& bufferPtr) {
236 if (m_syncDataContentType == ndn::tlv::ContentType_Nack) {
237 m_syncDataContentType = ndn::tlv::ContentType_Blob;
240 NDN_LOG_TRACE(
"Segment fetcher got sync data");
241 onSyncData(bufferPtr);
244 m_syncFetcher->onError.connect([
this] (uint32_t errorCode,
const std::string& msg) {
245 NDN_LOG_TRACE(
"Cannot fetch sync data, error: " << errorCode <<
" message: " << msg);
246 if (errorCode == SegmentFetcher::ErrorCode::INTEREST_TIMEOUT) {
250 ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
251 NDN_LOG_TRACE(
"Scheduling sync Interest after: " << after);
258 Consumer::onSyncData(
const ndn::ConstBufferPtr& bufferPtr)
261 m_iblt = m_syncDataName.getSubName(m_syncDataName.size() - 1, 1);
264 std::vector<MissingDataInfo> updates;
266 for (
const auto& content : state) {
267 NDN_LOG_DEBUG(content);
268 const ndn::Name& prefix = content.getPrefix(-1);
269 uint64_t seq = content.get(content.size() - 1).toNumber();
270 if (m_prefixes.find(prefix) == m_prefixes.end() || seq > m_prefixes[prefix]) {
273 updates.push_back({prefix, m_prefixes[prefix] + 1, seq, 0});
274 m_prefixes[prefix] = seq;
279 NDN_LOG_DEBUG(
"Sync Data: " << state);
281 if (!updates.empty()) {
Consumer logic to subscribe to producer's data.
bool addSubscription(const ndn::Name &prefix, uint64_t seqNo, bool callSyncDataCb=true)
Add prefix to subscription list.
bool removeSubscription(const ndn::Name &prefix)
Remove prefix from subscription list.
void stop()
Stop segment fetcher to stop the sync and free resources.
void sendHelloInterest()
send hello interest /<sync-prefix>/hello/
Consumer(ndn::Face &face, const ndn::Name &syncPrefix, const Options &opts)
Constructor.
void sendSyncInterest()
send sync interest /<sync-prefix>/sync/<BF>/<producers-IBF>
bool isSubscribed(const ndn::Name &prefix) const
void insert(const ndn::Name &key)
void appendToName(ndn::Name &name) const
Append our bloom filter to the given name.
std::function< void(const std::vector< MissingDataInfo > &)> UpdateCallback
std::function< void(const std::map< ndn::Name, uint64_t > &)> ReceiveHelloCallback