partial-producer.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2020, The University of Memphis
4  *
5  * This file is part of PSync.
6  * See AUTHORS.md for complete list of PSync authors and contributors.
7  *
8  * PSync is free software: you can redistribute it and/or modify it under the terms
9  * of the GNU Lesser General Public License as published by the Free Software Foundation,
10  * either version 3 of the License, or (at your option) any later version.
11  *
12  * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14  * PURPOSE. See the GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License along with
17  * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18  **/
19 
21 #include "PSync/detail/state.hpp"
22 
23 #include <ndn-cxx/util/logger.hpp>
24 
25 #include <cstring>
26 #include <limits>
27 
28 namespace psync {
29 
31 
32 PartialProducer::PartialProducer(size_t expectedNumEntries,
33  ndn::Face& face,
34  const ndn::Name& syncPrefix,
35  const ndn::Name& userPrefix,
36  ndn::time::milliseconds helloReplyFreshness,
37  ndn::time::milliseconds syncReplyFreshness,
38  CompressionScheme ibltCompression)
39  : ProducerBase(expectedNumEntries, face, syncPrefix,
40  userPrefix, syncReplyFreshness, ibltCompression)
41  , m_helloReplyFreshness(helloReplyFreshness)
42 {
43  m_registeredPrefix = m_face.registerPrefix(m_syncPrefix,
44  [this] (const ndn::Name& syncPrefix) {
45  m_face.setInterestFilter(ndn::Name(m_syncPrefix).append("hello"),
46  std::bind(&PartialProducer::onHelloInterest, this, _1, _2));
47  m_face.setInterestFilter(ndn::Name(m_syncPrefix).append("sync"),
48  std::bind(&PartialProducer::onSyncInterest, this, _1, _2));
49  },
50  std::bind(&PartialProducer::onRegisterFailed, this, _1, _2));
51 }
52 
53 void
54 PartialProducer::publishName(const ndn::Name& prefix, ndn::optional<uint64_t> seq)
55 {
56  if (m_prefixes.find(prefix) == m_prefixes.end()) {
57  return;
58  }
59 
60  uint64_t newSeq = seq.value_or(m_prefixes[prefix] + 1);
61 
62  NDN_LOG_INFO("Publish: " << prefix << "/" << newSeq);
63 
64  updateSeqNo(prefix, newSeq);
65 
66  satisfyPendingSyncInterests(prefix);
67 }
68 
69 void
70 PartialProducer::onHelloInterest(const ndn::Name& prefix, const ndn::Interest& interest)
71 {
72  if (m_segmentPublisher.replyFromStore(interest.getName())) {
73  return;
74  }
75 
76  // Last component or fourth last component (in case of interest with version and segment)
77  // needs to be hello
78  if (interest.getName().get(interest.getName().size()-1).toUri() != "hello" &&
79  interest.getName().get(interest.getName().size()-4).toUri() != "hello") {
80  return;
81  }
82 
83  NDN_LOG_DEBUG("Hello Interest Received, nonce: " << interest);
84 
85  detail::State state;
86  for (const auto& p : m_prefixes) {
87  state.addContent(ndn::Name(p.first).appendNumber(p.second));
88  }
89  NDN_LOG_DEBUG("sending content p: " << state);
90 
91  ndn::Name helloDataName = prefix;
92  m_iblt.appendToName(helloDataName);
93 
94  m_segmentPublisher.publish(interest.getName(), helloDataName,
95  state.wireEncode(), m_helloReplyFreshness);
96 }
97 
98 void
99 PartialProducer::onSyncInterest(const ndn::Name& prefix, const ndn::Interest& interest)
100 {
101  if (m_segmentPublisher.replyFromStore(interest.getName())) {
102  return;
103  }
104 
105  NDN_LOG_DEBUG("Sync Interest Received, nonce: " << interest.getNonce() <<
106  " hash: " << std::hash<std::string>{}(interest.getName().toUri()));
107 
108  ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefix.size());
109  ndn::Name interestName;
110 
111  if (nameWithoutSyncPrefix.size() == 4) {
112  // Get /<prefix>/BF/IBF/ from /<prefix>/BF/IBF (3 components of BF + 1 for IBF)
113  interestName = interest.getName();
114  }
115  else if (nameWithoutSyncPrefix.size() == 6) {
116  // Get <prefix>/BF/IBF/ from /<prefix>/BF/IBF/<version>/<segment-no>
117  interestName = interest.getName().getPrefix(-2);
118  }
119  else {
120  return;
121  }
122 
123  ndn::name::Component bfName, ibltName;
124  unsigned int projectedCount;
125  double falsePositiveProb;
126  try {
127  projectedCount = interestName.get(interestName.size()-4).toNumber();
128  falsePositiveProb = interestName.get(interestName.size()-3).toNumber()/1000.;
129  bfName = interestName.get(interestName.size()-2);
130 
131  ibltName = interestName.get(interestName.size()-1);
132  }
133  catch (const std::exception& e) {
134  NDN_LOG_ERROR("Cannot extract bloom filter and IBF from sync interest: " << e.what());
135  NDN_LOG_ERROR("Format: /<syncPrefix>/sync/<BF-count>/<BF-false-positive-probability>/<BF>/<IBF>");
136  return;
137  }
138 
141  try {
142  bf = detail::BloomFilter(projectedCount, falsePositiveProb, bfName);
143  iblt.initialize(ibltName);
144  }
145  catch (const std::exception& e) {
146  NDN_LOG_WARN(e.what());
147  return;
148  }
149 
150  // get the difference
151  auto diff = m_iblt - iblt;
152 
153  // non-empty positive means we have some elements that the others don't
154  std::set<uint32_t> positive;
155  std::set<uint32_t> negative;
156 
157  NDN_LOG_TRACE("Number elements in IBF: " << m_prefixes.size());
158 
159  bool peel = diff.listEntries(positive, negative);
160 
161  NDN_LOG_TRACE("Result of listEntries on the difference: " << peel);
162 
163  if (!peel) {
164  NDN_LOG_DEBUG("Can't decode the difference, sending application Nack");
165  sendApplicationNack(interestName);
166  return;
167  }
168 
169  // generate content for Sync reply
170  detail::State state;
171  NDN_LOG_TRACE("Size of positive set " << positive.size());
172  NDN_LOG_TRACE("Size of negative set " << negative.size());
173  for (const auto& hash : positive) {
174  auto nameIt = m_biMap.left.find(hash);
175  if (nameIt != m_biMap.left.end()) {
176  if (bf.contains(nameIt->second.getPrefix(-1).toUri())) {
177  // generate data
178  state.addContent(nameIt->second);
179  NDN_LOG_DEBUG("Content: " << nameIt->second << " " << nameIt->first);
180  }
181  }
182  }
183 
184  NDN_LOG_TRACE("m_threshold: " << m_threshold << " Total: " << positive.size() + negative.size());
185 
186  if (positive.size() + negative.size() >= m_threshold || !state.getContent().empty()) {
187 
188  // send back data
189  ndn::Name syncDataName = interestName;
190  m_iblt.appendToName(syncDataName);
191 
192  m_segmentPublisher.publish(interest.getName(), syncDataName,
194  return;
195  }
196 
197  auto& entry = m_pendingEntries.emplace(interestName, PendingEntryInfo{bf, iblt, {}}).first->second;
198  entry.expirationEvent = m_scheduler.schedule(interest.getInterestLifetime(),
199  [this, interest] {
200  NDN_LOG_TRACE("Erase Pending Interest " << interest.getNonce());
201  m_pendingEntries.erase(interest.getName());
202  });
203 }
204 
205 void
206 PartialProducer::satisfyPendingSyncInterests(const ndn::Name& prefix) {
207  NDN_LOG_TRACE("size of pending interest: " << m_pendingEntries.size());
208 
209  for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
210  const PendingEntryInfo& entry = it->second;
211 
212  auto diff = m_iblt - entry.iblt;
213  std::set<uint32_t> positive;
214  std::set<uint32_t> negative;
215 
216  bool peel = diff.listEntries(positive, negative);
217 
218  NDN_LOG_TRACE("Result of listEntries on the difference: " << peel);
219 
220  NDN_LOG_TRACE("Number elements in IBF: " << m_prefixes.size());
221  NDN_LOG_TRACE("m_threshold: " << m_threshold << " Total: " << positive.size() + negative.size());
222 
223  if (!peel) {
224  NDN_LOG_TRACE("Decoding of differences with stored IBF unsuccessful, deleting pending interest");
225  m_pendingEntries.erase(it++);
226  continue;
227  }
228 
229  detail::State state;
230  if (entry.bf.contains(prefix.toUri()) || positive.size() + negative.size() >= m_threshold) {
231  if (entry.bf.contains(prefix.toUri())) {
232  state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
233  NDN_LOG_DEBUG("sending sync content " << prefix << " " << std::to_string(m_prefixes[prefix]));
234  }
235  else {
236  NDN_LOG_DEBUG("Sending with empty content to send latest IBF to consumer");
237  }
238 
239  // generate sync data and cancel the event
240  ndn::Name syncDataName = it->first;
241  m_iblt.appendToName(syncDataName);
242 
243  m_segmentPublisher.publish(it->first, syncDataName,
245 
246  m_pendingEntries.erase(it++);
247  }
248  else {
249  ++it;
250  }
251  }
252 }
253 
254 } // namespace psync
void appendToName(ndn::Name &name) const
Appends self to name.
Definition: iblt.cpp:187
void sendApplicationNack(const ndn::Name &name)
Sends a data packet with content type nack.
void updateSeqNo(const ndn::Name &prefix, uint64_t seq)
Update m_prefixes and IBF with the given prefix and seq.
Invertible Bloom Lookup Table (Invertible Bloom Filter)
Definition: iblt.hpp:81
const ndn::Block & wireEncode() const
Definition: state.cpp:41
Definition: common.hpp:33
NDN_LOG_INIT(psync.Consumer)
HashNameBiMap m_biMap
Partial sync logic to publish data names.
void publish(const ndn::Name &interestName, const ndn::Name &dataName, const ndn::Block &block, ndn::time::milliseconds freshness, const ndn::security::SigningInfo &signingInfo=ndn::security::SigningInfo())
Put all the segments in memory.
void addContent(const ndn::Name &prefix)
Definition: state.cpp:34
bool listEntries(std::set< uint32_t > &positive, std::set< uint32_t > &negative) const
List all the entries in the IBLT.
Definition: iblt.cpp:137
CompressionScheme m_ibltCompression
bool contains(const std::string &key) const
void publishName(const ndn::Name &prefix, ndn::optional< uint64_t > seq=ndn::nullopt)
Publish name to let subscribed consumers know.
CompressionScheme
Definition: common.hpp:35
bool replyFromStore(const ndn::Name &interestName)
Try to reply from memory, return false if we cannot find the segment.
void initialize(const ndn::name::Component &ibltName)
Populate the hash table using the vector representation of IBLT.
Definition: iblt.cpp:91
const std::vector< ndn::Name > & getContent() const
Definition: state.hpp:48
void onRegisterFailed(const ndn::Name &prefix, const std::string &msg) const
Logs a message if setting an interest filter fails.
PartialProducer(size_t expectedNumEntries, ndn::Face &face, const ndn::Name &syncPrefix, const ndn::Name &userPrefix, ndn::time::milliseconds helloReplyFreshness=HELLO_REPLY_FRESHNESS, ndn::time::milliseconds syncReplyFreshness=SYNC_REPLY_FRESHNESS, CompressionScheme ibltCompression=CompressionScheme::NONE)
constructor
ndn::Scheduler m_scheduler
ndn::time::milliseconds m_syncReplyFreshness
SegmentPublisher m_segmentPublisher
Base class for PartialProducer and FullProducer.
std::map< ndn::Name, uint64_t > m_prefixes