partial-producer.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2020, The University of Memphis
4  *
5  * This file is part of PSync.
6  * See AUTHORS.md for complete list of PSync authors and contributors.
7  *
8  * PSync is free software: you can redistribute it and/or modify it under the terms
9  * of the GNU Lesser General Public License as published by the Free Software Foundation,
10  * either version 3 of the License, or (at your option) any later version.
11  *
12  * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14  * PURPOSE. See the GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License along with
17  * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18  **/
19 
21 #include "PSync/detail/state.hpp"
22 
23 #include <ndn-cxx/util/logger.hpp>
24 
25 #include <cstring>
26 #include <limits>
27 
28 namespace psync {
29 
31 
32 PartialProducer::PartialProducer(size_t expectedNumEntries,
33  ndn::Face& face,
34  const ndn::Name& syncPrefix,
35  const ndn::Name& userPrefix,
36  ndn::time::milliseconds helloReplyFreshness,
37  ndn::time::milliseconds syncReplyFreshness,
38  CompressionScheme ibltCompression)
39  : ProducerBase(expectedNumEntries, face, syncPrefix,
40  userPrefix, syncReplyFreshness, ibltCompression)
41  , m_helloReplyFreshness(helloReplyFreshness)
42 {
43  m_registeredPrefix = m_face.registerPrefix(m_syncPrefix,
44  [this] (const ndn::Name& syncPrefix) {
45  m_face.setInterestFilter(ndn::Name(m_syncPrefix).append("hello"),
46  std::bind(&PartialProducer::onHelloInterest, this, _1, _2));
47  m_face.setInterestFilter(ndn::Name(m_syncPrefix).append("sync"),
48  std::bind(&PartialProducer::onSyncInterest, this, _1, _2));
49  },
50  std::bind(&PartialProducer::onRegisterFailed, this, _1, _2));
51 }
52 
53 void
54 PartialProducer::publishName(const ndn::Name& prefix, ndn::optional<uint64_t> seq)
55 {
56  if (m_prefixes.find(prefix) == m_prefixes.end()) {
57  return;
58  }
59 
60  uint64_t newSeq = seq.value_or(m_prefixes[prefix] + 1);
61 
62  NDN_LOG_INFO("Publish: " << prefix << "/" << newSeq);
63 
64  updateSeqNo(prefix, newSeq);
65 
66  satisfyPendingSyncInterests(prefix);
67 }
68 
69 void
70 PartialProducer::onHelloInterest(const ndn::Name& prefix, const ndn::Interest& interest)
71 {
72  if (m_segmentPublisher.replyFromStore(interest.getName())) {
73  return;
74  }
75 
76  // Last component or fourth last component (in case of interest with version and segment)
77  // needs to be hello
78  if (interest.getName().get(interest.getName().size()-1).toUri() != "hello" &&
79  interest.getName().get(interest.getName().size()-4).toUri() != "hello") {
80  return;
81  }
82 
83  NDN_LOG_DEBUG("Hello Interest Received, nonce: " << interest);
84 
85  State state;
86 
87  for (const auto& prefix : m_prefixes) {
88  state.addContent(ndn::Name(prefix.first).appendNumber(prefix.second));
89  }
90  NDN_LOG_DEBUG("sending content p: " << state);
91 
92  ndn::Name helloDataName = prefix;
93  m_iblt.appendToName(helloDataName);
94 
95  m_segmentPublisher.publish(interest.getName(), helloDataName,
96  state.wireEncode(), m_helloReplyFreshness);
97 }
98 
99 void
100 PartialProducer::onSyncInterest(const ndn::Name& prefix, const ndn::Interest& interest)
101 {
102  if (m_segmentPublisher.replyFromStore(interest.getName())) {
103  return;
104  }
105 
106  NDN_LOG_DEBUG("Sync Interest Received, nonce: " << interest.getNonce() <<
107  " hash: " << std::hash<std::string>{}(interest.getName().toUri()));
108 
109  ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefix.size());
110  ndn::Name interestName;
111 
112  if (nameWithoutSyncPrefix.size() == 4) {
113  // Get /<prefix>/BF/IBF/ from /<prefix>/BF/IBF (3 components of BF + 1 for IBF)
114  interestName = interest.getName();
115  }
116  else if (nameWithoutSyncPrefix.size() == 6) {
117  // Get <prefix>/BF/IBF/ from /<prefix>/BF/IBF/<version>/<segment-no>
118  interestName = interest.getName().getPrefix(-2);
119  }
120  else {
121  return;
122  }
123 
124  ndn::name::Component bfName, ibltName;
125  unsigned int projectedCount;
126  double falsePositiveProb;
127  try {
128  projectedCount = interestName.get(interestName.size()-4).toNumber();
129  falsePositiveProb = interestName.get(interestName.size()-3).toNumber()/1000.;
130  bfName = interestName.get(interestName.size()-2);
131 
132  ibltName = interestName.get(interestName.size()-1);
133  }
134  catch (const std::exception& e) {
135  NDN_LOG_ERROR("Cannot extract bloom filter and IBF from sync interest: " << e.what());
136  NDN_LOG_ERROR("Format: /<syncPrefix>/sync/<BF-count>/<BF-false-positive-probability>/<BF>/<IBF>");
137  return;
138  }
139 
140  BloomFilter bf;
142 
143  try {
144  bf = BloomFilter(projectedCount, falsePositiveProb, bfName);
145  iblt.initialize(ibltName);
146  }
147  catch (const std::exception& e) {
148  NDN_LOG_WARN(e.what());
149  return;
150  }
151 
152  // get the difference
153  IBLT diff = m_iblt - iblt;
154 
155  // non-empty positive means we have some elements that the others don't
156  std::set<uint32_t> positive;
157  std::set<uint32_t> negative;
158 
159  NDN_LOG_TRACE("Number elements in IBF: " << m_prefixes.size());
160 
161  bool peel = diff.listEntries(positive, negative);
162 
163  NDN_LOG_TRACE("Result of listEntries on the difference: " << peel);
164 
165  if (!peel) {
166  NDN_LOG_DEBUG("Can't decode the difference, sending application Nack");
167  sendApplicationNack(interestName);
168  return;
169  }
170 
171  // generate content for Sync reply
172  State state;
173  NDN_LOG_TRACE("Size of positive set " << positive.size());
174  NDN_LOG_TRACE("Size of negative set " << negative.size());
175  for (const auto& hash : positive) {
176  auto nameIt = m_biMap.left.find(hash);
177  if (nameIt != m_biMap.left.end()) {
178  if (bf.contains(nameIt->second.getPrefix(-1).toUri())) {
179  // generate data
180  state.addContent(nameIt->second);
181  NDN_LOG_DEBUG("Content: " << nameIt->second << " " << nameIt->first);
182  }
183  }
184  }
185 
186  NDN_LOG_TRACE("m_threshold: " << m_threshold << " Total: " << positive.size() + negative.size());
187 
188  if (positive.size() + negative.size() >= m_threshold || !state.getContent().empty()) {
189 
190  // send back data
191  ndn::Name syncDataName = interestName;
192  m_iblt.appendToName(syncDataName);
193 
194  m_segmentPublisher.publish(interest.getName(), syncDataName,
196  return;
197  }
198 
199  auto& entry = m_pendingEntries.emplace(interestName, PendingEntryInfo{bf, iblt, {}}).first->second;
200  entry.expirationEvent = m_scheduler.schedule(interest.getInterestLifetime(),
201  [this, interest] {
202  NDN_LOG_TRACE("Erase Pending Interest " << interest.getNonce());
203  m_pendingEntries.erase(interest.getName());
204  });
205 }
206 
207 void
208 PartialProducer::satisfyPendingSyncInterests(const ndn::Name& prefix) {
209  NDN_LOG_TRACE("size of pending interest: " << m_pendingEntries.size());
210 
211  for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
212  const PendingEntryInfo& entry = it->second;
213 
214  IBLT diff = m_iblt - entry.iblt;
215  std::set<uint32_t> positive;
216  std::set<uint32_t> negative;
217 
218  bool peel = diff.listEntries(positive, negative);
219 
220  NDN_LOG_TRACE("Result of listEntries on the difference: " << peel);
221 
222  NDN_LOG_TRACE("Number elements in IBF: " << m_prefixes.size());
223  NDN_LOG_TRACE("m_threshold: " << m_threshold << " Total: " << positive.size() + negative.size());
224 
225  if (!peel) {
226  NDN_LOG_TRACE("Decoding of differences with stored IBF unsuccessful, deleting pending interest");
227  m_pendingEntries.erase(it++);
228  continue;
229  }
230 
231  State state;
232  if (entry.bf.contains(prefix.toUri()) || positive.size() + negative.size() >= m_threshold) {
233  if (entry.bf.contains(prefix.toUri())) {
234  state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
235  NDN_LOG_DEBUG("sending sync content " << prefix << " " << std::to_string(m_prefixes[prefix]));
236  }
237  else {
238  NDN_LOG_DEBUG("Sending with empty content to send latest IBF to consumer");
239  }
240 
241  // generate sync data and cancel the event
242  ndn::Name syncDataName = it->first;
243  m_iblt.appendToName(syncDataName);
244 
245  m_segmentPublisher.publish(it->first, syncDataName,
247 
248  m_pendingEntries.erase(it++);
249  }
250  else {
251  ++it;
252  }
253  }
254 }
255 
256 } // namespace psync
void sendApplicationNack(const ndn::Name &name)
Sends a data packet with content type nack.
void updateSeqNo(const ndn::Name &prefix, uint64_t seq)
Update m_prefixes and IBF with the given prefix and seq.
void initialize(const ndn::name::Component &ibltName)
Populate the hash table using the vector representation of IBLT.
Definition: iblt.cpp:85
void addContent(const ndn::Name &prefix)
Definition: state.cpp:32
const ndn::Block & wireEncode() const
Definition: state.cpp:38
NDN_LOG_INIT(psync.Consumer)
HashNameBiMap m_biMap
Partial sync logic to publish data names.
Invertible Bloom Lookup Table (Invertible Bloom Filter)
Definition: iblt.hpp:82
void publish(const ndn::Name &interestName, const ndn::Name &dataName, const ndn::Block &block, ndn::time::milliseconds freshness, const ndn::security::SigningInfo &signingInfo=ndn::security::v2::KeyChain::getDefaultSigningInfo())
Put all the segments in memory.
CompressionScheme m_ibltCompression
bool contains(const std::string &key) const
void publishName(const ndn::Name &prefix, ndn::optional< uint64_t > seq=ndn::nullopt)
Publish name to let subscribed consumers know.
CompressionScheme
Definition: util.hpp:51
const std::vector< ndn::Name > & getContent() const
Definition: state.hpp:46
bool replyFromStore(const ndn::Name &interestName)
Try to reply from memory, return false if we cannot find the segment.
bool listEntries(std::set< uint32_t > &positive, std::set< uint32_t > &negative) const
List all the entries in the IBLT.
Definition: iblt.cpp:131
void appendToName(ndn::Name &name) const
Appends self to name.
Definition: iblt.cpp:222
void onRegisterFailed(const ndn::Name &prefix, const std::string &msg) const
Logs a message if setting an interest filter fails.
PartialProducer(size_t expectedNumEntries, ndn::Face &face, const ndn::Name &syncPrefix, const ndn::Name &userPrefix, ndn::time::milliseconds helloReplyFreshness=HELLO_REPLY_FRESHNESS, ndn::time::milliseconds syncReplyFreshness=SYNC_REPLY_FRESHNESS, CompressionScheme ibltCompression=CompressionScheme::NONE)
constructor
ndn::Scheduler m_scheduler
ndn::time::milliseconds m_syncReplyFreshness
SegmentPublisher m_segmentPublisher
Base class for PartialProducer and FullProducer.
std::map< ndn::Name, uint64_t > m_prefixes