full-producer.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2019, The University of Memphis
4  *
5  * This file is part of PSync.
6  * See AUTHORS.md for complete list of PSync authors and contributors.
7  *
8  * PSync is free software: you can redistribute it and/or modify it under the terms
9  * of the GNU Lesser General Public License as published by the Free Software Foundation,
10  * either version 3 of the License, or (at your option) any later version.
11  *
12  * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14  * PURPOSE. See the GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License along with
17  * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18  **/
19 
20 #include "PSync/full-producer.hpp"
21 
22 #include <ndn-cxx/util/logger.hpp>
23 #include <ndn-cxx/util/segment-fetcher.hpp>
24 #include <ndn-cxx/security/validator-null.hpp>
25 
26 #include <cstring>
27 #include <limits>
28 #include <functional>
29 
30 namespace psync {
31 
32 NDN_LOG_INIT(psync.FullProducer);
33 
34 FullProducer::FullProducer(const size_t expectedNumEntries,
35  ndn::Face& face,
36  const ndn::Name& syncPrefix,
37  const ndn::Name& userPrefix,
38  const UpdateCallback& onUpdateCallBack,
39  ndn::time::milliseconds syncInterestLifetime,
40  ndn::time::milliseconds syncReplyFreshness)
41  : ProducerBase(expectedNumEntries, face, syncPrefix, userPrefix, syncReplyFreshness)
42  , m_syncInterestLifetime(syncInterestLifetime)
43  , m_onUpdate(onUpdateCallBack)
44 {
45  int jitter = m_syncInterestLifetime.count() * .20;
46  m_jitter = std::uniform_int_distribution<>(-jitter, jitter);
47 
48  m_registeredPrefix = m_face.setInterestFilter(
49  ndn::InterestFilter(m_syncPrefix).allowLoopback(false),
50  std::bind(&FullProducer::onSyncInterest, this, _1, _2),
51  std::bind(&FullProducer::onRegisterFailed, this, _1, _2));
52 
53  // Should we do this after setInterestFilter success call back
54  // (Currently following ChronoSync's way)
55  sendSyncInterest();
56 }
57 
59 {
60  if (m_fetcher) {
61  m_fetcher->stop();
62  }
63 }
64 
65 void
66 FullProducer::publishName(const ndn::Name& prefix, ndn::optional<uint64_t> seq)
67 {
68  if (m_prefixes.find(prefix) == m_prefixes.end()) {
69  NDN_LOG_WARN("Prefix not added: " << prefix);
70  return;
71  }
72 
73  uint64_t newSeq = seq.value_or(m_prefixes[prefix] + 1);
74 
75  NDN_LOG_INFO("Publish: "<< prefix << "/" << newSeq);
76 
77  updateSeqNo(prefix, newSeq);
78 
79  satisfyPendingInterests();
80 }
81 
82 void
83 FullProducer::sendSyncInterest()
84 {
85  // If we send two sync interest one after the other
86  // since there is no new data in the network yet,
87  // when data is available it may satisfy both of them
88  if (m_fetcher) {
89  m_fetcher->stop();
90  }
91 
92  // Sync Interest format for full sync: /<sync-prefix>/<ourLatestIBF>
93  ndn::Name syncInterestName = m_syncPrefix;
94 
95  // Append our latest IBF
96  m_iblt.appendToName(syncInterestName);
97 
98  m_outstandingInterestName = syncInterestName;
99 
100  m_scheduledSyncInterestId =
101  m_scheduler.scheduleEvent(m_syncInterestLifetime / 2 + ndn::time::milliseconds(m_jitter(m_rng)),
102  [this] { sendSyncInterest(); });
103 
104  ndn::Interest syncInterest(syncInterestName);
105 
106  ndn::util::SegmentFetcher::Options options;
107  options.interestLifetime = m_syncInterestLifetime;
108  options.maxTimeout = m_syncInterestLifetime;
109 
110  m_fetcher = ndn::util::SegmentFetcher::start(m_face,
111  syncInterest,
112  ndn::security::v2::getAcceptAllValidator(),
113  options);
114 
115  m_fetcher->onComplete.connect([this, syncInterest] (ndn::ConstBufferPtr bufferPtr) {
116  onSyncData(syncInterest, bufferPtr);
117  });
118 
119  m_fetcher->onError.connect([] (uint32_t errorCode, const std::string& msg) {
120  NDN_LOG_ERROR("Cannot fetch sync data, error: " <<
121  errorCode << " message: " << msg);
122  });
123 
124  NDN_LOG_DEBUG("sendFullSyncInterest, nonce: " << syncInterest.getNonce() <<
125  ", hash: " << std::hash<ndn::Name>{}(syncInterestName));
126 }
127 
128 void
129 FullProducer::onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& interest)
130 {
131  if (m_segmentPublisher.replyFromStore(interest.getName())) {
132  return;
133  }
134 
135  ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefixName.size());
136  ndn::Name interestName;
137 
138  if (nameWithoutSyncPrefix.size() == 1) {
139  // Get /<prefix>/IBF from /<prefix>/IBF
140  interestName = interest.getName();
141  }
142  else if (nameWithoutSyncPrefix.size() == 3) {
143  // Get /<prefix>/IBF from /<prefix>/IBF/<version>/<segment-no>
144  interestName = interest.getName().getPrefix(-2);
145  }
146  else {
147  return;
148  }
149 
150  ndn::name::Component ibltName = interestName.get(interestName.size()-1);
151 
152  NDN_LOG_DEBUG("Full Sync Interest Received, nonce: " << interest.getNonce() <<
153  ", hash: " << std::hash<ndn::Name>{}(interestName));
154 
156  try {
157  iblt.initialize(ibltName);
158  }
159  catch (const std::exception& e) {
160  NDN_LOG_WARN(e.what());
161  return;
162  }
163 
164  IBLT diff = m_iblt - iblt;
165 
166  std::set<uint32_t> positive;
167  std::set<uint32_t> negative;
168 
169  if (!diff.listEntries(positive, negative)) {
170  NDN_LOG_TRACE("Cannot decode differences, positive: " << positive.size()
171  << " negative: " << negative.size() << " m_threshold: "
172  << m_threshold);
173 
174  // Send all data if greater then threshold, else send positive below as usual
175  // Or send if we can't get neither positive nor negative differences
176  if (positive.size() + negative.size() >= m_threshold ||
177  (positive.size() == 0 && negative.size() == 0)) {
178  State state;
179  for (const auto& content : m_prefixes) {
180  if (content.second != 0) {
181  state.addContent(ndn::Name(content.first).appendNumber(content.second));
182  }
183  }
184 
185  if (!state.getContent().empty()) {
186  m_segmentPublisher.publish(interest.getName(), interest.getName(),
188  }
189 
190  return;
191  }
192  }
193 
194  State state;
195  for (const auto& hash : positive) {
196  ndn::Name prefix = m_hash2prefix[hash];
197  // Don't sync up sequence number zero
198  if (m_prefixes[prefix] != 0 && !isFutureHash(prefix.toUri(), negative)) {
199  state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
200  }
201  }
202 
203  if (!state.getContent().empty()) {
204  NDN_LOG_DEBUG("Sending sync content: " << state);
205  sendSyncData(interestName, state.wireEncode());
206  return;
207  }
208 
209  auto& entry = m_pendingEntries.emplace(interestName, PendingEntryInfoFull{iblt, {}}).first->second;
210  entry.expirationEvent = m_scheduler.scheduleEvent(interest.getInterestLifetime(),
211  [this, interest] {
212  NDN_LOG_TRACE("Erase Pending Interest " << interest.getNonce());
213  m_pendingEntries.erase(interest.getName());
214  });
215 }
216 
217 void
218 FullProducer::sendSyncData(const ndn::Name& name, const ndn::Block& block)
219 {
220  NDN_LOG_DEBUG("Checking if data will satisfy our own pending interest");
221 
222  ndn::Name nameWithIblt;
223  m_iblt.appendToName(nameWithIblt);
224 
225  // Append hash of our IBF so that data name maybe different for each node answering
226  ndn::Name dataName(ndn::Name(name).appendNumber(std::hash<ndn::Name>{}(nameWithIblt)));
227 
228  // checking if our own interest got satisfied
229  if (m_outstandingInterestName == name) {
230  NDN_LOG_DEBUG("Satisfied our own pending interest");
231  // remove outstanding interest
232  if (m_fetcher) {
233  NDN_LOG_DEBUG("Removing our pending interest from face (stop fetcher)");
234  m_fetcher->stop();
235  m_outstandingInterestName = ndn::Name("");
236  }
237 
238  NDN_LOG_DEBUG("Sending Sync Data");
239 
240  // Send data after removing pending sync interest on face
241  m_segmentPublisher.publish(name, dataName, block, m_syncReplyFreshness);
242 
243  NDN_LOG_TRACE("Renewing sync interest");
244  sendSyncInterest();
245  }
246  else {
247  NDN_LOG_DEBUG("Sending Sync Data");
248  m_segmentPublisher.publish(name, dataName, block, m_syncReplyFreshness);
249  }
250 }
251 
252 void
253 FullProducer::onSyncData(const ndn::Interest& interest, const ndn::ConstBufferPtr& bufferPtr)
254 {
255  deletePendingInterests(interest.getName());
256 
257  State state(ndn::Block(std::move(bufferPtr)));
258  std::vector<MissingDataInfo> updates;
259 
260  NDN_LOG_DEBUG("Sync Data Received: " << state);
261 
262  for (const auto& content : state.getContent()) {
263  ndn::Name prefix = content.getPrefix(-1);
264  uint64_t seq = content.get(content.size()-1).toNumber();
265 
266  if (m_prefixes.find(prefix) == m_prefixes.end() || m_prefixes[prefix] < seq) {
267  updates.push_back(MissingDataInfo{prefix, m_prefixes[prefix] + 1, seq});
268  updateSeqNo(prefix, seq);
269  // We should not call satisfyPendingSyncInterests here because we just
270  // got data and deleted pending interest by calling deletePendingFullSyncInterests
271  // But we might have interests not matching to this interest that might not have deleted
272  // from pending sync interest
273  }
274  }
275 
276  // We just got the data, so send a new sync interest
277  if (!updates.empty()) {
278  m_onUpdate(updates);
279  NDN_LOG_TRACE("Renewing sync interest");
280  sendSyncInterest();
281  }
282  else {
283  NDN_LOG_TRACE("No new update, interest nonce: " << interest.getNonce() <<
284  " , hash: " << std::hash<ndn::Name>{}(interest.getName()));
285  }
286 }
287 
288 void
289 FullProducer::satisfyPendingInterests()
290 {
291  NDN_LOG_DEBUG("Satisfying full sync interest: " << m_pendingEntries.size());
292 
293  for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
294  const PendingEntryInfoFull& entry = it->second;
295  IBLT diff = m_iblt - entry.iblt;
296  std::set<uint32_t> positive;
297  std::set<uint32_t> negative;
298 
299  if (!diff.listEntries(positive, negative)) {
300  NDN_LOG_TRACE("Decode failed for pending interest");
301  if (positive.size() + negative.size() >= m_threshold ||
302  (positive.size() == 0 && negative.size() == 0)) {
303  NDN_LOG_TRACE("pos + neg > threshold or no diff can be found, erase pending interest");
304  m_pendingEntries.erase(it++);
305  continue;
306  }
307  }
308 
309  State state;
310  for (const auto& hash : positive) {
311  ndn::Name prefix = m_hash2prefix[hash];
312 
313  if (m_prefixes[prefix] != 0) {
314  state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
315  }
316  }
317 
318  if (!state.getContent().empty()) {
319  NDN_LOG_DEBUG("Satisfying sync content: " << state);
320  sendSyncData(it->first, state.wireEncode());
321  m_pendingEntries.erase(it++);
322  }
323  else {
324  ++it;
325  }
326  }
327 }
328 
329 bool
330 FullProducer::isFutureHash(const ndn::Name& prefix, const std::set<uint32_t>& negative)
331 {
332  uint32_t nextHash = murmurHash3(N_HASHCHECK,
333  ndn::Name(prefix).appendNumber(m_prefixes[prefix] + 1).toUri());
334  for (const auto& nHash : negative) {
335  if (nHash == nextHash) {
336  return true;
337  break;
338  }
339  }
340  return false;
341 }
342 
343 void
344 FullProducer::deletePendingInterests(const ndn::Name& interestName) {
345  for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
346  if (it->first == interestName) {
347  NDN_LOG_TRACE("Delete pending interest: " << interestName);
348  m_pendingEntries.erase(it++);
349  }
350  else {
351  ++it;
352  }
353  }
354 }
355 
356 } // namespace psync
void onRegisterFailed(const ndn::Name &prefix, const std::string &msg) const
Logs a message if setting an interest filter fails.
void updateSeqNo(const ndn::Name &prefix, uint64_t seq)
Update m_prefixes and IBF with the given prefix and seq.
void initialize(const ndn::name::Component &ibltName)
Populate the hash table using the vector representation of IBLT.
Definition: iblt.cpp:92
void addContent(const ndn::Name &prefix)
Definition: state.cpp:30
std::vector< ndn::Name > getContent() const
Definition: state.hpp:46
const ndn::Block & wireEncode() const
Definition: state.cpp:36
bool listEntries(std::set< uint32_t > &positive, std::set< uint32_t > &negative) const
List all the entries in the IBLT.
Definition: iblt.cpp:138
Full sync logic to synchronize with other nodes where all nodes wants to get all names prefixes synce...
uint32_t murmurHash3(uint32_t nHashSeed, const std::vector< unsigned char > &vDataToHash)
Definition: util.cpp:37
void publishName(const ndn::Name &prefix, ndn::optional< uint64_t > seq=ndn::nullopt)
Publish name to let others know.
Invertible Bloom Lookup Table (Invertible Bloom Filter)
Definition: iblt.hpp:80
const size_t N_HASHCHECK
void publish(const ndn::Name &interestName, const ndn::Name &dataName, const ndn::Block &block, ndn::time::milliseconds freshness, const ndn::security::SigningInfo &signingInfo=ndn::security::v2::KeyChain::getDefaultSigningInfo())
Put all the segments in memory.
FullProducer(size_t expectedNumEntries, ndn::Face &face, const ndn::Name &syncPrefix, const ndn::Name &userPrefix, const UpdateCallback &onUpdateCallBack, ndn::time::milliseconds syncInterestLifetime=SYNC_INTEREST_LIFTIME, ndn::time::milliseconds syncReplyFreshness=SYNC_REPLY_FRESHNESS)
constructor
bool replyFromStore(const ndn::Name &interestName)
Try to reply from memory, return false if we cannot find the segment.
std::map< uint32_t, ndn::Name > m_hash2prefix
ndn::random::RandomNumberEngine & m_rng
ndn::Scheduler m_scheduler
std::function< void(const std::vector< MissingDataInfo > &)> UpdateCallback
Definition: consumer.hpp:41
ndn::time::milliseconds m_syncReplyFreshness
SegmentPublisher m_segmentPublisher
void appendToName(ndn::Name &name) const
Appends self to name.
Definition: iblt.cpp:229
Base class for PartialProducer and FullProducer.
std::map< ndn::Name, uint64_t > m_prefixes