full-producer.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2020, The University of Memphis
4  *
5  * This file is part of PSync.
6  * See AUTHORS.md for complete list of PSync authors and contributors.
7  *
8  * PSync is free software: you can redistribute it and/or modify it under the terms
9  * of the GNU Lesser General Public License as published by the Free Software Foundation,
10  * either version 3 of the License, or (at your option) any later version.
11  *
12  * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14  * PURPOSE. See the GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License along with
17  * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18  **/
19 
20 #include "PSync/full-producer.hpp"
21 
22 #include <ndn-cxx/util/logger.hpp>
23 #include <ndn-cxx/util/segment-fetcher.hpp>
24 #include <ndn-cxx/security/validator-null.hpp>
25 
26 #include <cstring>
27 #include <limits>
28 #include <functional>
29 
30 namespace psync {
31 
33 
34 FullProducer::FullProducer(const size_t expectedNumEntries,
35  ndn::Face& face,
36  const ndn::Name& syncPrefix,
37  const ndn::Name& userPrefix,
38  const UpdateCallback& onUpdateCallBack,
39  ndn::time::milliseconds syncInterestLifetime,
40  ndn::time::milliseconds syncReplyFreshness,
41  CompressionScheme ibltCompression,
42  CompressionScheme contentCompression)
43  : ProducerBase(expectedNumEntries, face, syncPrefix, userPrefix, syncReplyFreshness,
44  ibltCompression, contentCompression)
45  , m_syncInterestLifetime(syncInterestLifetime)
46  , m_onUpdate(onUpdateCallBack)
47  , m_jitter(100, 500)
48 {
49  m_registeredPrefix = m_face.setInterestFilter(
50  ndn::InterestFilter(m_syncPrefix).allowLoopback(false),
51  std::bind(&FullProducer::onSyncInterest, this, _1, _2),
52  std::bind(&FullProducer::onRegisterFailed, this, _1, _2));
53 
54  // Should we do this after setInterestFilter success call back
55  // (Currently following ChronoSync's way)
56  sendSyncInterest();
57 }
58 
60 {
61  if (m_fetcher) {
62  m_fetcher->stop();
63  }
64 }
65 
66 void
67 FullProducer::publishName(const ndn::Name& prefix, ndn::optional<uint64_t> seq)
68 {
69  if (m_prefixes.find(prefix) == m_prefixes.end()) {
70  NDN_LOG_WARN("Prefix not added: " << prefix);
71  return;
72  }
73 
74  uint64_t newSeq = seq.value_or(m_prefixes[prefix] + 1);
75 
76  NDN_LOG_INFO("Publish: "<< prefix << "/" << newSeq);
77 
78  updateSeqNo(prefix, newSeq);
79 
80  satisfyPendingInterests();
81 }
82 
83 void
84 FullProducer::sendSyncInterest()
85 {
86  // If we send two sync interest one after the other
87  // since there is no new data in the network yet,
88  // when data is available it may satisfy both of them
89  if (m_fetcher) {
90  m_fetcher->stop();
91  }
92 
93  // Sync Interest format for full sync: /<sync-prefix>/<ourLatestIBF>
94  ndn::Name syncInterestName = m_syncPrefix;
95 
96  // Append our latest IBF
97  m_iblt.appendToName(syncInterestName);
98 
99  m_outstandingInterestName = syncInterestName;
100 
101  m_scheduledSyncInterestId =
102  m_scheduler.schedule(m_syncInterestLifetime / 2 + ndn::time::milliseconds(m_jitter(m_rng)),
103  [this] { sendSyncInterest(); });
104 
105  ndn::Interest syncInterest(syncInterestName);
106 
107  using ndn::util::SegmentFetcher;
108  SegmentFetcher::Options options;
109  options.interestLifetime = m_syncInterestLifetime;
110  options.maxTimeout = m_syncInterestLifetime;
111  options.rttOptions.initialRto = m_syncInterestLifetime;
112 
113  m_fetcher = SegmentFetcher::start(m_face, syncInterest,
114  ndn::security::v2::getAcceptAllValidator(), options);
115 
116  m_fetcher->onComplete.connect([this, syncInterest] (const ndn::ConstBufferPtr& bufferPtr) {
117  onSyncData(syncInterest, bufferPtr);
118  });
119 
120  m_fetcher->onError.connect([this] (uint32_t errorCode, const std::string& msg) {
121  NDN_LOG_ERROR("Cannot fetch sync data, error: " << errorCode << " message: " << msg);
122  if (errorCode == SegmentFetcher::ErrorCode::NACK_ERROR) {
123  auto after = ndn::time::milliseconds(m_jitter(m_rng));
124  NDN_LOG_DEBUG("Schedule sync interest after: " << after);
125  m_scheduledSyncInterestId = m_scheduler.schedule(after, [this] { sendSyncInterest(); });
126  }
127  });
128 
129  NDN_LOG_DEBUG("sendFullSyncInterest, nonce: " << syncInterest.getNonce() <<
130  ", hash: " << std::hash<ndn::Name>{}(syncInterestName));
131 }
132 
133 void
134 FullProducer::onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& interest)
135 {
136  // TODO: answer only segments from store.
137  if (m_segmentPublisher.replyFromStore(interest.getName())) {
138  return;
139  }
140 
141  ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefixName.size());
142  ndn::Name interestName;
143 
144  if (nameWithoutSyncPrefix.size() == 1) {
145  // Get /<prefix>/IBF from /<prefix>/IBF
146  interestName = interest.getName();
147  }
148  else if (nameWithoutSyncPrefix.size() == 3) {
149  // Get /<prefix>/IBF from /<prefix>/IBF/<version>/<segment-no>
150  interestName = interest.getName().getPrefix(-2);
151  }
152  else {
153  return;
154  }
155 
156  ndn::name::Component ibltName = interestName.get(interestName.size()-1);
157 
158  NDN_LOG_DEBUG("Full Sync Interest Received, nonce: " << interest.getNonce() <<
159  ", hash: " << std::hash<ndn::Name>{}(interestName));
160 
162  try {
163  iblt.initialize(ibltName);
164  }
165  catch (const std::exception& e) {
166  NDN_LOG_WARN(e.what());
167  return;
168  }
169 
170  IBLT diff = m_iblt - iblt;
171 
172  std::set<uint32_t> positive;
173  std::set<uint32_t> negative;
174 
175  if (!diff.listEntries(positive, negative)) {
176  NDN_LOG_TRACE("Cannot decode differences, positive: " << positive.size()
177  << " negative: " << negative.size() << " m_threshold: "
178  << m_threshold);
179 
180  // Send all data if greater then threshold, else send positive below as usual
181  // Or send if we can't get neither positive nor negative differences
182  if (positive.size() + negative.size() >= m_threshold ||
183  (positive.size() == 0 && negative.size() == 0)) {
184  State state;
185  for (const auto& content : m_prefixes) {
186  if (content.second != 0) {
187  state.addContent(ndn::Name(content.first).appendNumber(content.second));
188  }
189  }
190 
191  if (!state.getContent().empty()) {
192  sendSyncData(interest.getName(), state.wireEncode());
193  }
194 
195  return;
196  }
197  }
198 
199  State state;
200  for (const auto& hash : positive) {
201  auto nameIt = m_biMap.left.find(hash);
202  if (nameIt != m_biMap.left.end()) {
203  ndn::Name nameWithoutSeq = nameIt->second.getPrefix(-1);
204  // Don't sync up sequence number zero
205  if (m_prefixes[nameWithoutSeq] != 0 &&
206  !isFutureHash(nameWithoutSeq.toUri(), negative)) {
207  state.addContent(nameIt->second);
208  }
209  }
210  }
211 
212  if (!state.getContent().empty()) {
213  NDN_LOG_DEBUG("Sending sync content: " << state);
214  sendSyncData(interestName, state.wireEncode());
215  return;
216  }
217 
218  auto& entry = m_pendingEntries.emplace(interestName, PendingEntryInfoFull{iblt, {}}).first->second;
219  entry.expirationEvent = m_scheduler.schedule(interest.getInterestLifetime(),
220  [this, interest] {
221  NDN_LOG_TRACE("Erase Pending Interest " << interest.getNonce());
222  m_pendingEntries.erase(interest.getName());
223  });
224 }
225 
226 void
227 FullProducer::sendSyncData(const ndn::Name& name, const ndn::Block& block)
228 {
229  NDN_LOG_DEBUG("Checking if data will satisfy our own pending interest");
230 
231  ndn::Name nameWithIblt;
232  m_iblt.appendToName(nameWithIblt);
233 
234  // TODO: Remove appending of hash - serves no purpose to the receiver
235  ndn::Name dataName(ndn::Name(name).appendNumber(std::hash<ndn::Name>{}(nameWithIblt)));
236 
237  auto content = compress(m_contentCompression, block.wire(), block.size());
238 
239  // checking if our own interest got satisfied
240  if (m_outstandingInterestName == name) {
241  NDN_LOG_DEBUG("Satisfied our own pending interest");
242  // remove outstanding interest
243  if (m_fetcher) {
244  NDN_LOG_DEBUG("Removing our pending interest from face (stop fetcher)");
245  m_fetcher->stop();
246  m_outstandingInterestName = ndn::Name("");
247  }
248 
249  NDN_LOG_DEBUG("Sending Sync Data");
250 
251  // Send data after removing pending sync interest on face
252  m_segmentPublisher.publish(name, dataName, content, m_syncReplyFreshness);
253 
254  NDN_LOG_TRACE("Renewing sync interest");
255  sendSyncInterest();
256  }
257  else {
258  NDN_LOG_DEBUG("Sending Sync Data");
259  m_segmentPublisher.publish(name, dataName, content, m_syncReplyFreshness);
260  }
261 }
262 
263 void
264 FullProducer::onSyncData(const ndn::Interest& interest, const ndn::ConstBufferPtr& bufferPtr)
265 {
266  deletePendingInterests(interest.getName());
267 
268  State state;
269  try {
270  auto decompressed = decompress(m_contentCompression, bufferPtr->data(), bufferPtr->size());
271  state.wireDecode(ndn::Block(std::move(decompressed)));
272  }
273  catch (const std::exception& e) {
274  NDN_LOG_ERROR("Cannot parse received Sync Data: " << e.what());
275  return;
276  }
277 
278  std::vector<MissingDataInfo> updates;
279 
280  NDN_LOG_DEBUG("Sync Data Received: " << state);
281 
282  for (const auto& content : state.getContent()) {
283  ndn::Name prefix = content.getPrefix(-1);
284  uint64_t seq = content.get(content.size() - 1).toNumber();
285 
286  if (m_prefixes.find(prefix) == m_prefixes.end() || m_prefixes[prefix] < seq) {
287  updates.push_back(MissingDataInfo{prefix, m_prefixes[prefix] + 1, seq});
288  updateSeqNo(prefix, seq);
289  // We should not call satisfyPendingSyncInterests here because we just
290  // got data and deleted pending interest by calling deletePendingFullSyncInterests
291  // But we might have interests not matching to this interest that might not have deleted
292  // from pending sync interest
293  }
294  }
295 
296  // We just got the data, so send a new sync interest
297  if (!updates.empty()) {
298  m_onUpdate(updates);
299  NDN_LOG_TRACE("Renewing sync interest");
300  sendSyncInterest();
301  }
302  else {
303  NDN_LOG_TRACE("No new update, interest nonce: " << interest.getNonce() <<
304  " , hash: " << std::hash<ndn::Name>{}(interest.getName()));
305  }
306 }
307 
308 void
309 FullProducer::satisfyPendingInterests()
310 {
311  NDN_LOG_DEBUG("Satisfying full sync interest: " << m_pendingEntries.size());
312 
313  for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
314  const PendingEntryInfoFull& entry = it->second;
315  IBLT diff = m_iblt - entry.iblt;
316  std::set<uint32_t> positive;
317  std::set<uint32_t> negative;
318 
319  if (!diff.listEntries(positive, negative)) {
320  NDN_LOG_TRACE("Decode failed for pending interest");
321  if (positive.size() + negative.size() >= m_threshold ||
322  (positive.size() == 0 && negative.size() == 0)) {
323  NDN_LOG_TRACE("pos + neg > threshold or no diff can be found, erase pending interest");
324  it = m_pendingEntries.erase(it);
325  continue;
326  }
327  }
328 
329  State state;
330  for (const auto& hash : positive) {
331  auto nameIt = m_biMap.left.find(hash);
332  if (nameIt != m_biMap.left.end()) {
333  if (m_prefixes[nameIt->second.getPrefix(-1)] != 0) {
334  state.addContent(nameIt->second);
335  }
336  }
337  }
338 
339  if (!state.getContent().empty()) {
340  NDN_LOG_DEBUG("Satisfying sync content: " << state);
341  sendSyncData(it->first, state.wireEncode());
342  it = m_pendingEntries.erase(it);
343  }
344  else {
345  ++it;
346  }
347  }
348 }
349 
350 bool
351 FullProducer::isFutureHash(const ndn::Name& prefix, const std::set<uint32_t>& negative)
352 {
353  uint32_t nextHash = murmurHash3(N_HASHCHECK,
354  ndn::Name(prefix).appendNumber(m_prefixes[prefix] + 1).toUri());
355  for (const auto& nHash : negative) {
356  if (nHash == nextHash) {
357  return true;
358  break;
359  }
360  }
361  return false;
362 }
363 
364 void
365 FullProducer::deletePendingInterests(const ndn::Name& interestName)
366 {
367  auto it = m_pendingEntries.find(interestName);
368  if (it != m_pendingEntries.end()) {
369  NDN_LOG_TRACE("Delete pending interest: " << interestName);
370  it = m_pendingEntries.erase(it);
371  }
372 }
373 
374 } // namespace psync
void updateSeqNo(const ndn::Name &prefix, uint64_t seq)
Update m_prefixes and IBF with the given prefix and seq.
void initialize(const ndn::name::Component &ibltName)
Populate the hash table using the vector representation of IBLT.
Definition: iblt.cpp:85
void addContent(const ndn::Name &prefix)
Definition: state.cpp:32
const ndn::Block & wireEncode() const
Definition: state.cpp:38
NDN_LOG_INIT(psync.Consumer)
HashNameBiMap m_biMap
Full sync logic to synchronize with other nodes where all nodes wants to get all names prefixes synce...
uint32_t murmurHash3(uint32_t nHashSeed, const std::vector< unsigned char > &vDataToHash)
Definition: util.cpp:61
void publishName(const ndn::Name &prefix, ndn::optional< uint64_t > seq=ndn::nullopt)
Publish name to let others know.
Invertible Bloom Lookup Table (Invertible Bloom Filter)
Definition: iblt.hpp:82
const size_t N_HASHCHECK
void publish(const ndn::Name &interestName, const ndn::Name &dataName, const ndn::Block &block, ndn::time::milliseconds freshness, const ndn::security::SigningInfo &signingInfo=ndn::security::v2::KeyChain::getDefaultSigningInfo())
Put all the segments in memory.
std::shared_ptr< ndn::Buffer > decompress(CompressionScheme scheme, const uint8_t *buffer, size_t bufferSize)
Definition: util.cpp:187
CompressionScheme
Definition: util.hpp:51
const std::vector< ndn::Name > & getContent() const
Definition: state.hpp:46
bool replyFromStore(const ndn::Name &interestName)
Try to reply from memory, return false if we cannot find the segment.
std::shared_ptr< ndn::Buffer > compress(CompressionScheme scheme, const uint8_t *buffer, size_t bufferSize)
Definition: util.cpp:132
CompressionScheme m_contentCompression
bool listEntries(std::set< uint32_t > &positive, std::set< uint32_t > &negative) const
List all the entries in the IBLT.
Definition: iblt.cpp:131
FullProducer(size_t expectedNumEntries, ndn::Face &face, const ndn::Name &syncPrefix, const ndn::Name &userPrefix, const UpdateCallback &onUpdateCallBack, ndn::time::milliseconds syncInterestLifetime=SYNC_INTEREST_LIFTIME, ndn::time::milliseconds syncReplyFreshness=SYNC_REPLY_FRESHNESS, CompressionScheme ibltCompression=CompressionScheme::ZLIB, CompressionScheme contentCompression=CompressionScheme::ZLIB)
constructor
void appendToName(ndn::Name &name) const
Appends self to name.
Definition: iblt.cpp:222
ndn::random::RandomNumberEngine & m_rng
void onRegisterFailed(const ndn::Name &prefix, const std::string &msg) const
Logs a message if setting an interest filter fails.
ndn::Scheduler m_scheduler
std::function< void(const std::vector< MissingDataInfo > &)> UpdateCallback
Definition: consumer.hpp:41
ndn::time::milliseconds m_syncReplyFreshness
SegmentPublisher m_segmentPublisher
Base class for PartialProducer and FullProducer.
std::map< ndn::Name, uint64_t > m_prefixes